• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112756987

pending completion
4112756987

Pull #817

github

GitHub
Merge d47821d24 into c160ec41f
Pull Request #817: feat: Stateful pipeline for sources without PK

64 of 64 new or added lines in 2 files covered. (100.0%)

23475 of 37805 relevant lines covered (62.09%)

34580.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/product/tests/pipeline_test.rs
1
use dozer_core::dag::app::{App, AppPipeline};
2
use dozer_core::dag::appsource::{AppSource, AppSourceManager};
3
use dozer_core::dag::channels::SourceChannelForwarder;
4
use dozer_core::dag::epoch::Epoch;
5
use dozer_core::dag::errors::ExecutionError;
6
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
7
use dozer_core::dag::node::{
8
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
9
};
10
use dozer_core::dag::record_store::RecordReader;
11
use dozer_core::dag::DEFAULT_PORT_HANDLE;
12
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::tracing::{debug, info};
15
use dozer_types::types::{
16
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
17
};
18

19
use std::collections::HashMap;
20
use std::sync::atomic::{AtomicBool, Ordering};
21
use std::sync::Arc;
22
use tempdir::TempDir;
23

24
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
25

26
const USER_PORT: u16 = 0 as PortHandle;
27
const DEPARTMENT_PORT: u16 = 1 as PortHandle;
28
const COUNTRY_PORT: u16 = 2 as PortHandle;
29

30
#[derive(Debug)]
×
31
pub struct TestSourceFactory {
32
    running: Arc<AtomicBool>,
33
}
34

35
impl TestSourceFactory {
36
    pub fn new(running: Arc<AtomicBool>) -> Self {
×
37
        Self { running }
×
38
    }
×
39
}
40

41
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
42
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
43
        Ok(vec![
×
44
            OutputPortDef::new(
×
45
                USER_PORT,
×
46
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
47
                    retr_old_records_for_updates: true,
×
48
                    retr_old_records_for_deletes: true,
×
49
                },
×
50
            ),
×
51
            OutputPortDef::new(DEPARTMENT_PORT, OutputPortType::Stateless),
×
52
            OutputPortDef::new(
×
53
                COUNTRY_PORT,
×
54
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
55
                    retr_old_records_for_updates: true,
×
56
                    retr_old_records_for_deletes: true,
×
57
                },
×
58
            ),
×
59
        ])
×
60
    }
×
61

×
62
    fn get_output_schema(
×
63
        &self,
×
64
        port: &PortHandle,
×
65
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
×
66
        if port == &USER_PORT {
×
67
            let source_id = SourceDefinition::Table {
×
68
                connection: "connection".to_string(),
×
69
                name: "user".to_string(),
×
70
            };
×
71
            Ok((
×
72
                Schema::empty()
×
73
                    .field(
×
74
                        FieldDefinition::new(
×
75
                            String::from("id"),
×
76
                            FieldType::Int,
×
77
                            false,
×
78
                            source_id.clone(),
×
79
                        ),
×
80
                        true,
×
81
                    )
×
82
                    .field(
×
83
                        FieldDefinition::new(
×
84
                            String::from("name"),
×
85
                            FieldType::String,
×
86
                            false,
×
87
                            source_id.clone(),
×
88
                        ),
×
89
                        false,
×
90
                    )
×
91
                    .field(
×
92
                        FieldDefinition::new(
×
93
                            String::from("department_id"),
×
94
                            FieldType::Int,
×
95
                            false,
×
96
                            source_id.clone(),
×
97
                        ),
×
98
                        false,
×
99
                    )
×
100
                    .field(
×
101
                        FieldDefinition::new(
×
102
                            String::from("country_id"),
×
103
                            FieldType::String,
×
104
                            false,
×
105
                            source_id.clone(),
×
106
                        ),
×
107
                        false,
×
108
                    )
×
109
                    .field(
×
110
                        FieldDefinition::new(
×
111
                            String::from("salary"),
×
112
                            FieldType::Float,
×
113
                            false,
×
114
                            source_id,
×
115
                        ),
×
116
                        false,
×
117
                    )
×
118
                    .clone(),
×
119
                SchemaSQLContext::default(),
×
120
            ))
×
121
        } else if port == &DEPARTMENT_PORT {
×
122
            let source_id = SourceDefinition::Table {
×
123
                connection: "connection".to_string(),
×
124
                name: "department".to_string(),
×
125
            };
×
126
            Ok((
×
127
                Schema::empty()
×
128
                    .field(
×
129
                        FieldDefinition::new(
×
130
                            String::from("did"),
×
131
                            FieldType::Int,
×
132
                            false,
×
133
                            source_id.clone(),
×
134
                        ),
×
135
                        false,
×
136
                    )
×
137
                    .field(
×
138
                        FieldDefinition::new(
×
139
                            String::from("dname"),
×
140
                            FieldType::String,
×
141
                            false,
×
142
                            source_id,
×
143
                        ),
×
144
                        false,
×
145
                    )
×
146
                    .clone(),
×
147
                SchemaSQLContext::default(),
×
148
            ))
×
149
        } else if port == &COUNTRY_PORT {
×
150
            let source_id = SourceDefinition::Table {
×
151
                connection: "connection".to_string(),
×
152
                name: "country".to_string(),
×
153
            };
×
154
            Ok((
×
155
                Schema::empty()
×
156
                    .field(
×
157
                        FieldDefinition::new(
×
158
                            String::from("cid"),
×
159
                            FieldType::String,
×
160
                            false,
×
161
                            source_id.clone(),
×
162
                        ),
×
163
                        true,
×
164
                    )
×
165
                    .field(
×
166
                        FieldDefinition::new(
×
167
                            String::from("cname"),
×
168
                            FieldType::String,
×
169
                            false,
×
170
                            source_id,
×
171
                        ),
×
172
                        false,
×
173
                    )
×
174
                    .clone(),
×
175
                SchemaSQLContext::default(),
×
176
            ))
×
177
        } else {
×
178
            panic!("Invalid Port Handle {port}");
×
179
        }
×
180
    }
×
181

×
182
    fn build(
×
183
        &self,
×
184
        _output_schemas: HashMap<PortHandle, Schema>,
×
185
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
186
        Ok(Box::new(TestSource {
×
187
            running: self.running.clone(),
×
188
        }))
×
189
    }
×
190

×
191
    fn prepare(
×
192
        &self,
×
193
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
194
    ) -> Result<(), ExecutionError> {
×
195
        Ok(())
×
196
    }
×
197
}
×
198

×
199
#[derive(Debug)]
×
200
pub struct TestSource {
×
201
    running: Arc<AtomicBool>,
×
202
}
×
203

204
impl Source for TestSource {
205
    fn start(
×
206
        &self,
×
207
        fw: &mut dyn SourceChannelForwarder,
×
208
        _from_seq: Option<(u64, u64)>,
×
209
    ) -> Result<(), ExecutionError> {
×
210
        let operations = vec![
×
211
            (
×
212
                Operation::Insert {
×
213
                    new: Record::new(
×
214
                        None,
×
215
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
216
                        Some(1),
×
217
                    ),
×
218
                },
×
219
                DEPARTMENT_PORT,
×
220
            ),
×
221
            (
×
222
                Operation::Insert {
×
223
                    new: Record::new(
×
224
                        None,
×
225
                        vec![Field::Int(1), Field::String("HR".to_string())],
×
226
                        Some(1),
×
227
                    ),
×
228
                },
×
229
                DEPARTMENT_PORT,
×
230
            ),
×
231
            (
×
232
                Operation::Insert {
×
233
                    new: Record::new(
×
234
                        None,
×
235
                        vec![
×
236
                            Field::Int(10000),
×
237
                            Field::String("Alice".to_string()),
×
238
                            Field::Int(0),
×
239
                            Field::String("UK".to_string()),
×
240
                            Field::Float(OrderedFloat(1.1)),
×
241
                        ],
×
242
                        Some(1),
×
243
                    ),
×
244
                },
×
245
                USER_PORT,
×
246
            ),
×
247
            (
×
248
                Operation::Insert {
×
249
                    new: Record::new(
×
250
                        None,
×
251
                        vec![
×
252
                            Field::Int(10001),
×
253
                            Field::String("Bob".to_string()),
×
254
                            Field::Int(0),
×
255
                            Field::String("UK".to_string()),
×
256
                            Field::Float(OrderedFloat(1.1)),
×
257
                        ],
×
258
                        Some(1),
×
259
                    ),
×
260
                },
×
261
                USER_PORT,
×
262
            ),
×
263
            (
×
264
                Operation::Insert {
×
265
                    new: Record::new(
×
266
                        None,
×
267
                        vec![
×
268
                            Field::String("UK".to_string()),
×
269
                            Field::String("United Kingdom".to_string()),
×
270
                        ],
×
271
                        Some(1),
×
272
                    ),
×
273
                },
×
274
                COUNTRY_PORT,
×
275
            ),
×
276
            (
×
277
                Operation::Insert {
×
278
                    new: Record::new(
×
279
                        None,
×
280
                        vec![
×
281
                            Field::String("SG".to_string()),
×
282
                            Field::String("Singapore".to_string()),
×
283
                        ],
×
284
                        Some(1),
×
285
                    ),
×
286
                },
×
287
                COUNTRY_PORT,
×
288
            ),
×
289
            (
×
290
                Operation::Insert {
×
291
                    new: Record::new(
×
292
                        None,
×
293
                        vec![
×
294
                            Field::Int(10002),
×
295
                            Field::String("Craig".to_string()),
×
296
                            Field::Int(1),
×
297
                            Field::String("SG".to_string()),
×
298
                            Field::Float(OrderedFloat(1.1)),
×
299
                        ],
×
300
                        Some(1),
×
301
                    ),
×
302
                },
×
303
                USER_PORT,
×
304
            ),
×
305
            // (
×
306
            //     Operation::Delete {
×
307
            //         old: Record::new(
×
308
            //             None,
×
309
            //             vec![Field::Int(1), Field::String("HR".to_string())],
×
310
            //             Some(1),
×
311
            //         ),
×
312
            //     },
×
313
            //     DEPARTMENT_PORT,
×
314
            // ),
×
315
            (
×
316
                Operation::Insert {
×
317
                    new: Record::new(
×
318
                        None,
×
319
                        vec![
×
320
                            Field::Int(10003),
×
321
                            Field::String("Dan".to_string()),
×
322
                            Field::Int(0),
×
323
                            Field::String("UK".to_string()),
×
324
                            Field::Float(OrderedFloat(1.1)),
×
325
                        ],
×
326
                        Some(1),
×
327
                    ),
×
328
                },
×
329
                USER_PORT,
×
330
            ),
×
331
            (
×
332
                Operation::Insert {
×
333
                    new: Record::new(
×
334
                        None,
×
335
                        vec![
×
336
                            Field::Int(10004),
×
337
                            Field::String("Eve".to_string()),
×
338
                            Field::Int(1),
×
339
                            Field::String("SG".to_string()),
×
340
                            Field::Float(OrderedFloat(1.1)),
×
341
                        ],
×
342
                        Some(1),
×
343
                    ),
×
344
                },
×
345
                USER_PORT,
×
346
            ),
×
347
            // (
×
348
            //     Operation::Delete {
×
349
            //         old: Record::new(
×
350
            //             None,
×
351
            //             vec![
×
352
            //                 Field::Int(10002),
×
353
            //                 Field::String("Craig".to_string()),
×
354
            //                 Field::Int(1),
×
355
            //                 Field::Float(OrderedFloat(1.1)),
×
356
            //             ],
×
357
            //             None,
×
358
            //         ),
×
359
            //     },
×
360
            //     USER_PORT,
×
361
            // ),
×
362
            (
×
363
                Operation::Insert {
×
364
                    new: Record::new(
×
365
                        None,
×
366
                        vec![
×
367
                            Field::Int(10005),
×
368
                            Field::String("Frank".to_string()),
×
369
                            Field::Int(1),
×
370
                            Field::String("SG".to_string()),
×
371
                            Field::Float(OrderedFloat(1.5)),
×
372
                        ],
×
373
                        None,
×
374
                    ),
×
375
                },
×
376
                USER_PORT,
×
377
            ),
×
378
            (
×
379
                Operation::Update {
×
380
                    old: Record::new(
×
381
                        None,
×
382
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
383
                        Some(1),
×
384
                    ),
×
385
                    new: Record::new(
×
386
                        None,
×
387
                        vec![Field::Int(0), Field::String("RD".to_string())],
×
388
                        Some(2),
×
389
                    ),
×
390
                },
×
391
                DEPARTMENT_PORT,
×
392
            ),
×
393
            // (
×
394
            //     Operation::Update {
×
395
            //         old: Record::new(
×
396
            //             None,
×
397
            //             vec![Field::Int(0), Field::String("IT".to_string())],
×
398
            //             None,
×
399
            //         ),
×
400
            //         new: Record::new(
×
401
            //             None,
×
402
            //             vec![Field::Int(0), Field::String("XX".to_string())],
×
403
            //             None,
×
404
            //         ),
×
405
            //     },
×
406
            //     DEPARTMENT_PORT,
×
407
            // ),
×
408
        ];
×
409

×
410
        for operation in operations.iter().enumerate() {
×
411
            // match operation.1.clone().0 {
×
412
            //     Operation::Delete { old } => {
×
413
            //         info!("s{}: - {:?}", operation.1.clone().1, old.values)
×
414
            //     }
×
415
            //     Operation::Insert { new } => {
×
416
            //         info!("s{}: + {:?}", operation.1.clone().1, new.values)
×
417
            //     }
×
418
            //     Operation::Update { old, new } => {
×
419
            //         info!(
×
420
            //             "s{}: - {:?}, + {:?}",
×
421
            //             operation.1.clone().1,
×
422
            //             old.values,
×
423
            //             new.values
×
424
            //         )
×
425
            //     }
×
426
            // }
×
427
            fw.send(
×
428
                operation.0.try_into().unwrap(),
×
429
                0,
×
430
                operation.1.clone().0,
×
431
                operation.1.clone().1,
×
432
            )
×
433
            .unwrap();
×
434
        }
×
435

×
436
        loop {
×
437
            if !self.running.load(Ordering::Relaxed) {
×
438
                break;
×
439
            }
×
440
            // thread::sleep(Duration::from_millis(500));
×
441
        }
442
        Ok(())
×
443
    }
×
444
}
×
445

×
446
#[derive(Debug)]
×
447
pub(crate) struct TestSinkFactory {
448
    expected: u64,
×
449
    running: Arc<AtomicBool>,
×
450
}
451

452
impl TestSinkFactory {
×
453
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
×
454
        Self {
×
455
            expected,
×
456
            running: barrier,
×
457
        }
×
458
    }
×
459
}
×
460

×
461
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
×
462
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
463
        vec![DEFAULT_PORT_HANDLE]
×
464
    }
×
465

466
    fn build(
×
467
        &self,
×
468
        _input_schemas: HashMap<PortHandle, Schema>,
×
469
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
470
        Ok(Box::new(TestSink {
×
471
            expected: self.expected,
×
472
            current: 0,
×
473
            running: self.running.clone(),
×
474
        }))
×
475
    }
×
476

×
477
    fn prepare(
×
478
        &self,
×
479
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
480
    ) -> Result<(), ExecutionError> {
×
481
        Ok(())
×
482
    }
×
483
}
×
484

×
485
#[derive(Debug)]
×
486
pub struct TestSink {
×
487
    expected: u64,
×
488
    current: u64,
×
489
    running: Arc<AtomicBool>,
490
}
×
491

×
492
impl Sink for TestSink {
×
493
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
494
        debug!("SINK: Initialising TestSink");
×
495
        Ok(())
×
496
    }
×
497

498
    fn process(
×
499
        &mut self,
×
500
        _from_port: PortHandle,
×
501
        _op: Operation,
×
502
        _state: &SharedTransaction,
×
503
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
×
504
    ) -> Result<(), ExecutionError> {
×
505
        match _op {
×
506
            Operation::Delete { old } => info!("o0:-> - {:?}", old.values),
×
507
            Operation::Insert { new } => info!("o0:-> + {:?}", new.values),
×
508
            Operation::Update { old, new } => {
×
509
                info!("o0:-> - {:?}, + {:?}", old.values, new.values)
×
510
            }
×
511
        }
×
512

×
513
        self.current += 1;
×
514
        if self.current == self.expected {
×
515
            debug!(
×
516
                "Received {} messages. Notifying sender to exit!",
×
517
                self.current
×
518
            );
×
519
            self.running.store(false, Ordering::Relaxed);
×
520
        }
×
521
        Ok(())
×
522
    }
×
523

×
524
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
×
525
        Ok(())
×
526
    }
×
527
}
×
528

×
529
#[test]
×
530
#[ignore]
×
531
fn test_pipeline_builder() {
×
532
    dozer_tracing::init_telemetry(false).unwrap();
×
533

×
534
    let mut pipeline = AppPipeline::new();
×
535

×
536
    let context = statement_to_pipeline(
×
537
        "SELECT  name, dname, salary \
×
538
        FROM user JOIN department ON user.department_id = department.did JOIN country ON user.country_id = country.cid ",
×
539
        &mut pipeline,
×
540
        Some("results".to_string())
×
541
    )
×
542
    .unwrap();
×
543

×
544
    let table_info = context.output_tables_map.get("results").unwrap();
×
545

×
546
    let latch = Arc::new(AtomicBool::new(true));
×
547

×
548
    let mut asm = AppSourceManager::new();
×
549
    asm.add(AppSource::new(
×
550
        "conn".to_string(),
×
551
        Arc::new(TestSourceFactory::new(latch.clone())),
×
552
        vec![
×
553
            ("user".to_string(), USER_PORT),
×
554
            ("department".to_string(), DEPARTMENT_PORT),
×
555
            ("country".to_string(), COUNTRY_PORT),
×
556
        ]
×
557
        .into_iter()
×
558
        .collect(),
×
559
    ))
×
560
    .unwrap();
×
561

×
562
    pipeline.add_sink(Arc::new(TestSinkFactory::new(8, latch)), "sink");
×
563
    pipeline
×
564
        .connect_nodes(
×
565
            &table_info.node,
×
566
            Some(table_info.port),
×
567
            "sink",
×
568
            Some(DEFAULT_PORT_HANDLE),
×
569
            true,
×
570
        )
×
571
        .unwrap();
×
572

×
573
    let mut app = App::new(asm);
×
574
    app.add_pipeline(pipeline);
×
575

×
576
    let dag = app.get_dag().unwrap();
×
577

×
578
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
579
    if tmp_dir.path().exists() {
×
580
        std::fs::remove_dir_all(tmp_dir.path())
×
581
            .unwrap_or_else(|_e| panic!("Unable to remove old dir"));
×
582
    }
×
583
    std::fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
584

×
585
    use std::time::Instant;
×
586
    let now = Instant::now();
×
587

×
588
    let tmp_dir = TempDir::new("test").unwrap();
×
589

×
590
    let mut executor = DagExecutor::new(
×
591
        &dag,
×
592
        tmp_dir.path(),
×
593
        ExecutorOptions::default(),
×
594
        Arc::new(AtomicBool::new(true)),
×
595
    )
×
596
    .unwrap();
×
597

×
598
    executor
×
599
        .start()
×
600
        .unwrap_or_else(|e| panic!("Unable to start the Executor: {e}"));
×
601
    assert!(executor.join().is_ok());
×
602

×
603
    let elapsed = now.elapsed();
×
604
    debug!("Elapsed: {:.2?}", elapsed);
×
605
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc