• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.48
/dozer-core/src/tests/dag_base_create_errors.rs
1
use crate::executor::{DagExecutor, ExecutorOptions};
2
use crate::node::{
3
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
4
};
5
use crate::processor_record::ProcessorRecordStore;
6
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
7

8
use crate::tests::dag_base_run::NoopProcessorFactory;
9
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
10
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
11

12
use dozer_types::errors::internal::BoxedError;
13
use dozer_types::node::NodeHandle;
14
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
15

16
use std::collections::HashMap;
17
use std::sync::atomic::AtomicBool;
18
use std::sync::Arc;
19

20
use crate::tests::app::NoneContext;
21

22
#[derive(Debug)]
×
23
struct CreateErrSourceFactory {
24
    panic: bool,
×
25
}
26

27
impl CreateErrSourceFactory {
28
    pub fn new(panic: bool) -> Self {
2✔
29
        Self { panic }
2✔
30
    }
2✔
31
}
×
32

×
33
impl SourceFactory<NoneContext> for CreateErrSourceFactory {
34
    fn get_output_schema(&self, _port: &PortHandle) -> Result<(Schema, NoneContext), BoxedError> {
2✔
35
        Ok((
2✔
36
            Schema::default()
2✔
37
                .field(
2✔
38
                    FieldDefinition::new(
2✔
39
                        "id".to_string(),
2✔
40
                        FieldType::Int,
2✔
41
                        false,
2✔
42
                        SourceDefinition::Dynamic,
2✔
43
                    ),
2✔
44
                    true,
2✔
45
                )
2✔
46
                .clone(),
2✔
47
            NoneContext {},
2✔
48
        ))
2✔
49
    }
2✔
50

×
51
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
×
52
        "error".to_string()
×
53
    }
×
54

×
55
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
56
        vec![OutputPortDef::new(
6✔
57
            DEFAULT_PORT_HANDLE,
6✔
58
            OutputPortType::Stateless,
6✔
59
        )]
6✔
60
    }
6✔
61

×
62
    fn build(
2✔
63
        &self,
2✔
64
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
65
    ) -> Result<Box<dyn Source>, BoxedError> {
2✔
66
        if self.panic {
2✔
67
            panic!("Generated error");
1✔
68
        } else {
×
69
            Err("Generated Error".to_string().into())
1✔
70
        }
1✔
71
    }
1✔
72
}
×
73

×
74
#[test]
1✔
75
#[should_panic]
76
fn test_create_src_err() {
1✔
77
    let count: u64 = 1_000_000;
1✔
78

1✔
79
    let mut dag = Dag::new();
1✔
80
    let latch = Arc::new(AtomicBool::new(true));
1✔
81

1✔
82
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
83
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
84
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
85

1✔
86
    dag.add_source(
1✔
87
        source_handle.clone(),
1✔
88
        Box::new(CreateErrSourceFactory::new(false)),
1✔
89
    );
1✔
90
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
91
    dag.add_sink(
1✔
92
        sink_handle.clone(),
1✔
93
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
94
    );
1✔
95

1✔
96
    dag.connect(
1✔
97
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
98
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
99
    )
1✔
100
    .unwrap();
1✔
101

1✔
102
    dag.connect(
1✔
103
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
104
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
105
    )
1✔
106
    .unwrap();
1✔
107

1✔
108
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
109
        .unwrap()
1✔
110
        .start(Arc::new(AtomicBool::new(true)))
1✔
111
        .unwrap()
1✔
112
        .join()
1✔
113
        .unwrap();
1✔
114
}
1✔
115

×
116
#[test]
1✔
117
#[should_panic]
118
fn test_create_src_panic() {
1✔
119
    let count: u64 = 1_000_000;
1✔
120

1✔
121
    let mut dag = Dag::new();
1✔
122
    let latch = Arc::new(AtomicBool::new(true));
1✔
123

1✔
124
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
125
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
126
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
127

1✔
128
    dag.add_source(
1✔
129
        source_handle.clone(),
1✔
130
        Box::new(CreateErrSourceFactory::new(true)),
1✔
131
    );
1✔
132
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
133
    dag.add_sink(
1✔
134
        sink_handle.clone(),
1✔
135
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
136
    );
1✔
137

1✔
138
    dag.connect(
1✔
139
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
140
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
141
    )
1✔
142
    .unwrap();
1✔
143

1✔
144
    dag.connect(
1✔
145
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
146
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
147
    )
1✔
148
    .unwrap();
1✔
149

1✔
150
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
151
        .unwrap()
1✔
152
        .start(Arc::new(AtomicBool::new(true)))
1✔
153
        .unwrap()
1✔
154
        .join()
1✔
155
        .unwrap();
1✔
156
}
1✔
157

×
158
#[derive(Debug)]
×
159
struct CreateErrProcessorFactory {
×
160
    panic: bool,
161
}
162

×
163
impl CreateErrProcessorFactory {
164
    pub fn new(panic: bool) -> Self {
2✔
165
        Self { panic }
2✔
166
    }
2✔
167
}
168

×
169
impl ProcessorFactory<NoneContext> for CreateErrProcessorFactory {
×
170
    fn type_name(&self) -> String {
×
171
        "CreateErr".to_owned()
×
172
    }
×
173

174
    fn get_output_schema(
2✔
175
        &self,
2✔
176
        _port: &PortHandle,
2✔
177
        _input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
2✔
178
    ) -> Result<(Schema, NoneContext), BoxedError> {
2✔
179
        Ok((
2✔
180
            Schema::default()
2✔
181
                .field(
2✔
182
                    FieldDefinition::new(
2✔
183
                        "id".to_string(),
2✔
184
                        FieldType::Int,
2✔
185
                        false,
2✔
186
                        SourceDefinition::Dynamic,
2✔
187
                    ),
2✔
188
                    true,
2✔
189
                )
2✔
190
                .clone(),
2✔
191
            NoneContext {},
2✔
192
        ))
2✔
193
    }
2✔
194

×
195
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
196
        vec![DEFAULT_PORT_HANDLE]
8✔
197
    }
8✔
198

199
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
200
        vec![OutputPortDef::new(
6✔
201
            DEFAULT_PORT_HANDLE,
6✔
202
            OutputPortType::Stateless,
6✔
203
        )]
6✔
204
    }
6✔
205

×
206
    fn build(
2✔
207
        &self,
2✔
208
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
209
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
210
        _record_store: &ProcessorRecordStore,
2✔
211
    ) -> Result<Box<dyn Processor>, BoxedError> {
2✔
212
        if self.panic {
2✔
213
            panic!("Generated error");
1✔
214
        } else {
×
215
            Err("Generated Error".to_string().into())
1✔
216
        }
1✔
217
    }
1✔
218

219
    fn id(&self) -> String {
×
220
        "CreateErr".to_owned()
×
221
    }
×
222
}
223

×
224
#[test]
1✔
225
#[should_panic]
×
226
fn test_create_proc_err() {
1✔
227
    let count: u64 = 1_000_000;
1✔
228

1✔
229
    let mut dag = Dag::new();
1✔
230
    let latch = Arc::new(AtomicBool::new(true));
1✔
231

1✔
232
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
233
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
234
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
235

1✔
236
    dag.add_source(
1✔
237
        source_handle.clone(),
1✔
238
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
239
    );
1✔
240
    dag.add_processor(
1✔
241
        proc_handle.clone(),
1✔
242
        Box::new(CreateErrProcessorFactory::new(false)),
1✔
243
    );
1✔
244
    dag.add_sink(
1✔
245
        sink_handle.clone(),
1✔
246
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
247
    );
1✔
248

1✔
249
    dag.connect(
1✔
250
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
251
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
252
    )
1✔
253
    .unwrap();
1✔
254

1✔
255
    dag.connect(
1✔
256
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
257
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
258
    )
1✔
259
    .unwrap();
1✔
260

1✔
261
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
262
        .unwrap()
1✔
263
        .start(Arc::new(AtomicBool::new(true)))
1✔
264
        .unwrap()
1✔
265
        .join()
1✔
266
        .unwrap();
1✔
267
}
1✔
268

×
269
#[test]
1✔
270
#[should_panic]
×
271
fn test_create_proc_panic() {
1✔
272
    let count: u64 = 1_000_000;
1✔
273

1✔
274
    let mut dag = Dag::new();
1✔
275
    let latch = Arc::new(AtomicBool::new(true));
1✔
276

1✔
277
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
278
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
279
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
280

1✔
281
    dag.add_source(
1✔
282
        source_handle.clone(),
1✔
283
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
284
    );
1✔
285
    dag.add_processor(
1✔
286
        proc_handle.clone(),
1✔
287
        Box::new(CreateErrProcessorFactory::new(true)),
1✔
288
    );
1✔
289
    dag.add_sink(
1✔
290
        sink_handle.clone(),
1✔
291
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
292
    );
1✔
293

1✔
294
    dag.connect(
1✔
295
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
296
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
297
    )
1✔
298
    .unwrap();
1✔
299

1✔
300
    dag.connect(
1✔
301
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
302
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
303
    )
1✔
304
    .unwrap();
1✔
305

1✔
306
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
307
        .unwrap()
1✔
308
        .start(Arc::new(AtomicBool::new(true)))
1✔
309
        .unwrap()
1✔
310
        .join()
1✔
311
        .unwrap();
1✔
312
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc