• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.62
/dozer-core/src/executor/mod.rs
1
use crate::builder_dag::{BuilderDag, NodeKind};
2
use crate::checkpoint::{CheckpointFactory, OptionCheckpoint};
3
use crate::dag_schemas::DagSchemas;
4
use crate::errors::ExecutionError;
5
use crate::Dag;
6

7
use daggy::petgraph::visit::IntoNodeIdentifiers;
8

9
use dozer_tracing::LabelsAndProgress;
10
use dozer_types::serde::{self, Deserialize, Serialize};
11
use std::fmt::Debug;
12
use std::panic::panic_any;
13
use std::sync::atomic::AtomicBool;
14
use std::sync::Arc;
15
use std::thread::JoinHandle;
16
use std::thread::{self, Builder};
17
use std::time::Duration;
18

19
#[derive(Debug, Clone)]
×
20
pub struct ExecutorOptions {
21
    pub commit_sz: u32,
22
    pub channel_buffer_sz: usize,
23
    pub commit_time_threshold: Duration,
24
    pub error_threshold: Option<u32>,
25
}
26

27
impl Default for ExecutorOptions {
28
    fn default() -> Self {
585✔
29
        Self {
585✔
30
            commit_sz: 10_000,
585✔
31
            channel_buffer_sz: 20_000,
585✔
32
            commit_time_threshold: Duration::from_millis(50),
585✔
33
            error_threshold: Some(0),
585✔
34
        }
585✔
35
    }
585✔
36
}
37

38
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3,215✔
39
#[serde(crate = "self::serde")]
40
pub(crate) enum InputPortState {
41
    Open,
42
    Terminated,
43
}
44

45
mod execution_dag;
46
mod name;
47
mod node;
48
mod processor_node;
49
mod receiver_loop;
50
mod sink_node;
51
mod source_node;
52

53
use node::Node;
54
use processor_node::ProcessorNode;
55
use sink_node::SinkNode;
56

57
use self::execution_dag::ExecutionDag;
58
use self::source_node::{create_source_nodes, SourceListenerNode, SourceSenderNode};
59

60
pub struct DagExecutor {
61
    builder_dag: BuilderDag,
62
    options: ExecutorOptions,
63
}
64

65
pub struct DagExecutorJoinHandle {
66
    join_handles: Vec<JoinHandle<()>>,
67
}
68

69
impl DagExecutor {
70
    pub async fn new(
633✔
71
        dag: Dag,
633✔
72
        checkpoint_factory: Arc<CheckpointFactory>,
633✔
73
        checkpoint: OptionCheckpoint,
633✔
74
        options: ExecutorOptions,
633✔
75
    ) -> Result<Self, ExecutionError> {
633✔
76
        let dag_schemas = DagSchemas::new(dag)?;
118✔
77

78
        let builder_dag = BuilderDag::new(checkpoint_factory, checkpoint, dag_schemas).await?;
118✔
79

80
        Ok(Self {
114✔
81
            builder_dag,
114✔
82
            options,
114✔
83
        })
114✔
84
    }
116✔
85

86
    pub fn validate<T: Clone + Debug>(dag: Dag) -> Result<(), ExecutionError> {
87
        DagSchemas::new(dag)?;
×
88
        Ok(())
×
89
    }
×
90

91
    pub fn start(
629✔
92
        self,
629✔
93
        running: Arc<AtomicBool>,
629✔
94
        labels: LabelsAndProgress,
629✔
95
    ) -> Result<DagExecutorJoinHandle, ExecutionError> {
629✔
96
        // Construct execution dag.
629✔
97
        let initial_epoch_id = self.builder_dag.initial_epoch_id();
629✔
98
        let mut execution_dag = ExecutionDag::new(
629✔
99
            self.builder_dag,
629✔
100
            labels,
629✔
101
            self.options.channel_buffer_sz,
629✔
102
            self.options.error_threshold,
629✔
103
        )?;
629✔
104
        let node_indexes = execution_dag.graph().node_identifiers().collect::<Vec<_>>();
629✔
105

629✔
106
        // Start the threads.
629✔
107
        let mut join_handles = Vec::new();
629✔
108
        for node_index in node_indexes {
3,661✔
109
            let node = execution_dag.graph()[node_index]
3,032✔
110
                .as_ref()
3,032✔
111
                .expect("We created all nodes");
3,032✔
112
            match &node.kind {
3,032✔
113
                NodeKind::Source(_, _) => {
114
                    let (source_sender_node, source_listener_node) = create_source_nodes(
636✔
115
                        &mut execution_dag,
636✔
116
                        node_index,
636✔
117
                        &self.options,
636✔
118
                        running.clone(),
636✔
119
                    );
636✔
120
                    let (sender, receiver) =
636✔
121
                        start_source(source_sender_node, source_listener_node)?;
636✔
122
                    join_handles.extend([sender, receiver]);
636✔
123
                }
124
                NodeKind::Processor(_) => {
125
                    let processor_node =
1,763✔
126
                        ProcessorNode::new(&mut execution_dag, node_index, initial_epoch_id);
1,763✔
127
                    join_handles.push(start_processor(processor_node)?);
1,763✔
128
                }
129
                NodeKind::Sink(_) => {
130
                    let sink_node = SinkNode::new(&mut execution_dag, node_index, initial_epoch_id);
633✔
131
                    join_handles.push(start_sink(sink_node)?);
633✔
132
                }
133
            }
134
        }
135

136
        Ok(DagExecutorJoinHandle { join_handles })
629✔
137
    }
629✔
138
}
139

140
impl DagExecutorJoinHandle {
141
    pub fn join(mut self) -> Result<(), ExecutionError> {
628✔
142
        loop {
143
            let Some(finished) = self
4,605✔
144
                .join_handles
4,605✔
145
                .iter()
4,605✔
146
                .enumerate()
4,605✔
147
                .find_map(|(i, handle)| handle.is_finished().then_some(i))
8,738✔
148
            else {
149
                thread::sleep(Duration::from_millis(250));
953✔
150

953✔
151
                continue;
953✔
152
            };
153
            let handle = self.join_handles.swap_remove(finished);
3,652✔
154
            if let Err(e) = handle.join() {
3,652✔
155
                panic_any(e)
5✔
156
            }
3,647✔
157

3,647✔
158
            if self.join_handles.is_empty() {
3,647✔
159
                return Ok(());
623✔
160
            }
3,024✔
161
        }
162
    }
623✔
163
}
164

165
fn start_source(
636✔
166
    source_sender: SourceSenderNode,
636✔
167
    source_listener: SourceListenerNode,
636✔
168
) -> Result<(JoinHandle<()>, JoinHandle<()>), ExecutionError> {
636✔
169
    let handle = source_sender.handle().clone();
636✔
170

171
    let sender_handle = Builder::new()
636✔
172
        .name(format!("{handle}-sender"))
636✔
173
        .spawn(move || match source_sender.run() {
636✔
174
            Ok(_) => {}
629✔
175
            // Channel disconnection means the source listener has quit.
176
            // Maybe it quit gracefully so we don't need to panic.
177
            Err(e) => {
7✔
178
                if let ExecutionError::Source(e) = &e {
7✔
179
                    if let Some(ExecutionError::CannotSendToChannel) = e.downcast_ref() {
7✔
180
                        return;
7✔
181
                    }
×
182
                }
×
183
                std::panic::panic_any(e);
×
184
            }
185
        })
636✔
186
        .map_err(ExecutionError::CannotSpawnWorkerThread)?;
636✔
187

188
    let listener_handle = Builder::new()
636✔
189
        .name(format!("{handle}-listener"))
636✔
190
        .spawn(move || {
636✔
191
            if let Err(e) = source_listener.run() {
636✔
192
                std::panic::panic_any(e);
6✔
193
            }
630✔
194
        })
636✔
195
        .map_err(ExecutionError::CannotSpawnWorkerThread)?;
636✔
196

197
    Ok((sender_handle, listener_handle))
636✔
198
}
636✔
199

200
fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
1,763✔
201
    Builder::new()
1,763✔
202
        .name(processor.handle().to_string())
1,763✔
203
        .spawn(move || {
1,763✔
204
            if let Err(e) = processor.run() {
1,763✔
205
                std::panic::panic_any(e);
8✔
206
            }
1,755✔
207
        })
1,763✔
208
        .map_err(ExecutionError::CannotSpawnWorkerThread)
1,763✔
209
}
1,763✔
210

211
fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
633✔
212
    Builder::new()
633✔
213
        .name(sink.handle().to_string())
633✔
214
        .spawn(|| {
633✔
215
            if let Err(e) = sink.run() {
633✔
216
                std::panic::panic_any(e);
6✔
217
            }
627✔
218
        })
633✔
219
        .map_err(ExecutionError::CannotSpawnWorkerThread)
633✔
220
}
633✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc