• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4351017879

pending completion
4351017879

push

github

GitHub
chore: bump sqlparser v0.31.0 (#1137)

29428 of 38369 relevant lines covered (76.7%)

62214.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.07
/dozer-core/src/executor.rs
1
use crate::builder_dag::{BuilderDag, NodeKind};
2
use crate::dag_metadata::DagMetadata;
3
use crate::dag_schemas::DagSchemas;
4
use crate::errors::ExecutionError;
5
use crate::Dag;
6

7
use daggy::petgraph::visit::IntoNodeIdentifiers;
8
use dozer_types::node::NodeHandle;
9
use dozer_types::types::Operation;
10

11
use crate::epoch::Epoch;
12
use std::collections::hash_map::Entry;
13
use std::collections::HashMap;
14
use std::fmt::Debug;
15
use std::panic::panic_any;
16
use std::path::PathBuf;
17
use std::sync::atomic::AtomicBool;
18
use std::sync::Arc;
19
use std::thread::JoinHandle;
20
use std::thread::{self, Builder};
21
use std::time::Duration;
22

23
#[derive(Clone)]
×
24
pub struct ExecutorOptions {
25
    pub commit_sz: u32,
26
    pub channel_buffer_sz: usize,
27
    pub commit_time_threshold: Duration,
28

29
    pub max_map_size: usize,
30
}
31

32
impl Default for ExecutorOptions {
33
    fn default() -> Self {
636✔
34
        Self {
636✔
35
            commit_sz: 10_000,
636✔
36
            channel_buffer_sz: 20_000,
636✔
37
            commit_time_threshold: Duration::from_millis(50),
636✔
38
            max_map_size: 1024 * 1024 * 1024 * 1024,
636✔
39
        }
636✔
40
    }
636✔
41
}
42

43
#[derive(Clone, Debug, PartialEq, Eq)]
3,274✔
44
pub(crate) enum InputPortState {
45
    Open,
46
    Terminated,
47
}
48

49
#[derive(Clone, Debug, PartialEq, Eq)]
84,021✔
50
pub enum ExecutorOperation {
51
    Op { op: Operation },
52
    Commit { epoch: Epoch },
53
    Terminate,
54
    SnapshottingDone {},
55
}
56

57
mod execution_dag;
58
mod name;
59
mod node;
60
mod processor_node;
61
mod receiver_loop;
62
mod sink_node;
63
mod source_node;
64

65
use node::Node;
66
use processor_node::ProcessorNode;
67
use sink_node::SinkNode;
68

69
use self::execution_dag::ExecutionDag;
70
use self::source_node::{create_source_nodes, SourceListenerNode, SourceSenderNode};
71

72
pub struct DagExecutor {
73
    builder_dag: BuilderDag,
74
    options: ExecutorOptions,
75
}
76

77
pub struct DagExecutorJoinHandle {
78
    join_handles: HashMap<NodeHandle, JoinHandle<()>>,
79
}
80

81
impl DagExecutor {
82
    pub fn new<T: Clone + Debug>(
223✔
83
        dag: Dag<T>,
223✔
84
        path: PathBuf,
223✔
85
        options: ExecutorOptions,
223✔
86
    ) -> Result<Self, ExecutionError> {
223✔
87
        let dag_schemas = DagSchemas::new(dag)?;
223✔
88
        let builder_dag = BuilderDag::new(dag_schemas, path, options.max_map_size)?;
223✔
89

90
        Ok(Self {
219✔
91
            builder_dag,
219✔
92
            options,
219✔
93
        })
219✔
94
    }
221✔
95

96
    pub fn validate<T: Clone + Debug>(dag: Dag<T>, path: PathBuf) -> Result<(), ExecutionError> {
8✔
97
        let dag_schemas = DagSchemas::new(dag)?;
8✔
98
        DagMetadata::new(dag_schemas, path)?;
8✔
99
        Ok(())
8✔
100
    }
8✔
101

102
    pub fn start(self, running: Arc<AtomicBool>) -> Result<DagExecutorJoinHandle, ExecutionError> {
644✔
103
        // Construct execution dag.
104
        let mut execution_dag =
644✔
105
            ExecutionDag::new(self.builder_dag, self.options.channel_buffer_sz)?;
644✔
106
        let node_indexes = execution_dag.graph().node_identifiers().collect::<Vec<_>>();
644✔
107

644✔
108
        // Start the threads.
644✔
109
        let mut join_handles = HashMap::new();
644✔
110
        for node_index in node_indexes {
3,953✔
111
            let node = execution_dag.graph()[node_index]
3,309✔
112
                .as_ref()
3,309✔
113
                .expect("We created all nodes");
3,309✔
114
            let node_handle = node.handle.clone();
3,309✔
115
            match &node.kind {
3,309✔
116
                NodeKind::Source(_, _) => {
117
                    let (source_sender_node, source_listener_node) = create_source_nodes(
654✔
118
                        &mut execution_dag,
654✔
119
                        node_index,
654✔
120
                        &self.options,
654✔
121
                        running.clone(),
654✔
122
                    );
654✔
123
                    join_handles.insert(
654✔
124
                        node_handle,
654✔
125
                        start_source(source_sender_node, source_listener_node)?,
654✔
126
                    );
127
                }
128
                NodeKind::Processor(_) => {
129
                    let processor_node = ProcessorNode::new(&mut execution_dag, node_index);
2,007✔
130
                    join_handles.insert(node_handle, start_processor(processor_node)?);
2,007✔
131
                }
132
                NodeKind::Sink(_) => {
133
                    let sink_node = SinkNode::new(&mut execution_dag, node_index);
648✔
134
                    join_handles.insert(node_handle, start_sink(sink_node)?);
648✔
135
                }
136
            }
137
        }
138

139
        Ok(DagExecutorJoinHandle { join_handles })
644✔
140
    }
644✔
141
}
142

143
impl DagExecutorJoinHandle {
144
    pub fn join(mut self) -> Result<(), ExecutionError> {
643✔
145
        let handles: Vec<NodeHandle> = self.join_handles.iter().map(|e| e.0.clone()).collect();
3,306✔
146

147
        loop {
148
            for handle in &handles {
16,684✔
149
                if let Entry::Occupied(entry) = self.join_handles.entry(handle.clone()) {
13,900✔
150
                    if entry.get().is_finished() {
13,240✔
151
                        if let Err(e) = entry.remove().join() {
3,294✔
152
                            panic_any(e);
5✔
153
                        }
3,289✔
154
                    }
9,946✔
155
                }
660✔
156
            }
157

158
            if self.join_handles.is_empty() {
2,784✔
159
                return Ok(());
638✔
160
            }
2,146✔
161

2,146✔
162
            thread::sleep(Duration::from_millis(250));
2,146✔
163
        }
164
    }
638✔
165
}
166

167
fn start_source(
654✔
168
    source_sender: SourceSenderNode,
654✔
169
    source_listener: SourceListenerNode,
654✔
170
) -> Result<JoinHandle<()>, ExecutionError> {
654✔
171
    let handle = source_sender.handle().clone();
654✔
172

173
    let _st_handle = Builder::new()
654✔
174
        .name(format!("{handle}-sender"))
654✔
175
        .spawn(move || match source_sender.run() {
654✔
176
            Ok(_) => {}
647✔
177
            // Channel disconnection means the source listener has quit.
178
            // Maybe it quit gracefully so we don't need to panic.
179
            Err(ExecutionError::CannotSendToChannel) => {}
6✔
180
            // Other errors result in panic.
181
            Err(e) => std::panic::panic_any(e),
1✔
182
        })?;
654✔
183

184
    Ok(Builder::new()
654✔
185
        .name(format!("{handle}-listener"))
654✔
186
        .spawn(move || {
654✔
187
            if let Err(e) = source_listener.run() {
654✔
188
                std::panic::panic_any(e);
5✔
189
            }
649✔
190
        })?)
654✔
191
}
654✔
192

193
fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
2,007✔
194
    Ok(Builder::new()
2,007✔
195
        .name(processor.handle().to_string())
2,007✔
196
        .spawn(move || {
2,007✔
197
            if let Err(e) = processor.run() {
2,007✔
198
                std::panic::panic_any(e);
7✔
199
            }
2,000✔
200
        })?)
2,007✔
201
}
2,007✔
202

203
fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
648✔
204
    Ok(Builder::new().name(sink.handle().to_string()).spawn(|| {
648✔
205
        if let Err(e) = sink.run() {
648✔
206
            std::panic::panic_any(e);
5✔
207
        }
643✔
208
    })?)
648✔
209
}
648✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc