• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5725710489

pending completion
5725710489

push

github

web-flow
chore: Add `SourceFactory::get_output_port_name` to simplify ui graph generation (#1812)

140 of 140 new or added lines in 13 files covered. (100.0%)

45519 of 60083 relevant lines covered (75.76%)

39458.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/product/tests/pipeline_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::executor::{DagExecutor, ExecutorOptions};
5
use dozer_core::node::{
6
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
7
};
8
use dozer_core::processor_record::ProcessorRecord;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::errors::internal::BoxedError;
11
use dozer_types::ingestion_types::IngestionMessage;
12
use dozer_types::ordered_float::OrderedFloat;
13
use dozer_types::tracing::debug;
14
use dozer_types::types::{
15
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
16
};
17

18
use std::collections::HashMap;
19
use std::sync::atomic::{AtomicBool, Ordering};
20
use std::sync::Arc;
21

22
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
23
use crate::pipeline::window::tests::pipeline_test::TestSink;
24

25
const USER_PORT: u16 = 0 as PortHandle;
26
const DEPARTMENT_PORT: u16 = 1 as PortHandle;
27
const COUNTRY_PORT: u16 = 2 as PortHandle;
28

29
#[derive(Debug)]
×
30
pub struct TestSourceFactory {
31
    running: Arc<AtomicBool>,
32
}
33

34
impl TestSourceFactory {
35
    pub fn new(running: Arc<AtomicBool>) -> Self {
×
36
        Self { running }
×
37
    }
×
38
}
39

40
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
41
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
42
        vec![
×
43
            OutputPortDef::new(USER_PORT, OutputPortType::Stateless),
×
44
            OutputPortDef::new(DEPARTMENT_PORT, OutputPortType::Stateless),
×
45
            OutputPortDef::new(COUNTRY_PORT, OutputPortType::Stateless),
×
46
        ]
×
47
    }
×
48

49
    fn get_output_schema(
×
50
        &self,
×
51
        port: &PortHandle,
×
52
    ) -> Result<(Schema, SchemaSQLContext), BoxedError> {
×
53
        if port == &USER_PORT {
×
54
            let source_id = SourceDefinition::Table {
×
55
                connection: "connection".to_string(),
×
56
                name: "user".to_string(),
×
57
            };
×
58
            Ok((
×
59
                Schema::default()
×
60
                    .field(
×
61
                        FieldDefinition::new(
×
62
                            String::from("id"),
×
63
                            FieldType::Int,
×
64
                            false,
×
65
                            source_id.clone(),
×
66
                        ),
×
67
                        false,
×
68
                    )
×
69
                    .field(
×
70
                        FieldDefinition::new(
×
71
                            String::from("name"),
×
72
                            FieldType::String,
×
73
                            false,
×
74
                            source_id.clone(),
×
75
                        ),
×
76
                        false,
×
77
                    )
×
78
                    .field(
×
79
                        FieldDefinition::new(
×
80
                            String::from("department_id"),
×
81
                            FieldType::Int,
×
82
                            false,
×
83
                            source_id.clone(),
×
84
                        ),
×
85
                        false,
×
86
                    )
×
87
                    .field(
×
88
                        FieldDefinition::new(
×
89
                            String::from("country_id"),
×
90
                            FieldType::String,
×
91
                            false,
×
92
                            source_id.clone(),
×
93
                        ),
×
94
                        false,
×
95
                    )
×
96
                    .field(
×
97
                        FieldDefinition::new(
×
98
                            String::from("salary"),
×
99
                            FieldType::Float,
×
100
                            false,
×
101
                            source_id,
×
102
                        ),
×
103
                        false,
×
104
                    )
×
105
                    .clone(),
×
106
                SchemaSQLContext::default(),
×
107
            ))
×
108
        } else if port == &DEPARTMENT_PORT {
×
109
            let source_id = SourceDefinition::Table {
×
110
                connection: "connection".to_string(),
×
111
                name: "department".to_string(),
×
112
            };
×
113
            Ok((
×
114
                Schema::default()
×
115
                    .field(
×
116
                        FieldDefinition::new(
×
117
                            String::from("did"),
×
118
                            FieldType::Int,
×
119
                            false,
×
120
                            source_id.clone(),
×
121
                        ),
×
122
                        false,
×
123
                    )
×
124
                    .field(
×
125
                        FieldDefinition::new(
×
126
                            String::from("dname"),
×
127
                            FieldType::String,
×
128
                            false,
×
129
                            source_id,
×
130
                        ),
×
131
                        false,
×
132
                    )
×
133
                    .clone(),
×
134
                SchemaSQLContext::default(),
×
135
            ))
×
136
        } else if port == &COUNTRY_PORT {
×
137
            let source_id = SourceDefinition::Table {
×
138
                connection: "connection".to_string(),
×
139
                name: "country".to_string(),
×
140
            };
×
141
            Ok((
×
142
                Schema::default()
×
143
                    .field(
×
144
                        FieldDefinition::new(
×
145
                            String::from("cid"),
×
146
                            FieldType::String,
×
147
                            false,
×
148
                            source_id.clone(),
×
149
                        ),
×
150
                        false,
×
151
                    )
×
152
                    .field(
×
153
                        FieldDefinition::new(
×
154
                            String::from("cname"),
×
155
                            FieldType::String,
×
156
                            false,
×
157
                            source_id,
×
158
                        ),
×
159
                        false,
×
160
                    )
×
161
                    .clone(),
×
162
                SchemaSQLContext::default(),
×
163
            ))
×
164
        } else {
165
            panic!("Invalid Port Handle {port}");
×
166
        }
167
    }
×
168

169
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
170
        match *port {
×
171
            USER_PORT => "user".to_string(),
×
172
            DEPARTMENT_PORT => "department".to_string(),
×
173
            COUNTRY_PORT => "country".to_string(),
×
174
            _ => panic!("Invalid Port Handle {port}"),
×
175
        }
×
176
    }
×
177

178
    fn build(
×
179
        &self,
×
180
        _output_schemas: HashMap<PortHandle, Schema>,
×
181
    ) -> Result<Box<dyn Source>, BoxedError> {
×
182
        Ok(Box::new(TestSource {
×
183
            running: self.running.clone(),
×
184
        }))
×
185
    }
×
186
}
×
187

×
188
#[derive(Debug)]
×
189
pub struct TestSource {
×
190
    running: Arc<AtomicBool>,
×
191
}
×
192

×
193
impl Source for TestSource {
×
194
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
195
        Ok(false)
×
196
    }
×
197

×
198
    fn start(
×
199
        &self,
×
200
        fw: &mut dyn SourceChannelForwarder,
×
201
        _last_checkpoint: Option<(u64, u64)>,
×
202
    ) -> Result<(), BoxedError> {
×
203
        let mut new_rec = ProcessorRecord::new();
×
204
        new_rec.extend_direct_field(Field::Int(0));
×
205
        new_rec.extend_direct_field(Field::String("IT".to_string()));
×
206
        let operations = vec![
×
207
            (
×
208
                Operation::Insert {
×
209
                    new: Record::new(vec![]),
×
210
                },
×
211
                DEPARTMENT_PORT,
×
212
            ),
×
213
            (
×
214
                Operation::Insert {
×
215
                    new: Record::new(vec![Field::Int(1), Field::String("HR".to_string())]),
×
216
                },
×
217
                DEPARTMENT_PORT,
×
218
            ),
×
219
            (
×
220
                Operation::Insert {
×
221
                    new: Record::new(vec![
×
222
                        Field::Int(10000),
×
223
                        Field::String("Alice".to_string()),
×
224
                        Field::Int(0),
×
225
                        Field::String("UK".to_string()),
×
226
                        Field::Float(OrderedFloat(1.1)),
×
227
                    ]),
×
228
                },
×
229
                USER_PORT,
×
230
            ),
×
231
            (
×
232
                Operation::Insert {
×
233
                    new: Record::new(vec![
×
234
                        Field::Int(10001),
×
235
                        Field::String("Bob".to_string()),
×
236
                        Field::Int(0),
×
237
                        Field::String("UK".to_string()),
×
238
                        Field::Float(OrderedFloat(1.1)),
×
239
                    ]),
×
240
                },
×
241
                USER_PORT,
×
242
            ),
×
243
            (
×
244
                Operation::Insert {
×
245
                    new: Record::new(vec![
×
246
                        Field::String("UK".to_string()),
×
247
                        Field::String("United Kingdom".to_string()),
×
248
                    ]),
×
249
                },
×
250
                COUNTRY_PORT,
×
251
            ),
×
252
            (
×
253
                Operation::Insert {
×
254
                    new: Record::new(vec![
×
255
                        Field::String("SG".to_string()),
×
256
                        Field::String("Singapore".to_string()),
×
257
                    ]),
×
258
                },
×
259
                COUNTRY_PORT,
×
260
            ),
×
261
            (
×
262
                Operation::Insert {
×
263
                    new: Record::new(vec![
×
264
                        Field::Int(10002),
×
265
                        Field::String("Craig".to_string()),
×
266
                        Field::Int(1),
×
267
                        Field::String("SG".to_string()),
×
268
                        Field::Float(OrderedFloat(1.1)),
×
269
                    ]),
×
270
                },
×
271
                USER_PORT,
×
272
            ),
×
273
            // (
×
274
            //     Operation::Delete {
×
275
            //         old: Record::new(
×
276
            //             None,
×
277
            //             vec![Field::Int(1), Field::String("HR".to_string())],
×
278
            //         ),
×
279
            //     },
×
280
            //     DEPARTMENT_PORT,
×
281
            // ),
×
282
            (
×
283
                Operation::Insert {
×
284
                    new: Record::new(vec![
×
285
                        Field::Int(10003),
×
286
                        Field::String("Dan".to_string()),
×
287
                        Field::Int(0),
×
288
                        Field::String("UK".to_string()),
×
289
                        Field::Float(OrderedFloat(1.1)),
×
290
                    ]),
×
291
                },
×
292
                USER_PORT,
×
293
            ),
×
294
            (
×
295
                Operation::Insert {
×
296
                    new: Record::new(vec![
×
297
                        Field::Int(10004),
×
298
                        Field::String("Eve".to_string()),
×
299
                        Field::Int(1),
×
300
                        Field::String("SG".to_string()),
×
301
                        Field::Float(OrderedFloat(1.1)),
×
302
                    ]),
×
303
                },
×
304
                USER_PORT,
×
305
            ),
×
306
            // (
×
307
            //     Operation::Delete {
×
308
            //         old: Record::new(
×
309
            //             None,
×
310
            //             vec![
×
311
            //                 Field::Int(10002),
×
312
            //                 Field::String("Craig".to_string()),
×
313
            //                 Field::Int(1),
×
314
            //                 Field::Float(OrderedFloat(1.1)),
×
315
            //             ],
×
316
            //         ),
×
317
            //     },
×
318
            //     USER_PORT,
×
319
            // ),
×
320
            (
×
321
                Operation::Insert {
×
322
                    new: Record::new(vec![
×
323
                        Field::Int(10005),
×
324
                        Field::String("Frank".to_string()),
×
325
                        Field::Int(1),
×
326
                        Field::String("SG".to_string()),
×
327
                        Field::Float(OrderedFloat(1.5)),
×
328
                    ]),
×
329
                },
×
330
                USER_PORT,
×
331
            ),
×
332
            (
×
333
                Operation::Update {
×
334
                    old: Record::new(vec![Field::Int(0), Field::String("IT".to_string())]),
×
335
                    new: Record::new(vec![Field::Int(0), Field::String("RD".to_string())]),
×
336
                },
×
337
                DEPARTMENT_PORT,
×
338
            ),
×
339
            // (
×
340
            //     Operation::Update {
×
341
            //         old: Record::new(
×
342
            //             None,
×
343
            //             vec![Field::Int(0), Field::String("IT".to_string())],
×
344
            //         ),
×
345
            //         new: Record::new(
×
346
            //             None,
×
347
            //             vec![Field::Int(0), Field::String("XX".to_string())],
×
348
            //         ),
×
349
            //     },
×
350
            //     DEPARTMENT_PORT,
×
351
            // ),
×
352
        ];
×
353

×
354
        for (index, (op, port)) in operations.into_iter().enumerate() {
×
355
            // match operation.1.clone().0 {
×
356
            //     Operation::Delete { old } => {
×
357
            //         info!("s{}: - {:?}", operation.1.clone().1, old.values)
×
358
            //     }
×
359
            //     Operation::Insert { new } => {
×
360
            //         info!("s{}: + {:?}", operation.1.clone().1, new.values)
×
361
            //     }
×
362
            //     Operation::Update { old, new } => {
×
363
            //         info!(
×
364
            //             "s{}: - {:?}, + {:?}",
×
365
            //             operation.1.clone().1,
×
366
            //             old.values,
×
367
            //             new.values
×
368
            //         )
×
369
            //     }
×
370
            // }
×
371
            fw.send(IngestionMessage::new_op(index as u64, 0, 0, op), port)
×
372
                .unwrap();
×
373
        }
×
374

375
        loop {
×
376
            if !self.running.load(Ordering::Relaxed) {
×
377
                break;
×
378
            }
×
379
            // thread::sleep(Duration::from_millis(500));
380
        }
381
        Ok(())
×
382
    }
×
383
}
×
384

×
385
#[derive(Debug)]
×
386
pub(crate) struct TestSinkFactory {
×
387
    expected: u64,
×
388
    running: Arc<AtomicBool>,
×
389
}
390

391
impl TestSinkFactory {
392
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
×
393
        Self {
×
394
            expected,
×
395
            running: barrier,
×
396
        }
×
397
    }
×
398
}
×
399

×
400
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
×
401
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
402
        vec![DEFAULT_PORT_HANDLE]
×
403
    }
×
404

×
405
    fn build(
×
406
        &self,
×
407
        _input_schemas: HashMap<PortHandle, Schema>,
×
408
    ) -> Result<Box<dyn Sink>, BoxedError> {
×
409
        Ok(Box::new(TestSink {
×
410
            expected: self.expected,
×
411
            current: 0,
×
412
            running: self.running.clone(),
×
413
        }))
×
414
    }
×
415

×
416
    fn prepare(
×
417
        &self,
×
418
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
419
    ) -> Result<(), BoxedError> {
×
420
        Ok(())
×
421
    }
×
422
}
×
423

×
424
#[test]
×
425
#[ignore]
×
426
fn test_pipeline_builder() {
×
427
    dozer_tracing::init_telemetry(None, None);
×
428

×
429
    let mut pipeline = AppPipeline::new();
×
430

×
431
    let context = statement_to_pipeline(
×
432
        "SELECT  name, dname, salary \
×
433
        FROM user JOIN department ON user.department_id = department.did JOIN country ON user.country_id = country.cid ",
×
434
        &mut pipeline,
×
435
        Some("results".to_string()),
×
436
    )
×
437
        .unwrap();
×
438

×
439
    let table_info = context.output_tables_map.get("results").unwrap();
×
440

×
441
    let latch = Arc::new(AtomicBool::new(true));
×
442

×
443
    let mut asm = AppSourceManager::new();
×
444
    asm.add(
×
445
        Box::new(TestSourceFactory::new(latch.clone())),
×
446
        AppSourceMappings::new(
×
447
            "conn".to_string(),
×
448
            vec![
×
449
                ("user".to_string(), USER_PORT),
×
450
                ("department".to_string(), DEPARTMENT_PORT),
×
451
                ("country".to_string(), COUNTRY_PORT),
×
452
            ]
×
453
            .into_iter()
×
454
            .collect(),
×
455
        ),
×
456
    )
×
457
    .unwrap();
×
458

×
459
    pipeline.add_sink(Box::new(TestSinkFactory::new(8, latch)), "sink", None);
×
460
    pipeline.connect_nodes(
×
461
        &table_info.node,
×
462
        table_info.port,
×
463
        "sink",
×
464
        DEFAULT_PORT_HANDLE,
×
465
    );
×
466

×
467
    let mut app = App::new(asm);
×
468
    app.add_pipeline(pipeline);
×
469

×
470
    let dag = app.into_dag().unwrap();
×
471

×
472
    let now = std::time::Instant::now();
×
473

×
474
    DagExecutor::new(dag, ExecutorOptions::default())
×
475
        .unwrap()
×
476
        .start(Arc::new(AtomicBool::new(true)))
×
477
        .unwrap()
×
478
        .join()
×
479
        .unwrap();
×
480

×
481
    let elapsed = now.elapsed();
×
482
    debug!("Elapsed: {:.2?}", elapsed);
×
483
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc