• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5939715234

22 Aug 2023 01:47PM UTC coverage: 74.755% (-1.3%) from 76.052%
5939715234

push

github

web-flow
chore: Run e2e tests nightly (#1886)

* chore: Run e2e tests nightly

* chore: Run Dozer CI on default runners

46459 of 62148 relevant lines covered (74.76%)

40132.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.85
/dozer-core/src/executor/mod.rs
1
use crate::builder_dag::{BuilderDag, NodeKind};
2
use crate::checkpoint::CheckpointFactory;
3
use crate::dag_schemas::DagSchemas;
4
use crate::errors::ExecutionError;
5
use crate::Dag;
6

7
use daggy::petgraph::visit::IntoNodeIdentifiers;
8

9
use dozer_types::serde::{self, Deserialize, Serialize};
10
use std::fmt::Debug;
11
use std::panic::panic_any;
12
use std::sync::atomic::AtomicBool;
13
use std::sync::Arc;
14
use std::thread::JoinHandle;
15
use std::thread::{self, Builder};
16
use std::time::Duration;
17

×
18
#[derive(Debug, Clone)]
×
19
pub struct ExecutorOptions {
20
    pub commit_sz: u32,
21
    pub channel_buffer_sz: usize,
22
    pub commit_time_threshold: Duration,
23
    pub error_threshold: Option<u32>,
24
}
25

26
impl Default for ExecutorOptions {
×
27
    fn default() -> Self {
372✔
28
        Self {
372✔
29
            commit_sz: 10_000,
372✔
30
            channel_buffer_sz: 20_000,
372✔
31
            commit_time_threshold: Duration::from_millis(50),
372✔
32
            error_threshold: Some(0),
372✔
33
        }
372✔
34
    }
372✔
35
}
36

×
37
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2,029✔
38
#[serde(crate = "self::serde")]
39
pub(crate) enum InputPortState {
40
    Open,
41
    Terminated,
42
}
43

44
mod execution_dag;
45
mod name;
46
mod node;
47
mod processor_node;
48
mod receiver_loop;
49
mod sink_node;
50
mod source_node;
51

52
use node::Node;
53
use processor_node::ProcessorNode;
54
use sink_node::SinkNode;
55

56
use self::execution_dag::ExecutionDag;
57
use self::source_node::{create_source_nodes, SourceListenerNode, SourceSenderNode};
58

59
pub struct DagExecutor {
60
    builder_dag: BuilderDag,
61
    options: ExecutorOptions,
62
}
63

64
pub struct DagExecutorJoinHandle {
65
    join_handles: Vec<JoinHandle<()>>,
66
}
67

68
impl DagExecutor {
×
69
    pub fn new<T: Clone + Debug>(
94✔
70
        dag: Dag<T>,
94✔
71
        checkpoint_factory: Arc<CheckpointFactory>,
94✔
72
        options: ExecutorOptions,
94✔
73
    ) -> Result<Self, ExecutionError> {
94✔
74
        let dag_schemas = DagSchemas::new(dag)?;
94✔
75

×
76
        let builder_dag = BuilderDag::new(checkpoint_factory, dag_schemas)?;
94✔
77

×
78
        Ok(Self {
90✔
79
            builder_dag,
90✔
80
            options,
90✔
81
        })
90✔
82
    }
92✔
83

×
84
    pub fn validate<T: Clone + Debug>(dag: Dag<T>) -> Result<(), ExecutionError> {
×
85
        DagSchemas::new(dag)?;
×
86
        Ok(())
×
87
    }
×
88

×
89
    pub fn start(self, running: Arc<AtomicBool>) -> Result<DagExecutorJoinHandle, ExecutionError> {
383✔
90
        // Construct execution dag.
×
91
        let mut execution_dag = ExecutionDag::new(
383✔
92
            self.builder_dag,
383✔
93
            self.options.channel_buffer_sz,
383✔
94
            self.options.error_threshold,
383✔
95
        )?;
383✔
96
        let node_indexes = execution_dag.graph().node_identifiers().collect::<Vec<_>>();
383✔
97

383✔
98
        // Start the threads.
383✔
99
        let mut join_handles = Vec::new();
383✔
100
        for node_index in node_indexes {
2,269✔
101
            let node = execution_dag.graph()[node_index]
1,886✔
102
                .as_ref()
1,886✔
103
                .expect("We created all nodes");
1,886✔
104
            match &node.kind {
1,886✔
105
                NodeKind::Source(_, _) => {
×
106
                    let (source_sender_node, source_listener_node) = create_source_nodes(
390✔
107
                        &mut execution_dag,
390✔
108
                        node_index,
390✔
109
                        &self.options,
390✔
110
                        running.clone(),
390✔
111
                    );
390✔
112
                    let (sender, receiver) =
390✔
113
                        start_source(source_sender_node, source_listener_node)?;
390✔
114
                    join_handles.extend([sender, receiver]);
390✔
115
                }
×
116
                NodeKind::Processor(_) => {
117
                    let processor_node = ProcessorNode::new(&mut execution_dag, node_index);
1,109✔
118
                    join_handles.push(start_processor(processor_node)?);
1,109✔
119
                }
×
120
                NodeKind::Sink(_) => {
121
                    let sink_node = SinkNode::new(&mut execution_dag, node_index);
387✔
122
                    join_handles.push(start_sink(sink_node)?);
387✔
123
                }
124
            }
×
125
        }
×
126

127
        Ok(DagExecutorJoinHandle { join_handles })
383✔
128
    }
383✔
129
}
×
130

131
impl DagExecutorJoinHandle {
×
132
    pub fn join(mut self) -> Result<(), ExecutionError> {
382✔
133
        loop {
×
134
            let Some(finished) = self
2,830✔
135
                .join_handles
2,830✔
136
                .iter()
2,830✔
137
                .enumerate()
2,830✔
138
                .find_map(|(i, handle)| handle.is_finished().then_some(i))
5,421✔
139
            else {
×
140
                thread::sleep(Duration::from_millis(250));
570✔
141

570✔
142
                continue;
570✔
143
            };
×
144
            let handle = self.join_handles.swap_remove(finished);
2,260✔
145
            if let Err(e) = handle.join() {
2,260✔
146
                panic_any(e)
5✔
147
            }
2,255✔
148

2,255✔
149
            if self.join_handles.is_empty() {
2,255✔
150
                return Ok(());
377✔
151
            }
1,878✔
152
        }
153
    }
377✔
154
}
×
155

×
156
fn start_source(
390✔
157
    source_sender: SourceSenderNode,
390✔
158
    source_listener: SourceListenerNode,
390✔
159
) -> Result<(JoinHandle<()>, JoinHandle<()>), ExecutionError> {
390✔
160
    let handle = source_sender.handle().clone();
390✔
161

×
162
    let sender_handle = Builder::new()
390✔
163
        .name(format!("{handle}-sender"))
390✔
164
        .spawn(move || match source_sender.run() {
390✔
165
            Ok(_) => {}
383✔
166
            // Channel disconnection means the source listener has quit.
×
167
            // Maybe it quit gracefully so we don't need to panic.
×
168
            Err(e) => {
7✔
169
                if let ExecutionError::Source(e) = &e {
7✔
170
                    if let Some(ExecutionError::CannotSendToChannel) = e.downcast_ref() {
7✔
171
                        return;
6✔
172
                    }
1✔
173
                }
×
174
                std::panic::panic_any(e);
1✔
175
            }
176
        })
390✔
177
        .map_err(ExecutionError::CannotSpawnWorkerThread)?;
390✔
178

×
179
    let listener_handle = Builder::new()
390✔
180
        .name(format!("{handle}-listener"))
390✔
181
        .spawn(move || {
390✔
182
            if let Err(e) = source_listener.run() {
390✔
183
                std::panic::panic_any(e);
5✔
184
            }
385✔
185
        })
390✔
186
        .map_err(ExecutionError::CannotSpawnWorkerThread)?;
390✔
187

188
    Ok((sender_handle, listener_handle))
390✔
189
}
390✔
190

×
191
fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
1,109✔
192
    Builder::new()
1,109✔
193
        .name(processor.handle().to_string())
1,109✔
194
        .spawn(move || {
1,109✔
195
            if let Err(e) = processor.run() {
1,109✔
196
                std::panic::panic_any(e);
7✔
197
            }
1,102✔
198
        })
1,109✔
199
        .map_err(ExecutionError::CannotSpawnWorkerThread)
1,109✔
200
}
1,109✔
201

×
202
fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
387✔
203
    Builder::new()
387✔
204
        .name(sink.handle().to_string())
387✔
205
        .spawn(|| {
387✔
206
            if let Err(e) = sink.run() {
387✔
207
                std::panic::panic_any(e);
5✔
208
            }
382✔
209
        })
387✔
210
        .map_err(ExecutionError::CannotSpawnWorkerThread)
387✔
211
}
387✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc