• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5651881554

pending completion
5651881554

push

github

web-flow
feat: refactor records (#1781)

* feat: add processor record

* chore: refactor processor record

* chore: introduce processor operation

* chore: refactor processors

* chore: refactor log operations

* fix import errors

* chore: fix index

* chore: fix recursive fetch

* fix: Don't allow cloning `ProcessorRecord`

* chore: fix errors

* fix: Don't expose `ProcessorRecordRef` implementation detail

* fix: Add `ProcessorRecord::total_len` to correctly handle `extend_` functions

* fix: Fix tests compilation by @chloeminkyung

* fix: Fix window and lifetime sql unit tests

* fix: clippy fix

* chore: fmt fix

* chore: Move `ProcessorRecord` to `dozer-core`

---------

Co-authored-by: Chloe Kim <chloeminkyung@gmail.com>
Co-authored-by: chubei <914745487@qq.com>

849 of 849 new or added lines in 64 files covered. (100.0%)

43094 of 55381 relevant lines covered (77.81%)

29359.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/window/tests/pipeline_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSource, AppSourceManager};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::executor::{DagExecutor, ExecutorOptions};
5
use dozer_core::executor_operation::ProcessorOperation;
6
use dozer_core::node::{
7
    OutputPortDef, OutputPortType, PortHandle, Sink, SinkFactory, Source, SourceFactory,
8
};
9

10
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordRef};
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::chrono::{TimeZone, Utc};
13
use dozer_types::epoch::Epoch;
14
use dozer_types::errors::internal::BoxedError;
15
use dozer_types::ingestion_types::IngestionMessage;
16
use dozer_types::tracing::{debug, info};
17
use dozer_types::types::{Field, FieldDefinition, FieldType, Record, Schema, SourceDefinition};
18

19
use std::collections::HashMap;
20
use std::sync::atomic::{AtomicBool, Ordering};
21
use std::sync::Arc;
22
use std::thread;
23
use std::time::Duration;
24

25
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
26

27
const TRIPS_PORT: u16 = 0 as PortHandle;
28
const ZONES_PORT: u16 = 1 as PortHandle;
29
const EXPECTED_SINK_OP_COUNT: u64 = 12;
30
const DATE_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
31

32
#[test]
×
33
#[ignore]
34
fn test_pipeline_builder() {
×
35
    dozer_tracing::init_telemetry(None, None);
×
36

×
37
    let mut pipeline = AppPipeline::new();
×
38

×
39
    let context = statement_to_pipeline(
×
40
        // "SELECT trips.taxi_id, puz.zone, trips.completed_at, trips.window_start, trips.window_end \
×
41
        // FROM HOP(taxi_trips, completed_at, '1 MINUTE', '2 MINUTES') trips \
×
42
        // JOIN zones puz ON trips.pu_location_id = puz.location_id",
×
43
        "SELECT trips.taxi_id, trips.completed_at, trips.window_start, trips.window_end \
×
44
        FROM HOP(taxi_trips, completed_at, '1 MINUTE', '2 MINUTES') trips ",
×
45
        &mut pipeline,
×
46
        Some("results".to_string()),
×
47
    )
×
48
    .unwrap();
×
49

×
50
    let table_info = context.output_tables_map.get("results").unwrap();
×
51

×
52
    let latch = Arc::new(AtomicBool::new(true));
×
53

×
54
    let mut asm = AppSourceManager::new();
×
55
    asm.add(AppSource::new(
×
56
        "connection".to_string(),
×
57
        Arc::new(TestSourceFactory::new(latch.clone())),
×
58
        vec![
×
59
            ("taxi_trips".to_string(), TRIPS_PORT),
×
60
            ("zones".to_string(), ZONES_PORT),
×
61
        ]
×
62
        .into_iter()
×
63
        .collect(),
×
64
    ))
×
65
    .unwrap();
×
66

×
67
    pipeline.add_sink(
×
68
        Arc::new(TestSinkFactory::new(EXPECTED_SINK_OP_COUNT, latch)),
×
69
        "sink",
×
70
    );
×
71
    pipeline.connect_nodes(
×
72
        &table_info.node,
×
73
        Some(table_info.port),
×
74
        "sink",
×
75
        Some(DEFAULT_PORT_HANDLE),
×
76
        true,
×
77
    );
×
78

×
79
    let mut app = App::new(asm);
×
80
    app.add_pipeline(pipeline);
×
81

×
82
    let dag = app.get_dag().unwrap();
×
83

×
84
    let now = std::time::Instant::now();
×
85

×
86
    DagExecutor::new(dag, ExecutorOptions::default())
×
87
        .unwrap()
×
88
        .start(Arc::new(AtomicBool::new(true)))
×
89
        .unwrap()
×
90
        .join()
×
91
        .unwrap();
×
92

×
93
    let elapsed = now.elapsed();
×
94
    debug!("Elapsed: {:.2?}", elapsed);
×
95
}
×
96

97
#[derive(Debug)]
×
98
pub struct TestSourceFactory {
99
    running: Arc<AtomicBool>,
100
}
101

102
impl TestSourceFactory {
103
    pub fn new(running: Arc<AtomicBool>) -> Self {
×
104
        Self { running }
×
105
    }
×
106
}
107

108
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
109
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
110
        vec![
×
111
            OutputPortDef::new(TRIPS_PORT, OutputPortType::Stateless),
×
112
            OutputPortDef::new(ZONES_PORT, OutputPortType::Stateless),
×
113
        ]
×
114
    }
×
115

116
    fn get_output_schema(
×
117
        &self,
×
118
        port: &PortHandle,
×
119
    ) -> Result<(Schema, SchemaSQLContext), BoxedError> {
×
120
        if port == &TRIPS_PORT {
×
121
            let taxi_trips_source = SourceDefinition::Table {
×
122
                connection: "connection".to_string(),
×
123
                name: "taxi_trips".to_string(),
×
124
            };
×
125
            Ok((
×
126
                Schema::default()
×
127
                    .field(
×
128
                        FieldDefinition::new(
×
129
                            String::from("taxi_id"),
×
130
                            FieldType::UInt,
×
131
                            false,
×
132
                            taxi_trips_source.clone(),
×
133
                        ),
×
134
                        true,
×
135
                    )
×
136
                    .field(
×
137
                        FieldDefinition::new(
×
138
                            String::from("completed_at"),
×
139
                            FieldType::Timestamp,
×
140
                            false,
×
141
                            taxi_trips_source.clone(),
×
142
                        ),
×
143
                        false,
×
144
                    )
×
145
                    .field(
×
146
                        FieldDefinition::new(
×
147
                            String::from("pu_location_id"),
×
148
                            FieldType::UInt,
×
149
                            false,
×
150
                            taxi_trips_source,
×
151
                        ),
×
152
                        false,
×
153
                    )
×
154
                    .clone(),
×
155
                SchemaSQLContext::default(),
×
156
            ))
×
157
        } else if port == &ZONES_PORT {
×
158
            let source_id = SourceDefinition::Table {
×
159
                connection: "connection".to_string(),
×
160
                name: "zones".to_string(),
×
161
            };
×
162
            Ok((
×
163
                Schema::default()
×
164
                    .field(
×
165
                        FieldDefinition::new(
×
166
                            String::from("location_id"),
×
167
                            FieldType::UInt,
×
168
                            false,
×
169
                            source_id.clone(),
×
170
                        ),
×
171
                        true,
×
172
                    )
×
173
                    .field(
×
174
                        FieldDefinition::new(
×
175
                            String::from("zone"),
×
176
                            FieldType::String,
×
177
                            false,
×
178
                            source_id,
×
179
                        ),
×
180
                        false,
×
181
                    )
×
182
                    .clone(),
×
183
                SchemaSQLContext::default(),
×
184
            ))
×
185
        } else {
186
            panic!("Invalid Port Handle {port}");
×
187
        }
188
    }
×
189

190
    fn build(
×
191
        &self,
×
192
        _output_schemas: HashMap<PortHandle, Schema>,
×
193
    ) -> Result<Box<dyn Source>, BoxedError> {
×
194
        Ok(Box::new(TestSource {
×
195
            running: self.running.clone(),
×
196
        }))
×
197
    }
×
198
}
199

200
#[derive(Debug)]
×
201
pub struct TestSource {
202
    running: Arc<AtomicBool>,
203
}
204

205
impl Source for TestSource {
206
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
207
        Ok(false)
×
208
    }
×
209

210
    fn start(
×
211
        &self,
×
212
        fw: &mut dyn SourceChannelForwarder,
×
213
        _last_checkpoint: Option<(u64, u64)>,
×
214
    ) -> Result<(), BoxedError> {
×
215
        let operations = vec![
×
216
            (
×
217
                ProcessorOperation::Insert {
×
218
                    new: ProcessorRecordRef::new(ProcessorRecord::from(Record::new(vec![
×
219
                        Field::UInt(1001),
×
220
                        Field::Timestamp(
×
221
                            Utc.datetime_from_str("2023-02-01 22:00:00", DATE_FORMAT)
×
222
                                .unwrap()
×
223
                                .into(),
×
224
                        ),
×
225
                        Field::UInt(1),
×
226
                    ]))),
×
227
                },
×
228
                TRIPS_PORT,
×
229
            ),
×
230
            (
×
231
                ProcessorOperation::Insert {
×
232
                    new: ProcessorRecordRef::new(ProcessorRecord::from(Record::new(vec![
×
233
                        Field::UInt(1002),
×
234
                        Field::Timestamp(
×
235
                            Utc.datetime_from_str("2023-02-01 22:01:00", DATE_FORMAT)
×
236
                                .unwrap()
×
237
                                .into(),
×
238
                        ),
×
239
                        Field::UInt(2),
×
240
                    ]))),
×
241
                },
×
242
                TRIPS_PORT,
×
243
            ),
×
244
            (
×
245
                ProcessorOperation::Insert {
×
246
                    new: ProcessorRecordRef::new(ProcessorRecord::from(Record::new(vec![
×
247
                        Field::UInt(1003),
×
248
                        Field::Timestamp(
×
249
                            Utc.datetime_from_str("2023-02-01 22:02:10", DATE_FORMAT)
×
250
                                .unwrap()
×
251
                                .into(),
×
252
                        ),
×
253
                        Field::UInt(3),
×
254
                    ]))),
×
255
                },
×
256
                TRIPS_PORT,
×
257
            ),
×
258
            (
×
259
                ProcessorOperation::Insert {
×
260
                    new: ProcessorRecordRef::new(ProcessorRecord::from(Record::new(vec![
×
261
                        Field::UInt(1004),
×
262
                        Field::Timestamp(
×
263
                            Utc.datetime_from_str("2023-02-01 22:03:00", DATE_FORMAT)
×
264
                                .unwrap()
×
265
                                .into(),
×
266
                        ),
×
267
                        Field::UInt(2),
×
268
                    ]))),
×
269
                },
×
270
                TRIPS_PORT,
×
271
            ),
×
272
            (
×
273
                ProcessorOperation::Insert {
×
274
                    new: ProcessorRecordRef::new(ProcessorRecord::from(Record::new(vec![
×
275
                        Field::UInt(1005),
×
276
                        Field::Timestamp(
×
277
                            Utc.datetime_from_str("2023-02-01 22:05:00", DATE_FORMAT)
×
278
                                .unwrap()
×
279
                                .into(),
×
280
                        ),
×
281
                        Field::UInt(1),
×
282
                    ]))),
×
283
                },
×
284
                TRIPS_PORT,
×
285
            ),
×
286
            (
×
287
                ProcessorOperation::Insert {
×
288
                    new: ProcessorRecordRef::new(ProcessorRecord::from(Record::new(vec![
×
289
                        Field::UInt(1006),
×
290
                        Field::Timestamp(
×
291
                            Utc.datetime_from_str("2023-02-01 22:06:00", DATE_FORMAT)
×
292
                                .unwrap()
×
293
                                .into(),
×
294
                        ),
×
295
                        Field::UInt(2),
×
296
                    ]))),
×
297
                },
×
298
                TRIPS_PORT,
×
299
            ),
×
300
            // (
×
301
            //     Operation::Insert {
×
302
            //         new: Record::new(
×
303
            //             None,
×
304
            //             vec![Field::UInt(1), Field::String("Newark Airport".to_string())],
×
305
            //         ),
×
306
            //     },
×
307
            //     ZONES_PORT,
×
308
            // ),
×
309
            // (
×
310
            //     Operation::Insert {
×
311
            //         new: Record::new(
×
312
            //             None,
×
313
            //             vec![Field::UInt(2), Field::String("Jamaica Bay".to_string())],
×
314
            //         ),
×
315
            //     },
×
316
            //     ZONES_PORT,
×
317
            // ),
×
318
            // (
×
319
            //     Operation::Insert {
×
320
            //         new: Record::new(
×
321
            //             None,
×
322
            //             vec![
×
323
            //                 Field::UInt(3),
×
324
            //                 Field::String("Allerton/Pelham Gardens".to_string()),
×
325
            //             ],
×
326
            //         ),
×
327
            //     },
×
328
            //     ZONES_PORT,
×
329
            // ),
×
330
        ];
×
331

332
        for (index, (op, port)) in operations.into_iter().enumerate() {
×
333
            fw.send(
×
334
                IngestionMessage::new_op(index as u64, 0, 0, op.clone_deref()),
×
335
                port,
×
336
            )
×
337
            .unwrap();
×
338
        }
×
339

340
        loop {
341
            if !self.running.load(Ordering::Relaxed) {
×
342
                break;
×
343
            }
×
344
            thread::sleep(Duration::from_millis(500));
×
345
        }
346
        Ok(())
×
347
    }
×
348
}
349

350
#[derive(Debug)]
×
351
pub(crate) struct TestSinkFactory {
352
    expected: u64,
353
    running: Arc<AtomicBool>,
354
}
355

356
impl TestSinkFactory {
357
    pub fn new(expected: u64, barrier: Arc<AtomicBool>) -> Self {
×
358
        Self {
×
359
            expected,
×
360
            running: barrier,
×
361
        }
×
362
    }
×
363
}
364

365
impl SinkFactory<SchemaSQLContext> for TestSinkFactory {
366
    fn get_input_ports(&self) -> Vec<PortHandle> {
×
367
        vec![DEFAULT_PORT_HANDLE]
×
368
    }
×
369

370
    fn build(
×
371
        &self,
×
372
        _input_schemas: HashMap<PortHandle, Schema>,
×
373
    ) -> Result<Box<dyn Sink>, BoxedError> {
×
374
        Ok(Box::new(TestSink {
×
375
            expected: self.expected,
×
376
            current: 0,
×
377
            running: self.running.clone(),
×
378
        }))
×
379
    }
×
380

381
    fn prepare(
×
382
        &self,
×
383
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
384
    ) -> Result<(), BoxedError> {
×
385
        Ok(())
×
386
    }
×
387
}
388

389
#[derive(Debug)]
×
390
pub struct TestSink {
391
    expected: u64,
392
    current: u64,
393
    running: Arc<AtomicBool>,
394
}
395

396
impl Sink for TestSink {
397
    fn process(
×
398
        &mut self,
×
399
        _from_port: PortHandle,
×
400
        _op: ProcessorOperation,
×
401
    ) -> Result<(), BoxedError> {
×
402
        match _op {
×
403
            ProcessorOperation::Delete { old } => {
×
404
                info!("o0:-> - {:?}", old.get_record().get_fields())
×
405
            }
406
            ProcessorOperation::Insert { new } => {
×
407
                info!("o0:-> + {:?}", new.get_record().get_fields())
×
408
            }
409
            ProcessorOperation::Update { old, new } => {
×
410
                info!(
×
411
                    "o0:-> - {:?}, + {:?}",
×
412
                    old.get_record().get_fields(),
×
413
                    new.get_record().get_fields()
×
414
                )
×
415
            }
416
        }
417

418
        self.current += 1;
×
419
        if self.current == self.expected {
×
420
            debug!(
×
421
                "Received {} messages. Notifying sender to exit!",
×
422
                self.current
×
423
            );
×
424
            self.running.store(false, Ordering::Relaxed);
×
425
        }
×
426
        Ok(())
×
427
    }
×
428

429
    fn commit(&mut self, _epoch_details: &Epoch) -> Result<(), BoxedError> {
×
430
        Ok(())
×
431
    }
×
432

433
    fn on_source_snapshotting_done(&mut self, _connection_name: String) -> Result<(), BoxedError> {
×
434
        Ok(())
×
435
    }
×
436
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc