• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/dozer-core/src/dag/executor.rs
1
#![allow(clippy::type_complexity)]
2

3
use crate::dag::dag_metadata::{Consistency, DagMetadata, DagMetadataManager};
4
use crate::dag::dag_schemas::{DagSchemas, NodeSchemas};
5
use crate::dag::errors::ExecutionError;
6
use crate::dag::errors::ExecutionError::{IncompatibleSchemas, InconsistentCheckpointMetadata};
7
use crate::dag::executor_utils::index_edges;
8
use crate::dag::node::{NodeHandle, PortHandle, ProcessorFactory, SinkFactory, SourceFactory};
9
use crate::dag::record_store::RecordReader;
10
use crate::dag::Dag;
11

12
use crossbeam::channel::{bounded, Receiver, Sender};
13
use dozer_types::parking_lot::RwLock;
14
use dozer_types::types::{Operation, Record, Schema};
15

16
use crate::dag::epoch::{Epoch, EpochManager};
17
use std::collections::hash_map::Entry;
18
use std::collections::HashMap;
19
use std::fmt::{Debug, Display, Formatter};
20
use std::panic::panic_any;
21
use std::path::{Path, PathBuf};
22
use std::sync::atomic::{AtomicBool, Ordering};
23
use std::sync::{Arc, Barrier};
24
use std::thread::JoinHandle;
25
use std::thread::{self, Builder};
26
use std::time::Duration;
27

28
#[derive(Clone)]
×
29
pub struct ExecutorOptions {
30
    pub commit_sz: u32,
×
31
    pub channel_buffer_sz: usize,
32
    pub commit_time_threshold: Duration,
33
}
34

35
impl Default for ExecutorOptions {
36
    fn default() -> Self {
132✔
37
        Self {
132✔
38
            commit_sz: 10_000,
132✔
39
            channel_buffer_sz: 20_000,
132✔
40
            commit_time_threshold: Duration::from_millis(50),
132✔
41
        }
132✔
42
    }
132✔
43
}
×
44

×
45
#[derive(Clone, Debug, PartialEq, Eq)]
220✔
46
pub(crate) enum InputPortState {
47
    Open,
×
48
    Terminated,
49
}
50

51
#[derive(Clone, Debug, PartialEq, Eq)]
83,789✔
52
pub enum ExecutorOperation {
53
    Delete { old: Record },
×
54
    Insert { new: Record },
55
    Update { old: Record, new: Record },
56
    Commit { epoch: Epoch },
57
    Terminate,
58
}
59

60
impl ExecutorOperation {
61
    pub fn from_operation(op: Operation) -> ExecutorOperation {
×
62
        match op {
×
63
            Operation::Update { old, new } => ExecutorOperation::Update { old, new },
×
64
            Operation::Delete { old } => ExecutorOperation::Delete { old },
×
65
            Operation::Insert { new } => ExecutorOperation::Insert { new },
×
66
        }
×
67
    }
×
68
}
69

×
70
impl Display for ExecutorOperation {
71
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
72
        let type_str = match self {
×
73
            ExecutorOperation::Delete { .. } => "Delete",
×
74
            ExecutorOperation::Update { .. } => "Update",
×
75
            ExecutorOperation::Insert { .. } => "Insert",
×
76
            ExecutorOperation::Terminate { .. } => "Terminate",
×
77
            ExecutorOperation::Commit { .. } => "Commit",
×
78
        };
×
79
        f.write_str(type_str)
×
80
    }
×
81
}
×
82

×
83
mod name;
84
mod node;
85
mod processor_node;
86
mod receiver_loop;
87
mod sink_node;
88
mod source_node;
89

90
use node::Node;
91
use processor_node::ProcessorNode;
×
92
use sink_node::SinkNode;
×
93

×
94
use self::source_node::{SourceListenerNode, SourceSenderNode};
95

96
use super::epoch::OpIdentifier;
97

98
pub struct DagExecutor<T: Clone> {
99
    dag: Dag<T>,
100
    schemas: HashMap<NodeHandle, NodeSchemas<T>>,
101
    record_stores: Arc<RwLock<HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>>>,
102
    join_handles: HashMap<NodeHandle, JoinHandle<()>>,
103
    path: PathBuf,
104
    options: ExecutorOptions,
105
    running: Arc<AtomicBool>,
106
    consistency_metadata: HashMap<NodeHandle, Option<OpIdentifier>>,
107
}
108

109
impl<T: Clone + Debug + 'static> DagExecutor<T> {
110
    fn check_consistency(
62✔
111
        dag: &Dag<T>,
62✔
112
        path: &Path,
62✔
113
    ) -> Result<HashMap<NodeHandle, Option<OpIdentifier>>, ExecutionError> {
62✔
114
        let mut r = HashMap::new();
62✔
115
        let meta = DagMetadataManager::new(dag, path)?;
62✔
116
        let chk = meta.get_checkpoint_consistency()?;
62✔
117
        for (handle, _factory) in dag.sources() {
75✔
118
            match chk.get(handle) {
75✔
119
                Some(Consistency::FullyConsistent(c)) => {
75✔
120
                    r.insert(handle.clone(), *c);
75✔
121
                }
75✔
122
                _ => return Err(InconsistentCheckpointMetadata),
×
123
            }
×
124
        }
×
125
        Ok(r)
62✔
126
    }
62✔
127

×
128
    pub fn new(
62✔
129
        dag: Dag<T>,
62✔
130
        path: &Path,
62✔
131
        options: ExecutorOptions,
62✔
132
        running: Arc<AtomicBool>,
62✔
133
    ) -> Result<Self, ExecutionError> {
62✔
134
        //
×
135

×
136
        let consistency_metadata = match Self::check_consistency(&dag, path) {
62✔
137
            Ok(c) => c,
62✔
138
            Err(_) => {
×
139
                DagMetadataManager::new(&dag, path)?.delete_metadata();
×
140
                dag.sources()
×
141
                    .map(|(handle, _)| (handle.clone(), None))
×
142
                    .collect()
×
143
            }
×
144
        };
×
145

×
146
        let schemas = Self::load_or_init_schema(&dag, path)?;
62✔
147
        let record_stores = Arc::new(RwLock::new(
61✔
148
            dag.node_handles()
61✔
149
                .map(|node_handle| {
256✔
150
                    (
256✔
151
                        node_handle.clone(),
256✔
152
                        HashMap::<PortHandle, Box<dyn RecordReader>>::new(),
256✔
153
                    )
256✔
154
                })
256✔
155
                .collect(),
61✔
156
        ));
61✔
157

61✔
158
        Ok(Self {
61✔
159
            dag,
61✔
160
            schemas,
61✔
161
            record_stores,
61✔
162
            path: path.to_path_buf(),
61✔
163
            join_handles: HashMap::new(),
61✔
164
            options,
61✔
165
            running,
61✔
166
            consistency_metadata,
61✔
167
        })
61✔
168
    }
62✔
169

×
170
    pub fn validate(dag: &Dag<T>, path: &Path) -> Result<(), ExecutionError> {
×
171
        let dag_schemas = DagSchemas::new(dag)?;
×
172
        let meta_manager = DagMetadataManager::new(dag, path)?;
×
173

×
174
        let current_schemas = dag_schemas.get_all_schemas();
×
175
        let existing_schemas = meta_manager.get_metadata()?;
×
176
        for (handle, current) in &current_schemas {
×
177
            if let Some(existing) = existing_schemas.get(handle) {
×
178
                Self::validate_schemas(current, existing)?;
×
179
            } else {
×
180
                // Non-existing schemas is OK. `Executor::new` will initialize the schemas.
×
181
            }
×
182
        }
×
183

×
184
        Ok(())
×
185
    }
×
186

×
187
    fn validate_schemas(
9✔
188
        current: &NodeSchemas<T>,
9✔
189
        existing: &DagMetadata,
9✔
190
    ) -> Result<(), ExecutionError> {
9✔
191
        if existing.output_schemas.len() != current.output_schemas.len() {
9✔
192
            return Err(IncompatibleSchemas(
×
193
                "Output Schemas length mismatch".to_string(),
×
194
            ));
×
195
        }
9✔
196
        for (port, (schema, _ctx)) in &current.output_schemas {
15✔
197
            let other_schema = existing
7✔
198
                .output_schemas
7✔
199
                .get(port)
7✔
200
                .ok_or(IncompatibleSchemas(format!(
7✔
201
                    "Cannot find output schema on port {port:?}"
7✔
202
                )))?;
7✔
203
            if schema != other_schema {
7✔
204
                schema.print().printstd();
1✔
205

1✔
206
                other_schema.print().printstd();
1✔
207
                return Err(IncompatibleSchemas(format!(
1✔
208
                    "Schema mismatch for port {port:?}:"
1✔
209
                )));
1✔
210
            }
6✔
211
        }
×
212
        if existing.input_schemas.len() != current.input_schemas.len() {
8✔
213
            return Err(IncompatibleSchemas(
×
214
                "Input Schemas length mismatch".to_string(),
×
215
            ));
×
216
        }
8✔
217
        for (port, (schema, _ctx)) in &current.input_schemas {
14✔
218
            let other_schema =
6✔
219
                existing
6✔
220
                    .input_schemas
6✔
221
                    .get(port)
6✔
222
                    .ok_or(IncompatibleSchemas(format!(
6✔
223
                        "Cannot find input schema on port {port:?}",
6✔
224
                    )))?;
6✔
225
            if schema != other_schema {
6✔
226
                schema.print().printstd();
×
227

×
228
                other_schema.print().printstd();
×
229
                return Err(IncompatibleSchemas(format!(
×
230
                    "Schema mismatch for port {port:?}:",
×
231
                )));
×
232
            }
6✔
233
        }
×
234
        Ok(())
8✔
235
    }
9✔
236

×
237
    fn load_or_init_schema(
62✔
238
        dag: &Dag<T>,
62✔
239
        path: &Path,
62✔
240
    ) -> Result<HashMap<NodeHandle, NodeSchemas<T>>, ExecutionError> {
62✔
241
        let dag_schemas = DagSchemas::new(dag)?;
62✔
242
        let meta_manager = DagMetadataManager::new(dag, path)?;
62✔
243

244
        let current_schemas = dag_schemas.get_all_schemas();
62✔
245
        match meta_manager.get_metadata() {
62✔
246
            Ok(existing_schemas) => {
62✔
247
                for (handle, current) in &current_schemas {
318✔
248
                    if let Some(existing) = existing_schemas.get(handle) {
257✔
249
                        Self::validate_schemas(current, existing)?;
9✔
250
                    } else {
×
251
                        meta_manager.delete_metadata();
248✔
252
                        meta_manager.init_metadata(&current_schemas)?;
248✔
253
                    }
×
254
                }
×
255
            }
×
256
            Err(_) => {
×
257
                meta_manager.delete_metadata();
×
258
                meta_manager.init_metadata(&current_schemas)?;
×
259
            }
×
260
        };
×
261

×
262
        Ok(current_schemas)
61✔
263
    }
62✔
264

×
265
    fn start_source(
69✔
266
        &self,
69✔
267
        handle: NodeHandle,
69✔
268
        src_factory: Arc<dyn SourceFactory<T>>,
69✔
269
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
69✔
270
        schemas: &NodeSchemas<T>,
69✔
271
        epoch_manager: Arc<EpochManager>,
69✔
272
        start_barrier: Arc<Barrier>,
69✔
273
    ) -> Result<JoinHandle<()>, ExecutionError> {
69✔
274
        let (sender, receiver) = bounded(self.options.channel_buffer_sz);
69✔
275
        // let (sender, receiver) = bounded(1);
×
276

×
277
        let start_seq = *self
69✔
278
            .consistency_metadata
69✔
279
            .get(&handle)
69✔
280
            .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?;
69✔
281
        let output_ports = src_factory.get_output_ports()?;
69✔
282

×
283
        let st_node_handle = handle.clone();
69✔
284
        let output_schemas: HashMap<PortHandle, Schema> = schemas
69✔
285
            .output_schemas
69✔
286
            .clone()
69✔
287
            .into_iter()
69✔
288
            .map(|e| (e.0, e.1 .0))
81✔
289
            .collect();
69✔
290
        let running = self.running.clone();
69✔
291
        let source_fn = move |handle: NodeHandle| -> Result<(), ExecutionError> {
69✔
292
            let sender = SourceSenderNode::new(
69✔
293
                handle,
69✔
294
                &*src_factory,
69✔
295
                output_schemas,
69✔
296
                start_seq,
69✔
297
                sender,
69✔
298
                running,
69✔
299
            )?;
69✔
300
            sender.run()
68✔
301
        };
69✔
302

×
303
        let _st_handle = Builder::new()
69✔
304
            .name(format!("{handle}-sender"))
69✔
305
            .spawn(move || {
69✔
306
                if let Err(e) = source_fn(st_node_handle) {
69✔
307
                    std::panic::panic_any(e);
11✔
308
                }
58✔
309
            })?;
69✔
310

×
311
        let timeout = self.options.commit_time_threshold;
69✔
312
        let base_path = self.path.clone();
69✔
313
        let record_readers = self.record_stores.clone();
69✔
314
        let edges = self.dag.edge_handles().cloned().collect::<Vec<_>>();
69✔
315
        let running = self.running.clone();
69✔
316
        let running_listener = running.clone();
69✔
317
        let commit_sz = self.options.commit_sz;
69✔
318
        let max_duration_between_commits = self.options.commit_time_threshold;
69✔
319
        let output_schemas: HashMap<PortHandle, Schema> = schemas
69✔
320
            .output_schemas
69✔
321
            .clone()
69✔
322
            .into_iter()
69✔
323
            .map(|e| (e.0, e.1 .0))
81✔
324
            .collect();
69✔
325
        let retention_queue_size = self.options.channel_buffer_sz + 1;
69✔
326
        let source_fn = move |handle: NodeHandle| -> Result<(), ExecutionError> {
69✔
327
            let listener = SourceListenerNode::new(
69✔
328
                handle,
69✔
329
                receiver,
69✔
330
                timeout,
69✔
331
                &base_path,
69✔
332
                &output_ports,
69✔
333
                record_readers,
69✔
334
                senders,
69✔
335
                &edges,
69✔
336
                running,
69✔
337
                commit_sz,
69✔
338
                max_duration_between_commits,
69✔
339
                epoch_manager,
69✔
340
                output_schemas,
69✔
341
                retention_queue_size,
69✔
342
            )?;
69✔
343
            start_barrier.wait();
68✔
344
            listener.run()
68✔
345
        };
69✔
346
        Ok(Builder::new()
69✔
347
            .name(format!("{handle}-listener"))
69✔
348
            .spawn(move || {
69✔
349
                if let Err(e) = source_fn(handle) {
69✔
350
                    if running_listener.load(Ordering::Relaxed) {
10✔
351
                        std::panic::panic_any(e);
4✔
352
                    }
6✔
353
                }
59✔
354
            })?)
69✔
355
    }
69✔
356

×
357
    pub fn start_processor(
116✔
358
        &self,
116✔
359
        handle: NodeHandle,
116✔
360
        proc_factory: Arc<dyn ProcessorFactory<T>>,
116✔
361
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
116✔
362
        receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
116✔
363
        schemas: &NodeSchemas<T>,
116✔
364
    ) -> Result<JoinHandle<()>, ExecutionError> {
116✔
365
        let base_path = self.path.clone();
116✔
366
        let record_readers = self.record_stores.clone();
116✔
367
        let edges = self.dag.edge_handles().cloned().collect::<Vec<_>>();
116✔
368
        let input_schemas: HashMap<PortHandle, Schema> = schemas
116✔
369
            .input_schemas
116✔
370
            .clone()
116✔
371
            .into_iter()
116✔
372
            .map(|e| (e.0, e.1 .0))
138✔
373
            .collect();
116✔
374
        let output_schemas: HashMap<PortHandle, Schema> = schemas
116✔
375
            .output_schemas
116✔
376
            .clone()
116✔
377
            .into_iter()
116✔
378
            .map(|e| (e.0, e.1 .0))
116✔
379
            .collect();
116✔
380
        let running = self.running.clone();
116✔
381
        let retention_queue_size = self.options.channel_buffer_sz + 1;
116✔
382
        let processor_fn = move |handle: NodeHandle| -> Result<(), ExecutionError> {
116✔
383
            let processor = ProcessorNode::new(
116✔
384
                handle,
116✔
385
                &*proc_factory,
116✔
386
                &base_path,
116✔
387
                record_readers,
116✔
388
                receivers,
116✔
389
                senders,
116✔
390
                &edges,
116✔
391
                input_schemas,
116✔
392
                output_schemas,
116✔
393
                retention_queue_size,
116✔
394
            )?;
116✔
395
            processor.run()
114✔
396
        };
116✔
397
        Ok(Builder::new().name(handle.to_string()).spawn(move || {
116✔
398
            if let Err(e) = processor_fn(handle) {
116✔
399
                if running.load(Ordering::Relaxed) {
12✔
400
                    std::panic::panic_any(e);
10✔
401
                }
2✔
402
            }
104✔
403
        })?)
116✔
404
    }
116✔
405

×
406
    pub fn start_sink(
63✔
407
        &self,
63✔
408
        handle: NodeHandle,
63✔
409
        snk_factory: Arc<dyn SinkFactory<T>>,
63✔
410
        receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
63✔
411
        schemas: &NodeSchemas<T>,
63✔
412
    ) -> Result<JoinHandle<()>, ExecutionError> {
63✔
413
        let base_path = self.path.clone();
63✔
414
        let record_readers = self.record_stores.clone();
63✔
415
        let input_schemas: HashMap<PortHandle, Schema> = schemas
63✔
416
            .input_schemas
63✔
417
            .clone()
63✔
418
            .into_iter()
63✔
419
            .map(|e| (e.0, e.1 .0))
63✔
420
            .collect();
63✔
421
        let retention_queue_size = self.options.channel_buffer_sz + 1;
63✔
422
        let snk_fn = move |handle| -> Result<(), ExecutionError> {
63✔
423
            let sink = SinkNode::new(
63✔
424
                handle,
63✔
425
                &*snk_factory,
63✔
426
                &base_path,
63✔
427
                record_readers,
63✔
428
                receivers,
63✔
429
                input_schemas,
63✔
430
                retention_queue_size,
63✔
431
            )?;
63✔
432
            sink.run()
62✔
433
        };
63✔
434
        Ok(Builder::new().name(handle.to_string()).spawn(|| {
63✔
435
            if let Err(e) = snk_fn(handle) {
63✔
436
                std::panic::panic_any(e);
10✔
437
            }
53✔
438
        })?)
63✔
439
    }
63✔
440

×
441
    pub fn start(&mut self) -> Result<(), ExecutionError> {
59✔
442
        let (mut senders, mut receivers) = index_edges(&self.dag, self.options.channel_buffer_sz);
59✔
443

×
444
        for (handle, factory) in self.dag.sinks() {
63✔
445
            let join_handle = self.start_sink(
63✔
446
                handle.clone(),
63✔
447
                factory.clone(),
63✔
448
                receivers.remove(handle).expect("BUG in DagExecutor"),
63✔
449
                self.schemas
63✔
450
                    .get(handle)
63✔
451
                    .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?,
63✔
452
            )?;
×
453
            self.join_handles.insert(handle.clone(), join_handle);
63✔
454
        }
×
455

×
456
        for (handle, factory) in self.dag.processors() {
116✔
457
            let join_handle = self.start_processor(
116✔
458
                handle.clone(),
116✔
459
                factory.clone(),
116✔
460
                senders.remove(handle).expect("BUG in DagExecutor"),
116✔
461
                receivers.remove(handle).expect("BUG in DagExecutor"),
116✔
462
                self.schemas
116✔
463
                    .get(handle)
116✔
464
                    .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?,
116✔
465
            )?;
×
466
            self.join_handles.insert(handle.clone(), join_handle);
116✔
467
        }
×
468

×
469
        let num_sources = self.dag.sources().count();
59✔
470
        let epoch_manager: Arc<EpochManager> = Arc::new(EpochManager::new(num_sources));
59✔
471

59✔
472
        let start_barrier = Arc::new(Barrier::new(num_sources));
59✔
473

×
474
        for (handle, factory) in self.dag.sources() {
69✔
475
            let join_handle = self.start_source(
69✔
476
                handle.clone(),
69✔
477
                factory.clone(),
69✔
478
                senders.remove(handle).expect("BUG in DagExecutor"),
69✔
479
                self.schemas
69✔
480
                    .get(handle)
69✔
481
                    .ok_or_else(|| ExecutionError::InvalidNodeHandle(handle.clone()))?,
69✔
482
                epoch_manager.clone(),
69✔
483
                start_barrier.clone(),
69✔
484
            )?;
×
485
            self.join_handles.insert(handle.clone(), join_handle);
69✔
486
        }
×
487
        Ok(())
59✔
488
    }
59✔
489

×
490
    pub fn stop(&self) {
1✔
491
        self.running.store(false, Ordering::SeqCst);
1✔
492
    }
1✔
493

×
494
    pub fn join(mut self) -> Result<(), ExecutionError> {
58✔
495
        let handles: Vec<NodeHandle> = self.join_handles.iter().map(|e| e.0.clone()).collect();
245✔
496

×
497
        loop {
×
498
            for handle in &handles {
2,270✔
499
                if let Entry::Occupied(entry) = self.join_handles.entry(handle.clone()) {
1,839✔
500
                    if entry.get().is_finished() {
1,811✔
501
                        if let Err(e) = entry.remove().join() {
226✔
502
                            panic_any(e);
9✔
503
                        }
217✔
504
                    }
1,585✔
505
                }
28✔
506
            }
×
507

508
            if self.join_handles.is_empty() {
431✔
509
                return Ok(());
49✔
510
            }
382✔
511

382✔
512
            thread::sleep(Duration::from_millis(250));
382✔
513
        }
514
    }
49✔
515
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc