• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3978628498

pending completion
3978628498

Pull #705

github

GitHub
Merge 8775fcda7 into e2f9ad287
Pull Request #705: chore: support for generic schema context in `Sink`, `Processor` and `Source` factories

572 of 572 new or added lines in 35 files covered. (100.0%)

22294 of 34850 relevant lines covered (63.97%)

40332.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/product/tests/pipeline_test.rs
1
use dozer_core::dag::app::App;
2
use dozer_core::dag::appsource::{AppSource, AppSourceManager};
3
use dozer_core::dag::channels::SourceChannelForwarder;
4
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
5
use dozer_core::dag::epoch::Epoch;
6
use dozer_core::dag::errors::ExecutionError;
7
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
8
use dozer_core::dag::node::{
9
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
10
};
11
use dozer_core::dag::record_store::RecordReader;
12
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
13
use dozer_types::ordered_float::OrderedFloat;
14
use dozer_types::tracing::{debug, info};
15
use dozer_types::types::{Field, FieldDefinition, FieldType, Operation, Record, Schema};
16

17
use std::collections::HashMap;
18
use std::sync::atomic::{AtomicBool, Ordering};
19
use std::sync::Arc;
20
use std::thread;
21
use std::time::Duration;
22
use tempdir::TempDir;
23

24
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
25

26
const USER_PORT: u16 = 0 as PortHandle;
27
const DEPARTMENT_PORT: u16 = 1 as PortHandle;
28

29
#[derive(Debug)]
×
30
pub struct TestSourceFactory {
31
    running: Arc<AtomicBool>,
32
}
33

34
impl TestSourceFactory {
35
    pub fn new(running: Arc<AtomicBool>) -> Self {
×
36
        Self { running }
×
37
    }
×
38
}
39

40
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
41
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError> {
×
42
        Ok(vec![
×
43
            OutputPortDef::new(
×
44
                USER_PORT,
×
45
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
46
                    retr_old_records_for_updates: true,
×
47
                    retr_old_records_for_deletes: true,
×
48
                },
×
49
            ),
×
50
            OutputPortDef::new(
×
51
                DEPARTMENT_PORT,
×
52
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
53
                    retr_old_records_for_updates: true,
×
54
                    retr_old_records_for_deletes: true,
×
55
                },
×
56
            ),
×
57
        ])
×
58
    }
×
59

60
    fn get_output_schema(
×
61
        &self,
×
62
        port: &PortHandle,
×
63
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
×
64
        if port == &USER_PORT {
×
65
            Ok((
×
66
                Schema::empty()
×
67
                    .field(
×
68
                        FieldDefinition::new(String::from("id"), FieldType::Int, false),
×
69
                        true,
×
70
                    )
×
71
                    .field(
×
72
                        FieldDefinition::new(String::from("name"), FieldType::String, false),
×
73
                        false,
×
74
                    )
×
75
                    .field(
×
76
                        FieldDefinition::new(String::from("department_id"), FieldType::Int, false),
×
77
                        false,
×
78
                    )
×
79
                    .field(
×
80
                        FieldDefinition::new(String::from("salary"), FieldType::Float, false),
×
81
                        false,
×
82
                    )
×
83
                    .clone(),
×
84
                SchemaSQLContext {},
×
85
            ))
×
86
        } else if port == &DEPARTMENT_PORT {
×
87
            Ok((
×
88
                Schema::empty()
×
89
                    .field(
×
90
                        FieldDefinition::new(String::from("id"), FieldType::Int, false),
×
91
                        true,
×
92
                    )
×
93
                    .field(
×
94
                        FieldDefinition::new(String::from("name"), FieldType::String, false),
×
95
                        false,
×
96
                    )
×
97
                    .clone(),
×
98
                SchemaSQLContext {},
×
99
            ))
×
100
        } else {
×
101
            panic!("Invalid Port Handle {}", port);
×
102
        }
×
103
    }
×
104

105
    fn build(
×
106
        &self,
×
107
        _output_schemas: HashMap<PortHandle, Schema>,
×
108
    ) -> Result<Box<dyn Source>, ExecutionError> {
×
109
        Ok(Box::new(TestSource {
×
110
            running: self.running.clone(),
×
111
        }))
×
112
    }
×
113

114
    fn prepare(
×
115
        &self,
×
116
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
117
    ) -> Result<(), ExecutionError> {
×
118
        Ok(())
×
119
    }
×
120
}
×
121

×
122
#[derive(Debug)]
×
123
pub struct TestSource {
×
124
    running: Arc<AtomicBool>,
×
125
}
×
126

×
127
impl Source for TestSource {
×
128
    fn start(
×
129
        &self,
×
130
        fw: &mut dyn SourceChannelForwarder,
×
131
        _from_seq: Option<(u64, u64)>,
×
132
    ) -> Result<(), ExecutionError> {
×
133
        let operations = vec![
×
134
            (
×
135
                Operation::Insert {
×
136
                    new: Record::new(
×
137
                        None,
×
138
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
139
                        None,
×
140
                    ),
×
141
                },
×
142
                DEPARTMENT_PORT,
×
143
            ),
×
144
            (
×
145
                Operation::Insert {
×
146
                    new: Record::new(
×
147
                        None,
×
148
                        vec![Field::Int(1), Field::String("HR".to_string())],
×
149
                        None,
×
150
                    ),
×
151
                },
×
152
                DEPARTMENT_PORT,
×
153
            ),
×
154
            (
×
155
                Operation::Insert {
×
156
                    new: Record::new(
×
157
                        None,
×
158
                        vec![
×
159
                            Field::Int(10000),
×
160
                            Field::String("Alice".to_string()),
×
161
                            Field::Int(0),
×
162
                            Field::Float(OrderedFloat(1.1)),
×
163
                        ],
×
164
                        None,
×
165
                    ),
×
166
                },
×
167
                USER_PORT,
×
168
            ),
×
169
            (
×
170
                Operation::Insert {
×
171
                    new: Record::new(
×
172
                        None,
×
173
                        vec![Field::Int(1), Field::String("HR".to_string())],
×
174
                        None,
×
175
                    ),
×
176
                },
×
177
                DEPARTMENT_PORT,
×
178
            ),
×
179
            (
×
180
                Operation::Insert {
×
181
                    new: Record::new(
×
182
                        None,
×
183
                        vec![
×
184
                            Field::Int(10001),
×
185
                            Field::String("Bob".to_string()),
×
186
                            Field::Int(0),
×
187
                            Field::Float(OrderedFloat(1.1)),
×
188
                        ],
×
189
                        None,
×
190
                    ),
×
191
                },
×
192
                USER_PORT,
×
193
            ),
×
194
            (
×
195
                Operation::Insert {
×
196
                    new: Record::new(
×
197
                        None,
×
198
                        vec![
×
199
                            Field::Int(10002),
×
200
                            Field::String("Craig".to_string()),
×
201
                            Field::Int(1),
×
202
                            Field::Float(OrderedFloat(1.1)),
×
203
                        ],
×
204
                        None,
×
205
                    ),
×
206
                },
×
207
                USER_PORT,
×
208
            ),
×
209
            (
×
210
                Operation::Insert {
×
211
                    new: Record::new(
×
212
                        None,
×
213
                        vec![
×
214
                            Field::Int(10003),
×
215
                            Field::String("Dan".to_string()),
×
216
                            Field::Int(0),
×
217
                            Field::Float(OrderedFloat(1.1)),
×
218
                        ],
×
219
                        None,
×
220
                    ),
×
221
                },
×
222
                USER_PORT,
×
223
            ),
×
224
            (
×
225
                Operation::Insert {
×
226
                    new: Record::new(
×
227
                        None,
×
228
                        vec![
×
229
                            Field::Int(10004),
×
230
                            Field::String("Eve".to_string()),
×
231
                            Field::Int(1),
×
232
                            Field::Float(OrderedFloat(1.1)),
×
233
                        ],
×
234
                        None,
×
235
                    ),
×
236
                },
×
237
                USER_PORT,
×
238
            ),
×
239
            (
×
240
                Operation::Delete {
×
241
                    old: Record::new(
×
242
                        None,
×
243
                        vec![
×
244
                            Field::Int(10002),
×
245
                            Field::String("Craig".to_string()),
×
246
                            Field::Int(1),
×
247
                            Field::Float(OrderedFloat(1.1)),
×
248
                        ],
×
249
                        None,
×
250
                    ),
×
251
                },
×
252
                USER_PORT,
×
253
            ),
×
254
            (
×
255
                Operation::Insert {
×
256
                    new: Record::new(
×
257
                        None,
×
258
                        vec![
×
259
                            Field::Int(10004),
×
260
                            Field::String("Frank".to_string()),
×
261
                            Field::Int(1),
×
262
                            Field::Float(OrderedFloat(1.5)),
×
263
                        ],
×
264
                        None,
×
265
                    ),
×
266
                },
×
267
                USER_PORT,
×
268
            ),
×
269
            (
×
270
                Operation::Update {
×
271
                    old: Record::new(
×
272
                        None,
×
273
                        vec![Field::Int(0), Field::String("IT".to_string())],
×
274
                        None,
×
275
                    ),
×
276
                    new: Record::new(
×
277
                        None,
×
278
                        vec![Field::Int(0), Field::String("XX".to_string())],
×
279
                        None,
×
280
                    ),
×
281
                },
×
282
                DEPARTMENT_PORT,
×
283
            ),
×
284
        ];
×
285

286
        for operation in operations.iter().enumerate() {
×
287
            match operation.1.clone().0 {
×
288
                Operation::Delete { old } => info!("{}: - {:?}", operation.1.clone().1, old.values),
×
289
                Operation::Insert { new } => info!("{}: + {:?}", operation.1.clone().1, new.values),
×
290
                Operation::Update { old, new } => {
×
291
                    info!(
×
292
                        "{}: - {:?}, + {:?}",
×
293
                        operation.1.clone().1,
×
294
                        old.values,
×
295
                        new.values
×
296
                    )
×
297
                }
×
298
            }
×
299
            fw.send(
×
300
                operation.0.try_into().unwrap(),
×
301
                0,
×
302
                operation.1.clone().0,
×
303
                operation.1.clone().1,
×
304
            )
×
305
            .unwrap();
×
306
        }
×
307

308
        loop {
309
            if !self.running.load(Ordering::Relaxed) {
×
310
                break;
×
311
            }
×
312
            thread::sleep(Duration::from_millis(500));
×
313
        }
×
314
        Ok(())
×
315
    }
×
316
}
×
317

×
318
#[derive(Debug)]
×
319
pub(crate) struct TestSinkFactory {
320
    expected: u64,
321
    running: Arc<AtomicBool>,
322
}
×
323

×
324
impl TestSinkFactory {
×
325
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
×
326
        Self {
×
327
            expected,
×
328
            running: barrier,
×
329
        }
×
330
    }
×
331
}
×
332

333
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
×
334
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
335
        vec![DEFAULT_PORT_HANDLE]
×
336
    }
×
337

×
338
    fn set_input_schema(
×
339
        &self,
×
340
        _input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
341
    ) -> Result<(), ExecutionError> {
×
342
        Ok(())
×
343
    }
×
344

×
345
    fn build(
×
346
        &self,
×
347
        _input_schemas: HashMap<PortHandle, Schema>,
×
348
    ) -> Result<Box<dyn Sink>, ExecutionError> {
×
349
        Ok(Box::new(TestSink {
×
350
            expected: self.expected,
×
351
            current: 0,
×
352
            running: self.running.clone(),
×
353
        }))
×
354
    }
×
355

356
    fn prepare(
×
357
        &self,
×
358
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
359
    ) -> Result<(), ExecutionError> {
×
360
        Ok(())
×
361
    }
×
362
}
×
363

×
364
#[derive(Debug)]
×
365
pub struct TestSink {
×
366
    expected: u64,
×
367
    current: u64,
×
368
    running: Arc<AtomicBool>,
×
369
}
×
370

×
371
impl Sink for TestSink {
×
372
    fn init(&mut self, _env: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
×
373
        debug!("SINK: Initialising TestSink");
×
374
        Ok(())
×
375
    }
×
376

377
    fn process(
×
378
        &mut self,
×
379
        _from_port: PortHandle,
×
380
        _op: Operation,
×
381
        _state: &SharedTransaction,
×
382
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
×
383
    ) -> Result<(), ExecutionError> {
×
384
        match _op {
×
385
            Operation::Delete { old } => info!("s: - {:?}", old.values),
×
386
            Operation::Insert { new } => info!("s: + {:?}", new.values),
×
387
            Operation::Update { old, new } => {
×
388
                info!("s: - {:?}, + {:?}", old.values, new.values)
×
389
            }
×
390
        }
×
391

392
        self.current += 1;
×
393
        if self.current == self.expected {
×
394
            debug!(
×
395
                "Received {} messages. Notifying sender to exit!",
×
396
                self.current
×
397
            );
×
398
            self.running.store(false, Ordering::Relaxed);
×
399
        }
×
400
        Ok(())
×
401
    }
×
402

×
403
    fn commit(&mut self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
×
404
        Ok(())
×
405
    }
×
406
}
×
407

×
408
#[test]
×
409
#[ignore]
×
410
fn test_pipeline_builder() {
×
411
    dozer_tracing::init_telemetry(false).unwrap();
×
412

×
413
    let (mut pipeline, (node, port)) = statement_to_pipeline(
×
414
        "SELECT  department.name, SUM(user.salary) \
×
415
        FROM user JOIN department ON user.department_id = department.id \
×
416
        GROUP BY department.name",
×
417
    )
×
418
    .unwrap();
×
419

×
420
    let latch = Arc::new(AtomicBool::new(true));
×
421

×
422
    let mut asm = AppSourceManager::new();
×
423
    asm.add(AppSource::new(
×
424
        "conn1".to_string(),
×
425
        Arc::new(TestSourceFactory::new(latch.clone())),
×
426
        vec![
×
427
            ("user".to_string(), USER_PORT),
×
428
            ("department".to_string(), DEPARTMENT_PORT),
×
429
        ]
×
430
        .into_iter()
×
431
        .collect(),
×
432
    ))
×
433
    .unwrap();
×
434

×
435
    pipeline.add_sink(Arc::new(TestSinkFactory::new(8, latch)), "sink");
×
436
    pipeline
×
437
        .connect_nodes(&node, Some(port), "sink", Some(DEFAULT_PORT_HANDLE))
×
438
        .unwrap();
×
439

×
440
    let mut app = App::new(asm);
×
441
    app.add_pipeline(pipeline);
×
442

×
443
    let dag = app.get_dag().unwrap();
×
444

×
445
    let tmp_dir = TempDir::new("example").unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
446
    if tmp_dir.path().exists() {
×
447
        std::fs::remove_dir_all(tmp_dir.path())
×
448
            .unwrap_or_else(|_e| panic!("Unable to remove old dir"));
×
449
    }
×
450
    std::fs::create_dir(tmp_dir.path()).unwrap_or_else(|_e| panic!("Unable to create temp dir"));
×
451

×
452
    use std::time::Instant;
×
453
    let now = Instant::now();
×
454

×
455
    let tmp_dir = TempDir::new("test").unwrap();
×
456

×
457
    let mut executor = DagExecutor::new(
×
458
        &dag,
×
459
        tmp_dir.path(),
×
460
        ExecutorOptions::default(),
×
461
        Arc::new(AtomicBool::new(true)),
×
462
    )
×
463
    .unwrap();
×
464

×
465
    executor
×
466
        .start()
×
467
        .unwrap_or_else(|e| panic!("Unable to start the Executor: {}", e));
×
468
    assert!(executor.join().is_ok());
×
469

470
    let elapsed = now.elapsed();
×
471
    debug!("Elapsed: {:.2?}", elapsed);
×
472
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc