• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.61
/dozer-core/src/dag/tests/checkpoint_ns.rs
1
use crate::chk;
2
use crate::dag::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
3

4
use crate::dag::executor::{DagExecutor, ExecutorOptions};
5
use crate::dag::node::NodeHandle;
6
use crate::dag::tests::dag_base_run::NoopJoinProcessorFactory;
7
use crate::dag::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
8
use crate::dag::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
9
use std::sync::atomic::AtomicBool;
10
use std::sync::Arc;
11
use tempdir::TempDir;
12

13
#[test]
1✔
14
fn test_checkpoint_consistency_ns() {
1✔
15
    // dozer_tracing::init_telemetry(false).unwrap();
1✔
16

1✔
17
    const MESSAGES_COUNT: u64 = 25_000;
1✔
18

1✔
19
    let mut dag = Dag::new();
1✔
20

1✔
21
    let sources: Vec<NodeHandle> = vec![
1✔
22
        NodeHandle::new(None, "src1".to_string()),
1✔
23
        NodeHandle::new(None, "src2".to_string()),
1✔
24
        NodeHandle::new(None, "src3".to_string()),
1✔
25
        NodeHandle::new(None, "src4".to_string()),
1✔
26
        NodeHandle::new(None, "src5".to_string()),
1✔
27
    ];
1✔
28

1✔
29
    let latch = Arc::new(AtomicBool::new(true));
1✔
30

31
    for src_handle in &sources {
6✔
32
        dag.add_source(
5✔
33
            src_handle.clone(),
5✔
34
            Arc::new(GeneratorSourceFactory::new(
5✔
35
                MESSAGES_COUNT,
5✔
36
                latch.clone(),
5✔
37
                true,
5✔
38
            )),
5✔
39
        );
5✔
40
    }
5✔
41

42
    // Create sources.len()-1 sub dags
43
    for i in 0..sources.len() - 1 {
4✔
44
        let mut child_dag = Dag::new();
4✔
45

4✔
46
        let proc_handle = NodeHandle::new(None, "proc".to_string());
4✔
47
        let sink_handle = NodeHandle::new(None, "sink".to_string());
4✔
48

4✔
49
        child_dag.add_processor(proc_handle.clone(), Arc::new(NoopJoinProcessorFactory {}));
4✔
50
        child_dag.add_sink(
4✔
51
            sink_handle.clone(),
4✔
52
            Arc::new(CountingSinkFactory::new(MESSAGES_COUNT * 2, latch.clone())),
4✔
53
        );
4✔
54
        chk!(child_dag.connect(
4✔
55
            Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
56
            Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
57
        ));
4✔
58

4✔
59
        // Merge the DAG with the parent dag
4✔
60
        dag.merge(Some(i as u16), child_dag);
4✔
61

4✔
62
        chk!(dag.connect(
4✔
63
            Endpoint::new(sources[i].clone(), GENERATOR_SOURCE_OUTPUT_PORT),
×
64
            Endpoint::new(NodeHandle::new(Some(i as u16), "proc".to_string()), 1),
×
65
        ));
4✔
66
        chk!(dag.connect(
4✔
67
            Endpoint::new(sources[i + 1].clone(), GENERATOR_SOURCE_OUTPUT_PORT),
×
68
            Endpoint::new(NodeHandle::new(Some(i as u16), "proc".to_string()), 2),
×
69
        ));
4✔
70
    }
4✔
71

×
72
    let tmp_dir = chk!(TempDir::new("test"));
1✔
73
    let mut executor = chk!(DagExecutor::new(
1✔
74
        dag,
×
75
        tmp_dir.path(),
×
76
        ExecutorOptions::default(),
×
77
        Arc::new(AtomicBool::new(true))
×
78
    ));
×
79

×
80
    chk!(executor.start());
×
81
    assert!(executor.join().is_ok());
1✔
82
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc