• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5832882754

pending completion
5832882754

push

github

web-flow
feat: Publish DAG to JSON (#1824)

Co-authored-by: chubei <914745487@qq.com>

251 of 251 new or added lines in 14 files covered. (100.0%)

45594 of 62527 relevant lines covered (72.92%)

39203.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-sql/src/pipeline/table_operator/tests/pipeline_test.rs
1
use dozer_core::app::{App, AppPipeline};
2
use dozer_core::appsource::{AppSourceManager, AppSourceMappings};
3
use dozer_core::channels::SourceChannelForwarder;
4
use dozer_core::executor::{DagExecutor, ExecutorOptions};
5
use dozer_core::node::{OutputPortDef, OutputPortType, PortHandle, Source, SourceFactory};
6

7
use dozer_core::DEFAULT_PORT_HANDLE;
8
use dozer_types::chrono::{TimeZone, Utc};
9
use dozer_types::errors::internal::BoxedError;
10
use dozer_types::ingestion_types::IngestionMessage;
11
use dozer_types::tracing::debug;
12
use dozer_types::types::{
13
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition,
14
};
15

16
use std::collections::HashMap;
17
use std::sync::atomic::AtomicBool;
18
use std::sync::Arc;
19
use std::thread;
20
use std::time::Duration;
21

22
use crate::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
23
use crate::pipeline::product::tests::pipeline_test::TestSinkFactory;
24

25
const TRIPS_PORT: u16 = 0 as PortHandle;
26
const ZONES_PORT: u16 = 1 as PortHandle;
27
const EXPECTED_SINK_OP_COUNT: u64 = 12;
28
const DATE_FORMAT: &str = "%Y-%m-%d %H:%M:%S";
29

30
#[test]
×
31
#[ignore]
32
fn test_lifetime_pipeline() {
×
33
    dozer_tracing::init_telemetry(None, None);
×
34

×
35
    let mut pipeline = AppPipeline::new();
×
36

×
37
    let context = statement_to_pipeline(
×
38
        "SELECT trips.taxi_id, puz.zone, trips.completed_at \
×
39
                FROM TTL(taxi_trips, completed_at, '3 MINUTES') trips \
×
40
                JOIN zones puz ON trips.pu_location_id = puz.location_id",
×
41
        &mut pipeline,
×
42
        Some("results".to_string()),
×
43
    )
×
44
    .unwrap();
×
45

×
46
    let table_info = context.output_tables_map.get("results").unwrap();
×
47

×
48
    let latch = Arc::new(AtomicBool::new(true));
×
49

×
50
    let mut asm = AppSourceManager::new();
×
51
    asm.add(
×
52
        Box::new(TestSourceFactory::new(latch.clone())),
×
53
        AppSourceMappings::new(
×
54
            "connection".to_string(),
×
55
            vec![
×
56
                ("taxi_trips".to_string(), TRIPS_PORT),
×
57
                ("zones".to_string(), ZONES_PORT),
×
58
            ]
×
59
            .into_iter()
×
60
            .collect(),
×
61
        ),
×
62
    )
×
63
    .unwrap();
×
64

×
65
    pipeline.add_sink(
×
66
        Box::new(TestSinkFactory::new(EXPECTED_SINK_OP_COUNT, latch)),
×
67
        "sink",
×
68
        None,
×
69
    );
×
70
    pipeline.connect_nodes(
×
71
        &table_info.node,
×
72
        table_info.port,
×
73
        "sink",
×
74
        DEFAULT_PORT_HANDLE,
×
75
    );
×
76

×
77
    let mut app = App::new(asm);
×
78
    app.add_pipeline(pipeline);
×
79

×
80
    let dag = app.into_dag().unwrap();
×
81

×
82
    dag.print_dot();
×
83

×
84
    let now = std::time::Instant::now();
×
85

×
86
    DagExecutor::new(dag, ExecutorOptions::default())
×
87
        .unwrap()
×
88
        .start(Arc::new(AtomicBool::new(true)))
×
89
        .unwrap()
×
90
        .join()
×
91
        .unwrap();
×
92

×
93
    let elapsed = now.elapsed();
×
94
    debug!("Elapsed: {:.2?}", elapsed);
×
95
}
×
96

97
#[derive(Debug)]
×
98
pub struct TestSourceFactory {
99
    running: Arc<AtomicBool>,
100
}
101

×
102
impl TestSourceFactory {
×
103
    pub fn new(running: Arc<AtomicBool>) -> Self {
×
104
        Self { running }
×
105
    }
×
106
}
107

×
108
impl SourceFactory<SchemaSQLContext> for TestSourceFactory {
×
109
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
110
        vec![
×
111
            OutputPortDef::new(TRIPS_PORT, OutputPortType::Stateless),
×
112
            OutputPortDef::new(ZONES_PORT, OutputPortType::Stateless),
×
113
        ]
×
114
    }
×
115

×
116
    fn get_output_schema(
×
117
        &self,
×
118
        port: &PortHandle,
×
119
    ) -> Result<(Schema, SchemaSQLContext), BoxedError> {
×
120
        if port == &TRIPS_PORT {
×
121
            let taxi_trips_source = SourceDefinition::Table {
×
122
                connection: "connection".to_string(),
×
123
                name: "taxi_trips".to_string(),
×
124
            };
×
125
            Ok((
×
126
                Schema::default()
×
127
                    .field(
×
128
                        FieldDefinition::new(
×
129
                            String::from("taxi_id"),
×
130
                            FieldType::UInt,
×
131
                            false,
×
132
                            taxi_trips_source.clone(),
×
133
                        ),
×
134
                        true,
×
135
                    )
×
136
                    .field(
×
137
                        FieldDefinition::new(
×
138
                            String::from("completed_at"),
×
139
                            FieldType::Timestamp,
×
140
                            false,
×
141
                            taxi_trips_source.clone(),
×
142
                        ),
×
143
                        false,
×
144
                    )
×
145
                    .field(
×
146
                        FieldDefinition::new(
×
147
                            String::from("pu_location_id"),
×
148
                            FieldType::UInt,
×
149
                            false,
×
150
                            taxi_trips_source,
×
151
                        ),
×
152
                        false,
×
153
                    )
×
154
                    .clone(),
×
155
                SchemaSQLContext::default(),
×
156
            ))
×
157
        } else if port == &ZONES_PORT {
×
158
            let source_id = SourceDefinition::Table {
×
159
                connection: "connection".to_string(),
×
160
                name: "zones".to_string(),
×
161
            };
×
162
            Ok((
×
163
                Schema::default()
×
164
                    .field(
×
165
                        FieldDefinition::new(
×
166
                            String::from("location_id"),
×
167
                            FieldType::UInt,
×
168
                            false,
×
169
                            source_id.clone(),
×
170
                        ),
×
171
                        true,
×
172
                    )
×
173
                    .field(
×
174
                        FieldDefinition::new(
×
175
                            String::from("zone"),
×
176
                            FieldType::String,
×
177
                            false,
×
178
                            source_id,
×
179
                        ),
×
180
                        false,
×
181
                    )
×
182
                    .clone(),
×
183
                SchemaSQLContext::default(),
×
184
            ))
×
185
        } else {
186
            panic!("Invalid Port Handle {port}");
×
187
        }
188
    }
×
189

×
190
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
191
        match *port {
×
192
            TRIPS_PORT => "trips".to_string(),
×
193
            ZONES_PORT => "zones".to_string(),
×
194
            _ => panic!("Invalid Port Handle {port}"),
×
195
        }
×
196
    }
×
197

198
    fn build(
×
199
        &self,
×
200
        _output_schemas: HashMap<PortHandle, Schema>,
×
201
    ) -> Result<Box<dyn Source>, BoxedError> {
×
202
        Ok(Box::new(TestSource {
×
203
            _running: self.running.clone(),
×
204
        }))
×
205
    }
×
206
}
×
207

208
#[derive(Debug)]
×
209
pub struct TestSource {
×
210
    _running: Arc<AtomicBool>,
×
211
}
×
212

×
213
impl Source for TestSource {
×
214
    fn can_start_from(&self, _last_checkpoint: (u64, u64)) -> Result<bool, BoxedError> {
×
215
        Ok(false)
×
216
    }
×
217

×
218
    fn start(
×
219
        &self,
×
220
        fw: &mut dyn SourceChannelForwarder,
×
221
        _last_checkpoint: Option<(u64, u64)>,
×
222
    ) -> Result<(), BoxedError> {
×
223
        let operations = vec![
×
224
            (
×
225
                Operation::Insert {
×
226
                    new: Record::new(vec![
×
227
                        Field::UInt(1001),
×
228
                        Field::Timestamp(
×
229
                            Utc.datetime_from_str("2023-02-01 22:00:00", DATE_FORMAT)
×
230
                                .unwrap()
×
231
                                .into(),
×
232
                        ),
×
233
                        Field::UInt(1),
×
234
                    ]),
×
235
                },
×
236
                TRIPS_PORT,
×
237
            ),
×
238
            (
×
239
                Operation::Insert {
×
240
                    new: Record::new(vec![
×
241
                        Field::UInt(1002),
×
242
                        Field::Timestamp(
×
243
                            Utc.datetime_from_str("2023-02-01 22:01:00", DATE_FORMAT)
×
244
                                .unwrap()
×
245
                                .into(),
×
246
                        ),
×
247
                        Field::UInt(2),
×
248
                    ]),
×
249
                },
×
250
                TRIPS_PORT,
×
251
            ),
×
252
            (
×
253
                Operation::Insert {
×
254
                    new: Record::new(vec![
×
255
                        Field::UInt(1003),
×
256
                        Field::Timestamp(
×
257
                            Utc.datetime_from_str("2023-02-01 22:02:10", DATE_FORMAT)
×
258
                                .unwrap()
×
259
                                .into(),
×
260
                        ),
×
261
                        Field::UInt(3),
×
262
                    ]),
×
263
                },
×
264
                TRIPS_PORT,
×
265
            ),
×
266
            (
×
267
                Operation::Insert {
×
268
                    new: Record::new(vec![
×
269
                        Field::UInt(1004),
×
270
                        Field::Timestamp(
×
271
                            Utc.datetime_from_str("2023-02-01 22:03:00", DATE_FORMAT)
×
272
                                .unwrap()
×
273
                                .into(),
×
274
                        ),
×
275
                        Field::UInt(2),
×
276
                    ]),
×
277
                },
×
278
                TRIPS_PORT,
×
279
            ),
×
280
            (
×
281
                Operation::Insert {
×
282
                    new: Record::new(vec![
×
283
                        Field::UInt(1005),
×
284
                        Field::Timestamp(
×
285
                            Utc.datetime_from_str("2023-02-01 22:05:00", DATE_FORMAT)
×
286
                                .unwrap()
×
287
                                .into(),
×
288
                        ),
×
289
                        Field::UInt(1),
×
290
                    ]),
×
291
                },
×
292
                TRIPS_PORT,
×
293
            ),
×
294
            (
×
295
                Operation::Insert {
×
296
                    new: Record::new(vec![
×
297
                        Field::UInt(1006),
×
298
                        Field::Timestamp(
×
299
                            Utc.datetime_from_str("2023-02-01 22:06:00", DATE_FORMAT)
×
300
                                .unwrap()
×
301
                                .into(),
×
302
                        ),
×
303
                        Field::UInt(2),
×
304
                    ]),
×
305
                },
×
306
                TRIPS_PORT,
×
307
            ),
×
308
            (
×
309
                Operation::Insert {
×
310
                    new: Record::new(vec![
×
311
                        Field::UInt(1),
×
312
                        Field::String("Newark Airport".to_string()),
×
313
                    ]),
×
314
                },
×
315
                ZONES_PORT,
×
316
            ),
×
317
            (
×
318
                Operation::Insert {
×
319
                    new: Record::new(vec![
×
320
                        Field::UInt(2),
×
321
                        Field::String("Jamaica Bay".to_string()),
×
322
                    ]),
×
323
                },
×
324
                ZONES_PORT,
×
325
            ),
×
326
            (
×
327
                Operation::Insert {
×
328
                    new: Record::new(vec![
×
329
                        Field::UInt(3),
×
330
                        Field::String("Allerton/Pelham Gardens".to_string()),
×
331
                    ]),
×
332
                },
×
333
                ZONES_PORT,
×
334
            ),
×
335
        ];
×
336

×
337
        for (index, (op, port)) in operations.into_iter().enumerate() {
×
338
            fw.send(IngestionMessage::new_op(index as u64, 0, 0, op), port)
×
339
                .unwrap();
×
340
            //thread::sleep(Duration::from_millis(500));
×
341
        }
×
342

343
        thread::sleep(Duration::from_millis(500));
×
344

×
345
        Ok(())
×
346
    }
×
347
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc