• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283961027

pending completion
4283961027

push

github

GitHub
feat: Blue green cache (#1061)

645 of 645 new or added lines in 45 files covered. (100.0%)

27779 of 39307 relevant lines covered (70.67%)

52489.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.81
/dozer-core/src/executor.rs
1
use crate::builder_dag::{BuilderDag, NodeKind};
2
use crate::dag_metadata::DagMetadata;
3
use crate::dag_schemas::DagSchemas;
4
use crate::errors::ExecutionError;
5
use crate::Dag;
6

7
use daggy::petgraph::visit::IntoNodeIdentifiers;
8
use dozer_types::node::NodeHandle;
9
use dozer_types::types::Operation;
10

11
use crate::epoch::Epoch;
12
use std::collections::hash_map::Entry;
13
use std::collections::HashMap;
14
use std::fmt::Debug;
15
use std::panic::panic_any;
16
use std::path::PathBuf;
17
use std::sync::atomic::AtomicBool;
18
use std::sync::Arc;
19
use std::thread::JoinHandle;
20
use std::thread::{self, Builder};
21
use std::time::Duration;
22

23
#[derive(Clone)]
×
24
pub struct ExecutorOptions {
25
    pub commit_sz: u32,
26
    pub channel_buffer_sz: usize,
27
    pub commit_time_threshold: Duration,
28

29
    pub max_map_size: usize,
30
}
31

32
impl Default for ExecutorOptions {
33
    fn default() -> Self {
330✔
34
        Self {
330✔
35
            commit_sz: 10_000,
330✔
36
            channel_buffer_sz: 20_000,
330✔
37
            commit_time_threshold: Duration::from_millis(50),
330✔
38
            max_map_size: 1024 * 1024 * 1024 * 1024,
330✔
39
        }
330✔
40
    }
330✔
41
}
42

43
#[derive(Clone, Debug, PartialEq, Eq)]
1,667✔
44
pub(crate) enum InputPortState {
45
    Open,
46
    Terminated,
47
}
48

49
#[derive(Clone, Debug, PartialEq, Eq)]
84,994✔
50
pub enum ExecutorOperation {
51
    Op { op: Operation },
52
    Commit { epoch: Epoch },
53
    Terminate,
54
}
55

56
mod execution_dag;
57
mod name;
58
mod node;
59
mod processor_node;
×
60
mod receiver_loop;
×
61
mod sink_node;
×
62
mod source_node;
×
63

×
64
use node::Node;
65
use processor_node::ProcessorNode;
×
66
use sink_node::SinkNode;
67

68
use self::execution_dag::ExecutionDag;
69
use self::source_node::{create_source_nodes, SourceListenerNode, SourceSenderNode};
×
70

×
71
pub struct DagExecutor {
×
72
    builder_dag: BuilderDag,
×
73
    options: ExecutorOptions,
×
74
}
×
75

×
76
pub struct DagExecutorJoinHandle {
77
    join_handles: HashMap<NodeHandle, JoinHandle<()>>,
×
78
}
×
79

80
impl DagExecutor {
81
    pub fn new<T: Clone + Debug>(
223✔
82
        dag: &Dag<T>,
223✔
83
        path: PathBuf,
223✔
84
        options: ExecutorOptions,
223✔
85
    ) -> Result<Self, ExecutionError> {
223✔
86
        let dag_schemas = DagSchemas::new(dag)?;
223✔
87
        let builder_dag = BuilderDag::new(&dag_schemas, path, options.max_map_size)?;
223✔
88

89
        Ok(Self {
219✔
90
            builder_dag,
219✔
91
            options,
219✔
92
        })
219✔
93
    }
221✔
94

95
    pub fn validate<T: Clone + Debug>(dag: &Dag<T>, path: PathBuf) -> Result<(), ExecutionError> {
8✔
96
        let dag_schemas = DagSchemas::new(dag)?;
8✔
97
        DagMetadata::new(&dag_schemas, path)?;
8✔
98
        Ok(())
8✔
99
    }
8✔
100

101
    pub fn start(self, running: Arc<AtomicBool>) -> Result<DagExecutorJoinHandle, ExecutionError> {
332✔
102
        // Construct execution dag.
103
        let mut execution_dag =
332✔
104
            ExecutionDag::new(self.builder_dag, self.options.channel_buffer_sz)?;
332✔
105
        let node_indexes = execution_dag.graph().node_identifiers().collect::<Vec<_>>();
332✔
106

332✔
107
        // Start the threads.
332✔
108
        let mut join_handles = HashMap::new();
332✔
109
        for node_index in node_indexes {
2,027✔
110
            let node = &execution_dag.graph()[node_index];
1,695✔
111
            let node_handle = node.handle.clone();
1,695✔
112
            match &node.kind.as_ref().expect("We created all nodes") {
1,695✔
113
                NodeKind::Source(_, _) => {
114
                    let (source_sender_node, source_listener_node) = create_source_nodes(
342✔
115
                        &mut execution_dag,
342✔
116
                        node_index,
342✔
117
                        &self.options,
342✔
118
                        running.clone(),
342✔
119
                    );
342✔
120
                    join_handles.insert(
342✔
121
                        node_handle,
342✔
122
                        start_source(source_sender_node, source_listener_node)?,
342✔
123
                    );
×
124
                }
×
125
                NodeKind::Processor(_) => {
126
                    let processor_node = ProcessorNode::new(&mut execution_dag, node_index);
1,017✔
127
                    join_handles.insert(node_handle, start_processor(processor_node)?);
1,017✔
128
                }
×
129
                NodeKind::Sink(_) => {
×
130
                    let sink_node = SinkNode::new(&mut execution_dag, node_index);
336✔
131
                    join_handles.insert(node_handle, start_sink(sink_node)?);
336✔
132
                }
×
133
            }
×
134
        }
×
135

×
136
        Ok(DagExecutorJoinHandle { join_handles })
332✔
137
    }
332✔
138
}
139

×
140
impl DagExecutorJoinHandle {
×
141
    pub fn join(mut self) -> Result<(), ExecutionError> {
331✔
142
        let handles: Vec<NodeHandle> = self.join_handles.iter().map(|e| e.0.clone()).collect();
1,692✔
143

×
144
        loop {
×
145
            for handle in &handles {
9,029✔
146
                if let Entry::Occupied(entry) = self.join_handles.entry(handle.clone()) {
7,512✔
147
                    if entry.get().is_finished() {
7,242✔
148
                        if let Err(e) = entry.remove().join() {
1,680✔
149
                            panic_any(e);
5✔
150
                        }
1,675✔
151
                    }
5,562✔
152
                }
270✔
153
            }
154

155
            if self.join_handles.is_empty() {
1,517✔
156
                return Ok(());
326✔
157
            }
1,191✔
158

1,191✔
159
            thread::sleep(Duration::from_millis(250));
1,191✔
160
        }
161
    }
326✔
162
}
×
163

164
fn start_source(
342✔
165
    source_sender: SourceSenderNode,
342✔
166
    source_listener: SourceListenerNode,
342✔
167
) -> Result<JoinHandle<()>, ExecutionError> {
342✔
168
    let handle = source_sender.handle().clone();
342✔
169

170
    let _st_handle = Builder::new()
342✔
171
        .name(format!("{handle}-sender"))
342✔
172
        .spawn(move || match source_sender.run() {
342✔
173
            Ok(_) => {}
335✔
174
            // Channel disconnection means the source listener has quit.
×
175
            // Maybe it quit gracefully so we don't need to panic.
×
176
            Err(ExecutionError::CannotSendToChannel) => {}
6✔
177
            // Other errors result in panic.
×
178
            Err(e) => std::panic::panic_any(e),
1✔
179
        })?;
342✔
180

×
181
    Ok(Builder::new()
342✔
182
        .name(format!("{handle}-listener"))
342✔
183
        .spawn(move || {
342✔
184
            if let Err(e) = source_listener.run() {
342✔
185
                std::panic::panic_any(e);
5✔
186
            }
337✔
187
        })?)
342✔
188
}
342✔
189

×
190
fn start_processor(processor: ProcessorNode) -> Result<JoinHandle<()>, ExecutionError> {
1,017✔
191
    Ok(Builder::new()
1,017✔
192
        .name(processor.handle().to_string())
1,017✔
193
        .spawn(move || {
1,017✔
194
            if let Err(e) = processor.run() {
1,017✔
195
                std::panic::panic_any(e);
7✔
196
            }
1,010✔
197
        })?)
1,017✔
198
}
1,017✔
199

200
fn start_sink(sink: SinkNode) -> Result<JoinHandle<()>, ExecutionError> {
336✔
201
    Ok(Builder::new().name(sink.handle().to_string()).spawn(|| {
336✔
202
        if let Err(e) = sink.run() {
336✔
203
            std::panic::panic_any(e);
5✔
204
        }
331✔
205
    })?)
336✔
206
}
336✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc