• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020902227

pending completion
4020902227

Pull #743

github

GitHub
Merge 57279c6b6 into a12da35a5
Pull Request #743: Chore clippy fix

165 of 165 new or added lines in 60 files covered. (100.0%)

23638 of 35485 relevant lines covered (66.61%)

38417.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/product/tests/pipeline_test.rs
1
use dozer_core::dag::app::App;
2
use dozer_core::dag::appsource::{AppSource, AppSourceManager};
3
use dozer_core::dag::channels::SourceChannelForwarder;
4
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
5
use dozer_core::dag::epoch::Epoch;
6
use dozer_core::dag::errors::ExecutionError;
7
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
8
use dozer_core::dag::node::{
9
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
10
};
11
use dozer_core::dag::record_store::RecordReader;
12
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::tracing::{debug, info};
15
use dozer_types::types::{
16
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
17
};
18

19
use std::collections::HashMap;
20
use std::sync::atomic::{AtomicBool, Ordering};
21
use std::sync::Arc;
22
use std::thread;
23
use std::time::Duration;
24
use tempdir::TempDir;
25

26
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
27

28
const USER_PORT: u16 = 0 as PortHandle;
29
const DEPARTMENT_PORT: u16 = 1 as PortHandle;
×
30

31
#[derive(Debug)]
×
32
pub struct TestSourceFactory {
33
    running: Arc<AtomicBool>,
34
}
35

×
36
impl TestSourceFactory {
×
37
    pub fn new(running: Arc<AtomicBool>) -> Self {
×
38
        Self { running }
×
39
    }
×
40
}
41

×
42
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
×
43
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
44
        Ok(vec![
×
45
            OutputPortDef::new(
×
46
                USER_PORT,
×
47
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
48
                    retr_old_records_for_updates: true,
×
49
                    retr_old_records_for_deletes: true,
×
50
                },
×
51
            ),
×
52
            OutputPortDef::new(
×
53
                DEPARTMENT_PORT,
×
54
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
55
                    retr_old_records_for_updates: true,
×
56
                    retr_old_records_for_deletes: true,
×
57
                },
×
58
            ),
×
59
        ])
×
60
    }
×
61

×
62
    fn get_output_schema(
×
63
        &self,
×
64
        port: &PortHandle,
×
65
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
×
66
        if port == &USER_PORT {
×
67
            Ok((
×
68
                Schema::empty()
×
69
                    .field(
×
70
                        FieldDefinition::new(
×
71
                            String::from("id"),
×
72
                            FieldType::Int,
×
73
                            false,
×
74
                            SourceDefinition::Dynamic,
×
75
                        ),
×
76
                        true,
×
77
                    )
×
78
                    .field(
×
79
                        FieldDefinition::new(
×
80
                            String::from("name"),
×
81
                            FieldType::String,
×
82
                            false,
×
83
                            SourceDefinition::Dynamic,
×
84
                        ),
×
85
                        false,
×
86
                    )
×
87
                    .field(
×
88
                        FieldDefinition::new(
×
89
                            String::from("department_id"),
×
90
                            FieldType::Int,
×
91
                            false,
×
92
                            SourceDefinition::Dynamic,
×
93
                        ),
×
94
                        false,
×
95
                    )
×
96
                    .field(
×
97
                        FieldDefinition::new(
×
98
                            String::from("salary"),
×
99
                            FieldType::Float,
×
100
                            false,
×
101
                            SourceDefinition::Dynamic,
×
102
                        ),
×
103
                        false,
×
104
                    )
×
105
                    .clone(),
×
106
                SchemaSQLContext::default(),
×
107
            ))
×
108
        } else if port == &DEPARTMENT_PORT {
×
109
            Ok((
×
110
                Schema::empty()
×
111
                    .field(
×
112
                        FieldDefinition::new(
×
113
                            String::from("id"),
×
114
                            FieldType::Int,
×
115
                            false,
×
116
                            SourceDefinition::Dynamic,
×
117
                        ),
×
118
                        true,
×
119
                    )
×
120
                    .field(
×
121
                        FieldDefinition::new(
×
122
                            String::from("name"),
×
123
                            FieldType::String,
×
124
                            false,
×
125
                            SourceDefinition::Dynamic,
×
126
                        ),
×
127
                        false,
×
128
                    )
×
129
                    .clone(),
×
130
                SchemaSQLContext::default(),
×
131
            ))
×
132
        } else {
×
133
            panic!("Invalid Port Handle {port}");
×
134
        }
×
135
    }
×
136

×
137
    fn build(
×
138
        &self,
×
139
        _output_schemas: HashMap<PortHandle, Schema>,
×
140
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
141
        Ok(Box::new(TestSource {
×
142
            running: self.running.clone(),
×
143
        }))
×
144
    }
×
145

×
146
    fn prepare(
×
147
        &self,
×
148
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
149
    ) -> Result<(), ExecutionError> {
×
150
        Ok(())
×
151
    }
×
152
}
×
153

×
154
#[derive(Debug)]
×
155
pub struct TestSource {
×
156
    running: Arc<AtomicBool>,
×
157
}
×
158

×
159
impl Source for TestSource {
×
160
    fn start(
×
161
        &self,
×
162
        fw: &mut dyn SourceChannelForwarder,
×
163
        _from_seq: Option<(u64, u64)>,
×
164
    ) -> Result<(), ExecutionError> {
×
165
        let operations = vec![
×
166
            (
×
167
                Operation::Insert {
×
168
                    new: Record::new(
×
169
                        None,
×
170
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
171
                        None,
×
172
                    ),
×
173
                },
×
174
                DEPARTMENT_PORT,
×
175
            ),
×
176
            (
×
177
                Operation::Insert {
×
178
                    new: Record::new(
×
179
                        None,
×
180
                        vec![Field::Int(1), Field::String("HR".to_string())],
×
181
                        None,
×
182
                    ),
×
183
                },
×
184
                DEPARTMENT_PORT,
×
185
            ),
×
186
            (
×
187
                Operation::Insert {
×
188
                    new: Record::new(
×
189
                        None,
×
190
                        vec![
×
191
                            Field::Int(10000),
×
192
                            Field::String("Alice".to_string()),
×
193
                            Field::Int(0),
×
194
                            Field::Float(OrderedFloat(1.1)),
×
195
                        ],
×
196
                        None,
×
197
                    ),
×
198
                },
×
199
                USER_PORT,
×
200
            ),
×
201
            (
×
202
                Operation::Insert {
×
203
                    new: Record::new(
×
204
                        None,
×
205
                        vec![Field::Int(1), Field::String("HR".to_string())],
×
206
                        None,
×
207
                    ),
×
208
                },
×
209
                DEPARTMENT_PORT,
×
210
            ),
×
211
            (
×
212
                Operation::Insert {
×
213
                    new: Record::new(
×
214
                        None,
×
215
                        vec![
×
216
                            Field::Int(10001),
×
217
                            Field::String("Bob".to_string()),
×
218
                            Field::Int(0),
×
219
                            Field::Float(OrderedFloat(1.1)),
×
220
                        ],
×
221
                        None,
×
222
                    ),
×
223
                },
×
224
                USER_PORT,
×
225
            ),
×
226
            (
×
227
                Operation::Insert {
×
228
                    new: Record::new(
×
229
                        None,
×
230
                        vec![
×
231
                            Field::Int(10002),
×
232
                            Field::String("Craig".to_string()),
×
233
                            Field::Int(1),
×
234
                            Field::Float(OrderedFloat(1.1)),
×
235
                        ],
×
236
                        None,
×
237
                    ),
×
238
                },
×
239
                USER_PORT,
×
240
            ),
×
241
            (
×
242
                Operation::Insert {
×
243
                    new: Record::new(
×
244
                        None,
×
245
                        vec![
×
246
                            Field::Int(10003),
×
247
                            Field::String("Dan".to_string()),
×
248
                            Field::Int(0),
×
249
                            Field::Float(OrderedFloat(1.1)),
×
250
                        ],
×
251
                        None,
×
252
                    ),
×
253
                },
×
254
                USER_PORT,
×
255
            ),
×
256
            (
×
257
                Operation::Insert {
×
258
                    new: Record::new(
×
259
                        None,
×
260
                        vec![
×
261
                            Field::Int(10004),
×
262
                            Field::String("Eve".to_string()),
×
263
                            Field::Int(1),
×
264
                            Field::Float(OrderedFloat(1.1)),
×
265
                        ],
×
266
                        None,
×
267
                    ),
×
268
                },
×
269
                USER_PORT,
×
270
            ),
×
271
            (
×
272
                Operation::Delete {
×
273
                    old: Record::new(
×
274
                        None,
×
275
                        vec![
×
276
                            Field::Int(10002),
×
277
                            Field::String("Craig".to_string()),
×
278
                            Field::Int(1),
×
279
                            Field::Float(OrderedFloat(1.1)),
×
280
                        ],
×
281
                        None,
×
282
                    ),
×
283
                },
×
284
                USER_PORT,
×
285
            ),
×
286
            (
×
287
                Operation::Insert {
×
288
                    new: Record::new(
×
289
                        None,
×
290
                        vec![
×
291
                            Field::Int(10004),
×
292
                            Field::String("Frank".to_string()),
×
293
                            Field::Int(1),
×
294
                            Field::Float(OrderedFloat(1.5)),
×
295
                        ],
×
296
                        None,
×
297
                    ),
×
298
                },
×
299
                USER_PORT,
×
300
            ),
×
301
            (
×
302
                Operation::Update {
×
303
                    old: Record::new(
×
304
                        None,
×
305
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
306
                        None,
×
307
                    ),
×
308
                    new: Record::new(
×
309
                        None,
×
310
                        vec![Field::Int(0), Field::String("XX".to_string())],
×
311
                        None,
×
312
                    ),
×
313
                },
×
314
                DEPARTMENT_PORT,
×
315
            ),
×
316
        ];
×
317

318
        for operation in operations.iter().enumerate() {
×
319
            match operation.1.clone().0 {
×
320
                Operation::Delete { old } => info!("{}: - {:?}", operation.1.clone().1, old.values),
×
321
                Operation::Insert { new } => info!("{}: + {:?}", operation.1.clone().1, new.values),
×
322
                Operation::Update { old, new } => {
×
323
                    info!(
×
324
                        "{}: - {:?}, + {:?}",
×
325
                        operation.1.clone().1,
×
326
                        old.values,
×
327
                        new.values
×
328
                    )
×
329
                }
×
330
            }
×
331
            fw.send(
×
332
                operation.0.try_into().unwrap(),
×
333
                0,
×
334
                operation.1.clone().0,
×
335
                operation.1.clone().1,
×
336
            )
×
337
            .unwrap();
×
338
        }
×
339

×
340
        loop {
×
341
            if !self.running.load(Ordering::Relaxed) {
×
342
                break;
×
343
            }
×
344
            thread::sleep(Duration::from_millis(500));
×
345
        }
×
346
        Ok(())
×
347
    }
×
348
}
×
349

×
350
#[derive(Debug)]
×
351
pub(crate) struct TestSinkFactory {
×
352
    expected: u64,
×
353
    running: Arc<AtomicBool>,
×
354
}
×
355

356
impl TestSinkFactory {
×
357
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
×
358
        Self {
×
359
            expected,
×
360
            running: barrier,
×
361
        }
×
362
    }
×
363
}
364

×
365
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
366
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
367
        vec![DEFAULT_PORT_HANDLE]
×
368
    }
×
369

370
    fn set_input_schema(
×
371
        &self,
×
372
        _input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
373
    ) -> Result<(), ExecutionError> {
×
374
        Ok(())
×
375
    }
×
376

377
    fn build(
×
378
        &self,
×
379
        _input_schemas: HashMap<PortHandle, Schema>,
×
380
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
381
        Ok(Box::new(TestSink {
×
382
            expected: self.expected,
×
383
            current: 0,
×
384
            running: self.running.clone(),
×
385
        }))
×
386
    }
×
387

×
388
    fn prepare(
×
389
        &self,
×
390
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
391
    ) -> Result<(), ExecutionError> {
×
392
        Ok(())
×
393
    }
×
394
}
×
395

×
396
#[derive(Debug)]
×
397
pub struct TestSink {
×
398
    expected: u64,
×
399
    current: u64,
×
400
    running: Arc<AtomicBool>,
×
401
}
×
402

403
impl Sink for TestSink {
×
404
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
405
        debug!("SINK: Initialising TestSink");
×
406
        Ok(())
×
407
    }
×
408

×
409
    fn process(
×
410
        &mut self,
×
411
        _from_port: PortHandle,
×
412
        _op: Operation,
×
413
        _state: &SharedTransaction,
×
414
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
×
415
    ) -> Result<(), ExecutionError> {
×
416
        match _op {
×
417
            Operation::Delete { old } => info!("s: - {:?}", old.values),
×
418
            Operation::Insert { new } => info!("s: + {:?}", new.values),
×
419
            Operation::Update { old, new } => {
×
420
                info!("s: - {:?}, + {:?}", old.values, new.values)
×
421
            }
×
422
        }
×
423

×
424
        self.current += 1;
×
425
        if self.current == self.expected {
×
426
            debug!(
×
427
                "Received {} messages. Notifying sender to exit!",
×
428
                self.current
×
429
            );
×
430
            self.running.store(false, Ordering::Relaxed);
×
431
        }
×
432
        Ok(())
×
433
    }
×
434

×
435
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
×
436
        Ok(())
×
437
    }
×
438
}
×
439

×
440
#[test]
×
441
#[ignore]
×
442
fn test_pipeline_builder() {
×
443
    dozer_tracing::init_telemetry(false).unwrap();
×
444

×
445
    let (mut pipeline, (node, port)) = statement_to_pipeline(
×
446
        "SELECT  department.name, SUM(user.salary) \
×
447
        FROM user JOIN department ON user.department_id = department.id \
×
448
        GROUP BY department.name",
×
449
    )
×
450
    .unwrap();
×
451

×
452
    let latch = Arc::new(AtomicBool::new(true));
×
453

×
454
    let mut asm = AppSourceManager::new();
×
455
    asm.add(AppSource::new(
×
456
        "conn1".to_string(),
×
457
        Arc::new(TestSourceFactory::new(latch.clone())),
×
458
        vec![
×
459
            ("user".to_string(), USER_PORT),
×
460
            ("department".to_string(), DEPARTMENT_PORT),
×
461
        ]
×
462
        .into_iter()
×
463
        .collect(),
×
464
    ))
×
465
    .unwrap();
×
466

×
467
    pipeline.add_sink(Arc::new(TestSinkFactory::new(8, latch)), "sink");
×
468
    pipeline
×
469
        .connect_nodes(&node, Some(port), "sink", Some(DEFAULT_PORT_HANDLE))
×
470
        .unwrap();
×
471

×
472
    let mut app = App::new(asm);
×
473
    app.add_pipeline(pipeline);
×
474

×
475
    let dag = app.get_dag().unwrap();
×
476

×
477
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
478
    if tmp_dir.path().exists() {
×
479
        std::fs::remove_dir_all(tmp_dir.path())
×
480
            .unwrap_or_else(|_e| panic!("Unable to remove old dir"));
×
481
    }
×
482
    std::fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
483

×
484
    use std::time::Instant;
×
485
    let now = Instant::now();
×
486

×
487
    let tmp_dir = TempDir::new("test").unwrap();
×
488

×
489
    let mut executor = DagExecutor::new(
×
490
        &dag,
×
491
        tmp_dir.path(),
×
492
        ExecutorOptions::default(),
×
493
        Arc::new(AtomicBool::new(true)),
×
494
    )
×
495
    .unwrap();
×
496

×
497
    executor
×
498
        .start()
×
499
        .unwrap_or_else(|e| panic!("Unable to start the Executor: {e}"));
×
500
    assert!(executor.join().is_ok());
×
501

502
    let elapsed = now.elapsed();
×
503
    debug!("Elapsed: {:.2?}", elapsed);
×
504
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc