• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5939715234

22 Aug 2023 01:47PM UTC coverage: 74.755% (-1.3%) from 76.052%
5939715234

push

github

web-flow
chore: Run e2e tests nightly (#1886)

* chore: Run e2e tests nightly

* chore: Run Dozer CI on default runners

46459 of 62148 relevant lines covered (74.76%)

40132.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.05
/dozer-core/src/tests/dag_base_create_errors.rs
1
use crate::checkpoint::create_checkpoint_factory_for_test;
2
use crate::executor::{DagExecutor, ExecutorOptions};
3
use crate::node::{
4
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
5
};
6
use crate::processor_record::ProcessorRecordStore;
7
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
8

9
use crate::tests::dag_base_run::NoopProcessorFactory;
10
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
11
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
12

13
use dozer_log::tokio;
14
use dozer_types::errors::internal::BoxedError;
15
use dozer_types::node::NodeHandle;
16
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
17

18
use std::collections::HashMap;
19
use std::sync::atomic::AtomicBool;
20
use std::sync::Arc;
21

22
use crate::tests::app::NoneContext;
×
23

24
#[derive(Debug)]
×
25
struct CreateErrSourceFactory {
26
    panic: bool,
27
}
28

×
29
impl CreateErrSourceFactory {
×
30
    pub fn new(panic: bool) -> Self {
2✔
31
        Self { panic }
2✔
32
    }
2✔
33
}
34

×
35
impl SourceFactory<NoneContext> for CreateErrSourceFactory {
×
36
    fn get_output_schema(&self, _port: &PortHandle) -> Result<(Schema, NoneContext), BoxedError> {
2✔
37
        Ok((
2✔
38
            Schema::default()
2✔
39
                .field(
2✔
40
                    FieldDefinition::new(
2✔
41
                        "id".to_string(),
2✔
42
                        FieldType::Int,
2✔
43
                        false,
2✔
44
                        SourceDefinition::Dynamic,
2✔
45
                    ),
2✔
46
                    true,
2✔
47
                )
2✔
48
                .clone(),
2✔
49
            NoneContext {},
2✔
50
        ))
2✔
51
    }
2✔
52

×
53
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
×
54
        "error".to_string()
×
55
    }
×
56

×
57
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
58
        vec![OutputPortDef::new(
6✔
59
            DEFAULT_PORT_HANDLE,
6✔
60
            OutputPortType::Stateless,
6✔
61
        )]
6✔
62
    }
6✔
63

×
64
    fn build(
2✔
65
        &self,
2✔
66
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
67
    ) -> Result<Box<dyn Source>, BoxedError> {
2✔
68
        if self.panic {
2✔
69
            panic!("Generated error");
1✔
70
        } else {
×
71
            Err("Generated Error".to_string().into())
1✔
72
        }
1✔
73
    }
1✔
74
}
×
75

76
#[tokio::test]
1✔
77
#[should_panic]
×
78
async fn test_create_src_err() {
1✔
79
    let count: u64 = 1_000_000;
1✔
80

1✔
81
    let mut dag = Dag::new();
1✔
82
    let latch = Arc::new(AtomicBool::new(true));
1✔
83

1✔
84
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
85
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
86
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
87

1✔
88
    dag.add_source(
1✔
89
        source_handle.clone(),
1✔
90
        Box::new(CreateErrSourceFactory::new(false)),
1✔
91
    );
1✔
92
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
93
    dag.add_sink(
1✔
94
        sink_handle.clone(),
1✔
95
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
96
    );
1✔
97

1✔
98
    dag.connect(
1✔
99
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
100
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
101
    )
1✔
102
    .unwrap();
1✔
103

1✔
104
    dag.connect(
1✔
105
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
106
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
107
    )
1✔
108
    .unwrap();
1✔
109

×
110
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
111
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
112
        .unwrap()
1✔
113
        .start(Arc::new(AtomicBool::new(true)))
1✔
114
        .unwrap()
1✔
115
        .join()
1✔
116
        .unwrap();
1✔
117
}
118

×
119
#[tokio::test]
1✔
120
#[should_panic]
×
121
async fn test_create_src_panic() {
1✔
122
    let count: u64 = 1_000_000;
1✔
123

1✔
124
    let mut dag = Dag::new();
1✔
125
    let latch = Arc::new(AtomicBool::new(true));
1✔
126

1✔
127
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
128
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
129
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
130

1✔
131
    dag.add_source(
1✔
132
        source_handle.clone(),
1✔
133
        Box::new(CreateErrSourceFactory::new(true)),
1✔
134
    );
1✔
135
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
136
    dag.add_sink(
1✔
137
        sink_handle.clone(),
1✔
138
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
139
    );
1✔
140

1✔
141
    dag.connect(
1✔
142
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
143
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
144
    )
1✔
145
    .unwrap();
1✔
146

1✔
147
    dag.connect(
1✔
148
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
149
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
150
    )
1✔
151
    .unwrap();
1✔
152

×
153
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
154
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
155
        .unwrap()
1✔
156
        .start(Arc::new(AtomicBool::new(true)))
1✔
157
        .unwrap()
1✔
158
        .join()
1✔
159
        .unwrap();
1✔
160
}
161

162
#[derive(Debug)]
×
163
struct CreateErrProcessorFactory {
164
    panic: bool,
×
165
}
×
166

×
167
impl CreateErrProcessorFactory {
168
    pub fn new(panic: bool) -> Self {
2✔
169
        Self { panic }
2✔
170
    }
2✔
171
}
×
172

×
173
impl ProcessorFactory<NoneContext> for CreateErrProcessorFactory {
174
    fn type_name(&self) -> String {
×
175
        "CreateErr".to_owned()
×
176
    }
×
177

×
178
    fn get_output_schema(
2✔
179
        &self,
2✔
180
        _port: &PortHandle,
2✔
181
        _input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
2✔
182
    ) -> Result<(Schema, NoneContext), BoxedError> {
2✔
183
        Ok((
2✔
184
            Schema::default()
2✔
185
                .field(
2✔
186
                    FieldDefinition::new(
2✔
187
                        "id".to_string(),
2✔
188
                        FieldType::Int,
2✔
189
                        false,
2✔
190
                        SourceDefinition::Dynamic,
2✔
191
                    ),
2✔
192
                    true,
2✔
193
                )
2✔
194
                .clone(),
2✔
195
            NoneContext {},
2✔
196
        ))
2✔
197
    }
2✔
198

199
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
200
        vec![DEFAULT_PORT_HANDLE]
8✔
201
    }
8✔
202

×
203
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
204
        vec![OutputPortDef::new(
6✔
205
            DEFAULT_PORT_HANDLE,
6✔
206
            OutputPortType::Stateless,
6✔
207
        )]
6✔
208
    }
6✔
209

×
210
    fn build(
2✔
211
        &self,
2✔
212
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
213
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
214
        _record_store: &ProcessorRecordStore,
2✔
215
    ) -> Result<Box<dyn Processor>, BoxedError> {
2✔
216
        if self.panic {
2✔
217
            panic!("Generated error");
1✔
218
        } else {
219
            Err("Generated Error".to_string().into())
1✔
220
        }
1✔
221
    }
1✔
222

223
    fn id(&self) -> String {
×
224
        "CreateErr".to_owned()
×
225
    }
×
226
}
×
227

×
228
#[tokio::test]
1✔
229
#[should_panic]
×
230
async fn test_create_proc_err() {
1✔
231
    let count: u64 = 1_000_000;
1✔
232

1✔
233
    let mut dag = Dag::new();
1✔
234
    let latch = Arc::new(AtomicBool::new(true));
1✔
235

1✔
236
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
237
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
238
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
239

1✔
240
    dag.add_source(
1✔
241
        source_handle.clone(),
1✔
242
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
243
    );
1✔
244
    dag.add_processor(
1✔
245
        proc_handle.clone(),
1✔
246
        Box::new(CreateErrProcessorFactory::new(false)),
1✔
247
    );
1✔
248
    dag.add_sink(
1✔
249
        sink_handle.clone(),
1✔
250
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
251
    );
1✔
252

1✔
253
    dag.connect(
1✔
254
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
255
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
256
    )
1✔
257
    .unwrap();
1✔
258

1✔
259
    dag.connect(
1✔
260
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
261
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
262
    )
1✔
263
    .unwrap();
1✔
264

×
265
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
266
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
267
        .unwrap()
1✔
268
        .start(Arc::new(AtomicBool::new(true)))
1✔
269
        .unwrap()
1✔
270
        .join()
1✔
271
        .unwrap();
1✔
272
}
×
273

×
274
#[tokio::test]
1✔
275
#[should_panic]
×
276
async fn test_create_proc_panic() {
1✔
277
    let count: u64 = 1_000_000;
1✔
278

1✔
279
    let mut dag = Dag::new();
1✔
280
    let latch = Arc::new(AtomicBool::new(true));
1✔
281

1✔
282
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
283
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
284
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
285

1✔
286
    dag.add_source(
1✔
287
        source_handle.clone(),
1✔
288
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
289
    );
1✔
290
    dag.add_processor(
1✔
291
        proc_handle.clone(),
1✔
292
        Box::new(CreateErrProcessorFactory::new(true)),
1✔
293
    );
1✔
294
    dag.add_sink(
1✔
295
        sink_handle.clone(),
1✔
296
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
297
    );
1✔
298

1✔
299
    dag.connect(
1✔
300
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
301
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
302
    )
1✔
303
    .unwrap();
1✔
304

1✔
305
    dag.connect(
1✔
306
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
307
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
308
    )
1✔
309
    .unwrap();
1✔
310

×
311
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
312
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
313
        .unwrap()
1✔
314
        .start(Arc::new(AtomicBool::new(true)))
1✔
315
        .unwrap()
1✔
316
        .join()
1✔
317
        .unwrap();
1✔
318
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc