• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965135367

pending completion
3965135367

Pull #680

github

GitHub
Merge 1add77327 into 56c0cf2b3
Pull Request #680: feat: Implement nested queries and CTE.

506 of 506 new or added lines in 18 files covered. (100.0%)

21999 of 33062 relevant lines covered (66.54%)

50489.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/product/tests/pipeline_test.rs
1
use dozer_core::dag::app::App;
2
use dozer_core::dag::appsource::{AppSource, AppSourceManager};
3
use dozer_core::dag::channels::SourceChannelForwarder;
4
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
5
use dozer_core::dag::epoch::Epoch;
6
use dozer_core::dag::errors::ExecutionError;
7
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
8
use dozer_core::dag::node::{
9
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
10
};
11
use dozer_core::dag::record_store::RecordReader;
12
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::tracing::{debug, info};
15
use dozer_types::types::{Field, FieldDefinition, FieldType, Operation, Record, Schema};
16

17
use std::collections::HashMap;
18
use std::sync::atomic::{AtomicBool, Ordering};
19
use std::sync::Arc;
20
use std::thread;
21
use std::time::Duration;
22
use tempdir::TempDir;
23

24
use crate::pipeline::builder::statement_to_pipeline;
25

26
const USER_PORT: u16 = 0 as PortHandle;
27
const DEPARTMENT_PORT: u16 = 1 as PortHandle;
28

29
#[derive(Debug)]
×
30
pub struct TestSourceFactory {
31
    running: Arc<AtomicBool>,
32
}
33

34
impl TestSourceFactory {
35
    pub fn new(running: Arc<AtomicBool>) -> Self {
×
36
        Self { running }
×
37
    }
×
38
}
39

40
impl SourceFactory for TestSourceFactory {
41
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
42
        Ok(vec![
×
43
            OutputPortDef::new(
×
44
                USER_PORT,
×
45
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
46
                    retr_old_records_for_updates: true,
×
47
                    retr_old_records_for_deletes: true,
×
48
                },
×
49
            ),
×
50
            OutputPortDef::new(
×
51
                DEPARTMENT_PORT,
×
52
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
53
                    retr_old_records_for_updates: true,
×
54
                    retr_old_records_for_deletes: true,
×
55
                },
×
56
            ),
×
57
        ])
×
58
    }
×
59

60
    fn get_output_schema(&self, port: &PortHandle) -> Result<Schema, ExecutionError> {
×
61
        if port == &USER_PORT {
×
62
            Ok(Schema::empty()
×
63
                .field(
×
64
                    FieldDefinition::new(String::from("id"), FieldType::Int, false),
×
65
                    true,
×
66
                )
×
67
                .field(
×
68
                    FieldDefinition::new(String::from("name"), FieldType::String, false),
×
69
                    false,
×
70
                )
×
71
                .field(
×
72
                    FieldDefinition::new(String::from("department_id"), FieldType::Int, false),
×
73
                    false,
×
74
                )
×
75
                .field(
×
76
                    FieldDefinition::new(String::from("salary"), FieldType::Float, false),
×
77
                    false,
×
78
                )
×
79
                .clone())
×
80
        } else if port == &DEPARTMENT_PORT {
×
81
            Ok(Schema::empty()
×
82
                .field(
×
83
                    FieldDefinition::new(String::from("id"), FieldType::Int, false),
×
84
                    true,
×
85
                )
×
86
                .field(
×
87
                    FieldDefinition::new(String::from("name"), FieldType::String, false),
×
88
                    false,
×
89
                )
×
90
                .clone())
×
91
        } else {
92
            panic!("Invalid Port Handle {}", port);
×
93
        }
94
    }
×
95

96
    fn build(
×
97
        &self,
×
98
        _output_schemas: HashMap<PortHandle, Schema>,
×
99
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
100
        Ok(Box::new(TestSource {
×
101
            running: self.running.clone(),
×
102
        }))
×
103
    }
×
104

105
    fn prepare(&self, _output_schemas: HashMap<PortHandle, Schema>) -> Result<(), ExecutionError> {
×
106
        Ok(())
×
107
    }
×
108
}
109

110
#[derive(Debug)]
×
111
pub struct TestSource {
112
    running: Arc<AtomicBool>,
113
}
114

115
impl Source for TestSource {
116
    fn start(
×
117
        &self,
×
118
        fw: &mut dyn SourceChannelForwarder,
×
119
        _from_seq: Option<(u64, u64)>,
×
120
    ) -> Result<(), ExecutionError> {
×
121
        let operations = vec![
×
122
            (
×
123
                Operation::Insert {
×
124
                    new: Record::new(
×
125
                        None,
×
126
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
127
                        None,
×
128
                    ),
×
129
                },
×
130
                DEPARTMENT_PORT,
×
131
            ),
×
132
            (
×
133
                Operation::Insert {
×
134
                    new: Record::new(
×
135
                        None,
×
136
                        vec![Field::Int(1), Field::String("HR".to_string())],
×
137
                        None,
×
138
                    ),
×
139
                },
×
140
                DEPARTMENT_PORT,
×
141
            ),
×
142
            (
×
143
                Operation::Insert {
×
144
                    new: Record::new(
×
145
                        None,
×
146
                        vec![
×
147
                            Field::Int(10000),
×
148
                            Field::String("Alice".to_string()),
×
149
                            Field::Int(0),
×
150
                            Field::Float(OrderedFloat(1.1)),
×
151
                        ],
×
152
                        None,
×
153
                    ),
×
154
                },
×
155
                USER_PORT,
×
156
            ),
×
157
            (
×
158
                Operation::Insert {
×
159
                    new: Record::new(
×
160
                        None,
×
161
                        vec![Field::Int(1), Field::String("HR".to_string())],
×
162
                        None,
×
163
                    ),
×
164
                },
×
165
                DEPARTMENT_PORT,
×
166
            ),
×
167
            (
×
168
                Operation::Insert {
×
169
                    new: Record::new(
×
170
                        None,
×
171
                        vec![
×
172
                            Field::Int(10001),
×
173
                            Field::String("Bob".to_string()),
×
174
                            Field::Int(0),
×
175
                            Field::Float(OrderedFloat(1.1)),
×
176
                        ],
×
177
                        None,
×
178
                    ),
×
179
                },
×
180
                USER_PORT,
×
181
            ),
×
182
            (
×
183
                Operation::Insert {
×
184
                    new: Record::new(
×
185
                        None,
×
186
                        vec![
×
187
                            Field::Int(10002),
×
188
                            Field::String("Craig".to_string()),
×
189
                            Field::Int(1),
×
190
                            Field::Float(OrderedFloat(1.1)),
×
191
                        ],
×
192
                        None,
×
193
                    ),
×
194
                },
×
195
                USER_PORT,
×
196
            ),
×
197
            (
×
198
                Operation::Insert {
×
199
                    new: Record::new(
×
200
                        None,
×
201
                        vec![
×
202
                            Field::Int(10003),
×
203
                            Field::String("Dan".to_string()),
×
204
                            Field::Int(0),
×
205
                            Field::Float(OrderedFloat(1.1)),
×
206
                        ],
×
207
                        None,
×
208
                    ),
×
209
                },
×
210
                USER_PORT,
×
211
            ),
×
212
            (
×
213
                Operation::Insert {
×
214
                    new: Record::new(
×
215
                        None,
×
216
                        vec![
×
217
                            Field::Int(10004),
×
218
                            Field::String("Eve".to_string()),
×
219
                            Field::Int(1),
×
220
                            Field::Float(OrderedFloat(1.1)),
×
221
                        ],
×
222
                        None,
×
223
                    ),
×
224
                },
×
225
                USER_PORT,
×
226
            ),
×
227
            (
×
228
                Operation::Delete {
×
229
                    old: Record::new(
×
230
                        None,
×
231
                        vec![
×
232
                            Field::Int(10002),
×
233
                            Field::String("Craig".to_string()),
×
234
                            Field::Int(1),
×
235
                            Field::Float(OrderedFloat(1.1)),
×
236
                        ],
×
237
                        None,
×
238
                    ),
×
239
                },
×
240
                USER_PORT,
×
241
            ),
×
242
            (
×
243
                Operation::Insert {
×
244
                    new: Record::new(
×
245
                        None,
×
246
                        vec![
×
247
                            Field::Int(10004),
×
248
                            Field::String("Frank".to_string()),
×
249
                            Field::Int(1),
×
250
                            Field::Float(OrderedFloat(1.5)),
×
251
                        ],
×
252
                        None,
×
253
                    ),
×
254
                },
×
255
                USER_PORT,
×
256
            ),
×
257
            (
×
258
                Operation::Update {
×
259
                    old: Record::new(
×
260
                        None,
×
261
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
262
                        None,
×
263
                    ),
×
264
                    new: Record::new(
×
265
                        None,
×
266
                        vec![Field::Int(0), Field::String("XX".to_string())],
×
267
                        None,
×
268
                    ),
×
269
                },
×
270
                DEPARTMENT_PORT,
×
271
            ),
×
272
        ];
×
273

274
        for operation in operations.iter().enumerate() {
×
275
            match operation.1.clone().0 {
×
276
                Operation::Delete { old } => info!("{}: - {:?}", operation.1.clone().1, old.values),
×
277
                Operation::Insert { new } => info!("{}: + {:?}", operation.1.clone().1, new.values),
×
278
                Operation::Update { old, new } => {
×
279
                    info!(
×
280
                        "{}: - {:?}, + {:?}",
×
281
                        operation.1.clone().1,
×
282
                        old.values,
×
283
                        new.values
×
284
                    )
×
285
                }
286
            }
287
            fw.send(
×
288
                operation.0.try_into().unwrap(),
×
289
                0,
×
290
                operation.1.clone().0,
×
291
                operation.1.clone().1,
×
292
            )
×
293
            .unwrap();
×
294
        }
295

296
        loop {
297
            if !self.running.load(Ordering::Relaxed) {
×
298
                break;
×
299
            }
×
300
            thread::sleep(Duration::from_millis(500));
×
301
        }
302
        Ok(())
×
303
    }
×
304
}
305

306
#[derive(Debug)]
×
307
pub(crate) struct TestSinkFactory {
308
    expected: u64,
309
    running: Arc<AtomicBool>,
310
}
311

312
impl TestSinkFactory {
313
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
×
314
        Self {
×
315
            expected,
×
316
            running: barrier,
×
317
        }
×
318
    }
×
319
}
320

321
impl SinkFactory for TestSinkFactory {
322
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
323
        vec![DEFAULT_PORT_HANDLE]
×
324
    }
×
325

326
    fn set_input_schema(
×
327
        &self,
×
328
        _input_schemas: &HashMap<PortHandle, Schema>,
×
329
    ) -> Result<(), ExecutionError> {
×
330
        Ok(())
×
331
    }
×
332

333
    fn build(
×
334
        &self,
×
335
        _input_schemas: HashMap<PortHandle, Schema>,
×
336
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
337
        Ok(Box::new(TestSink {
×
338
            expected: self.expected,
×
339
            current: 0,
×
340
            running: self.running.clone(),
×
341
        }))
×
342
    }
×
343

344
    fn prepare(&self, _input_schemas: HashMap<PortHandle, Schema>) -> Result<(), ExecutionError> {
×
345
        Ok(())
×
346
    }
×
347
}
348

349
#[derive(Debug)]
×
350
pub struct TestSink {
351
    expected: u64,
352
    current: u64,
353
    running: Arc<AtomicBool>,
354
}
355

356
impl Sink for TestSink {
357
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
358
        debug!("SINK: Initialising TestSink");
×
359
        Ok(())
×
360
    }
×
361

362
    fn process(
×
363
        &mut self,
×
364
        _from_port: PortHandle,
×
365
        _op: Operation,
×
366
        _state: &SharedTransaction,
×
367
        _reader: &HashMap<PortHandle, RecordReader>,
×
368
    ) -> Result<(), ExecutionError> {
×
369
        match _op {
×
370
            Operation::Delete { old } => info!("s: - {:?}", old.values),
×
371
            Operation::Insert { new } => info!("s: + {:?}", new.values),
×
372
            Operation::Update { old, new } => {
×
373
                info!("s: - {:?}, + {:?}", old.values, new.values)
×
374
            }
375
        }
376

377
        self.current += 1;
×
378
        if self.current == self.expected {
×
379
            debug!(
×
380
                "Received {} messages. Notifying sender to exit!",
×
381
                self.current
×
382
            );
×
383
            self.running.store(false, Ordering::Relaxed);
×
384
        }
×
385
        Ok(())
×
386
    }
×
387

388
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
×
389
        Ok(())
×
390
    }
×
391
}
392

393
#[test]
×
394
#[ignore]
395
fn test_pipeline_builder() {
×
396
    dozer_tracing::init_telemetry(false).unwrap();
×
397

×
398
    let (mut pipeline, _) = statement_to_pipeline(
×
399
        "SELECT  department.name, SUM(user.salary) \
×
400
        FROM user JOIN department ON user.department_id = department.id \
×
401
        GROUP BY department.name",
×
402
    )
×
403
    .unwrap();
×
404

×
405
    let latch = Arc::new(AtomicBool::new(true));
×
406

×
407
    let mut asm = AppSourceManager::new();
×
408
    asm.add(AppSource::new(
×
409
        "conn1".to_string(),
×
410
        Arc::new(TestSourceFactory::new(latch.clone())),
×
411
        vec![
×
412
            ("user".to_string(), USER_PORT),
×
413
            ("department".to_string(), DEPARTMENT_PORT),
×
414
        ]
×
415
        .into_iter()
×
416
        .collect(),
×
417
    ))
×
418
    .unwrap();
×
419

×
420
    pipeline.add_sink(Arc::new(TestSinkFactory::new(8, latch)), "sink");
×
421
    pipeline
×
422
        .connect_nodes(
×
423
            "aggregation",
×
424
            Some(DEFAULT_PORT_HANDLE),
×
425
            "sink",
×
426
            Some(DEFAULT_PORT_HANDLE),
×
427
        )
×
428
        .unwrap();
×
429

×
430
    let mut app = App::new(asm);
×
431
    app.add_pipeline(pipeline);
×
432

×
433
    let dag = app.get_dag().unwrap();
×
434

×
435
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
436
    if tmp_dir.path().exists() {
×
437
        std::fs::remove_dir_all(tmp_dir.path())
×
438
            .unwrap_or_else(|_e| panic!("Unable to remove old dir"));
×
439
    }
×
440
    std::fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
441

×
442
    use std::time::Instant;
×
443
    let now = Instant::now();
×
444

×
445
    let tmp_dir = TempDir::new("test").unwrap();
×
446

×
447
    let mut executor = DagExecutor::new(
×
448
        &dag,
×
449
        tmp_dir.path(),
×
450
        ExecutorOptions::default(),
×
451
        Arc::new(AtomicBool::new(true)),
×
452
    )
×
453
    .unwrap();
×
454

×
455
    executor
×
456
        .start()
×
457
        .unwrap_or_else(|e| panic!("Unable to start the Executor: {}", e));
×
458
    assert!(executor.join().is_ok());
×
459

×
460
    let elapsed = now.elapsed();
×
461
    debug!("Elapsed: {:.2?}", elapsed);
×
462
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc