• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4125713559

pending completion
4125713559

Pull #831

github

GitHub
Merge 3efc092d8 into cdb46cf22
Pull Request #831: chore: Improve Join processor errors

134 of 134 new or added lines in 4 files covered. (100.0%)

23044 of 34837 relevant lines covered (66.15%)

35039.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.61
/dozer-core/src/executor.rs
1
#![allow(clippy::type_complexity)]
2

3
use crate::dag_metadata::{Consistency, DagMetadata, DagMetadataManager};
4
use crate::dag_schemas::{DagSchemas, NodeSchemas};
5
use crate::errors::ExecutionError;
6
use crate::errors::ExecutionError::{IncompatibleSchemas, InconsistentCheckpointMetadata};
7
use crate::executor_utils::index_edges;
8
use crate::node::{NodeHandle, PortHandle, ProcessorFactory, SinkFactory, SourceFactory};
9
use crate::record_store::RecordReader;
10
use crate::Dag;
11

12
use crossbeam::channel::{bounded, Receiver, Sender};
13
use dozer_types::parking_lot::RwLock;
14
use dozer_types::types::{Operation, Record, Schema};
15

16
use crate::epoch::{Epoch, EpochManager};
17
use std::collections::hash_map::Entry;
18
use std::collections::HashMap;
19
use std::fmt::{Debug, Display, Formatter};
20
use std::panic::panic_any;
21
use std::path::{Path, PathBuf};
22
use std::sync::atomic::{AtomicBool, Ordering};
23
use std::sync::{Arc, Barrier};
24
use std::thread::JoinHandle;
25
use std::thread::{self, Builder};
26
use std::time::Duration;
27

28
#[derive(Clone)]
×
29
pub struct ExecutorOptions {
30
    pub commit_sz: u32,
31
    pub channel_buffer_sz: usize,
32
    pub commit_time_threshold: Duration,
33
}
34

35
impl Default for ExecutorOptions {
36
    fn default() -> Self {
159✔
37
        Self {
159✔
38
            commit_sz: 10_000,
159✔
39
            channel_buffer_sz: 20_000,
159✔
40
            commit_time_threshold: Duration::from_millis(50),
159✔
41
        }
159✔
42
    }
159✔
43
}
44

45
#[derive(Clone, Debug, PartialEq, Eq)]
276✔
46
pub(crate) enum InputPortState {
47
    Open,
48
    Terminated,
49
}
50

51
#[derive(Clone, Debug, PartialEq, Eq)]
83,151✔
52
pub enum ExecutorOperation {
53
    Delete { old: Record },
54
    Insert { new: Record },
55
    Update { old: Record, new: Record },
56
    Commit { epoch: Epoch },
57
    Terminate,
58
}
59

60
impl ExecutorOperation {
61
    pub fn from_operation(op: Operation) -> ExecutorOperation {
×
62
        match op {
×
63
            Operation::Update { old, new } => ExecutorOperation::Update { old, new },
×
64
            Operation::Delete { old } => ExecutorOperation::Delete { old },
×
65
            Operation::Insert { new } => ExecutorOperation::Insert { new },
×
66
        }
67
    }
×
68
}
69

70
impl Display for ExecutorOperation {
71
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
72
        let type_str = match self {
×
73
            ExecutorOperation::Delete { .. } => "Delete",
×
74
            ExecutorOperation::Update { .. } => "Update",
×
75
            ExecutorOperation::Insert { .. } => "Insert",
×
76
            ExecutorOperation::Terminate { .. } => "Terminate",
×
77
            ExecutorOperation::Commit { .. } => "Commit",
×
78
        };
79
        f.write_str(type_str)
×
80
    }
×
81
}
82

83
mod name;
84
mod node;
85
mod processor_node;
86
mod receiver_loop;
87
mod sink_node;
88
mod source_node;
89

90
use node::Node;
91
use processor_node::ProcessorNode;
92
use sink_node::SinkNode;
93

94
use self::source_node::{SourceListenerNode, SourceSenderNode};
95

96
use super::epoch::OpIdentifier;
97

98
pub struct DagExecutor<T: Clone> {
99
    dag: Dag<T>,
100
    schemas: HashMap<NodeHandle, NodeSchemas<T>>,
101
    record_stores: Arc<RwLock<HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>>>,
102
    join_handles: HashMap<NodeHandle, JoinHandle<()>>,
103
    path: PathBuf,
104
    options: ExecutorOptions,
105
    running: Arc<AtomicBool>,
106
    consistency_metadata: HashMap<NodeHandle, Option<OpIdentifier>>,
107
}
108

109
impl<T: Clone + Debug + 'static> DagExecutor<T> {
110
    fn check_consistency(
71✔
111
        dag: &Dag<T>,
71✔
112
        path: &Path,
71✔
113
    ) -> Result<HashMap<NodeHandle, Option<OpIdentifier>>, ExecutionError> {
71✔
114
        let mut r = HashMap::new();
71✔
115
        let meta = DagMetadataManager::new(dag, path)?;
71✔
116
        let chk = meta.get_checkpoint_consistency()?;
71✔
117
        for (handle, _factory) in dag.sources() {
84✔
118
            match chk.get(handle) {
84✔
119
                Some(Consistency::FullyConsistent(c)) => {
84✔
120
                    r.insert(handle.clone(), *c);
84✔
121
                }
84✔
122
                _ => return Err(InconsistentCheckpointMetadata),
×
123
            }
124
        }
125
        Ok(r)
71✔
126
    }
71✔
127

128
    pub fn new(
71✔
129
        dag: Dag<T>,
71✔
130
        path: &Path,
71✔
131
        options: ExecutorOptions,
71✔
132
        running: Arc<AtomicBool>,
71✔
133
    ) -> Result<Self, ExecutionError> {
71✔
134
        //
135

136
        let consistency_metadata = match Self::check_consistency(&dag, path) {
71✔
137
            Ok(c) => c,
71✔
138
            Err(_) => {
139
                DagMetadataManager::new(&dag, path)?.delete_metadata();
×
140
                dag.sources()
×
141
                    .map(|(handle, _)| (handle.clone(), None))
×
142
                    .collect()
×
143
            }
144
        };
145

146
        let schemas = Self::load_or_init_schema(&dag, path)?;
71✔
147
        let record_stores = Arc::new(RwLock::new(
70✔
148
            dag.node_handles()
70✔
149
                .map(|node_handle| {
310✔
150
                    (
310✔
151
                        node_handle.clone(),
310✔
152
                        HashMap::<PortHandle, Box<dyn RecordReader>>::new(),
310✔
153
                    )
310✔
154
                })
310✔
155
                .collect(),
70✔
156
        ));
70✔
157

70✔
158
        Ok(Self {
70✔
159
            dag,
70✔
160
            schemas,
70✔
161
            record_stores,
70✔
162
            path: path.to_path_buf(),
70✔
163
            join_handles: HashMap::new(),
70✔
164
            options,
70✔
165
            running,
70✔
166
            consistency_metadata,
70✔
167
        })
70✔
168
    }
71✔
169

170
    pub fn validate(dag: &Dag<T>, path: &Path) -> Result<(), ExecutionError> {
×
171
        let dag_schemas = DagSchemas::new(dag)?;
×
172
        let meta_manager = DagMetadataManager::new(dag, path)?;
×
173

174
        let current_schemas = dag_schemas.get_all_schemas();
×
175
        let existing_schemas = meta_manager.get_metadata()?;
×
176
        for (handle, current) in &current_schemas {
×
177
            if let Some(existing) = existing_schemas.get(handle) {
×
178
                Self::validate_schemas(current, existing)?;
×
179
            } else {
×
180
                // Non-existing schemas is OK. `Executor::new` will initialize the schemas.
×
181
            }
×
182
        }
183

184
        Ok(())
×
185
    }
×
186

187
    fn validate_schemas(
9✔
188
        current: &NodeSchemas<T>,
9✔
189
        existing: &DagMetadata,
9✔
190
    ) -> Result<(), ExecutionError> {
9✔
191
        if existing.output_schemas.len() != current.output_schemas.len() {
9✔
192
            return Err(IncompatibleSchemas(
×
193
                "Output Schemas length mismatch".to_string(),
×
194
            ));
×
195
        }
9✔
196
        for (port, (schema, _ctx)) in &current.output_schemas {
15✔
197
            let other_schema = existing
6✔
198
                .output_schemas
6✔
199
                .get(port)
6✔
200
                .ok_or(IncompatibleSchemas(format!(
6✔
201
                    "Cannot find output schema on port {port:?}"
6✔
202
                )))?;
6✔
203
            if schema != other_schema {
6✔
204
                schema.print().printstd();
×
205

×
206
                other_schema.print().printstd();
×
207
                return Err(IncompatibleSchemas(format!(
×
208
                    "Schema mismatch for port {port:?}:"
×
209
                )));
×
210
            }
6✔
211
        }
212
        if existing.input_schemas.len() != current.input_schemas.len() {
9✔
213
            return Err(IncompatibleSchemas(
×
214
                "Input Schemas length mismatch".to_string(),
×
215
            ));
×
216
        }
9✔
217
        for (port, (schema, _ctx)) in &current.input_schemas {
15✔
218
            let other_schema =
7✔
219
                existing
7✔
220
                    .input_schemas
7✔
221
                    .get(port)
7✔
222
                    .ok_or(IncompatibleSchemas(format!(
7✔
223
                        "Cannot find input schema on port {port:?}",
7✔
224
                    )))?;
7✔
225
            if schema != other_schema {
7✔
226
                schema.print().printstd();
1✔
227

1✔
228
                other_schema.print().printstd();
1✔
229
                return Err(IncompatibleSchemas(format!(
1✔
230
                    "Schema mismatch for port {port:?}:",
1✔
231
                )));
1✔
232
            }
6✔
233
        }
234
        Ok(())
8✔
235
    }
9✔
236

237
    fn load_or_init_schema(
71✔
238
        dag: &Dag<T>,
71✔
239
        path: &Path,
71✔
240
    ) -> Result<HashMap<NodeHandle, NodeSchemas<T>>, ExecutionError> {
71✔
241
        let dag_schemas = DagSchemas::new(dag)?;
71✔
242
        let meta_manager = DagMetadataManager::new(dag, path)?;
71✔
243

244
        let current_schemas = dag_schemas.get_all_schemas();
71✔
245
        match meta_manager.get_metadata() {
71✔
246
            Ok(existing_schemas) => {
71✔
247
                for (handle, current) in &current_schemas {
381✔
248
                    if let Some(existing) = existing_schemas.get(handle) {
311✔
249
                        Self::validate_schemas(current, existing)?;
9✔
250
                    } else {
251
                        meta_manager.delete_metadata();
302✔
252
                        meta_manager.init_metadata(&current_schemas)?;
302✔
253
                    }
254
                }
255
            }
256
            Err(_) => {
257
                meta_manager.delete_metadata();
×
258
                meta_manager.init_metadata(&current_schemas)?;
×
259
            }
260
        };
261

262
        Ok(current_schemas)
70✔
263
    }
71✔
264

265
    fn start_source(
78✔
266
        &self,
78✔
267
        handle: NodeHandle,
78✔
268
        src_factory: Arc<dyn SourceFactory<T>>,
78✔
269
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
78✔
270
        schemas: &NodeSchemas<T>,
78✔
271
        epoch_manager: Arc<EpochManager>,
78✔
272
        start_barrier: Arc<Barrier>,
78✔
273
    ) -> Result<JoinHandle<()>, ExecutionError> {
78✔
274
        let (sender, receiver) = bounded(self.options.channel_buffer_sz);
78✔
275
        // let (sender, receiver) = bounded(1);
276

277
        let start_seq = *self
78✔
278
            .consistency_metadata
78✔
279
            .get(&handle)
78✔
280
            .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?;
78✔
281
        let output_ports = src_factory.get_output_ports()?;
78✔
282

283
        let st_node_handle = handle.clone();
78✔
284
        let output_schemas: HashMap<PortHandle, Schema> = schemas
78✔
285
            .output_schemas
78✔
286
            .clone()
78✔
287
            .into_iter()
78✔
288
            .map(|e| (e.0, e.1 .0))
92✔
289
            .collect();
78✔
290
        let running = self.running.clone();
78✔
291
        let source_fn = move |handle: NodeHandle| -> Result<(), ExecutionError> {
78✔
292
            let sender = SourceSenderNode::new(
78✔
293
                handle,
78✔
294
                &*src_factory,
78✔
295
                output_schemas,
78✔
296
                start_seq,
78✔
297
                sender,
78✔
298
                running,
78✔
299
            )?;
78✔
300
            sender.run()
77✔
301
        };
78✔
302

303
        let _st_handle = Builder::new()
78✔
304
            .name(format!("{handle}-sender"))
78✔
305
            .spawn(move || {
78✔
306
                if let Err(e) = source_fn(st_node_handle) {
78✔
307
                    std::panic::panic_any(e);
11✔
308
                }
67✔
309
            })?;
78✔
310

311
        let timeout = self.options.commit_time_threshold;
78✔
312
        let base_path = self.path.clone();
78✔
313
        let record_readers = self.record_stores.clone();
78✔
314
        let edges = self.dag.edge_handles().cloned().collect::<Vec<_>>();
78✔
315
        let running = self.running.clone();
78✔
316
        let running_listener = running.clone();
78✔
317
        let commit_sz = self.options.commit_sz;
78✔
318
        let max_duration_between_commits = self.options.commit_time_threshold;
78✔
319
        let output_schemas: HashMap<PortHandle, Schema> = schemas
78✔
320
            .output_schemas
78✔
321
            .clone()
78✔
322
            .into_iter()
78✔
323
            .map(|e| (e.0, e.1 .0))
92✔
324
            .collect();
78✔
325
        let retention_queue_size = self.options.channel_buffer_sz + 1;
78✔
326
        let source_fn = move |handle: NodeHandle| -> Result<(), ExecutionError> {
78✔
327
            let listener = SourceListenerNode::new(
78✔
328
                handle,
78✔
329
                receiver,
78✔
330
                timeout,
78✔
331
                &base_path,
78✔
332
                &output_ports,
78✔
333
                record_readers,
78✔
334
                senders,
78✔
335
                &edges,
78✔
336
                running,
78✔
337
                commit_sz,
78✔
338
                max_duration_between_commits,
78✔
339
                epoch_manager,
78✔
340
                output_schemas,
78✔
341
                retention_queue_size,
78✔
342
            )?;
78✔
343
            start_barrier.wait();
77✔
344
            listener.run()
77✔
345
        };
78✔
346
        Ok(Builder::new()
78✔
347
            .name(format!("{handle}-listener"))
78✔
348
            .spawn(move || {
78✔
349
                if let Err(e) = source_fn(handle) {
78✔
350
                    if running_listener.load(Ordering::Relaxed) {
10✔
351
                        std::panic::panic_any(e);
3✔
352
                    }
7✔
353
                }
68✔
354
            })?)
78✔
355
    }
78✔
356

357
    pub fn start_processor(
152✔
358
        &self,
152✔
359
        handle: NodeHandle,
152✔
360
        proc_factory: Arc<dyn ProcessorFactory<T>>,
152✔
361
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
152✔
362
        receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
152✔
363
        schemas: &NodeSchemas<T>,
152✔
364
    ) -> Result<JoinHandle<()>, ExecutionError> {
152✔
365
        let base_path = self.path.clone();
152✔
366
        let record_readers = self.record_stores.clone();
152✔
367
        let edges = self.dag.edge_handles().cloned().collect::<Vec<_>>();
152✔
368
        let input_schemas: HashMap<PortHandle, Schema> = schemas
152✔
369
            .input_schemas
152✔
370
            .clone()
152✔
371
            .into_iter()
152✔
372
            .map(|e| (e.0, e.1 .0))
176✔
373
            .collect();
152✔
374
        let output_schemas: HashMap<PortHandle, Schema> = schemas
152✔
375
            .output_schemas
152✔
376
            .clone()
152✔
377
            .into_iter()
152✔
378
            .map(|e| (e.0, e.1 .0))
152✔
379
            .collect();
152✔
380
        let running = self.running.clone();
152✔
381
        let retention_queue_size = self.options.channel_buffer_sz + 1;
152✔
382
        let processor_fn = move |handle: NodeHandle| -> Result<(), ExecutionError> {
152✔
383
            let processor = ProcessorNode::new(
152✔
384
                handle,
152✔
385
                &*proc_factory,
152✔
386
                &base_path,
152✔
387
                record_readers,
152✔
388
                receivers,
152✔
389
                senders,
152✔
390
                &edges,
152✔
391
                input_schemas,
152✔
392
                output_schemas,
152✔
393
                retention_queue_size,
152✔
394
            )?;
152✔
395
            processor.run()
150✔
396
        };
152✔
397
        Ok(Builder::new().name(handle.to_string()).spawn(move || {
152✔
398
            if let Err(e) = processor_fn(handle) {
152✔
399
                if running.load(Ordering::Relaxed) {
12✔
400
                    std::panic::panic_any(e);
8✔
401
                }
4✔
402
            }
140✔
403
        })?)
152✔
404
    }
152✔
405

406
    pub fn start_sink(
72✔
407
        &self,
72✔
408
        handle: NodeHandle,
72✔
409
        snk_factory: Arc<dyn SinkFactory<T>>,
72✔
410
        receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
72✔
411
        schemas: &NodeSchemas<T>,
72✔
412
    ) -> Result<JoinHandle<()>, ExecutionError> {
72✔
413
        let base_path = self.path.clone();
72✔
414
        let record_readers = self.record_stores.clone();
72✔
415
        let input_schemas: HashMap<PortHandle, Schema> = schemas
72✔
416
            .input_schemas
72✔
417
            .clone()
72✔
418
            .into_iter()
72✔
419
            .map(|e| (e.0, e.1 .0))
72✔
420
            .collect();
72✔
421
        let retention_queue_size = self.options.channel_buffer_sz + 1;
72✔
422
        let snk_fn = move |handle| -> Result<(), ExecutionError> {
72✔
423
            let sink = SinkNode::new(
72✔
424
                handle,
72✔
425
                &*snk_factory,
72✔
426
                &base_path,
72✔
427
                record_readers,
72✔
428
                receivers,
72✔
429
                input_schemas,
72✔
430
                retention_queue_size,
72✔
431
            )?;
72✔
432
            sink.run()
71✔
433
        };
72✔
434
        Ok(Builder::new().name(handle.to_string()).spawn(|| {
72✔
435
            if let Err(e) = snk_fn(handle) {
72✔
436
                std::panic::panic_any(e);
10✔
437
            }
62✔
438
        })?)
72✔
439
    }
72✔
440

441
    pub fn start(&mut self) -> Result<(), ExecutionError> {
68✔
442
        let (mut senders, mut receivers) = index_edges(&self.dag, self.options.channel_buffer_sz);
68✔
443

444
        for (handle, factory) in self.dag.sinks() {
72✔
445
            let join_handle = self.start_sink(
72✔
446
                handle.clone(),
72✔
447
                factory.clone(),
72✔
448
                receivers.remove(handle).unwrap_or_default(),
72✔
449
                self.schemas
72✔
450
                    .get(handle)
72✔
451
                    .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?,
72✔
452
            )?;
×
453
            self.join_handles.insert(handle.clone(), join_handle);
72✔
454
        }
455

456
        for (handle, factory) in self.dag.processors() {
152✔
457
            let join_handle = self.start_processor(
152✔
458
                handle.clone(),
152✔
459
                factory.clone(),
152✔
460
                senders.remove(handle).unwrap_or_default(),
152✔
461
                receivers.remove(handle).unwrap_or_default(),
152✔
462
                self.schemas
152✔
463
                    .get(handle)
152✔
464
                    .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?,
152✔
465
            )?;
×
466
            self.join_handles.insert(handle.clone(), join_handle);
152✔
467
        }
468

469
        let num_sources = self.dag.sources().count();
68✔
470
        let epoch_manager: Arc<EpochManager> = Arc::new(EpochManager::new(num_sources));
68✔
471

68✔
472
        let start_barrier = Arc::new(Barrier::new(num_sources));
68✔
473

474
        for (handle, factory) in self.dag.sources() {
78✔
475
            let join_handle = self.start_source(
78✔
476
                handle.clone(),
78✔
477
                factory.clone(),
78✔
478
                senders.remove(handle).unwrap_or_default(),
78✔
479
                self.schemas
78✔
480
                    .get(handle)
78✔
481
                    .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?,
78✔
482
                epoch_manager.clone(),
78✔
483
                start_barrier.clone(),
78✔
484
            )?;
×
485
            self.join_handles.insert(handle.clone(), join_handle);
78✔
486
        }
487
        Ok(())
68✔
488
    }
68✔
489

490
    pub fn stop(&self) {
1✔
491
        self.running.store(false, Ordering::SeqCst);
1✔
492
    }
1✔
493

494
    pub fn join(mut self) -> Result<(), ExecutionError> {
67✔
495
        let handles: Vec<NodeHandle> = self.join_handles.iter().map(|e| e.0.clone()).collect();
299✔
496

497
        loop {
498
            for handle in &handles {
2,340✔
499
                if let Entry::Occupied(entry) = self.join_handles.entry(handle.clone()) {
1,908✔
500
                    if entry.get().is_finished() {
1,874✔
501
                        if let Err(e) = entry.remove().join() {
283✔
502
                            panic_any(e);
9✔
503
                        }
274✔
504
                    }
1,591✔
505
                }
34✔
506
            }
507

508
            if self.join_handles.is_empty() {
432✔
509
                return Ok(());
58✔
510
            }
374✔
511

374✔
512
            thread::sleep(Duration::from_millis(250));
374✔
513
        }
514
    }
58✔
515
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc