• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5923086724

21 Aug 2023 07:05AM UTC coverage: 74.763% (-1.2%) from 75.988%
5923086724

push

github

web-flow
chore: Remove short form of `enable_progress` because it's conflicting with `dozer cloud` (#1876)

46105 of 61668 relevant lines covered (74.76%)

39792.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.16
/dozer-core/src/executor/mod.rs
1
use crate::builder_dag::{BuilderDag, NodeKind};
2
use crate::dag_schemas::DagSchemas;
3
use crate::errors::ExecutionError;
4
use crate::Dag;
5

6
use daggy::petgraph::visit::IntoNodeIdentifiers;
7

8
use dozer_types::serde::{self, Deserialize, Serialize};
9
use std::fmt::Debug;
10
use std::panic::panic_any;
11
use std::sync::atomic::AtomicBool;
12
use std::sync::Arc;
13
use std::thread::JoinHandle;
14
use std::thread::{self, Builder};
15
use std::time::Duration;
16

17
#[derive(Clone)]
×
18
pub struct ExecutorOptions {
19
    pub commit_sz: u32,
20
    pub channel_buffer_sz: usize,
21
    pub commit_time_threshold: Duration,
×
22
    pub error_threshold: Option<u32>,
23
}
24

25
impl Default for ExecutorOptions {
26
    fn default() -> Self {
372✔
27
        Self {
372✔
28
            commit_sz: 10_000,
372✔
29
            channel_buffer_sz: 20_000,
372✔
30
            commit_time_threshold: Duration::from_millis(50),
372✔
31
            error_threshold: Some(0),
372✔
32
        }
372✔
33
    }
372✔
34
}
×
35

×
36
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2,026✔
37
#[serde(crate = "self::serde")]
×
38
pub(crate) enum InputPortState {
39
    Open,
40
    Terminated,
×
41
}
42

43
mod execution_dag;
44
mod name;
45
mod node;
46
mod processor_node;
47
mod receiver_loop;
48
mod sink_node;
49
mod source_node;
50

51
use node::Node;
52
use processor_node::ProcessorNode;
53
use sink_node::SinkNode;
54

55
use self::execution_dag::ExecutionDag;
56
use self::source_node::{create_source_nodes, SourceListenerNode, SourceSenderNode};
57

58
pub struct DagExecutor {
59
    builder_dag: BuilderDag,
60
    options: ExecutorOptions,
61
}
62

63
pub struct DagExecutorJoinHandle {
64
    join_handles: Vec<JoinHandle<()>>,
65
}
66

67
impl DagExecutor {
68
    pub fn new<T: Clone + Debug>(
94✔
69
        dag: Dag<T>,
94✔
70
        options: ExecutorOptions,
94✔
71
    ) -> Result<Self, ExecutionError> {
94✔
72
        let dag_schemas = DagSchemas::new(dag)?;
94✔
73
        let builder_dag = BuilderDag::new(dag_schemas)?;
94✔
74

×
75
        Ok(Self {
90✔
76
            builder_dag,
90✔
77
            options,
90✔
78
        })
90✔
79
    }
92✔
80

81
    pub fn validate<T: Clone + Debug>(dag: Dag<T>) -> Result<(), ExecutionError> {
×
82
        DagSchemas::new(dag)?;
×
83
        Ok(())
×
84
    }
×
85

×
86
    pub fn start(self, running: Arc<AtomicBool>) -> Result<DagExecutorJoinHandle, ExecutionError> {
383✔
87
        // Construct execution dag.
88
        let mut execution_dag = ExecutionDag::new(
383✔
89
            self.builder_dag,
383✔
90
            self.options.channel_buffer_sz,
383✔
91
            self.options.error_threshold,
383✔
92
        )?;
383✔
93
        let node_indexes = execution_dag.graph().node_identifiers().collect::<Vec<_>>();
383✔
94

383✔
95
        // Start the threads.
383✔
96
        let mut join_handles = Vec::new();
383✔
97
        for node_index in node_indexes {
2,269✔
98
            let node = execution_dag.graph()[node_index]
1,886✔
99
                .as_ref()
1,886✔
100
                .expect("We created all nodes");
1,886✔
101
            match &node.kind {
1,886✔
102
                NodeKind::Source(_, _) => {
×
103
                    let (source_sender_node, source_listener_node) = create_source_nodes(
390✔
104
                        &mut execution_dag,
390✔
105
                        node_index,
390✔
106
                        &self.options,
390✔
107
                        running.clone(),
390✔
108
                    );
390✔
109
                    let (sender, receiver) =
390✔
110
                        start_source(source_sender_node, source_listener_node)?;
390✔
111
                    join_handles.extend([sender, receiver]);
390✔
112
                }
×
113
                NodeKind::Processor(_) => {
×
114
                    let processor_node = ProcessorNode::new(&mut execution_dag, node_index);
1,109✔
115
                    join_handles.push(start_processor(processor_node)?);
1,109✔
116
                }
×
117
                NodeKind::Sink(_) => {
×
118
                    let sink_node = SinkNode::new(&mut execution_dag, node_index);
387✔
119
                    join_handles.push(start_sink(sink_node)?);
387✔
120
                }
121
            }
122
        }
×
123

×
124
        Ok(DagExecutorJoinHandle { join_handles })
383✔
125
    }
383✔
126
}
×
127

×
128
impl DagExecutorJoinHandle {
129
    pub fn join(mut self) -> Result<(), ExecutionError> {
382✔
130
        loop {
131
            let Some(finished) = self
2,808✔
132
                .join_handles
2,808✔
133
                .iter()
2,808✔
134
                .enumerate()
2,808✔
135
                .find_map(|(i, handle)| handle.is_finished().then_some(i))
5,288✔
136
            else {
137
                thread::sleep(Duration::from_millis(250));
548✔
138

548✔
139
                continue;
548✔
140
            };
141
            let handle = self.join_handles.swap_remove(finished);
2,260✔
142
            if let Err(e) = handle.join() {
2,260✔
143
                panic_any(e)
5✔
144
            }
2,255✔
145

2,255✔
146
            if self.join_handles.is_empty() {
2,255✔
147
                return Ok(());
377✔
148
            }
1,878✔
149
        }
150
    }
377✔
151
}
×
152

×
153
fn start_source(
390✔
154
    source_sender: SourceSenderNode,
390✔
155
    source_listener: SourceListenerNode,
390✔
156
) -> Result<(JoinHandle<()>, JoinHandle<()>), ExecutionError> {
390✔
157
    let handle = source_sender.handle().clone();
390✔
158

159
    let sender_handle = Builder::new()
390✔
160
        .name(format!("{handle}-sender"))
390✔
161
        .spawn(move || match source_sender.run() {
390✔
162
            Ok(_) => {}
383✔
163
            // Channel disconnection means the source listener has quit.
×
164
            // Maybe it quit gracefully so we don't need to panic.
×
165
            Err(e) => {
7✔
166
                if let ExecutionError::Source(e) = &e {
7✔
167
                    if let Some(ExecutionError::CannotSendToChannel) = e.downcast_ref() {
7✔
168
                        return;
6✔
169
                    }
1✔
170
                }
×
171
                std::panic::panic_any(e);
1✔
172
            }
×
173
        })
390✔
174
        .map_err(ExecutionError::CannotSpawnWorkerThread)?;
390✔
175

×
176
    let listener_handle = Builder::new()
390✔
177
        .name(format!("{handle}-listener"))
390✔
178
        .spawn(move || {
390✔
179
            if let Err(e) = source_listener.run() {
390✔
180
                std::panic::panic_any(e);
5✔
181
            }
385✔
182
        })
390✔
183
        .map_err(ExecutionError::CannotSpawnWorkerThread)?;
390✔
184

×
185
    Ok((sender_handle, listener_handle))
390✔
186
}
390✔
187

×
188
fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
1,109✔
189
    Builder::new()
1,109✔
190
        .name(processor.handle().to_string())
1,109✔
191
        .spawn(move || {
1,109✔
192
            if let Err(e) = processor.run() {
1,109✔
193
                std::panic::panic_any(e);
7✔
194
            }
1,102✔
195
        })
1,109✔
196
        .map_err(ExecutionError::CannotSpawnWorkerThread)
1,109✔
197
}
1,109✔
198

×
199
fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
387✔
200
    Builder::new()
387✔
201
        .name(sink.handle().to_string())
387✔
202
        .spawn(|| {
387✔
203
            if let Err(e) = sink.run() {
387✔
204
                std::panic::panic_any(e);
5✔
205
            }
382✔
206
        })
387✔
207
        .map_err(ExecutionError::CannotSpawnWorkerThread)
387✔
208
}
387✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc