• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5724602231

pending completion
5724602231

push

github

web-flow
chore: LogClient automatically retries on network error (#1811)

* chore: `LogClient` automatically retries on network error

* chore: Split `ReaderBuilderError` out from `ReaderError`

55 of 55 new or added lines in 2 files covered. (100.0%)

45530 of 59847 relevant lines covered (76.08%)

38880.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.69
/dozer-core/src/executor.rs
1
use crate::builder_dag::{BuilderDag, NodeKind};
2
use crate::dag_schemas::DagSchemas;
3
use crate::errors::ExecutionError;
4
use crate::Dag;
5

6
use daggy::petgraph::visit::IntoNodeIdentifiers;
7
use dozer_types::node::NodeHandle;
8

9
use dozer_types::serde::{self, Deserialize, Serialize};
10
use std::collections::hash_map::Entry;
11
use std::collections::HashMap;
12
use std::fmt::Debug;
13
use std::panic::panic_any;
14
use std::sync::atomic::AtomicBool;
15
use std::sync::Arc;
16
use std::thread::JoinHandle;
17
use std::thread::{self, Builder};
18
use std::time::Duration;
19

20
#[derive(Clone)]
×
21
pub struct ExecutorOptions {
22
    pub commit_sz: u32,
23
    pub channel_buffer_sz: usize,
24
    pub commit_time_threshold: Duration,
25
    pub error_threshold: Option<u32>,
26
}
27

28
impl Default for ExecutorOptions {
29
    fn default() -> Self {
372✔
30
        Self {
372✔
31
            commit_sz: 10_000,
372✔
32
            channel_buffer_sz: 20_000,
372✔
33
            commit_time_threshold: Duration::from_millis(50),
372✔
34
            error_threshold: Some(0),
372✔
35
        }
372✔
36
    }
372✔
37
}
38

39
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
2,047✔
40
#[serde(crate = "self::serde")]
41
pub(crate) enum InputPortState {
42
    Open,
43
    Terminated,
44
}
45

46
mod execution_dag;
47
mod name;
48
mod node;
49
mod processor_node;
50
mod receiver_loop;
51
mod sink_node;
52
mod source_node;
53

54
use node::Node;
55
use processor_node::ProcessorNode;
56
use sink_node::SinkNode;
57

58
use self::execution_dag::ExecutionDag;
59
use self::source_node::{create_source_nodes, SourceListenerNode, SourceSenderNode};
60

61
pub struct DagExecutor {
62
    builder_dag: BuilderDag,
63
    options: ExecutorOptions,
64
}
65

66
pub struct DagExecutorJoinHandle {
67
    join_handles: HashMap<NodeHandle, JoinHandle<()>>,
68
}
69

70
impl DagExecutor {
71
    pub fn new<T: Clone + Debug>(
94✔
72
        dag: Dag<T>,
94✔
73
        options: ExecutorOptions,
94✔
74
    ) -> Result<Self, ExecutionError> {
94✔
75
        let dag_schemas = DagSchemas::new(dag)?;
94✔
76
        let builder_dag = BuilderDag::new(dag_schemas)?;
94✔
77

78
        Ok(Self {
90✔
79
            builder_dag,
90✔
80
            options,
90✔
81
        })
90✔
82
    }
92✔
83

84
    pub fn validate<T: Clone + Debug>(dag: Dag<T>) -> Result<(), ExecutionError> {
85
        DagSchemas::new(dag)?;
×
86
        Ok(())
×
87
    }
×
88

89
    pub fn start(self, running: Arc<AtomicBool>) -> Result<DagExecutorJoinHandle, ExecutionError> {
383✔
90
        // Construct execution dag.
91
        let mut execution_dag = ExecutionDag::new(
383✔
92
            self.builder_dag,
383✔
93
            self.options.channel_buffer_sz,
383✔
94
            self.options.error_threshold,
383✔
95
        )?;
383✔
96
        let node_indexes = execution_dag.graph().node_identifiers().collect::<Vec<_>>();
383✔
97

383✔
98
        // Start the threads.
383✔
99
        let mut join_handles = HashMap::new();
383✔
100
        for node_index in node_indexes {
2,269✔
101
            let node = execution_dag.graph()[node_index]
1,886✔
102
                .as_ref()
1,886✔
103
                .expect("We created all nodes");
1,886✔
104
            let node_handle = node.handle.clone();
1,886✔
105
            match &node.kind {
1,886✔
106
                NodeKind::Source(_, _) => {
107
                    let (source_sender_node, source_listener_node) = create_source_nodes(
390✔
108
                        &mut execution_dag,
390✔
109
                        node_index,
390✔
110
                        &self.options,
390✔
111
                        running.clone(),
390✔
112
                    );
390✔
113
                    join_handles.insert(
390✔
114
                        node_handle,
390✔
115
                        start_source(source_sender_node, source_listener_node)?,
390✔
116
                    );
117
                }
118
                NodeKind::Processor(_) => {
119
                    let processor_node = ProcessorNode::new(&mut execution_dag, node_index);
1,109✔
120
                    join_handles.insert(node_handle, start_processor(processor_node)?);
1,109✔
121
                }
122
                NodeKind::Sink(_) => {
123
                    let sink_node = SinkNode::new(&mut execution_dag, node_index);
387✔
124
                    join_handles.insert(node_handle, start_sink(sink_node)?);
387✔
125
                }
126
            }
127
        }
128

129
        Ok(DagExecutorJoinHandle { join_handles })
383✔
130
    }
383✔
131
}
132

133
impl DagExecutorJoinHandle {
134
    pub fn join(mut self) -> Result<(), ExecutionError> {
382✔
135
        let handles: Vec<NodeHandle> = self.join_handles.iter().map(|e| e.0.clone()).collect();
1,883✔
136

137
        loop {
138
            for handle in &handles {
5,148✔
139
                if let Entry::Occupied(entry) = self.join_handles.entry(handle.clone()) {
4,262✔
140
                    if entry.get().is_finished() {
4,262✔
141
                        if let Err(e) = entry.remove().join() {
1,871✔
142
                            panic_any(e);
5✔
143
                        }
1,866✔
144
                    }
2,391✔
145
                }
×
146
            }
147

148
            if self.join_handles.is_empty() {
886✔
149
                return Ok(());
377✔
150
            }
509✔
151

509✔
152
            thread::sleep(Duration::from_millis(250));
509✔
153
        }
154
    }
377✔
155
}
156

157
fn start_source(
390✔
158
    source_sender: SourceSenderNode,
390✔
159
    source_listener: SourceListenerNode,
390✔
160
) -> Result<JoinHandle<()>, ExecutionError> {
390✔
161
    let handle = source_sender.handle().clone();
390✔
162

163
    let _st_handle = Builder::new()
390✔
164
        .name(format!("{handle}-sender"))
390✔
165
        .spawn(move || match source_sender.run() {
390✔
166
            Ok(_) => {}
383✔
167
            // Channel disconnection means the source listener has quit.
168
            // Maybe it quit gracefully so we don't need to panic.
169
            Err(ExecutionError::CannotSendToChannel) => {}
×
170
            // Other errors result in panic.
171
            Err(e) => std::panic::panic_any(e),
7✔
172
        })
390✔
173
        .map_err(ExecutionError::CannotSpawnWorkerThread)?;
390✔
174

175
    Builder::new()
390✔
176
        .name(format!("{handle}-listener"))
390✔
177
        .spawn(move || {
390✔
178
            if let Err(e) = source_listener.run() {
390✔
179
                std::panic::panic_any(e);
5✔
180
            }
385✔
181
        })
390✔
182
        .map_err(ExecutionError::CannotSpawnWorkerThread)
390✔
183
}
390✔
184

185
fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
1,109✔
186
    Builder::new()
1,109✔
187
        .name(processor.handle().to_string())
1,109✔
188
        .spawn(move || {
1,109✔
189
            if let Err(e) = processor.run() {
1,109✔
190
                std::panic::panic_any(e);
7✔
191
            }
1,102✔
192
        })
1,109✔
193
        .map_err(ExecutionError::CannotSpawnWorkerThread)
1,109✔
194
}
1,109✔
195

196
fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
387✔
197
    Builder::new()
387✔
198
        .name(sink.handle().to_string())
387✔
199
        .spawn(|| {
387✔
200
            if let Err(e) = sink.run() {
387✔
201
                std::panic::panic_any(e);
5✔
202
            }
382✔
203
        })
387✔
204
        .map_err(ExecutionError::CannotSpawnWorkerThread)
387✔
205
}
387✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc