• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5863879382

pending completion
5863879382

Pull #1854

github

chubei
feat: Write record store checkpoints
Pull Request #1854: feat: Write record store checkpoints

598 of 598 new or added lines in 29 files covered. (100.0%)

46228 of 59930 relevant lines covered (77.14%)

41576.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.88
/dozer-core/src/tests/dag_base_create_errors.rs
1
use crate::executor::{DagExecutor, ExecutorOptions};
2
use crate::node::{
3
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
4
};
5
use crate::processor_record::ProcessorRecordStore;
6
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
7

8
use crate::tests::dag_base_run::NoopProcessorFactory;
9
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
10
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
11

12
use dozer_log::tokio;
13
use dozer_types::errors::internal::BoxedError;
14
use dozer_types::node::NodeHandle;
15
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
16
use tempdir::TempDir;
17

18
use std::collections::HashMap;
19
use std::sync::atomic::AtomicBool;
20
use std::sync::Arc;
21

22
use crate::tests::app::NoneContext;
×
23

24
#[derive(Debug)]
×
25
struct CreateErrSourceFactory {
26
    panic: bool,
27
}
28

×
29
impl CreateErrSourceFactory {
×
30
    pub fn new(panic: bool) -> Self {
2✔
31
        Self { panic }
2✔
32
    }
2✔
33
}
34

×
35
impl SourceFactory<NoneContext> for CreateErrSourceFactory {
×
36
    fn get_output_schema(&self, _port: &PortHandle) -> Result<(Schema, NoneContext), BoxedError> {
2✔
37
        Ok((
2✔
38
            Schema::default()
2✔
39
                .field(
2✔
40
                    FieldDefinition::new(
2✔
41
                        "id".to_string(),
2✔
42
                        FieldType::Int,
2✔
43
                        false,
2✔
44
                        SourceDefinition::Dynamic,
2✔
45
                    ),
2✔
46
                    true,
2✔
47
                )
2✔
48
                .clone(),
2✔
49
            NoneContext {},
2✔
50
        ))
2✔
51
    }
2✔
52

×
53
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
×
54
        "error".to_string()
×
55
    }
×
56

×
57
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
58
        vec![OutputPortDef::new(
6✔
59
            DEFAULT_PORT_HANDLE,
6✔
60
            OutputPortType::Stateless,
6✔
61
        )]
6✔
62
    }
6✔
63

×
64
    fn build(
2✔
65
        &self,
2✔
66
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
67
    ) -> Result<Box<dyn Source>, BoxedError> {
2✔
68
        if self.panic {
2✔
69
            panic!("Generated error");
1✔
70
        } else {
×
71
            Err("Generated Error".to_string().into())
1✔
72
        }
1✔
73
    }
1✔
74
}
×
75

76
#[tokio::test]
1✔
77
#[should_panic]
×
78
async fn test_create_src_err() {
1✔
79
    let count: u64 = 1_000_000;
1✔
80

1✔
81
    let mut dag = Dag::new();
1✔
82
    let latch = Arc::new(AtomicBool::new(true));
1✔
83

1✔
84
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
85
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
86
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
87

1✔
88
    dag.add_source(
1✔
89
        source_handle.clone(),
1✔
90
        Box::new(CreateErrSourceFactory::new(false)),
1✔
91
    );
1✔
92
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
93
    dag.add_sink(
1✔
94
        sink_handle.clone(),
1✔
95
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
96
    );
1✔
97

1✔
98
    dag.connect(
1✔
99
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
100
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
101
    )
1✔
102
    .unwrap();
1✔
103

1✔
104
    dag.connect(
1✔
105
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
106
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
107
    )
1✔
108
    .unwrap();
1✔
109

1✔
110
    let temp_dir = TempDir::new("test_create_src_err").unwrap();
1✔
111
    DagExecutor::new(
1✔
112
        dag,
1✔
113
        temp_dir.path().to_str().unwrap().to_string(),
1✔
114
        ExecutorOptions::default(),
1✔
115
    )
1✔
116
    .await
1✔
117
    .unwrap()
1✔
118
    .start(Arc::new(AtomicBool::new(true)))
1✔
119
    .unwrap()
1✔
120
    .join()
1✔
121
    .unwrap();
1✔
122
}
×
123

×
124
#[tokio::test]
1✔
125
#[should_panic]
×
126
async fn test_create_src_panic() {
1✔
127
    let count: u64 = 1_000_000;
1✔
128

1✔
129
    let mut dag = Dag::new();
1✔
130
    let latch = Arc::new(AtomicBool::new(true));
1✔
131

1✔
132
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
133
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
134
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
135

1✔
136
    dag.add_source(
1✔
137
        source_handle.clone(),
1✔
138
        Box::new(CreateErrSourceFactory::new(true)),
1✔
139
    );
1✔
140
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
141
    dag.add_sink(
1✔
142
        sink_handle.clone(),
1✔
143
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
144
    );
1✔
145

1✔
146
    dag.connect(
1✔
147
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
148
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
149
    )
1✔
150
    .unwrap();
1✔
151

1✔
152
    dag.connect(
1✔
153
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
154
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
155
    )
1✔
156
    .unwrap();
1✔
157

1✔
158
    let temp_dir = TempDir::new("test_create_src_panic").unwrap();
1✔
159
    DagExecutor::new(
1✔
160
        dag,
1✔
161
        temp_dir.path().to_str().unwrap().to_string(),
1✔
162
        ExecutorOptions::default(),
1✔
163
    )
1✔
164
    .await
1✔
165
    .unwrap()
×
166
    .start(Arc::new(AtomicBool::new(true)))
×
167
    .unwrap()
×
168
    .join()
×
169
    .unwrap();
×
170
}
×
171

×
172
#[derive(Debug)]
×
173
struct CreateErrProcessorFactory {
174
    panic: bool,
×
175
}
×
176

×
177
impl CreateErrProcessorFactory {
×
178
    pub fn new(panic: bool) -> Self {
2✔
179
        Self { panic }
2✔
180
    }
2✔
181
}
×
182

×
183
impl ProcessorFactory<NoneContext> for CreateErrProcessorFactory {
×
184
    fn type_name(&self) -> String {
×
185
        "CreateErr".to_owned()
×
186
    }
×
187

×
188
    fn get_output_schema(
2✔
189
        &self,
2✔
190
        _port: &PortHandle,
2✔
191
        _input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
2✔
192
    ) -> Result<(Schema, NoneContext), BoxedError> {
2✔
193
        Ok((
2✔
194
            Schema::default()
2✔
195
                .field(
2✔
196
                    FieldDefinition::new(
2✔
197
                        "id".to_string(),
2✔
198
                        FieldType::Int,
2✔
199
                        false,
2✔
200
                        SourceDefinition::Dynamic,
2✔
201
                    ),
2✔
202
                    true,
2✔
203
                )
2✔
204
                .clone(),
2✔
205
            NoneContext {},
2✔
206
        ))
2✔
207
    }
2✔
208

×
209
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
210
        vec![DEFAULT_PORT_HANDLE]
8✔
211
    }
8✔
212

×
213
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
214
        vec![OutputPortDef::new(
6✔
215
            DEFAULT_PORT_HANDLE,
6✔
216
            OutputPortType::Stateless,
6✔
217
        )]
6✔
218
    }
6✔
219

×
220
    fn build(
2✔
221
        &self,
2✔
222
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
223
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
224
        _record_store: &ProcessorRecordStore,
2✔
225
    ) -> Result<Box<dyn Processor>, BoxedError> {
2✔
226
        if self.panic {
2✔
227
            panic!("Generated error");
1✔
228
        } else {
×
229
            Err("Generated Error".to_string().into())
1✔
230
        }
1✔
231
    }
1✔
232

×
233
    fn id(&self) -> String {
×
234
        "CreateErr".to_owned()
×
235
    }
×
236
}
×
237

×
238
#[tokio::test]
1✔
239
#[should_panic]
×
240
async fn test_create_proc_err() {
1✔
241
    let count: u64 = 1_000_000;
1✔
242

1✔
243
    let mut dag = Dag::new();
1✔
244
    let latch = Arc::new(AtomicBool::new(true));
1✔
245

1✔
246
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
247
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
248
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
249

1✔
250
    dag.add_source(
1✔
251
        source_handle.clone(),
1✔
252
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
253
    );
1✔
254
    dag.add_processor(
1✔
255
        proc_handle.clone(),
1✔
256
        Box::new(CreateErrProcessorFactory::new(false)),
1✔
257
    );
1✔
258
    dag.add_sink(
1✔
259
        sink_handle.clone(),
1✔
260
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
261
    );
1✔
262

1✔
263
    dag.connect(
1✔
264
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
265
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
266
    )
1✔
267
    .unwrap();
1✔
268

1✔
269
    dag.connect(
1✔
270
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
271
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
272
    )
1✔
273
    .unwrap();
1✔
274

1✔
275
    let temp_dir = TempDir::new("test_create_proc_err").unwrap();
1✔
276
    DagExecutor::new(
1✔
277
        dag,
1✔
278
        temp_dir.path().to_str().unwrap().to_string(),
1✔
279
        ExecutorOptions::default(),
1✔
280
    )
1✔
281
    .await
1✔
282
    .unwrap()
1✔
283
    .start(Arc::new(AtomicBool::new(true)))
1✔
284
    .unwrap()
1✔
285
    .join()
1✔
286
    .unwrap();
1✔
287
}
×
288

×
289
#[tokio::test]
1✔
290
#[should_panic]
×
291
async fn test_create_proc_panic() {
1✔
292
    let count: u64 = 1_000_000;
1✔
293

1✔
294
    let mut dag = Dag::new();
1✔
295
    let latch = Arc::new(AtomicBool::new(true));
1✔
296

1✔
297
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
298
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
299
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
300

1✔
301
    dag.add_source(
1✔
302
        source_handle.clone(),
1✔
303
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
304
    );
1✔
305
    dag.add_processor(
1✔
306
        proc_handle.clone(),
1✔
307
        Box::new(CreateErrProcessorFactory::new(true)),
1✔
308
    );
1✔
309
    dag.add_sink(
1✔
310
        sink_handle.clone(),
1✔
311
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
312
    );
1✔
313

1✔
314
    dag.connect(
1✔
315
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
316
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
317
    )
1✔
318
    .unwrap();
1✔
319

1✔
320
    dag.connect(
1✔
321
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
322
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
323
    )
1✔
324
    .unwrap();
1✔
325

1✔
326
    let temp_dir = TempDir::new("test_create_proc_panic").unwrap();
1✔
327
    DagExecutor::new(
1✔
328
        dag,
1✔
329
        temp_dir.path().to_str().unwrap().to_string(),
1✔
330
        ExecutorOptions::default(),
1✔
331
    )
1✔
332
    .await
1✔
333
    .unwrap()
×
334
    .start(Arc::new(AtomicBool::new(true)))
×
335
    .unwrap()
×
336
    .join()
×
337
    .unwrap();
×
338
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc