• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5881075993

16 Aug 2023 03:58PM UTC coverage: 77.415% (-0.2%) from 77.649%
5881075993

push

github

web-flow
feat: replace jaeger with xray (#1862)

feat: replace jaeger with xray

23 of 23 new or added lines in 2 files covered. (100.0%)

46085 of 59530 relevant lines covered (77.41%)

55729.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.69
/dozer-core/src/executor/mod.rs
1
use crate::builder_dag::{BuilderDag, NodeKind};
2
use crate::dag_schemas::DagSchemas;
3
use crate::errors::ExecutionError;
4
use crate::Dag;
5

6
use daggy::petgraph::visit::IntoNodeIdentifiers;
7
use dozer_types::node::NodeHandle;
8

9
use dozer_types::serde::{self, Deserialize, Serialize};
10
use std::collections::hash_map::Entry;
11
use std::collections::HashMap;
12
use std::fmt::Debug;
13
use std::panic::panic_any;
14
use std::sync::atomic::AtomicBool;
15
use std::sync::Arc;
16
use std::thread::JoinHandle;
17
use std::thread::{self, Builder};
18
use std::time::Duration;
19

20
#[derive(Clone)]
×
21
pub struct ExecutorOptions {
22
    pub commit_sz: u32,
23
    pub channel_buffer_sz: usize,
24
    pub commit_time_threshold: Duration,
25
    pub error_threshold: Option<u32>,
26
}
27

28
impl Default for ExecutorOptions {
29
    fn default() -> Self {
656✔
30
        Self {
656✔
31
            commit_sz: 10_000,
656✔
32
            channel_buffer_sz: 20_000,
656✔
33
            commit_time_threshold: Duration::from_millis(50),
656✔
34
            error_threshold: Some(0),
656✔
35
        }
656✔
36
    }
656✔
37
}
38

39
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3,586✔
40
#[serde(crate = "self::serde")]
41
pub(crate) enum InputPortState {
42
    Open,
43
    Terminated,
44
}
45

46
mod execution_dag;
47
mod name;
48
mod node;
49
mod processor_node;
50
mod receiver_loop;
51
mod sink_node;
52
mod source_node;
53

54
use node::Node;
55
use processor_node::ProcessorNode;
56
use sink_node::SinkNode;
57

58
use self::execution_dag::ExecutionDag;
59
use self::source_node::{create_source_nodes, SourceListenerNode, SourceSenderNode};
60

61
pub struct DagExecutor {
62
    builder_dag: BuilderDag,
63
    options: ExecutorOptions,
64
}
65

66
pub struct DagExecutorJoinHandle {
67
    join_handles: HashMap<NodeHandle, JoinHandle<()>>,
68
}
69

70
impl DagExecutor {
71
    pub fn new<T: Clone + Debug>(
94✔
72
        dag: Dag<T>,
94✔
73
        options: ExecutorOptions,
94✔
74
    ) -> Result<Self, ExecutionError> {
94✔
75
        let dag_schemas = DagSchemas::new(dag)?;
94✔
76
        let builder_dag = BuilderDag::new(dag_schemas)?;
94✔
77

78
        Ok(Self {
90✔
79
            builder_dag,
90✔
80
            options,
90✔
81
        })
90✔
82
    }
92✔
83

84
    pub fn validate<T: Clone + Debug>(dag: Dag<T>) -> Result<(), ExecutionError> {
85
        DagSchemas::new(dag)?;
×
86
        Ok(())
×
87
    }
×
88

89
    pub fn start(self, running: Arc<AtomicBool>) -> Result<DagExecutorJoinHandle, ExecutionError> {
679✔
90
        // Construct execution dag.
91
        let mut execution_dag = ExecutionDag::new(
679✔
92
            self.builder_dag,
679✔
93
            self.options.channel_buffer_sz,
679✔
94
            self.options.error_threshold,
679✔
95
        )?;
679✔
96
        let node_indexes = execution_dag.graph().node_identifiers().collect::<Vec<_>>();
679✔
97

679✔
98
        // Start the threads.
679✔
99
        let mut join_handles = HashMap::new();
679✔
100
        for node_index in node_indexes {
4,029✔
101
            let node = execution_dag.graph()[node_index]
3,350✔
102
                .as_ref()
3,350✔
103
                .expect("We created all nodes");
3,350✔
104
            let node_handle = node.handle.clone();
3,350✔
105
            match &node.kind {
3,350✔
106
                NodeKind::Source(_, _) => {
107
                    let (source_sender_node, source_listener_node) = create_source_nodes(
686✔
108
                        &mut execution_dag,
686✔
109
                        node_index,
686✔
110
                        &self.options,
686✔
111
                        running.clone(),
686✔
112
                    );
686✔
113
                    join_handles.insert(
686✔
114
                        node_handle,
686✔
115
                        start_source(source_sender_node, source_listener_node)?,
686✔
116
                    );
117
                }
118
                NodeKind::Processor(_) => {
119
                    let processor_node = ProcessorNode::new(&mut execution_dag, node_index);
1,981✔
120
                    join_handles.insert(node_handle, start_processor(processor_node)?);
1,981✔
121
                }
122
                NodeKind::Sink(_) => {
123
                    let sink_node = SinkNode::new(&mut execution_dag, node_index);
683✔
124
                    join_handles.insert(node_handle, start_sink(sink_node)?);
683✔
125
                }
126
            }
127
        }
128

129
        Ok(DagExecutorJoinHandle { join_handles })
679✔
130
    }
679✔
131
}
132

133
impl DagExecutorJoinHandle {
134
    pub fn join(mut self) -> Result<(), ExecutionError> {
678✔
135
        let handles: Vec<NodeHandle> = self.join_handles.iter().map(|e| e.0.clone()).collect();
3,347✔
136

137
        loop {
138
            for handle in &handles {
8,958✔
139
                if let Entry::Occupied(entry) = self.join_handles.entry(handle.clone()) {
7,413✔
140
                    if entry.get().is_finished() {
7,413✔
141
                        if let Err(e) = entry.remove().join() {
3,335✔
142
                            panic_any(e);
5✔
143
                        }
3,330✔
144
                    }
4,078✔
145
                }
×
146
            }
147

148
            if self.join_handles.is_empty() {
1,545✔
149
                return Ok(());
673✔
150
            }
872✔
151

872✔
152
            thread::sleep(Duration::from_millis(250));
872✔
153
        }
154
    }
673✔
155
}
156

157
fn start_source(
686✔
158
    source_sender: SourceSenderNode,
686✔
159
    source_listener: SourceListenerNode,
686✔
160
) -> Result<JoinHandle<()>, ExecutionError> {
686✔
161
    let handle = source_sender.handle().clone();
686✔
162

163
    let _st_handle = Builder::new()
686✔
164
        .name(format!("{handle}-sender"))
686✔
165
        .spawn(move || match source_sender.run() {
686✔
166
            Ok(_) => {}
679✔
167
            // Channel disconnection means the source listener has quit.
168
            // Maybe it quit gracefully so we don't need to panic.
169
            Err(ExecutionError::CannotSendToChannel) => {}
×
170
            // Other errors result in panic.
171
            Err(e) => std::panic::panic_any(e),
7✔
172
        })
686✔
173
        .map_err(ExecutionError::CannotSpawnWorkerThread)?;
686✔
174

175
    Builder::new()
686✔
176
        .name(format!("{handle}-listener"))
686✔
177
        .spawn(move || {
686✔
178
            if let Err(e) = source_listener.run() {
686✔
179
                std::panic::panic_any(e);
5✔
180
            }
681✔
181
        })
686✔
182
        .map_err(ExecutionError::CannotSpawnWorkerThread)
686✔
183
}
686✔
184

185
fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
1,981✔
186
    Builder::new()
1,981✔
187
        .name(processor.handle().to_string())
1,981✔
188
        .spawn(move || {
1,981✔
189
            if let Err(e) = processor.run() {
1,981✔
190
                std::panic::panic_any(e);
7✔
191
            }
1,974✔
192
        })
1,981✔
193
        .map_err(ExecutionError::CannotSpawnWorkerThread)
1,981✔
194
}
1,981✔
195

196
fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
683✔
197
    Builder::new()
683✔
198
        .name(sink.handle().to_string())
683✔
199
        .spawn(|| {
683✔
200
            if let Err(e) = sink.run() {
683✔
201
                std::panic::panic_any(e);
5✔
202
            }
678✔
203
        })
683✔
204
        .map_err(ExecutionError::CannotSpawnWorkerThread)
683✔
205
}
683✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc