• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4315007357

pending completion
4315007357

push

github

GitHub
fix: Sink should only be built after all source checkpoints are checked (#1112)

280 of 280 new or added lines in 24 files covered. (100.0%)

28292 of 38914 relevant lines covered (72.7%)

64132.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/dozer-core/src/tests/dag_base_create_errors.rs
1
use crate::chk;
2

3
use crate::errors::ExecutionError;
4
use crate::executor::{DagExecutor, ExecutorOptions};
5
use crate::node::{
6
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
7
};
8
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
9

10
use crate::tests::dag_base_run::NoopProcessorFactory;
11
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
12
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
13

14
use dozer_storage::lmdb_storage::LmdbExclusiveTransaction;
15
use dozer_types::node::NodeHandle;
16
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
17

18
use std::collections::HashMap;
19
use std::sync::atomic::AtomicBool;
20
use std::sync::Arc;
21

22
use crate::tests::app::NoneContext;
23
use tempdir::TempDir;
24

25
#[derive(Debug)]
×
26
struct CreateErrSourceFactory {
27
    panic: bool,
28
}
29

30
impl CreateErrSourceFactory {
31
    pub fn new(panic: bool) -> Self {
2✔
32
        Self { panic }
2✔
33
    }
2✔
34
}
35

36
impl SourceFactory<NoneContext> for CreateErrSourceFactory {
37
    fn get_output_schema(
2✔
38
        &self,
2✔
39
        _port: &PortHandle,
2✔
40
    ) -> Result<(Schema, NoneContext), ExecutionError> {
2✔
41
        Ok((
2✔
42
            Schema::empty()
2✔
43
                .field(
2✔
44
                    FieldDefinition::new(
2✔
45
                        "id".to_string(),
2✔
46
                        FieldType::Int,
2✔
47
                        false,
2✔
48
                        SourceDefinition::Dynamic,
2✔
49
                    ),
2✔
50
                    true,
2✔
51
                )
2✔
52
                .clone(),
2✔
53
            NoneContext {},
2✔
54
        ))
2✔
55
    }
2✔
56

57
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
6✔
58
        Ok(vec![OutputPortDef::new(
6✔
59
            DEFAULT_PORT_HANDLE,
6✔
60
            OutputPortType::Stateless,
6✔
61
        )])
6✔
62
    }
6✔
63

64
    fn build(
2✔
65
        &self,
2✔
66
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
67
    ) -> Result<Box<dyn Source>, ExecutionError> {
2✔
68
        if self.panic {
2✔
69
            panic!("Generated error");
1✔
70
        } else {
71
            Err(ExecutionError::InvalidOperation(
1✔
72
                "Generated Error".to_string(),
1✔
73
            ))
1✔
74
        }
1✔
75
    }
1✔
76
}
77

78
#[test]
1✔
79
#[should_panic]
80
fn test_create_src_err() {
1✔
81
    let count: u64 = 1_000_000;
1✔
82

1✔
83
    let mut dag = Dag::new();
1✔
84
    let latch = Arc::new(AtomicBool::new(true));
1✔
85

1✔
86
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
87
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
88
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
89

1✔
90
    dag.add_source(
1✔
91
        source_handle.clone(),
1✔
92
        Arc::new(CreateErrSourceFactory::new(false)),
1✔
93
    );
1✔
94
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
95
    dag.add_sink(
1✔
96
        sink_handle.clone(),
1✔
97
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
98
    );
1✔
99

1✔
100
    chk!(dag.connect(
1✔
101
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
×
102
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
103
    ));
1✔
104

1✔
105
    chk!(dag.connect(
1✔
106
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
107
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
108
    ));
×
109

1✔
110
    let tmp_dir = chk!(TempDir::new("test"));
1✔
111
    DagExecutor::new(
1✔
112
        dag,
1✔
113
        tmp_dir.path().to_path_buf(),
1✔
114
        ExecutorOptions::default(),
1✔
115
    )
1✔
116
    .unwrap()
1✔
117
    .start(Arc::new(AtomicBool::new(true)))
1✔
118
    .unwrap()
1✔
119
    .join()
1✔
120
    .unwrap();
1✔
121
}
1✔
122

123
#[test]
1✔
124
#[should_panic]
125
fn test_create_src_panic() {
1✔
126
    let count: u64 = 1_000_000;
1✔
127

1✔
128
    let mut dag = Dag::new();
1✔
129
    let latch = Arc::new(AtomicBool::new(true));
1✔
130

1✔
131
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
132
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
133
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
134

1✔
135
    dag.add_source(
1✔
136
        source_handle.clone(),
1✔
137
        Arc::new(CreateErrSourceFactory::new(true)),
1✔
138
    );
1✔
139
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
140
    dag.add_sink(
1✔
141
        sink_handle.clone(),
1✔
142
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
143
    );
1✔
144

1✔
145
    chk!(dag.connect(
1✔
146
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
×
147
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
148
    ));
1✔
149

1✔
150
    chk!(dag.connect(
1✔
151
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
152
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
153
    ));
×
154

1✔
155
    let tmp_dir = chk!(TempDir::new("test"));
1✔
156
    DagExecutor::new(
1✔
157
        dag,
1✔
158
        tmp_dir.path().to_path_buf(),
1✔
159
        ExecutorOptions::default(),
1✔
160
    )
1✔
161
    .unwrap()
1✔
162
    .start(Arc::new(AtomicBool::new(true)))
1✔
163
    .unwrap()
1✔
164
    .join()
1✔
165
    .unwrap();
1✔
166
}
1✔
167

168
#[derive(Debug)]
×
169
struct CreateErrProcessorFactory {
170
    panic: bool,
171
}
172

173
impl CreateErrProcessorFactory {
174
    pub fn new(panic: bool) -> Self {
2✔
175
        Self { panic }
2✔
176
    }
2✔
177
}
178

179
impl ProcessorFactory<NoneContext> for CreateErrProcessorFactory {
180
    fn get_output_schema(
2✔
181
        &self,
2✔
182
        _port: &PortHandle,
2✔
183
        _input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
2✔
184
    ) -> Result<(Schema, NoneContext), ExecutionError> {
2✔
185
        Ok((
2✔
186
            Schema::empty()
2✔
187
                .field(
2✔
188
                    FieldDefinition::new(
2✔
189
                        "id".to_string(),
2✔
190
                        FieldType::Int,
2✔
191
                        false,
2✔
192
                        SourceDefinition::Dynamic,
2✔
193
                    ),
2✔
194
                    true,
2✔
195
                )
2✔
196
                .clone(),
2✔
197
            NoneContext {},
2✔
198
        ))
2✔
199
    }
2✔
200

201
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
202
        vec![DEFAULT_PORT_HANDLE]
8✔
203
    }
8✔
204

205
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
206
        vec![OutputPortDef::new(
6✔
207
            DEFAULT_PORT_HANDLE,
6✔
208
            OutputPortType::Stateless,
6✔
209
        )]
6✔
210
    }
6✔
211

212
    fn build(
2✔
213
        &self,
2✔
214
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
215
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
216
        _txn: &mut LmdbExclusiveTransaction,
2✔
217
    ) -> Result<Box<dyn Processor>, ExecutionError> {
2✔
218
        if self.panic {
2✔
219
            panic!("Generated error");
1✔
220
        } else {
221
            Err(ExecutionError::InvalidOperation(
1✔
222
                "Generated Error".to_string(),
1✔
223
            ))
1✔
224
        }
1✔
225
    }
1✔
226
}
227

228
#[test]
1✔
229
#[should_panic]
230
fn test_create_proc_err() {
1✔
231
    let count: u64 = 1_000_000;
1✔
232

1✔
233
    let mut dag = Dag::new();
1✔
234
    let latch = Arc::new(AtomicBool::new(true));
1✔
235

1✔
236
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
237
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
238
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
239

1✔
240
    dag.add_source(
1✔
241
        source_handle.clone(),
1✔
242
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
243
    );
1✔
244
    dag.add_processor(
1✔
245
        proc_handle.clone(),
1✔
246
        Arc::new(CreateErrProcessorFactory::new(false)),
1✔
247
    );
1✔
248
    dag.add_sink(
1✔
249
        sink_handle.clone(),
1✔
250
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
251
    );
1✔
252

1✔
253
    chk!(dag.connect(
1✔
254
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
255
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
256
    ));
1✔
257

1✔
258
    chk!(dag.connect(
1✔
259
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
260
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
261
    ));
×
262

1✔
263
    let tmp_dir = chk!(TempDir::new("test"));
1✔
264
    DagExecutor::new(
1✔
265
        dag,
1✔
266
        tmp_dir.path().to_path_buf(),
1✔
267
        ExecutorOptions::default(),
1✔
268
    )
1✔
269
    .unwrap()
1✔
270
    .start(Arc::new(AtomicBool::new(true)))
1✔
271
    .unwrap()
1✔
272
    .join()
1✔
273
    .unwrap();
1✔
274
}
1✔
275

276
#[test]
1✔
277
#[should_panic]
278
fn test_create_proc_panic() {
1✔
279
    let count: u64 = 1_000_000;
1✔
280

1✔
281
    let mut dag = Dag::new();
1✔
282
    let latch = Arc::new(AtomicBool::new(true));
1✔
283

1✔
284
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
285
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
286
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
287

1✔
288
    dag.add_source(
1✔
289
        source_handle.clone(),
1✔
290
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
291
    );
1✔
292
    dag.add_processor(
1✔
293
        proc_handle.clone(),
1✔
294
        Arc::new(CreateErrProcessorFactory::new(true)),
1✔
295
    );
1✔
296
    dag.add_sink(
1✔
297
        sink_handle.clone(),
1✔
298
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
299
    );
1✔
300

1✔
301
    chk!(dag.connect(
1✔
302
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
303
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
304
    ));
1✔
305

1✔
306
    chk!(dag.connect(
1✔
307
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
308
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
309
    ));
×
310

1✔
311
    let tmp_dir = chk!(TempDir::new("test"));
1✔
312
    DagExecutor::new(
1✔
313
        dag,
1✔
314
        tmp_dir.path().to_path_buf(),
1✔
315
        ExecutorOptions::default(),
1✔
316
    )
1✔
317
    .unwrap()
1✔
318
    .start(Arc::new(AtomicBool::new(true)))
1✔
319
    .unwrap()
1✔
320
    .join()
1✔
321
    .unwrap();
1✔
322
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc