• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.08
/dozer-core/src/dag/tests/dag_base_create_errors.rs
1
use crate::chk;
2

3
use crate::dag::errors::ExecutionError;
4
use crate::dag::executor::{DagExecutor, ExecutorOptions};
5
use crate::dag::node::{
6
    NodeHandle, OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source,
7
    SourceFactory,
8
};
9
use crate::dag::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
10

11
use crate::dag::tests::dag_base_run::NoopProcessorFactory;
12
use crate::dag::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
13
use crate::dag::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
14

15
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
16

17
use std::collections::HashMap;
18
use std::sync::atomic::AtomicBool;
19
use std::sync::Arc;
20

21
use crate::dag::tests::app::NoneContext;
22
use tempdir::TempDir;
23

24
#[derive(Debug)]
×
25
struct CreateErrSourceFactory {
26
    panic: bool,
27
}
28

29
impl CreateErrSourceFactory {
30
    pub fn new(panic: bool) -> Self {
2✔
31
        Self { panic }
2✔
32
    }
2✔
33
}
34

35
impl SourceFactory<NoneContext> for CreateErrSourceFactory {
36
    fn get_output_schema(
2✔
37
        &self,
2✔
38
        _port: &PortHandle,
2✔
39
    ) -> Result<(Schema, NoneContext), ExecutionError> {
2✔
40
        Ok((
2✔
41
            Schema::empty()
2✔
42
                .field(
2✔
43
                    FieldDefinition::new(
2✔
44
                        "id".to_string(),
2✔
45
                        FieldType::Int,
2✔
46
                        false,
2✔
47
                        SourceDefinition::Dynamic,
2✔
48
                    ),
2✔
49
                    true,
2✔
50
                )
2✔
51
                .clone(),
2✔
52
            NoneContext {},
2✔
53
        ))
2✔
54
    }
2✔
55

56
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
6✔
57
        Ok(vec![OutputPortDef::new(
6✔
58
            DEFAULT_PORT_HANDLE,
6✔
59
            OutputPortType::Stateless,
6✔
60
        )])
6✔
61
    }
6✔
62

63
    fn prepare(
×
64
        &self,
×
65
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
66
    ) -> Result<(), ExecutionError> {
×
67
        Ok(())
×
68
    }
×
69

70
    fn build(
2✔
71
        &self,
2✔
72
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
73
    ) -> Result<Box<dyn Source>, ExecutionError> {
2✔
74
        if self.panic {
2✔
75
            panic!("Generated error");
1✔
76
        } else {
77
            Err(ExecutionError::InvalidOperation(
1✔
78
                "Generated Error".to_string(),
1✔
79
            ))
1✔
80
        }
1✔
81
    }
1✔
82
}
83

84
#[test]
1✔
85
#[should_panic]
86
fn test_create_src_err() {
1✔
87
    let count: u64 = 1_000_000;
1✔
88

1✔
89
    let mut dag = Dag::new();
1✔
90
    let latch = Arc::new(AtomicBool::new(true));
1✔
91

1✔
92
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
93
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
94
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
95

1✔
96
    dag.add_source(
1✔
97
        source_handle.clone(),
1✔
98
        Arc::new(CreateErrSourceFactory::new(false)),
1✔
99
    );
1✔
100
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
101
    dag.add_sink(
1✔
102
        sink_handle.clone(),
1✔
103
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
104
    );
1✔
105

×
106
    chk!(dag.connect(
×
107
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
×
108
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
109
    ));
×
110

×
111
    chk!(dag.connect(
×
112
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
113
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
114
    ));
×
115

×
116
    let tmp_dir = chk!(TempDir::new("test"));
1✔
117
    let mut executor = chk!(DagExecutor::new(
1✔
118
        dag,
×
119
        tmp_dir.path(),
×
120
        ExecutorOptions::default(),
×
121
        Arc::new(AtomicBool::new(true))
×
122
    ));
×
123

×
124
    chk!(executor.start());
×
125
    assert!(executor.join().is_err());
1✔
126
}
1✔
127

×
128
#[test]
1✔
129
#[should_panic]
×
130
fn test_create_src_panic() {
1✔
131
    let count: u64 = 1_000_000;
1✔
132

1✔
133
    let mut dag = Dag::new();
1✔
134
    let latch = Arc::new(AtomicBool::new(true));
1✔
135

1✔
136
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
137
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
138
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
139

1✔
140
    dag.add_source(
1✔
141
        source_handle.clone(),
1✔
142
        Arc::new(CreateErrSourceFactory::new(true)),
1✔
143
    );
1✔
144
    dag.add_processor(proc_handle.clone(), Arc::new(NoopProcessorFactory {}));
1✔
145
    dag.add_sink(
1✔
146
        sink_handle.clone(),
1✔
147
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
148
    );
1✔
149

×
150
    chk!(dag.connect(
×
151
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
×
152
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
153
    ));
×
154

×
155
    chk!(dag.connect(
×
156
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
157
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
158
    ));
×
159

×
160
    let tmp_dir = chk!(TempDir::new("test"));
1✔
161
    let mut executor = chk!(DagExecutor::new(
1✔
162
        dag,
×
163
        tmp_dir.path(),
×
164
        ExecutorOptions::default(),
×
165
        Arc::new(AtomicBool::new(true))
×
166
    ));
×
167

×
168
    chk!(executor.start());
×
169
    assert!(executor.join().is_err());
1✔
170
}
1✔
171

×
172
#[derive(Debug)]
×
173
struct CreateErrProcessorFactory {
174
    panic: bool,
×
175
}
×
176

×
177
impl CreateErrProcessorFactory {
178
    pub fn new(panic: bool) -> Self {
2✔
179
        Self { panic }
2✔
180
    }
2✔
181
}
182

183
impl ProcessorFactory<NoneContext> for CreateErrProcessorFactory {
184
    fn get_output_schema(
2✔
185
        &self,
2✔
186
        _port: &PortHandle,
2✔
187
        _input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
2✔
188
    ) -> Result<(Schema, NoneContext), ExecutionError> {
2✔
189
        Ok((
2✔
190
            Schema::empty()
2✔
191
                .field(
2✔
192
                    FieldDefinition::new(
2✔
193
                        "id".to_string(),
2✔
194
                        FieldType::Int,
2✔
195
                        false,
2✔
196
                        SourceDefinition::Dynamic,
2✔
197
                    ),
2✔
198
                    true,
2✔
199
                )
2✔
200
                .clone(),
2✔
201
            NoneContext {},
2✔
202
        ))
2✔
203
    }
2✔
204

×
205
    fn get_input_ports(&self) -> Vec<PortHandle> {
4✔
206
        vec![DEFAULT_PORT_HANDLE]
4✔
207
    }
4✔
208

×
209
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
4✔
210
        vec![OutputPortDef::new(
4✔
211
            DEFAULT_PORT_HANDLE,
4✔
212
            OutputPortType::Stateless,
4✔
213
        )]
4✔
214
    }
4✔
215

×
216
    fn prepare(
×
217
        &self,
×
218
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
219
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
220
    ) -> Result<(), ExecutionError> {
×
221
        Ok(())
×
222
    }
×
223

×
224
    fn build(
2✔
225
        &self,
2✔
226
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
227
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
228
    ) -> Result<Box<dyn Processor>, ExecutionError> {
2✔
229
        if self.panic {
2✔
230
            panic!("Generated error");
1✔
231
        } else {
×
232
            Err(ExecutionError::InvalidOperation(
1✔
233
                "Generated Error".to_string(),
1✔
234
            ))
1✔
235
        }
1✔
236
    }
1✔
237
}
238

×
239
#[test]
1✔
240
#[should_panic]
×
241
fn test_create_proc_err() {
1✔
242
    let count: u64 = 1_000_000;
1✔
243

1✔
244
    let mut dag = Dag::new();
1✔
245
    let latch = Arc::new(AtomicBool::new(true));
1✔
246

1✔
247
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
248
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
249
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
250

1✔
251
    dag.add_source(
1✔
252
        source_handle.clone(),
1✔
253
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
254
    );
1✔
255
    dag.add_processor(
1✔
256
        proc_handle.clone(),
1✔
257
        Arc::new(CreateErrProcessorFactory::new(false)),
1✔
258
    );
1✔
259
    dag.add_sink(
1✔
260
        sink_handle.clone(),
1✔
261
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
262
    );
1✔
263

×
264
    chk!(dag.connect(
×
265
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
266
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
267
    ));
×
268

×
269
    chk!(dag.connect(
×
270
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
271
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
272
    ));
×
273

274
    let tmp_dir = chk!(TempDir::new("test"));
1✔
275
    let mut executor = chk!(DagExecutor::new(
1✔
276
        dag,
×
277
        tmp_dir.path(),
×
278
        ExecutorOptions::default(),
×
279
        Arc::new(AtomicBool::new(true))
×
280
    ));
×
281

×
282
    chk!(executor.start());
×
283
    assert!(executor.join().is_err());
1✔
284
}
1✔
285

×
286
#[test]
1✔
287
#[should_panic]
×
288
fn test_create_proc_panic() {
1✔
289
    let count: u64 = 1_000_000;
1✔
290

1✔
291
    let mut dag = Dag::new();
1✔
292
    let latch = Arc::new(AtomicBool::new(true));
1✔
293

1✔
294
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
295
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
296
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
297

1✔
298
    dag.add_source(
1✔
299
        source_handle.clone(),
1✔
300
        Arc::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
301
    );
1✔
302
    dag.add_processor(
1✔
303
        proc_handle.clone(),
1✔
304
        Arc::new(CreateErrProcessorFactory::new(true)),
1✔
305
    );
1✔
306
    dag.add_sink(
1✔
307
        sink_handle.clone(),
1✔
308
        Arc::new(CountingSinkFactory::new(count, latch)),
1✔
309
    );
1✔
310

×
311
    chk!(dag.connect(
×
312
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
×
313
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
×
314
    ));
×
315

×
316
    chk!(dag.connect(
×
317
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
×
318
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
×
319
    ));
×
320

×
321
    let tmp_dir = chk!(TempDir::new("test"));
1✔
322
    let mut executor = chk!(DagExecutor::new(
1✔
323
        dag,
×
324
        tmp_dir.path(),
×
325
        ExecutorOptions::default(),
×
326
        Arc::new(AtomicBool::new(true))
×
327
    ));
×
328

×
329
    chk!(executor.start());
×
330
    assert!(executor.join().is_err());
1✔
331
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc