• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.86
/dozer-core/src/dag/tests/checkpoint.rs
1
use crate::chk;
2
use crate::dag::dag_metadata::{Consistency, DagMetadataManager};
3
use crate::dag::epoch::OpIdentifier;
4
use crate::dag::executor::{DagExecutor, ExecutorOptions};
5
use crate::dag::node::NodeHandle;
6
use crate::dag::tests::dag_base_run::NoopJoinProcessorFactory;
7
use crate::dag::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
8
use crate::dag::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
9
use crate::dag::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
10
use crate::storage::lmdb_storage::LmdbEnvironmentManager;
11

12
use std::collections::HashMap;
13
use std::sync::atomic::AtomicBool;
14
use std::sync::Arc;
15

16
use tempdir::TempDir;
17

18
#[test]
1✔
19
fn test_checkpoint_consistency() {
1✔
20
    //  dozer_tracing::init_telemetry(false).unwrap();
1✔
21
    let mut dag = Dag::new();
1✔
22
    let latch = Arc::new(AtomicBool::new(true));
1✔
23

1✔
24
    const SRC1_MSG_COUNT: u64 = 5000;
1✔
25
    const SRC2_MSG_COUNT: u64 = 5000;
1✔
26

1✔
27
    const SRC1_HANDLE_ID: &str = "SRC1";
1✔
28
    const SRC2_HANDLE_ID: &str = "SRC2";
1✔
29
    const PROC_HANDLE_ID: &str = "PROC";
1✔
30
    const SINK_HANDLE_ID: &str = "SINK";
1✔
31

1✔
32
    let source1_handle = NodeHandle::new(Some(1), SRC1_HANDLE_ID.to_string());
1✔
33
    let source2_handle = NodeHandle::new(Some(1), SRC2_HANDLE_ID.to_string());
1✔
34
    let proc_handle = NodeHandle::new(Some(1), PROC_HANDLE_ID.to_string());
1✔
35
    let sink_handle = NodeHandle::new(Some(1), SINK_HANDLE_ID.to_string());
1✔
36

1✔
37
    dag.add_source(
1✔
38
        source1_handle.clone(),
1✔
39
        Arc::new(GeneratorSourceFactory::new(
1✔
40
            SRC1_MSG_COUNT,
1✔
41
            latch.clone(),
1✔
42
            true,
1✔
43
        )),
1✔
44
    );
1✔
45
    dag.add_source(
1✔
46
        source2_handle.clone(),
1✔
47
        Arc::new(GeneratorSourceFactory::new(
1✔
48
            SRC2_MSG_COUNT,
1✔
49
            latch.clone(),
1✔
50
            true,
1✔
51
        )),
1✔
52
    );
1✔
53
    dag.add_processor(proc_handle.clone(), Arc::new(NoopJoinProcessorFactory {}));
1✔
54
    dag.add_sink(
1✔
55
        sink_handle.clone(),
1✔
56
        Arc::new(CountingSinkFactory::new(
1✔
57
            SRC1_MSG_COUNT + SRC2_MSG_COUNT,
1✔
58
            latch,
1✔
59
        )),
1✔
60
    );
1✔
61

×
62
    chk!(dag.connect(
×
63
        Endpoint::new(source1_handle.clone(), GENERATOR_SOURCE_OUTPUT_PORT),
×
64
        Endpoint::new(proc_handle.clone(), 1),
×
65
    ));
×
66

×
67
    chk!(dag.connect(
×
68
        Endpoint::new(source2_handle.clone(), GENERATOR_SOURCE_OUTPUT_PORT),
×
69
        Endpoint::new(proc_handle.clone(), 2),
×
70
    ));
×
71

×
72
    chk!(dag.connect(
×
73
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
74
        Endpoint::new(sink_handle.clone(), COUNTING_SINK_INPUT_PORT),
×
75
    ));
×
76

×
77
    let tmp_dir = chk!(TempDir::new("test"));
1✔
78
    let mut executor = chk!(DagExecutor::new(
1✔
79
        dag.clone(),
×
80
        tmp_dir.path(),
×
81
        ExecutorOptions::default(),
×
82
        Arc::new(AtomicBool::new(true))
×
83
    ));
×
84

×
85
    chk!(executor.start());
×
86
    assert!(executor.join().is_ok());
1✔
87

88
    let r = chk!(DagMetadataManager::new(&dag, tmp_dir.path()));
1✔
89
    let c = r.get_checkpoint_consistency().unwrap();
1✔
90

1✔
91
    match c.get(&source1_handle).unwrap() {
1✔
92
        Consistency::PartiallyConsistent(_r) => panic!("Wrong consistency"),
×
93
        Consistency::FullyConsistent(r) => {
1✔
94
            assert_eq!(r.unwrap(), OpIdentifier::new(SRC1_MSG_COUNT, 0))
1✔
95
        }
×
96
    }
×
97

×
98
    match c.get(&source2_handle).unwrap() {
1✔
99
        Consistency::PartiallyConsistent(_r) => panic!("Wrong consistency"),
×
100
        Consistency::FullyConsistent(r) => {
1✔
101
            assert_eq!(r.unwrap(), OpIdentifier::new(SRC2_MSG_COUNT, 0))
1✔
102
        }
×
103
    }
×
104

×
105
    LmdbEnvironmentManager::remove(tmp_dir.path(), format!("{proc_handle}").as_str());
1✔
106
    let r = chk!(DagMetadataManager::new(&dag, tmp_dir.path()));
1✔
107
    let c = r.get_checkpoint_consistency().unwrap();
1✔
108

1✔
109
    let mut expected = HashMap::new();
1✔
110
    expected.insert(
1✔
111
        Some(OpIdentifier::new(SRC1_MSG_COUNT, 0)),
1✔
112
        vec![source1_handle.clone(), sink_handle.clone()],
1✔
113
    );
1✔
114
    expected.insert(None, vec![proc_handle.clone()]);
1✔
115
    match c.get(&source1_handle).unwrap() {
1✔
116
        Consistency::PartiallyConsistent(r) => assert_eq!(r, &expected),
1✔
117
        Consistency::FullyConsistent(_r) => panic!("Wrong consistency"),
×
118
    }
×
119

×
120
    let mut expected = HashMap::new();
1✔
121
    expected.insert(
1✔
122
        Some(OpIdentifier::new(SRC2_MSG_COUNT, 0)),
1✔
123
        vec![source2_handle.clone(), sink_handle],
1✔
124
    );
1✔
125
    expected.insert(None, vec![proc_handle]);
1✔
126
    match c.get(&source2_handle).unwrap() {
1✔
127
        Consistency::PartiallyConsistent(r) => assert_eq!(r, &expected),
1✔
128
        Consistency::FullyConsistent(_r) => panic!("Wrong consistency"),
×
129
    }
×
130
}
1✔
131

×
132
#[test]
1✔
133
fn test_checkpoint_consistency_resume() {
1✔
134
    //   dozer_tracing::init_telemetry(false).unwrap();
1✔
135
    let mut dag = Dag::new();
1✔
136
    let latch = Arc::new(AtomicBool::new(true));
1✔
137

1✔
138
    let source1_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
139
    let source2_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
140
    let proc_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
141
    let sink_handle = NodeHandle::new(Some(1), 4.to_string());
1✔
142

1✔
143
    dag.add_source(
1✔
144
        source1_handle.clone(),
1✔
145
        Arc::new(GeneratorSourceFactory::new(25_000, latch.clone(), true)),
1✔
146
    );
1✔
147
    dag.add_source(
1✔
148
        source2_handle.clone(),
1✔
149
        Arc::new(GeneratorSourceFactory::new(50_000, latch.clone(), true)),
1✔
150
    );
1✔
151
    dag.add_processor(proc_handle.clone(), Arc::new(NoopJoinProcessorFactory {}));
1✔
152
    dag.add_sink(
1✔
153
        sink_handle.clone(),
1✔
154
        Arc::new(CountingSinkFactory::new(50_000 + 25_000, latch)),
1✔
155
    );
1✔
156

×
157
    chk!(dag.connect(
×
158
        Endpoint::new(source1_handle.clone(), GENERATOR_SOURCE_OUTPUT_PORT),
×
159
        Endpoint::new(proc_handle.clone(), 1),
×
160
    ));
×
161

×
162
    chk!(dag.connect(
×
163
        Endpoint::new(source2_handle.clone(), GENERATOR_SOURCE_OUTPUT_PORT),
×
164
        Endpoint::new(proc_handle.clone(), 2),
×
165
    ));
×
166

×
167
    chk!(dag.connect(
×
168
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
169
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
170
    ));
×
171

×
172
    let tmp_dir = chk!(TempDir::new("test"));
1✔
173
    let mut executor = chk!(DagExecutor::new(
1✔
174
        dag.clone(),
×
175
        tmp_dir.path(),
×
176
        ExecutorOptions::default(),
×
177
        Arc::new(AtomicBool::new(true))
×
178
    ));
×
179

×
180
    chk!(executor.start());
×
181
    assert!(executor.join().is_ok());
1✔
182

×
183
    let r = chk!(DagMetadataManager::new(&dag, tmp_dir.path()));
1✔
184
    let c = r.get_checkpoint_consistency().unwrap();
1✔
185

1✔
186
    match c.get(&source1_handle).unwrap() {
1✔
187
        Consistency::PartiallyConsistent(_r) => panic!("Wrong consistency"),
×
188
        Consistency::FullyConsistent(r) => assert_eq!(r.unwrap(), OpIdentifier::new(25_000, 0)),
1✔
189
    }
×
190

×
191
    match c.get(&source2_handle).unwrap() {
1✔
192
        Consistency::PartiallyConsistent(_r) => panic!("Wrong consistency"),
×
193
        Consistency::FullyConsistent(r) => assert_eq!(r.unwrap(), OpIdentifier::new(50_000, 0)),
1✔
194
    }
×
195

×
196
    let mut dag = Dag::new();
1✔
197
    let latch = Arc::new(AtomicBool::new(true));
1✔
198

1✔
199
    let source1_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
200
    let source2_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
201
    let proc_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
202
    let sink_handle = NodeHandle::new(Some(1), 4.to_string());
1✔
203

1✔
204
    dag.add_source(
1✔
205
        source1_handle.clone(),
1✔
206
        Arc::new(GeneratorSourceFactory::new(25_000, latch.clone(), true)),
1✔
207
    );
1✔
208
    dag.add_source(
1✔
209
        source2_handle.clone(),
1✔
210
        Arc::new(GeneratorSourceFactory::new(50_000, latch.clone(), true)),
1✔
211
    );
1✔
212
    dag.add_processor(proc_handle.clone(), Arc::new(NoopJoinProcessorFactory {}));
1✔
213
    dag.add_sink(
1✔
214
        sink_handle.clone(),
1✔
215
        Arc::new(CountingSinkFactory::new(50_000 + 25_000, latch)),
1✔
216
    );
1✔
217

×
218
    chk!(dag.connect(
×
219
        Endpoint::new(source1_handle.clone(), GENERATOR_SOURCE_OUTPUT_PORT),
×
220
        Endpoint::new(proc_handle.clone(), 1),
×
221
    ));
×
222

×
223
    chk!(dag.connect(
×
224
        Endpoint::new(source2_handle.clone(), GENERATOR_SOURCE_OUTPUT_PORT),
×
225
        Endpoint::new(proc_handle.clone(), 2),
×
226
    ));
×
227

×
228
    chk!(dag.connect(
×
229
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
230
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
231
    ));
×
232

×
233
    let mut executor = chk!(DagExecutor::new(
1✔
234
        dag.clone(),
×
235
        tmp_dir.path(),
×
236
        ExecutorOptions::default(),
×
237
        Arc::new(AtomicBool::new(true))
×
238
    ));
×
239

×
240
    chk!(executor.start());
×
241
    assert!(executor.join().is_ok());
1✔
242

×
243
    let r = chk!(DagMetadataManager::new(&dag, tmp_dir.path()));
1✔
244
    let c = r.get_checkpoint_consistency().unwrap();
1✔
245

1✔
246
    match c.get(&source1_handle).unwrap() {
1✔
247
        Consistency::PartiallyConsistent(_r) => panic!("Wrong consistency"),
×
248
        Consistency::FullyConsistent(r) => assert_eq!(r.unwrap(), OpIdentifier::new(50_000, 0)),
1✔
249
    }
×
250

×
251
    match c.get(&source2_handle).unwrap() {
1✔
252
        Consistency::PartiallyConsistent(_r) => panic!("Wrong consistency"),
×
253
        Consistency::FullyConsistent(r) => assert_eq!(r.unwrap(), OpIdentifier::new(100_000, 0)),
1✔
254
    }
×
255
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc