• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4763381855

pending completion
4763381855

Pull #1461

github

GitHub
Merge 50bf72be2 into c58df4a0b
Pull Request #1461: feat: Make secondary index configurable

135 of 135 new or added lines in 6 files covered. (100.0%)

34877 of 44466 relevant lines covered (78.44%)

11367.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.1
/dozer-core/src/executor.rs
1
use crate::builder_dag::{BuilderDag, NodeKind};
2
use crate::dag_schemas::DagSchemas;
3
use crate::errors::ExecutionError;
4
use crate::Dag;
5

6
use daggy::petgraph::visit::IntoNodeIdentifiers;
7
use dozer_types::node::NodeHandle;
8
use dozer_types::types::Operation;
9

10
use crate::epoch::Epoch;
11

12
use dozer_types::serde::{self, Deserialize, Serialize};
13
use std::collections::hash_map::Entry;
14
use std::collections::HashMap;
15
use std::fmt::Debug;
16
use std::panic::panic_any;
17
use std::path::PathBuf;
18
use std::sync::atomic::AtomicBool;
19
use std::sync::Arc;
20
use std::thread::JoinHandle;
21
use std::thread::{self, Builder};
22
use std::time::Duration;
23

24
#[derive(Clone)]
×
25
pub struct ExecutorOptions {
26
    pub commit_sz: u32,
27
    pub channel_buffer_sz: usize,
28
    pub commit_time_threshold: Duration,
29
}
30

31
impl Default for ExecutorOptions {
32
    fn default() -> Self {
575✔
33
        Self {
575✔
34
            commit_sz: 10_000,
575✔
35
            channel_buffer_sz: 20_000,
575✔
36
            commit_time_threshold: Duration::from_millis(50),
575✔
37
        }
575✔
38
    }
575✔
39
}
40

41
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
3,154✔
42
#[serde(crate = "self::serde")]
43
pub(crate) enum InputPortState {
44
    Open,
45
    Terminated,
46
}
47

48
#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize)]
86,655✔
49
#[serde(crate = "self::serde")]
50
pub enum ExecutorOperation {
51
    Op { op: Operation },
52
    Commit { epoch: Epoch },
53
    Terminate,
54
    SnapshottingDone {},
55
}
56

57
mod execution_dag;
58
mod name;
59
mod node;
60
mod processor_node;
61
mod receiver_loop;
62
mod sink_node;
63
mod source_node;
64

65
use node::Node;
66
use processor_node::ProcessorNode;
67
use sink_node::SinkNode;
68

69
use self::execution_dag::ExecutionDag;
70
use self::source_node::{create_source_nodes, SourceListenerNode, SourceSenderNode};
71

72
pub struct DagExecutor {
73
    builder_dag: BuilderDag,
74
    options: ExecutorOptions,
75
}
76

77
pub struct DagExecutorJoinHandle {
78
    join_handles: HashMap<NodeHandle, JoinHandle<()>>,
79
}
80

81
impl DagExecutor {
82
    pub fn new<T: Clone + Debug>(
94✔
83
        dag: Dag<T>,
94✔
84
        path: PathBuf,
94✔
85
        options: ExecutorOptions,
94✔
86
    ) -> Result<Self, ExecutionError> {
94✔
87
        let dag_schemas = DagSchemas::new(dag)?;
94✔
88
        let builder_dag = BuilderDag::new(dag_schemas, path)?;
94✔
89

90
        Ok(Self {
90✔
91
            builder_dag,
90✔
92
            options,
90✔
93
        })
90✔
94
    }
92✔
95

96
    pub fn validate<T: Clone + Debug>(dag: Dag<T>, _path: PathBuf) -> Result<(), ExecutionError> {
97
        DagSchemas::new(dag)?;
18✔
98
        Ok(())
18✔
99
    }
18✔
100

101
    pub fn start(self, running: Arc<AtomicBool>) -> Result<DagExecutorJoinHandle, ExecutionError> {
595✔
102
        // Construct execution dag.
103
        let mut execution_dag =
595✔
104
            ExecutionDag::new(self.builder_dag, self.options.channel_buffer_sz)?;
595✔
105
        let node_indexes = execution_dag.graph().node_identifiers().collect::<Vec<_>>();
595✔
106

595✔
107
        // Start the threads.
595✔
108
        let mut join_handles = HashMap::new();
595✔
109
        for node_index in node_indexes {
3,539✔
110
            let node = execution_dag.graph()[node_index]
2,944✔
111
                .as_ref()
2,944✔
112
                .expect("We created all nodes");
2,944✔
113
            let node_handle = node.handle.clone();
2,944✔
114
            match &node.kind {
2,944✔
115
                NodeKind::Source(_, _) => {
116
                    let (source_sender_node, source_listener_node) = create_source_nodes(
602✔
117
                        &mut execution_dag,
602✔
118
                        node_index,
602✔
119
                        &self.options,
602✔
120
                        running.clone(),
602✔
121
                    );
602✔
122
                    join_handles.insert(
602✔
123
                        node_handle,
602✔
124
                        start_source(source_sender_node, source_listener_node)?,
602✔
125
                    );
126
                }
127
                NodeKind::Processor(_) => {
128
                    let processor_node = ProcessorNode::new(&mut execution_dag, node_index);
1,743✔
129
                    join_handles.insert(node_handle, start_processor(processor_node)?);
1,743✔
130
                }
131
                NodeKind::Sink(_) => {
132
                    let sink_node = SinkNode::new(&mut execution_dag, node_index);
599✔
133
                    join_handles.insert(node_handle, start_sink(sink_node)?);
599✔
134
                }
135
            }
136
        }
137

138
        Ok(DagExecutorJoinHandle { join_handles })
595✔
139
    }
595✔
140
}
141

142
impl DagExecutorJoinHandle {
143
    pub fn join(mut self) -> Result<(), ExecutionError> {
594✔
144
        let handles: Vec<NodeHandle> = self.join_handles.iter().map(|e| e.0.clone()).collect();
2,941✔
145

146
        loop {
147
            for handle in &handles {
7,805✔
148
                if let Entry::Occupied(entry) = self.join_handles.entry(handle.clone()) {
6,481✔
149
                    if entry.get().is_finished() {
6,481✔
150
                        if let Err(e) = entry.remove().join() {
2,935✔
151
                            panic_any(e);
3✔
152
                        }
2,932✔
153
                    }
3,546✔
154
                }
×
155
            }
156

157
            if self.join_handles.is_empty() {
1,324✔
158
                return Ok(());
591✔
159
            }
733✔
160

733✔
161
            thread::sleep(Duration::from_millis(250));
733✔
162
        }
163
    }
591✔
164
}
165

166
fn start_source(
602✔
167
    source_sender: SourceSenderNode,
602✔
168
    source_listener: SourceListenerNode,
602✔
169
) -> Result<JoinHandle<()>, ExecutionError> {
602✔
170
    let handle = source_sender.handle().clone();
602✔
171

172
    let _st_handle = Builder::new()
602✔
173
        .name(format!("{handle}-sender"))
602✔
174
        .spawn(move || match source_sender.run() {
602✔
175
            Ok(_) => {}
597✔
176
            // Channel disconnection means the source listener has quit.
177
            // Maybe it quit gracefully so we don't need to panic.
178
            Err(ExecutionError::CannotSendToChannel) => {}
4✔
179
            // Other errors result in panic.
180
            Err(e) => std::panic::panic_any(e),
1✔
181
        })?;
602✔
182

183
    Ok(Builder::new()
602✔
184
        .name(format!("{handle}-listener"))
602✔
185
        .spawn(move || {
602✔
186
            if let Err(e) = source_listener.run() {
602✔
187
                std::panic::panic_any(e);
3✔
188
            }
599✔
189
        })?)
602✔
190
}
602✔
191

192
fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
1,743✔
193
    Ok(Builder::new()
1,743✔
194
        .name(processor.handle().to_string())
1,743✔
195
        .spawn(move || {
1,743✔
196
            if let Err(e) = processor.run() {
1,743✔
197
                std::panic::panic_any(e);
3✔
198
            }
1,740✔
199
        })?)
1,743✔
200
}
1,743✔
201

202
fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
599✔
203
    Ok(Builder::new().name(sink.handle().to_string()).spawn(|| {
599✔
204
        if let Err(e) = sink.run() {
599✔
205
            std::panic::panic_any(e);
3✔
206
        }
596✔
207
    })?)
599✔
208
}
599✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc