• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4061292542

pending completion
4061292542

Pull #729

github

GitHub
Merge 069171d20 into de98caa91
Pull Request #729: feat: Implement multi-way JOIN

1356 of 1356 new or added lines in 10 files covered. (100.0%)

24817 of 38526 relevant lines covered (64.42%)

39509.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/product/tests/pipeline_test.rs
1
use dozer_core::dag::app::App;
2
use dozer_core::dag::appsource::{AppSource, AppSourceManager};
3
use dozer_core::dag::channels::SourceChannelForwarder;
4
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
5
use dozer_core::dag::epoch::Epoch;
6
use dozer_core::dag::errors::ExecutionError;
7
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
8
use dozer_core::dag::node::{
9
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
10
};
11
use dozer_core::dag::record_store::RecordReader;
12
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::tracing::{debug, info};
15
use dozer_types::types::{
16
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
17
};
18

19
use std::collections::HashMap;
20
use std::sync::atomic::{AtomicBool, Ordering};
21
use std::sync::Arc;
22
use tempdir::TempDir;
23

24
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
25

26
const USER_PORT: u16 = 0 as PortHandle;
27
const DEPARTMENT_PORT: u16 = 1 as PortHandle;
28
const COUNTRY_PORT: u16 = 2 as PortHandle;
29

×
30
#[derive(Debug)]
×
31
pub struct TestSourceFactory {
×
32
    running: Arc<AtomicBool>,
33
}
34

35
impl TestSourceFactory {
×
36
    pub fn new(running: Arc<AtomicBool>) -> Self {
×
37
        Self { running }
×
38
    }
×
39
}
×
40

41
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
×
42
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
43
        Ok(vec![
×
44
            OutputPortDef::new(
×
45
                USER_PORT,
×
46
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
47
                    retr_old_records_for_updates: true,
×
48
                    retr_old_records_for_deletes: true,
×
49
                },
×
50
            ),
×
51
            OutputPortDef::new(
×
52
                DEPARTMENT_PORT,
×
53
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
54
                    retr_old_records_for_updates: true,
×
55
                    retr_old_records_for_deletes: true,
×
56
                },
×
57
            ),
×
58
            OutputPortDef::new(
×
59
                COUNTRY_PORT,
×
60
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
61
                    retr_old_records_for_updates: true,
×
62
                    retr_old_records_for_deletes: true,
×
63
                },
×
64
            ),
×
65
        ])
×
66
    }
×
67

×
68
    fn get_output_schema(
×
69
        &self,
×
70
        port: &PortHandle,
×
71
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
×
72
        if port == &USER_PORT {
×
73
            let source_id = SourceDefinition::Table {
×
74
                connection: "connection".to_string(),
×
75
                name: "user".to_string(),
×
76
            };
×
77
            Ok((
×
78
                Schema::empty()
×
79
                    .field(
×
80
                        FieldDefinition::new(
×
81
                            String::from("id"),
×
82
                            FieldType::Int,
×
83
                            false,
×
84
                            source_id.clone(),
×
85
                        ),
×
86
                        true,
×
87
                    )
×
88
                    .field(
×
89
                        FieldDefinition::new(
×
90
                            String::from("name"),
×
91
                            FieldType::String,
×
92
                            false,
×
93
                            source_id.clone(),
×
94
                        ),
×
95
                        false,
×
96
                    )
×
97
                    .field(
×
98
                        FieldDefinition::new(
×
99
                            String::from("department_id"),
×
100
                            FieldType::Int,
×
101
                            false,
×
102
                            source_id.clone(),
×
103
                        ),
×
104
                        false,
×
105
                    )
×
106
                    .field(
×
107
                        FieldDefinition::new(
×
108
                            String::from("country_id"),
×
109
                            FieldType::String,
×
110
                            false,
×
111
                            source_id.clone(),
×
112
                        ),
×
113
                        false,
×
114
                    )
×
115
                    .field(
×
116
                        FieldDefinition::new(
×
117
                            String::from("salary"),
×
118
                            FieldType::Float,
×
119
                            false,
×
120
                            source_id,
×
121
                        ),
×
122
                        false,
×
123
                    )
×
124
                    .clone(),
×
125
                SchemaSQLContext::default(),
×
126
            ))
×
127
        } else if port == &DEPARTMENT_PORT {
×
128
            let source_id = SourceDefinition::Table {
×
129
                connection: "connection".to_string(),
×
130
                name: "department".to_string(),
×
131
            };
×
132
            Ok((
×
133
                Schema::empty()
×
134
                    .field(
×
135
                        FieldDefinition::new(
×
136
                            String::from("did"),
×
137
                            FieldType::Int,
×
138
                            false,
×
139
                            source_id.clone(),
×
140
                        ),
×
141
                        true,
×
142
                    )
×
143
                    .field(
×
144
                        FieldDefinition::new(
×
145
                            String::from("dname"),
×
146
                            FieldType::String,
×
147
                            false,
×
148
                            source_id,
×
149
                        ),
×
150
                        false,
×
151
                    )
×
152
                    .clone(),
×
153
                SchemaSQLContext::default(),
×
154
            ))
×
155
        } else if port == &COUNTRY_PORT {
×
156
            let source_id = SourceDefinition::Table {
×
157
                connection: "connection".to_string(),
×
158
                name: "country".to_string(),
×
159
            };
×
160
            Ok((
×
161
                Schema::empty()
×
162
                    .field(
×
163
                        FieldDefinition::new(
×
164
                            String::from("cid"),
×
165
                            FieldType::String,
×
166
                            false,
×
167
                            source_id.clone(),
×
168
                        ),
×
169
                        true,
×
170
                    )
×
171
                    .field(
×
172
                        FieldDefinition::new(
×
173
                            String::from("cname"),
×
174
                            FieldType::String,
×
175
                            false,
×
176
                            source_id,
×
177
                        ),
×
178
                        false,
×
179
                    )
×
180
                    .clone(),
×
181
                SchemaSQLContext::default(),
×
182
            ))
×
183
        } else {
×
184
            panic!("Invalid Port Handle {port}");
×
185
        }
×
186
    }
×
187

×
188
    fn build(
×
189
        &self,
×
190
        _output_schemas: HashMap<PortHandle, Schema>,
×
191
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
192
        Ok(Box::new(TestSource {
×
193
            running: self.running.clone(),
×
194
        }))
×
195
    }
×
196

×
197
    fn prepare(
×
198
        &self,
×
199
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
200
    ) -> Result<(), ExecutionError> {
×
201
        Ok(())
×
202
    }
×
203
}
×
204

×
205
#[derive(Debug)]
×
206
pub struct TestSource {
×
207
    running: Arc<AtomicBool>,
×
208
}
×
209

×
210
impl Source for TestSource {
×
211
    fn start(
×
212
        &self,
×
213
        fw: &mut dyn SourceChannelForwarder,
×
214
        _from_seq: Option<(u64, u64)>,
×
215
    ) -> Result<(), ExecutionError> {
×
216
        let operations = vec![
×
217
            (
×
218
                Operation::Insert {
×
219
                    new: Record::new(
×
220
                        None,
×
221
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
222
                        Some(1),
×
223
                    ),
×
224
                },
×
225
                DEPARTMENT_PORT,
×
226
            ),
×
227
            (
×
228
                Operation::Insert {
×
229
                    new: Record::new(
×
230
                        None,
×
231
                        vec![Field::Int(1), Field::String("HR".to_string())],
×
232
                        Some(1),
×
233
                    ),
×
234
                },
×
235
                DEPARTMENT_PORT,
×
236
            ),
×
237
            (
×
238
                Operation::Insert {
×
239
                    new: Record::new(
×
240
                        None,
×
241
                        vec![
×
242
                            Field::Int(10000),
×
243
                            Field::String("Alice".to_string()),
×
244
                            Field::Int(0),
×
245
                            Field::String("UK".to_string()),
×
246
                            Field::Float(OrderedFloat(1.1)),
×
247
                        ],
×
248
                        Some(1),
×
249
                    ),
×
250
                },
×
251
                USER_PORT,
×
252
            ),
×
253
            (
×
254
                Operation::Insert {
×
255
                    new: Record::new(
×
256
                        None,
×
257
                        vec![
×
258
                            Field::Int(10001),
×
259
                            Field::String("Bob".to_string()),
×
260
                            Field::Int(0),
×
261
                            Field::String("UK".to_string()),
×
262
                            Field::Float(OrderedFloat(1.1)),
×
263
                        ],
×
264
                        Some(1),
×
265
                    ),
×
266
                },
×
267
                USER_PORT,
×
268
            ),
×
269
            (
×
270
                Operation::Insert {
×
271
                    new: Record::new(
×
272
                        None,
×
273
                        vec![
×
274
                            Field::String("UK".to_string()),
×
275
                            Field::String("United Kingdom".to_string()),
×
276
                        ],
×
277
                        Some(1),
×
278
                    ),
×
279
                },
×
280
                COUNTRY_PORT,
×
281
            ),
×
282
            (
×
283
                Operation::Insert {
×
284
                    new: Record::new(
×
285
                        None,
×
286
                        vec![
×
287
                            Field::String("SG".to_string()),
×
288
                            Field::String("Singapore".to_string()),
×
289
                        ],
×
290
                        Some(1),
×
291
                    ),
×
292
                },
×
293
                COUNTRY_PORT,
×
294
            ),
×
295
            (
×
296
                Operation::Insert {
×
297
                    new: Record::new(
×
298
                        None,
×
299
                        vec![
×
300
                            Field::Int(10002),
×
301
                            Field::String("Craig".to_string()),
×
302
                            Field::Int(1),
×
303
                            Field::String("SG".to_string()),
×
304
                            Field::Float(OrderedFloat(1.1)),
×
305
                        ],
×
306
                        Some(1),
×
307
                    ),
×
308
                },
×
309
                USER_PORT,
×
310
            ),
×
311
            // (
×
312
            //     Operation::Delete {
×
313
            //         old: Record::new(
×
314
            //             None,
×
315
            //             vec![Field::Int(1), Field::String("HR".to_string())],
×
316
            //             Some(1),
×
317
            //         ),
×
318
            //     },
×
319
            //     DEPARTMENT_PORT,
×
320
            // ),
×
321
            (
×
322
                Operation::Insert {
×
323
                    new: Record::new(
×
324
                        None,
×
325
                        vec![
×
326
                            Field::Int(10003),
×
327
                            Field::String("Dan".to_string()),
×
328
                            Field::Int(0),
×
329
                            Field::String("UK".to_string()),
×
330
                            Field::Float(OrderedFloat(1.1)),
×
331
                        ],
×
332
                        Some(1),
×
333
                    ),
×
334
                },
×
335
                USER_PORT,
×
336
            ),
×
337
            (
×
338
                Operation::Insert {
×
339
                    new: Record::new(
×
340
                        None,
×
341
                        vec![
×
342
                            Field::Int(10004),
×
343
                            Field::String("Eve".to_string()),
×
344
                            Field::Int(1),
×
345
                            Field::String("SG".to_string()),
×
346
                            Field::Float(OrderedFloat(1.1)),
×
347
                        ],
×
348
                        Some(1),
×
349
                    ),
×
350
                },
×
351
                USER_PORT,
×
352
            ),
×
353
            // (
×
354
            //     Operation::Delete {
×
355
            //         old: Record::new(
×
356
            //             None,
×
357
            //             vec![
×
358
            //                 Field::Int(10002),
×
359
            //                 Field::String("Craig".to_string()),
×
360
            //                 Field::Int(1),
×
361
            //                 Field::Float(OrderedFloat(1.1)),
×
362
            //             ],
×
363
            //             None,
×
364
            //         ),
×
365
            //     },
×
366
            //     USER_PORT,
×
367
            // ),
×
368
            (
×
369
                Operation::Insert {
×
370
                    new: Record::new(
×
371
                        None,
×
372
                        vec![
×
373
                            Field::Int(10005),
×
374
                            Field::String("Frank".to_string()),
×
375
                            Field::Int(1),
×
376
                            Field::String("SG".to_string()),
×
377
                            Field::Float(OrderedFloat(1.5)),
×
378
                        ],
×
379
                        None,
×
380
                    ),
×
381
                },
×
382
                USER_PORT,
×
383
            ),
×
384
            (
×
385
                Operation::Update {
×
386
                    old: Record::new(
×
387
                        None,
×
388
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
389
                        Some(1),
×
390
                    ),
×
391
                    new: Record::new(
×
392
                        None,
×
393
                        vec![Field::Int(0), Field::String("RD".to_string())],
×
394
                        Some(2),
×
395
                    ),
×
396
                },
×
397
                DEPARTMENT_PORT,
×
398
            ),
×
399
            // (
×
400
            //     Operation::Update {
×
401
            //         old: Record::new(
×
402
            //             None,
×
403
            //             vec![Field::Int(0), Field::String("IT".to_string())],
×
404
            //             None,
×
405
            //         ),
×
406
            //         new: Record::new(
×
407
            //             None,
×
408
            //             vec![Field::Int(0), Field::String("XX".to_string())],
×
409
            //             None,
×
410
            //         ),
×
411
            //     },
×
412
            //     DEPARTMENT_PORT,
×
413
            // ),
×
414
        ];
×
415

×
416
        for operation in operations.iter().enumerate() {
×
417
            // match operation.1.clone().0 {
×
418
            //     Operation::Delete { old } => {
×
419
            //         info!("s{}: - {:?}", operation.1.clone().1, old.values)
×
420
            //     }
×
421
            //     Operation::Insert { new } => {
×
422
            //         info!("s{}: + {:?}", operation.1.clone().1, new.values)
×
423
            //     }
×
424
            //     Operation::Update { old, new } => {
×
425
            //         info!(
×
426
            //             "s{}: - {:?}, + {:?}",
×
427
            //             operation.1.clone().1,
×
428
            //             old.values,
×
429
            //             new.values
×
430
            //         )
×
431
            //     }
×
432
            // }
×
433
            fw.send(
×
434
                operation.0.try_into().unwrap(),
×
435
                0,
×
436
                operation.1.clone().0,
×
437
                operation.1.clone().1,
×
438
            )
×
439
            .unwrap();
×
440
        }
×
441

×
442
        loop {
×
443
            if !self.running.load(Ordering::Relaxed) {
×
444
                break;
×
445
            }
×
446
            // thread::sleep(Duration::from_millis(500));
×
447
        }
×
448
        Ok(())
×
449
    }
×
450
}
×
451

×
452
#[derive(Debug)]
×
453
pub(crate) struct TestSinkFactory {
×
454
    expected: u64,
×
455
    running: Arc<AtomicBool>,
×
456
}
×
457

×
458
impl TestSinkFactory {
×
459
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
×
460
        Self {
×
461
            expected,
×
462
            running: barrier,
×
463
        }
×
464
    }
×
465
}
×
466

×
467
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
×
468
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
469
        vec![DEFAULT_PORT_HANDLE]
×
470
    }
×
471

×
472
    fn set_input_schema(
×
473
        &self,
×
474
        _input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
475
    ) -> Result<(), ExecutionError> {
×
476
        Ok(())
×
477
    }
×
478

×
479
    fn build(
×
480
        &self,
×
481
        _input_schemas: HashMap<PortHandle, Schema>,
×
482
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
483
        Ok(Box::new(TestSink {
×
484
            expected: self.expected,
×
485
            current: 0,
×
486
            running: self.running.clone(),
×
487
        }))
×
488
    }
×
489

×
490
    fn prepare(
×
491
        &self,
×
492
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
493
    ) -> Result<(), ExecutionError> {
×
494
        Ok(())
×
495
    }
×
496
}
×
497

×
498
#[derive(Debug)]
×
499
pub struct TestSink {
×
500
    expected: u64,
×
501
    current: u64,
502
    running: Arc<AtomicBool>,
×
503
}
×
504

×
505
impl Sink for TestSink {
506
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
507
        debug!("SINK: Initialising TestSink");
×
508
        Ok(())
×
509
    }
×
510

511
    fn process(
×
512
        &mut self,
×
513
        _from_port: PortHandle,
×
514
        _op: Operation,
×
515
        _state: &SharedTransaction,
×
516
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
×
517
    ) -> Result<(), ExecutionError> {
×
518
        match _op {
×
519
            Operation::Delete { old } => info!("o0:-> - {:?}", old.values),
×
520
            Operation::Insert { new } => info!("o0:-> + {:?}", new.values),
×
521
            Operation::Update { old, new } => {
×
522
                info!("o0:-> - {:?}, + {:?}", old.values, new.values)
×
523
            }
524
        }
525

526
        self.current += 1;
×
527
        if self.current == self.expected {
×
528
            debug!(
×
529
                "Received {} messages. Notifying sender to exit!",
×
530
                self.current
×
531
            );
×
532
            self.running.store(false, Ordering::Relaxed);
×
533
        }
×
534
        Ok(())
×
535
    }
×
536

537
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
×
538
        Ok(())
×
539
    }
×
540
}
541

542
#[test]
×
543
#[ignore]
544
fn test_pipeline_builder() {
×
545
    dozer_tracing::init_telemetry(false).unwrap();
×
546

×
547
    let (mut pipeline, (node, port)) = statement_to_pipeline(
×
548
        "SELECT  name, dname, salary \
×
549
        FROM user JOIN department ON user.department_id = department.did JOIN country ON user.country_id = country.cid ",
×
550
    )
×
551
    .unwrap();
×
552

×
553
    let latch = Arc::new(AtomicBool::new(true));
×
554

×
555
    let mut asm = AppSourceManager::new();
×
556
    asm.add(AppSource::new(
×
557
        "conn".to_string(),
×
558
        Arc::new(TestSourceFactory::new(latch.clone())),
×
559
        vec![
×
560
            ("user".to_string(), USER_PORT),
×
561
            ("department".to_string(), DEPARTMENT_PORT),
×
562
            ("country".to_string(), COUNTRY_PORT),
×
563
        ]
×
564
        .into_iter()
×
565
        .collect(),
×
566
    ))
×
567
    .unwrap();
×
568

×
569
    pipeline.add_sink(Arc::new(TestSinkFactory::new(8, latch)), "sink");
×
570
    pipeline
×
571
        .connect_nodes(&node, Some(port), "sink", Some(DEFAULT_PORT_HANDLE))
×
572
        .unwrap();
×
573

×
574
    let mut app = App::new(asm);
×
575
    app.add_pipeline(pipeline);
×
576

×
577
    let dag = app.get_dag().unwrap();
×
578

×
579
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
580
    if tmp_dir.path().exists() {
×
581
        std::fs::remove_dir_all(tmp_dir.path())
×
582
            .unwrap_or_else(|_e| panic!("Unable to remove old dir"));
×
583
    }
×
584
    std::fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
585

×
586
    use std::time::Instant;
×
587
    let now = Instant::now();
×
588

×
589
    let tmp_dir = TempDir::new("test").unwrap();
×
590

×
591
    let mut executor = DagExecutor::new(
×
592
        &dag,
×
593
        tmp_dir.path(),
×
594
        ExecutorOptions::default(),
×
595
        Arc::new(AtomicBool::new(true)),
×
596
    )
×
597
    .unwrap();
×
598

×
599
    executor
×
600
        .start()
×
601
        .unwrap_or_else(|e| panic!("Unable to start the Executor: {e}"));
×
602
    assert!(executor.join().is_ok());
×
603

604
    let elapsed = now.elapsed();
×
605
    debug!("Elapsed: {:.2?}", elapsed);
×
606
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc