• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5725710489

pending completion
5725710489

push

github

web-flow
chore: Add `SourceFactory::get_output_port_name` to simplify ui graph generation (#1812)

140 of 140 new or added lines in 13 files covered. (100.0%)

45519 of 60083 relevant lines covered (75.76%)

39458.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.64
/dozer-core/src/tests/dag_base_create_errors.rs
1
use crate::executor::{DagExecutor, ExecutorOptions};
2
use crate::node::{
3
    OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory, Source, SourceFactory,
4
};
5
use crate::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
6

7
use crate::tests::dag_base_run::NoopProcessorFactory;
8
use crate::tests::sinks::{CountingSinkFactory, COUNTING_SINK_INPUT_PORT};
9
use crate::tests::sources::{GeneratorSourceFactory, GENERATOR_SOURCE_OUTPUT_PORT};
10

11
use dozer_types::errors::internal::BoxedError;
12
use dozer_types::node::NodeHandle;
13
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
14

15
use std::collections::HashMap;
16
use std::sync::atomic::AtomicBool;
17
use std::sync::Arc;
18

19
use crate::tests::app::NoneContext;
20

21
#[derive(Debug)]
×
22
struct CreateErrSourceFactory {
23
    panic: bool,
24
}
25

26
impl CreateErrSourceFactory {
27
    pub fn new(panic: bool) -> Self {
2✔
28
        Self { panic }
2✔
29
    }
2✔
30
}
31

32
impl SourceFactory<NoneContext> for CreateErrSourceFactory {
33
    fn get_output_schema(&self, _port: &PortHandle) -> Result<(Schema, NoneContext), BoxedError> {
2✔
34
        Ok((
2✔
35
            Schema::default()
2✔
36
                .field(
2✔
37
                    FieldDefinition::new(
2✔
38
                        "id".to_string(),
2✔
39
                        FieldType::Int,
2✔
40
                        false,
2✔
41
                        SourceDefinition::Dynamic,
2✔
42
                    ),
2✔
43
                    true,
2✔
44
                )
2✔
45
                .clone(),
2✔
46
            NoneContext {},
2✔
47
        ))
2✔
48
    }
2✔
49

50
    fn get_output_port_name(&self, _port: &PortHandle) -> String {
×
51
        "error".to_string()
×
52
    }
×
53

×
54
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
55
        vec![OutputPortDef::new(
6✔
56
            DEFAULT_PORT_HANDLE,
6✔
57
            OutputPortType::Stateless,
6✔
58
        )]
6✔
59
    }
6✔
60

×
61
    fn build(
2✔
62
        &self,
2✔
63
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
64
    ) -> Result<Box<dyn Source>, BoxedError> {
2✔
65
        if self.panic {
2✔
66
            panic!("Generated error");
1✔
67
        } else {
68
            Err("Generated Error".to_string().into())
1✔
69
        }
1✔
70
    }
1✔
71
}
×
72

×
73
#[test]
1✔
74
#[should_panic]
×
75
fn test_create_src_err() {
1✔
76
    let count: u64 = 1_000_000;
1✔
77

1✔
78
    let mut dag = Dag::new();
1✔
79
    let latch = Arc::new(AtomicBool::new(true));
1✔
80

1✔
81
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
82
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
83
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
84

1✔
85
    dag.add_source(
1✔
86
        source_handle.clone(),
1✔
87
        Box::new(CreateErrSourceFactory::new(false)),
1✔
88
    );
1✔
89
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
90
    dag.add_sink(
1✔
91
        sink_handle.clone(),
1✔
92
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
93
    );
1✔
94

1✔
95
    dag.connect(
1✔
96
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
97
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
98
    )
1✔
99
    .unwrap();
1✔
100

1✔
101
    dag.connect(
1✔
102
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
103
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
104
    )
1✔
105
    .unwrap();
1✔
106

1✔
107
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
108
        .unwrap()
1✔
109
        .start(Arc::new(AtomicBool::new(true)))
1✔
110
        .unwrap()
1✔
111
        .join()
1✔
112
        .unwrap();
1✔
113
}
1✔
114

×
115
#[test]
1✔
116
#[should_panic]
×
117
fn test_create_src_panic() {
1✔
118
    let count: u64 = 1_000_000;
1✔
119

1✔
120
    let mut dag = Dag::new();
1✔
121
    let latch = Arc::new(AtomicBool::new(true));
1✔
122

1✔
123
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
124
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
125
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
126

1✔
127
    dag.add_source(
1✔
128
        source_handle.clone(),
1✔
129
        Box::new(CreateErrSourceFactory::new(true)),
1✔
130
    );
1✔
131
    dag.add_processor(proc_handle.clone(), Box::new(NoopProcessorFactory {}));
1✔
132
    dag.add_sink(
1✔
133
        sink_handle.clone(),
1✔
134
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
135
    );
1✔
136

1✔
137
    dag.connect(
1✔
138
        Endpoint::new(source_handle, DEFAULT_PORT_HANDLE),
1✔
139
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
140
    )
1✔
141
    .unwrap();
1✔
142

1✔
143
    dag.connect(
1✔
144
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
145
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
146
    )
1✔
147
    .unwrap();
1✔
148

1✔
149
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
150
        .unwrap()
1✔
151
        .start(Arc::new(AtomicBool::new(true)))
1✔
152
        .unwrap()
1✔
153
        .join()
1✔
154
        .unwrap();
1✔
155
}
1✔
156

157
#[derive(Debug)]
×
158
struct CreateErrProcessorFactory {
159
    panic: bool,
×
160
}
×
161

×
162
impl CreateErrProcessorFactory {
163
    pub fn new(panic: bool) -> Self {
2✔
164
        Self { panic }
2✔
165
    }
2✔
166
}
×
167

×
168
impl ProcessorFactory<NoneContext> for CreateErrProcessorFactory {
169
    fn type_name(&self) -> String {
×
170
        "CreateErr".to_owned()
×
171
    }
×
172

×
173
    fn get_output_schema(
2✔
174
        &self,
2✔
175
        _port: &PortHandle,
2✔
176
        _input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
2✔
177
    ) -> Result<(Schema, NoneContext), BoxedError> {
2✔
178
        Ok((
2✔
179
            Schema::default()
2✔
180
                .field(
2✔
181
                    FieldDefinition::new(
2✔
182
                        "id".to_string(),
2✔
183
                        FieldType::Int,
2✔
184
                        false,
2✔
185
                        SourceDefinition::Dynamic,
2✔
186
                    ),
2✔
187
                    true,
2✔
188
                )
2✔
189
                .clone(),
2✔
190
            NoneContext {},
2✔
191
        ))
2✔
192
    }
2✔
193

194
    fn get_input_ports(&self) -> Vec<PortHandle> {
8✔
195
        vec![DEFAULT_PORT_HANDLE]
8✔
196
    }
8✔
197

×
198
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
6✔
199
        vec![OutputPortDef::new(
6✔
200
            DEFAULT_PORT_HANDLE,
6✔
201
            OutputPortType::Stateless,
6✔
202
        )]
6✔
203
    }
6✔
204

×
205
    fn build(
2✔
206
        &self,
2✔
207
        _input_schemas: HashMap<PortHandle, Schema>,
2✔
208
        _output_schemas: HashMap<PortHandle, Schema>,
2✔
209
    ) -> Result<Box<dyn Processor>, BoxedError> {
2✔
210
        if self.panic {
2✔
211
            panic!("Generated error");
1✔
212
        } else {
213
            Err("Generated Error".to_string().into())
1✔
214
        }
1✔
215
    }
1✔
216

217
    fn id(&self) -> String {
×
218
        "CreateErr".to_owned()
×
219
    }
×
220
}
×
221

×
222
#[test]
1✔
223
#[should_panic]
×
224
fn test_create_proc_err() {
1✔
225
    let count: u64 = 1_000_000;
1✔
226

1✔
227
    let mut dag = Dag::new();
1✔
228
    let latch = Arc::new(AtomicBool::new(true));
1✔
229

1✔
230
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
231
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
232
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
233

1✔
234
    dag.add_source(
1✔
235
        source_handle.clone(),
1✔
236
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
237
    );
1✔
238
    dag.add_processor(
1✔
239
        proc_handle.clone(),
1✔
240
        Box::new(CreateErrProcessorFactory::new(false)),
1✔
241
    );
1✔
242
    dag.add_sink(
1✔
243
        sink_handle.clone(),
1✔
244
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
245
    );
1✔
246

1✔
247
    dag.connect(
1✔
248
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
249
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
250
    )
1✔
251
    .unwrap();
1✔
252

1✔
253
    dag.connect(
1✔
254
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
255
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
256
    )
1✔
257
    .unwrap();
1✔
258

1✔
259
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
260
        .unwrap()
1✔
261
        .start(Arc::new(AtomicBool::new(true)))
1✔
262
        .unwrap()
1✔
263
        .join()
1✔
264
        .unwrap();
1✔
265
}
1✔
266

×
267
#[test]
1✔
268
#[should_panic]
×
269
fn test_create_proc_panic() {
1✔
270
    let count: u64 = 1_000_000;
1✔
271

1✔
272
    let mut dag = Dag::new();
1✔
273
    let latch = Arc::new(AtomicBool::new(true));
1✔
274

1✔
275
    let source_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
276
    let proc_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
277
    let sink_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
278

1✔
279
    dag.add_source(
1✔
280
        source_handle.clone(),
1✔
281
        Box::new(GeneratorSourceFactory::new(count, latch.clone(), false)),
1✔
282
    );
1✔
283
    dag.add_processor(
1✔
284
        proc_handle.clone(),
1✔
285
        Box::new(CreateErrProcessorFactory::new(true)),
1✔
286
    );
1✔
287
    dag.add_sink(
1✔
288
        sink_handle.clone(),
1✔
289
        Box::new(CountingSinkFactory::new(count, latch)),
1✔
290
    );
1✔
291

1✔
292
    dag.connect(
1✔
293
        Endpoint::new(source_handle, GENERATOR_SOURCE_OUTPUT_PORT),
1✔
294
        Endpoint::new(proc_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
295
    )
1✔
296
    .unwrap();
1✔
297

1✔
298
    dag.connect(
1✔
299
        Endpoint::new(proc_handle, DEFAULT_PORT_HANDLE),
1✔
300
        Endpoint::new(sink_handle, COUNTING_SINK_INPUT_PORT),
1✔
301
    )
1✔
302
    .unwrap();
1✔
303

1✔
304
    DagExecutor::new(dag, ExecutorOptions::default())
1✔
305
        .unwrap()
1✔
306
        .start(Arc::new(AtomicBool::new(true)))
1✔
307
        .unwrap()
1✔
308
        .join()
1✔
309
        .unwrap();
1✔
310
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc