• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.42
/dozer-core/src/dag/tests/dag_schemas.rs
1
use crate::dag::dag_schemas::DagSchemas;
2
use crate::dag::errors::ExecutionError;
3
use crate::dag::executor::{DagExecutor, ExecutorOptions};
4
use crate::dag::node::{
5
    NodeHandle, OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory,
6
    SinkFactory, Source, SourceFactory,
7
};
8
use crate::dag::{Dag, Endpoint, DEFAULT_PORT_HANDLE};
9

10
use dozer_types::types::{FieldDefinition, FieldType, Schema, SourceDefinition};
11
use std::collections::HashMap;
12

13
use crate::dag::tests::app::NoneContext;
14
use std::sync::atomic::AtomicBool;
15
use std::sync::Arc;
16
use tempdir::TempDir;
17

18
macro_rules! chk {
19
    ($stmt:expr) => {
20
        $stmt.unwrap_or_else(|e| panic!("{}", e.to_string()))
21
    };
22
}
23

24
#[derive(Debug)]
×
25
struct TestUsersSourceFactory {}
26

27
impl SourceFactory<NoneContext> for TestUsersSourceFactory {
28
    fn get_output_schema(
5✔
29
        &self,
5✔
30
        _port: &PortHandle,
5✔
31
    ) -> Result<(Schema, NoneContext), ExecutionError> {
5✔
32
        Ok((
5✔
33
            Schema::empty()
5✔
34
                .field(
5✔
35
                    FieldDefinition::new(
5✔
36
                        "user_id".to_string(),
5✔
37
                        FieldType::String,
5✔
38
                        false,
5✔
39
                        SourceDefinition::Dynamic,
5✔
40
                    ),
5✔
41
                    true,
5✔
42
                )
5✔
43
                .field(
5✔
44
                    FieldDefinition::new(
5✔
45
                        "username".to_string(),
5✔
46
                        FieldType::String,
5✔
47
                        false,
5✔
48
                        SourceDefinition::Dynamic,
5✔
49
                    ),
5✔
50
                    true,
5✔
51
                )
5✔
52
                .field(
5✔
53
                    FieldDefinition::new(
5✔
54
                        "country_id".to_string(),
5✔
55
                        FieldType::String,
5✔
56
                        false,
5✔
57
                        SourceDefinition::Dynamic,
5✔
58
                    ),
5✔
59
                    true,
5✔
60
                )
5✔
61
                .clone(),
5✔
62
            NoneContext {},
5✔
63
        ))
5✔
64
    }
5✔
65

66
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
9✔
67
        Ok(vec![OutputPortDef::new(
9✔
68
            DEFAULT_PORT_HANDLE,
9✔
69
            OutputPortType::Stateless,
9✔
70
        )])
9✔
71
    }
9✔
72

73
    fn prepare(
×
74
        &self,
×
75
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
76
    ) -> Result<(), ExecutionError> {
×
77
        Ok(())
×
78
    }
×
79

80
    fn build(
×
81
        &self,
×
82
        _input_schemas: HashMap<PortHandle, Schema>,
×
83
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
84
        todo!()
×
85
    }
×
86
}
87

88
#[derive(Debug)]
×
89
struct TestCountriesSourceFactory {}
90

91
impl SourceFactory<NoneContext> for TestCountriesSourceFactory {
92
    fn get_output_schema(
3✔
93
        &self,
3✔
94
        _port: &PortHandle,
3✔
95
    ) -> Result<(Schema, NoneContext), ExecutionError> {
3✔
96
        Ok((
3✔
97
            Schema::empty()
3✔
98
                .field(
3✔
99
                    FieldDefinition::new(
3✔
100
                        "country_id".to_string(),
3✔
101
                        FieldType::String,
3✔
102
                        false,
3✔
103
                        SourceDefinition::Dynamic,
3✔
104
                    ),
3✔
105
                    true,
3✔
106
                )
3✔
107
                .field(
3✔
108
                    FieldDefinition::new(
3✔
109
                        "country_name".to_string(),
3✔
110
                        FieldType::String,
3✔
111
                        false,
3✔
112
                        SourceDefinition::Dynamic,
3✔
113
                    ),
3✔
114
                    true,
3✔
115
                )
3✔
116
                .clone(),
3✔
117
            NoneContext {},
3✔
118
        ))
3✔
119
    }
3✔
120

121
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
5✔
122
        Ok(vec![OutputPortDef::new(
5✔
123
            DEFAULT_PORT_HANDLE,
5✔
124
            OutputPortType::Stateless,
5✔
125
        )])
5✔
126
    }
5✔
127

128
    fn prepare(
×
129
        &self,
×
130
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
131
    ) -> Result<(), ExecutionError> {
×
132
        Ok(())
×
133
    }
×
134

135
    fn build(
×
136
        &self,
×
137
        _input_schemas: HashMap<PortHandle, Schema>,
×
138
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
139
        todo!()
×
140
    }
×
141
}
142

143
#[derive(Debug)]
×
144
struct TestJoinProcessorFactory {}
145

146
impl ProcessorFactory<NoneContext> for TestJoinProcessorFactory {
147
    fn get_output_schema(
4✔
148
        &self,
4✔
149
        _output_port: &PortHandle,
4✔
150
        input_schemas: &HashMap<PortHandle, (Schema, NoneContext)>,
4✔
151
    ) -> Result<(Schema, NoneContext), ExecutionError> {
4✔
152
        let mut joined: Vec<FieldDefinition> = Vec::new();
4✔
153
        joined.extend(input_schemas.get(&1).unwrap().0.fields.clone());
4✔
154
        joined.extend(input_schemas.get(&2).unwrap().0.fields.clone());
4✔
155
        Ok((
4✔
156
            Schema {
4✔
157
                fields: joined,
4✔
158
                primary_index: vec![],
4✔
159
                identifier: None,
4✔
160
            },
4✔
161
            NoneContext {},
4✔
162
        ))
4✔
163
    }
4✔
164

165
    fn get_input_ports(&self) -> Vec<PortHandle> {
10✔
166
        vec![1, 2]
10✔
167
    }
10✔
168

169
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
7✔
170
        vec![OutputPortDef::new(
7✔
171
            DEFAULT_PORT_HANDLE,
7✔
172
            OutputPortType::Stateless,
7✔
173
        )]
7✔
174
    }
7✔
175

176
    fn prepare(
×
177
        &self,
×
178
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
179
        _output_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
180
    ) -> Result<(), ExecutionError> {
×
181
        Ok(())
×
182
    }
×
183

184
    fn build(
×
185
        &self,
×
186
        _input_schemas: HashMap<PortHandle, Schema>,
×
187
        _output_schemas: HashMap<PortHandle, Schema>,
×
188
    ) -> Result<Box<dyn Processor>, ExecutionError> {
×
189
        todo!()
×
190
    }
×
191
}
192

193
#[derive(Debug)]
×
194
struct TestSinkFactory {}
195

196
impl SinkFactory<NoneContext> for TestSinkFactory {
197
    fn get_input_ports(&self) -> Vec<PortHandle> {
7✔
198
        vec![DEFAULT_PORT_HANDLE]
7✔
199
    }
7✔
200

×
201
    fn prepare(
×
202
        &self,
×
203
        _input_schemas: HashMap<PortHandle, (Schema, NoneContext)>,
×
204
    ) -> Result<(), ExecutionError> {
×
205
        Ok(())
×
206
    }
×
207

208
    fn build(
×
209
        &self,
×
210
        _input_schemas: HashMap<PortHandle, Schema>,
×
211
    ) -> Result<Box<dyn crate::dag::node::Sink>, ExecutionError> {
×
212
        todo!()
×
213
    }
×
214
}
215

×
216
#[test]
1✔
217
fn test_extract_dag_schemas() {
1✔
218
    let mut dag = Dag::new();
1✔
219

1✔
220
    let users_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
221
    let countries_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
222
    let join_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
223
    let sink_handle = NodeHandle::new(Some(1), 4.to_string());
1✔
224

1✔
225
    let users_index = dag.add_source(users_handle.clone(), Arc::new(TestUsersSourceFactory {}));
1✔
226
    let countries_index = dag.add_source(
1✔
227
        countries_handle.clone(),
1✔
228
        Arc::new(TestCountriesSourceFactory {}),
1✔
229
    );
1✔
230
    let join_index = dag.add_processor(join_handle.clone(), Arc::new(TestJoinProcessorFactory {}));
1✔
231
    let sink_index = dag.add_sink(sink_handle.clone(), Arc::new(TestSinkFactory {}));
1✔
232

×
233
    chk!(dag.connect(
×
234
        Endpoint::new(users_handle, DEFAULT_PORT_HANDLE),
×
235
        Endpoint::new(join_handle.clone(), 1),
×
236
    ));
×
237
    chk!(dag.connect(
1✔
238
        Endpoint::new(countries_handle, DEFAULT_PORT_HANDLE),
1✔
239
        Endpoint::new(join_handle.clone(), 2),
1✔
240
    ));
1✔
241
    chk!(dag.connect(
×
242
        Endpoint::new(join_handle, DEFAULT_PORT_HANDLE),
×
243
        Endpoint::new(sink_handle, DEFAULT_PORT_HANDLE),
×
244
    ));
×
245

×
246
    let dag_schemas = chk!(DagSchemas::new(&dag));
1✔
247

1✔
248
    let users_output = dag_schemas.get_node_output_schemas(users_index);
1✔
249
    assert_eq!(
1✔
250
        users_output
1✔
251
            .get(&DEFAULT_PORT_HANDLE)
1✔
252
            .unwrap()
1✔
253
            .0
1✔
254
            .fields
1✔
255
            .len(),
1✔
256
        3
1✔
257
    );
1✔
258

×
259
    let countries_output = dag_schemas.get_node_output_schemas(countries_index);
1✔
260
    assert_eq!(
1✔
261
        countries_output
1✔
262
            .get(&DEFAULT_PORT_HANDLE)
1✔
263
            .unwrap()
1✔
264
            .0
1✔
265
            .fields
1✔
266
            .len(),
1✔
267
        2
1✔
268
    );
1✔
269

×
270
    let join_input = dag_schemas.get_node_input_schemas(join_index);
1✔
271
    assert_eq!(join_input.get(&1).unwrap().0.fields.len(), 3);
1✔
272
    assert_eq!(join_input.get(&2).unwrap().0.fields.len(), 2);
1✔
273

×
274
    let join_output = dag_schemas.get_node_output_schemas(join_index);
1✔
275
    assert_eq!(
1✔
276
        join_output
1✔
277
            .get(&DEFAULT_PORT_HANDLE)
1✔
278
            .unwrap()
1✔
279
            .0
1✔
280
            .fields
1✔
281
            .len(),
1✔
282
        5
1✔
283
    );
1✔
284

×
285
    let sink_input = dag_schemas.get_node_input_schemas(sink_index);
1✔
286
    assert_eq!(
1✔
287
        sink_input.get(&DEFAULT_PORT_HANDLE).unwrap().0.fields.len(),
1✔
288
        5
1✔
289
    );
1✔
290
}
1✔
291

×
292
#[test]
1✔
293
fn test_init_metadata() {
1✔
294
    let users_handle = NodeHandle::new(Some(1), 1.to_string());
1✔
295
    let countries_handle = NodeHandle::new(Some(1), 2.to_string());
1✔
296
    let join_handle = NodeHandle::new(Some(1), 3.to_string());
1✔
297
    let sink_handle = NodeHandle::new(Some(1), 4.to_string());
1✔
298

1✔
299
    let mut dag = Dag::new();
1✔
300
    dag.add_source(users_handle.clone(), Arc::new(TestUsersSourceFactory {}));
1✔
301
    dag.add_source(
1✔
302
        countries_handle.clone(),
1✔
303
        Arc::new(TestCountriesSourceFactory {}),
1✔
304
    );
1✔
305
    dag.add_processor(join_handle.clone(), Arc::new(TestJoinProcessorFactory {}));
1✔
306
    dag.add_sink(sink_handle.clone(), Arc::new(TestSinkFactory {}));
1✔
307

×
308
    chk!(dag.connect(
×
309
        Endpoint::new(users_handle.clone(), DEFAULT_PORT_HANDLE),
×
310
        Endpoint::new(join_handle.clone(), 1),
×
311
    ));
×
312
    chk!(dag.connect(
1✔
313
        Endpoint::new(countries_handle.clone(), DEFAULT_PORT_HANDLE),
1✔
314
        Endpoint::new(join_handle.clone(), 2),
1✔
315
    ));
1✔
316
    chk!(dag.connect(
×
317
        Endpoint::new(join_handle.clone(), DEFAULT_PORT_HANDLE),
×
318
        Endpoint::new(sink_handle.clone(), DEFAULT_PORT_HANDLE),
×
319
    ));
×
320

×
321
    let tmp_dir = chk!(TempDir::new("example"));
1✔
322
    let _exec = chk!(DagExecutor::new(
1✔
323
        dag.clone(),
×
324
        tmp_dir.path(),
×
325
        ExecutorOptions::default(),
×
326
        Arc::new(AtomicBool::new(true))
×
327
    ));
×
328
    let _exec = chk!(DagExecutor::new(
1✔
329
        dag,
×
330
        tmp_dir.path(),
×
331
        ExecutorOptions::default(),
×
332
        Arc::new(AtomicBool::new(true))
×
333
    ));
×
334

×
335
    let mut dag = Dag::new();
1✔
336
    dag.add_source(users_handle.clone(), Arc::new(TestUsersSourceFactory {}));
1✔
337
    dag.add_source(
1✔
338
        countries_handle.clone(),
1✔
339
        Arc::new(TestUsersSourceFactory {}),
1✔
340
    );
1✔
341
    dag.add_processor(join_handle.clone(), Arc::new(TestJoinProcessorFactory {}));
1✔
342
    dag.add_sink(sink_handle.clone(), Arc::new(TestSinkFactory {}));
1✔
343

×
344
    chk!(dag.connect(
×
345
        Endpoint::new(users_handle, DEFAULT_PORT_HANDLE),
×
346
        Endpoint::new(join_handle.clone(), 1),
×
347
    ));
×
348
    chk!(dag.connect(
×
349
        Endpoint::new(countries_handle, DEFAULT_PORT_HANDLE),
×
350
        Endpoint::new(join_handle.clone(), 2),
×
351
    ));
×
352
    chk!(dag.connect(
×
353
        Endpoint::new(join_handle, DEFAULT_PORT_HANDLE),
×
354
        Endpoint::new(sink_handle, DEFAULT_PORT_HANDLE),
×
355
    ));
×
356

×
357
    let exec = DagExecutor::new(
1✔
358
        dag,
1✔
359
        tmp_dir.path(),
1✔
360
        ExecutorOptions::default(),
1✔
361
        Arc::new(AtomicBool::new(true)),
1✔
362
    );
1✔
363
    assert!(exec.is_err());
1✔
364
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc