• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6011779781

29 Aug 2023 11:17AM UTC coverage: 76.528% (-1.5%) from 78.07%
6011779781

push

github

web-flow
fix: Include connection type in `GenerateDot`. Fix `AggregationProcessorFactory::type_name` (#1934)

170 of 170 new or added lines in 5 files covered. (100.0%)

49017 of 64051 relevant lines covered (76.53%)

48279.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.62
/dozer-core/src/tests/dag_base_create_errors.rs
1
use crate::checkpoint::create_checkpoint_factory_for_test;
2
use crate::executor::{DagExecutor, ExecutorOptions};
3
use crate::node::{
4
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
5
};
6
use crate::processor_record::ProcessorRecordStore;
7
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
8

9
use crate::tests::dag_base_run::NoopProcessorFactory;
10
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
11
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
12

13
use dozer_log::tokio;
14
use dozer_types::errors::internal::BoxedError;
15
use dozer_types::node::NodeHandle;
16
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
17

18
use std::collections::HashMap;
19
use std::sync::atomic::AtomicBool;
20
use std::sync::Arc;
21

22
#[derive(Debug)]
×
23
struct CreateErrSourceFactory {
24
    panic: bool,
×
25
}
26

27
impl CreateErrSourceFactory {
28
    pub fn new(panic: bool) -> Self {
2✔
29
        Self { panic }
2✔
30
    }
2✔
31
}
×
32

×
33
impl SourceFactory for CreateErrSourceFactory {
34
    fn get_output_schema(&self, _port: &PortHandle) -> Result<Schema, BoxedError> {
2✔
35
        Ok(Schema::default()
2✔
36
            .field(
2✔
37
                FieldDefinition::new(
2✔
38
                    "id".to_string(),
2✔
39
                    FieldType::Int,
2✔
40
                    false,
2✔
41
                    SourceDefinition::Dynamic,
2✔
42
                ),
2✔
43
                true,
2✔
44
            )
2✔
45
            .clone())
2✔
46
    }
2✔
47

×
48
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
×
49
        "error".to_string()
×
50
    }
×
51

×
52
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
53
        vec![OutputPortDef::new(
6✔
54
            DEFAULT_PORT_HANDLE,
6✔
55
            OutputPortType::Stateless,
6✔
56
        )]
6✔
57
    }
6✔
58

×
59
    fn build(
2✔
60
        &self,
2✔
61
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
62
    ) -> Result<Box<dyn Source>, BoxedError> {
2✔
63
        if self.panic {
2✔
64
            panic!("Generated error");
1✔
65
        } else {
×
66
            Err("Generated Error".to_string().into())
1✔
67
        }
1✔
68
    }
1✔
69
}
×
70

71
#[tokio::test]
1✔
72
#[should_panic]
×
73
async fn test_create_src_err() {
1✔
74
    let count: u64 = 1_000_000;
1✔
75

1✔
76
    let mut dag = Dag::new();
1✔
77
    let latch = Arc::new(AtomicBool::new(true));
1✔
78

1✔
79
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
80
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
81
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
82

1✔
83
    dag.add_source(
1✔
84
        source_handle.clone(),
1✔
85
        Box::new(CreateErrSourceFactory::new(false)),
1✔
86
    );
1✔
87
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
88
    dag.add_sink(
1✔
89
        sink_handle.clone(),
1✔
90
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
91
    );
1✔
92

1✔
93
    dag.connect(
1✔
94
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
95
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
96
    )
1✔
97
    .unwrap();
1✔
98

1✔
99
    dag.connect(
1✔
100
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
101
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
102
    )
1✔
103
    .unwrap();
1✔
104

×
105
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
106
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
107
        .unwrap()
1✔
108
        .start(Arc::new(AtomicBool::new(true)))
1✔
109
        .unwrap()
1✔
110
        .join()
1✔
111
        .unwrap();
1✔
112
}
×
113

×
114
#[tokio::test]
1✔
115
#[should_panic]
×
116
async fn test_create_src_panic() {
1✔
117
    let count: u64 = 1_000_000;
1✔
118

1✔
119
    let mut dag = Dag::new();
1✔
120
    let latch = Arc::new(AtomicBool::new(true));
1✔
121

1✔
122
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
123
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
124
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
125

1✔
126
    dag.add_source(
1✔
127
        source_handle.clone(),
1✔
128
        Box::new(CreateErrSourceFactory::new(true)),
1✔
129
    );
1✔
130
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
131
    dag.add_sink(
1✔
132
        sink_handle.clone(),
1✔
133
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
134
    );
1✔
135

1✔
136
    dag.connect(
1✔
137
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
138
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
139
    )
1✔
140
    .unwrap();
1✔
141

1✔
142
    dag.connect(
1✔
143
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
144
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
145
    )
1✔
146
    .unwrap();
1✔
147

×
148
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
149
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
150
        .unwrap()
1✔
151
        .start(Arc::new(AtomicBool::new(true)))
1✔
152
        .unwrap()
1✔
153
        .join()
1✔
154
        .unwrap();
1✔
155
}
×
156

×
157
#[derive(Debug)]
×
158
struct CreateErrProcessorFactory {
×
159
    panic: bool,
×
160
}
161

162
impl CreateErrProcessorFactory {
×
163
    pub fn new(panic: bool) -> Self {
2✔
164
        Self { panic }
2✔
165
    }
2✔
166
}
167

168
impl ProcessorFactory for CreateErrProcessorFactory {
×
169
    fn type_name(&self) -> String {
×
170
        "CreateErr".to_owned()
×
171
    }
×
172

173
    fn get_output_schema(
2✔
174
        &self,
2✔
175
        _port: &PortHandle,
2✔
176
        _input_schemas: &HashMap<PortHandle, Schema>,
2✔
177
    ) -> Result<Schema, BoxedError> {
2✔
178
        Ok(Schema::default()
2✔
179
            .field(
2✔
180
                FieldDefinition::new(
2✔
181
                    "id".to_string(),
2✔
182
                    FieldType::Int,
2✔
183
                    false,
2✔
184
                    SourceDefinition::Dynamic,
2✔
185
                ),
2✔
186
                true,
2✔
187
            )
2✔
188
            .clone())
2✔
189
    }
2✔
190

×
191
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
192
        vec![DEFAULT_PORT_HANDLE]
8✔
193
    }
8✔
194

×
195
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
196
        vec![OutputPortDef::new(
6✔
197
            DEFAULT_PORT_HANDLE,
6✔
198
            OutputPortType::Stateless,
6✔
199
        )]
6✔
200
    }
6✔
201

×
202
    fn build(
2✔
203
        &self,
2✔
204
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
205
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
206
        _record_store: &ProcessorRecordStore,
2✔
207
    ) -> Result<Box<dyn Processor>, BoxedError> {
2✔
208
        if self.panic {
2✔
209
            panic!("Generated error");
1✔
210
        } else {
×
211
            Err("Generated Error".to_string().into())
1✔
212
        }
1✔
213
    }
1✔
214

×
215
    fn id(&self) -> String {
×
216
        "CreateErr".to_owned()
×
217
    }
×
218
}
219

×
220
#[tokio::test]
1✔
221
#[should_panic]
×
222
async fn test_create_proc_err() {
1✔
223
    let count: u64 = 1_000_000;
1✔
224

1✔
225
    let mut dag = Dag::new();
1✔
226
    let latch = Arc::new(AtomicBool::new(true));
1✔
227

1✔
228
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
229
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
230
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
231

1✔
232
    dag.add_source(
1✔
233
        source_handle.clone(),
1✔
234
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
235
    );
1✔
236
    dag.add_processor(
1✔
237
        proc_handle.clone(),
1✔
238
        Box::new(CreateErrProcessorFactory::new(false)),
1✔
239
    );
1✔
240
    dag.add_sink(
1✔
241
        sink_handle.clone(),
1✔
242
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
243
    );
1✔
244

1✔
245
    dag.connect(
1✔
246
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
247
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
248
    )
1✔
249
    .unwrap();
1✔
250

1✔
251
    dag.connect(
1✔
252
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
253
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
254
    )
1✔
255
    .unwrap();
1✔
256

×
257
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
258
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
259
        .unwrap()
1✔
260
        .start(Arc::new(AtomicBool::new(true)))
1✔
261
        .unwrap()
1✔
262
        .join()
1✔
263
        .unwrap();
1✔
264
}
265

×
266
#[tokio::test]
1✔
267
#[should_panic]
×
268
async fn test_create_proc_panic() {
1✔
269
    let count: u64 = 1_000_000;
1✔
270

1✔
271
    let mut dag = Dag::new();
1✔
272
    let latch = Arc::new(AtomicBool::new(true));
1✔
273

1✔
274
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
275
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
276
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
277

1✔
278
    dag.add_source(
1✔
279
        source_handle.clone(),
1✔
280
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
281
    );
1✔
282
    dag.add_processor(
1✔
283
        proc_handle.clone(),
1✔
284
        Box::new(CreateErrProcessorFactory::new(true)),
1✔
285
    );
1✔
286
    dag.add_sink(
1✔
287
        sink_handle.clone(),
1✔
288
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
289
    );
1✔
290

1✔
291
    dag.connect(
1✔
292
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
293
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
294
    )
1✔
295
    .unwrap();
1✔
296

1✔
297
    dag.connect(
1✔
298
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
299
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
300
    )
1✔
301
    .unwrap();
1✔
302

×
303
    let (_temp_dir, checkpoint_factory, _) = create_checkpoint_factory_for_test(&[]).await;
1✔
304
    DagExecutor::new(dag, checkpoint_factory, ExecutorOptions::default())
1✔
305
        .unwrap()
1✔
306
        .start(Arc::new(AtomicBool::new(true)))
1✔
307
        .unwrap()
1✔
308
        .join()
1✔
309
        .unwrap();
1✔
310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc