• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4371276901

pending completion
4371276901

push

github

GitHub
fix: Add sources names used in the query (#1190)

28 of 28 new or added lines in 3 files covered. (100.0%)

27837 of 39159 relevant lines covered (71.09%)

73963.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.52
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::{ExpressionBuilder, NameOrAlias};
5
use crate::pipeline::product::set_factory::SetProcessorFactory;
6
use crate::pipeline::selection::factory::SelectionProcessorFactory;
7
use crate::pipeline::window::builder::relation_is_a_window;
8
use dozer_core::app::AppPipeline;
9
use dozer_core::app::PipelineEntryPoint;
10
use dozer_core::appsource::AppSourceId;
11
use dozer_core::node::PortHandle;
12
use dozer_core::DEFAULT_PORT_HANDLE;
13
use sqlparser::ast::{Join, SetOperator, SetQuantifier, TableFactor, TableWithJoins};
14

15
use sqlparser::{
16
    ast::{Query, Select, SetExpr, Statement},
17
    dialect::AnsiDialect,
18
    parser::Parser,
19
};
20
use std::collections::HashMap;
21
use std::sync::Arc;
22

23
use super::errors::UnsupportedSqlError;
24
use super::product::factory::FromProcessorFactory;
×
25
use super::window::factory::WindowProcessorFactory;
26

27
#[derive(Debug, Clone, Default)]
2,791✔
28
pub struct SchemaSQLContext {}
29

30
#[derive(Debug, Clone)]
×
31
pub struct OutputNodeInfo {
32
    // Name to connect in dag
33
    pub node: String,
34
    // Port to connect in dag
35
    pub port: PortHandle,
36
    // If this table is originally from a source or created in transforms
37
    pub is_derived: bool,
38
    // TODO add:indexes to the tables
39
}
40

41
pub struct TableInfo {
42
    pub name: NameOrAlias,
43
    pub override_name: Option<String>,
44
    pub is_derived: bool,
×
45
}
46
/// The struct contains some contexts during query to pipeline.
47
#[derive(Debug, Clone, Default)]
244✔
48
pub struct QueryContext {
49
    // Internal tables map, used to store the tables that are created by the queries
50
    pub pipeline_map: HashMap<(usize, String), OutputNodeInfo>,
51

52
    // Output tables map that are marked with "INTO" used to store the tables, these can be exposed to sinks.
53
    pub output_tables_map: HashMap<String, OutputNodeInfo>,
54

55
    // Used Sources
56
    pub used_sources: Vec<String>,
×
57
}
58

59
#[derive(Debug, Clone)]
346✔
60
pub struct IndexedTableWithJoins {
61
    pub relation: (NameOrAlias, TableFactor),
×
62
    pub joins: Vec<(NameOrAlias, Join)>,
×
63
}
×
64
pub fn statement_to_pipeline(
244✔
65
    sql: &str,
244✔
66
    pipeline: &mut AppPipeline<SchemaSQLContext>,
244✔
67
    override_name: Option<String>,
244✔
68
) -> Result<QueryContext, PipelineError> {
244✔
69
    let dialect = AnsiDialect {};
244✔
70
    let mut ctx = QueryContext::default();
244✔
71

244✔
72
    let ast = Parser::parse_sql(&dialect, sql).unwrap();
244✔
73
    let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
244✔
74

×
75
    for (idx, statement) in ast.iter().enumerate() {
249✔
76
        match statement {
249✔
77
            Statement::Query(query) => {
249✔
78
                query_to_pipeline(
249✔
79
                    &TableInfo {
249✔
80
                        name: query_name.clone(),
249✔
81
                        is_derived: false,
249✔
82
                        override_name: override_name.clone(),
249✔
83
                    },
249✔
84
                    query,
249✔
85
                    pipeline,
249✔
86
                    &mut ctx,
249✔
87
                    false,
249✔
88
                    idx,
249✔
89
                )?;
249✔
90
            }
×
91
            s => {
×
92
                return Err(PipelineError::UnsupportedSqlError(
×
93
                    UnsupportedSqlError::GenericError(s.to_string()),
×
94
                ))
×
95
            }
96
        }
×
97
    }
×
98

99
    Ok(ctx)
244✔
100
}
244✔
101

×
102
fn query_to_pipeline(
324✔
103
    table_info: &TableInfo,
324✔
104
    query: &Query,
324✔
105
    pipeline: &mut AppPipeline<SchemaSQLContext>,
324✔
106
    query_ctx: &mut QueryContext,
324✔
107
    stateful: bool,
324✔
108
    pipeline_idx: usize,
324✔
109
) -> Result<(), PipelineError> {
324✔
110
    // return error if there is unsupported syntax
324✔
111
    if !query.order_by.is_empty() {
324✔
112
        return Err(PipelineError::UnsupportedSqlError(
×
113
            UnsupportedSqlError::OrderByError,
×
114
        ));
×
115
    }
324✔
116

324✔
117
    if query.limit.is_some() || query.offset.is_some() {
324✔
118
        return Err(PipelineError::UnsupportedSqlError(
×
119
            UnsupportedSqlError::LimitOffsetError,
×
120
        ));
×
121
    }
324✔
122

×
123
    // Attach the first pipeline if there is with clause
×
124
    if let Some(with) = &query.with {
324✔
125
        if with.recursive {
51✔
126
            return Err(PipelineError::UnsupportedSqlError(
×
127
                UnsupportedSqlError::Recursive,
×
128
            ));
×
129
        }
51✔
130

×
131
        for table in &with.cte_tables {
104✔
132
            if table.from.is_some() {
53✔
133
                return Err(PipelineError::UnsupportedSqlError(
×
134
                    UnsupportedSqlError::CteFromError,
×
135
                ));
×
136
            }
53✔
137
            let table_name = table.alias.name.to_string();
53✔
138
            if query_ctx
53✔
139
                .pipeline_map
53✔
140
                .contains_key(&(pipeline_idx, table_name.clone()))
53✔
141
            {
×
142
                return Err(InvalidQuery(format!(
×
143
                    "WITH query name {table_name:?} specified more than once"
×
144
                )));
×
145
            }
53✔
146
            query_to_pipeline(
53✔
147
                &TableInfo {
53✔
148
                    name: NameOrAlias(table_name.clone(), Some(table_name)),
53✔
149
                    is_derived: true,
53✔
150
                    override_name: None,
53✔
151
                },
53✔
152
                &table.query,
53✔
153
                pipeline,
53✔
154
                query_ctx,
53✔
155
                true,
53✔
156
                pipeline_idx,
53✔
157
            )?;
53✔
158
        }
×
159
    };
273✔
160

×
161
    match *query.body.clone() {
324✔
162
        SetExpr::Select(select) => {
302✔
163
            select_to_pipeline(
302✔
164
                table_info,
302✔
165
                *select,
302✔
166
                pipeline,
302✔
167
                query_ctx,
302✔
168
                stateful,
302✔
169
                pipeline_idx,
302✔
170
            )?;
302✔
171
        }
×
172
        SetExpr::Query(query) => {
×
173
            let query_name = format!("subquery_{}", uuid::Uuid::new_v4());
×
174
            let mut ctx = QueryContext::default();
×
175
            query_to_pipeline(
×
176
                &TableInfo {
×
177
                    name: NameOrAlias(query_name, None),
×
178
                    is_derived: true,
×
179
                    override_name: None,
×
180
                },
×
181
                &query,
×
182
                pipeline,
×
183
                &mut ctx,
×
184
                stateful,
×
185
                pipeline_idx,
×
186
            )?
×
187
        }
×
188
        SetExpr::SetOperation {
×
189
            op,
22✔
190
            set_quantifier,
22✔
191
            left,
22✔
192
            right,
22✔
193
        } => match op {
22✔
194
            SetOperator::Union => {
×
195
                set_to_pipeline(
22✔
196
                    table_info,
22✔
197
                    left,
22✔
198
                    right,
22✔
199
                    set_quantifier,
22✔
200
                    pipeline,
22✔
201
                    query_ctx,
22✔
202
                    stateful,
22✔
203
                    pipeline_idx,
22✔
204
                )?;
22✔
205
            }
206
            _ => return Err(PipelineError::InvalidOperator(op.to_string())),
×
207
        },
×
208
        _ => {
×
209
            return Err(PipelineError::UnsupportedSqlError(
×
210
                UnsupportedSqlError::GenericError("Unsupported query body structure".to_string()),
×
211
            ))
×
212
        }
×
213
    };
214
    Ok(())
324✔
215
}
324✔
216

×
217
fn select_to_pipeline(
346✔
218
    table_info: &TableInfo,
346✔
219
    select: Select,
346✔
220
    pipeline: &mut AppPipeline<SchemaSQLContext>,
346✔
221
    query_ctx: &mut QueryContext,
346✔
222
    stateful: bool,
346✔
223
    pipeline_idx: usize,
346✔
224
) -> Result<String, PipelineError> {
346✔
225
    // FROM clause
346✔
226
    if select.from.len() != 1 {
346✔
227
        return Err(PipelineError::UnsupportedSqlError(
×
228
            UnsupportedSqlError::FromCommaSyntax,
×
229
        ));
×
230
    }
346✔
231

×
232
    let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx, pipeline_idx)?;
346✔
233

×
234
    let (input_nodes, output_node, mut used_sources) = add_from_to_pipeline(
346✔
235
        pipeline,
346✔
236
        &input_tables,
346✔
237
        &mut query_ctx.pipeline_map,
346✔
238
        pipeline_idx,
346✔
239
    )?;
346✔
240

×
241
    query_ctx.used_sources.append(&mut used_sources);
346✔
242

346✔
243
    let gen_agg_name = format!("agg_{}", uuid::Uuid::new_v4());
346✔
244
    let gen_selection_name = format!("select_{}", uuid::Uuid::new_v4());
346✔
245
    let (gen_product_name, product_output_port) = output_node;
346✔
246

×
247
    for (source_name, processor_name, processor_port) in input_nodes.iter() {
346✔
248
        if let Some(table_info) = query_ctx
75✔
249
            .pipeline_map
75✔
250
            .get(&(pipeline_idx, source_name.clone()))
75✔
251
        {
×
252
            pipeline.connect_nodes(
75✔
253
                &table_info.node,
75✔
254
                Some(table_info.port),
75✔
255
                processor_name,
75✔
256
                Some(*processor_port as PortHandle),
75✔
257
                true,
75✔
258
            )?;
75✔
259
            // If not present in pipeline_map, insert into used_sources as this is coming from source
×
260
        } else {
×
261
            query_ctx.used_sources.push(source_name.clone());
×
262
        }
×
263
    }
×
264

×
265
    let aggregation = AggregationProcessorFactory::new(select.clone(), stateful);
346✔
266

346✔
267
    pipeline.add_processor(Arc::new(aggregation), &gen_agg_name, vec![]);
346✔
268

×
269
    // Where clause
×
270
    if let Some(selection) = select.selection {
346✔
271
        let selection = SelectionProcessorFactory::new(selection);
72✔
272

72✔
273
        pipeline.add_processor(Arc::new(selection), &gen_selection_name, vec![]);
72✔
274

72✔
275
        pipeline.connect_nodes(
72✔
276
            &gen_product_name,
72✔
277
            Some(product_output_port),
72✔
278
            &gen_selection_name,
72✔
279
            Some(DEFAULT_PORT_HANDLE),
72✔
280
            true,
72✔
281
        )?;
72✔
282

×
283
        pipeline.connect_nodes(
72✔
284
            &gen_selection_name,
72✔
285
            Some(DEFAULT_PORT_HANDLE),
72✔
286
            &gen_agg_name,
72✔
287
            Some(DEFAULT_PORT_HANDLE),
72✔
288
            true,
72✔
289
        )?;
72✔
290
    } else {
×
291
        pipeline.connect_nodes(
274✔
292
            &gen_product_name,
274✔
293
            Some(product_output_port),
274✔
294
            &gen_agg_name,
274✔
295
            Some(DEFAULT_PORT_HANDLE),
274✔
296
            true,
274✔
297
        )?;
274✔
298
    }
×
299

×
300
    query_ctx.pipeline_map.insert(
346✔
301
        (pipeline_idx, table_info.name.0.to_string()),
346✔
302
        OutputNodeInfo {
346✔
303
            node: gen_agg_name.clone(),
346✔
304
            port: DEFAULT_PORT_HANDLE,
346✔
305
            is_derived: table_info.is_derived,
346✔
306
        },
346✔
307
    );
346✔
308

×
309
    let output_table_name = if let Some(into) = select.into {
346✔
310
        Some(into.name.to_string())
8✔
311
    } else {
×
312
        table_info.override_name.clone()
338✔
313
    };
×
314
    if let Some(table_name) = output_table_name {
346✔
315
        query_ctx.output_tables_map.insert(
249✔
316
            table_name,
249✔
317
            OutputNodeInfo {
249✔
318
                node: gen_agg_name.clone(),
249✔
319
                port: DEFAULT_PORT_HANDLE,
249✔
320
                is_derived: false,
249✔
321
            },
249✔
322
        );
249✔
323
    }
252✔
324

×
325
    Ok(gen_agg_name)
346✔
326
}
346✔
327

×
328
#[allow(clippy::type_complexity)]
×
329
fn add_from_to_pipeline(
346✔
330
    pipeline: &mut AppPipeline<SchemaSQLContext>,
346✔
331
    input_tables: &IndexedTableWithJoins,
346✔
332
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
346✔
333
    pipeline_idx: usize,
346✔
334
) -> Result<
346✔
335
    (
346✔
336
        Vec<(String, String, PortHandle)>,
346✔
337
        (String, PortHandle),
346✔
338
        Vec<String>,
346✔
339
    ),
346✔
340
    PipelineError,
346✔
341
> {
346✔
342
    // the sources names that are used in this pipeline
346✔
343
    let mut used_sources = vec![];
346✔
344

346✔
345
    let mut product_entry_points = vec![];
346✔
346
    let mut input_nodes = vec![];
346✔
347

346✔
348
    let (relation_name_or_alias, relation) = input_tables.relation.clone();
346✔
349

346✔
350
    let product_processor_name = format!("product_{}", uuid::Uuid::new_v4());
346✔
351
    let product_processor = FromProcessorFactory::new(input_tables.clone());
346✔
352

346✔
353
    if relation_is_a_window(&relation).map_err(PipelineError::WindowError)? {
346✔
354
        let window_processor = WindowProcessorFactory::new(relation);
×
355
        let window_processor_name = format!("window_{}", uuid::Uuid::new_v4());
×
356
        let window_source_name = window_processor.get_source_name()?;
×
357
        let mut window_entry_points = vec![];
×
358

×
359
        if is_an_entry_point(&window_source_name, pipeline_map, pipeline_idx) {
×
360
            let entry_point = PipelineEntryPoint::new(
×
361
                AppSourceId::new(window_source_name.clone(), None),
×
362
                DEFAULT_PORT_HANDLE as PortHandle,
×
363
            );
×
364

×
365
            window_entry_points.push(entry_point);
×
366
            used_sources.push(window_source_name);
×
367
        } else {
×
368
            input_nodes.push((
×
369
                window_source_name,
×
370
                window_processor_name.clone(),
×
371
                DEFAULT_PORT_HANDLE as PortHandle,
×
372
            ));
×
373
        }
×
374

×
375
        pipeline.add_processor(
×
376
            Arc::new(window_processor),
×
377
            &window_processor_name,
×
378
            window_entry_points,
×
379
        );
×
380

×
381
        pipeline.connect_nodes(
×
382
            &window_processor_name,
×
383
            Some(DEFAULT_PORT_HANDLE as PortHandle),
×
384
            &product_processor_name,
×
385
            Some(0 as PortHandle),
×
386
            true,
×
387
        )?;
×
388
    } else {
×
389
        let product_input_name = relation_name_or_alias.0;
346✔
390

346✔
391
        if is_an_entry_point(&product_input_name, pipeline_map, pipeline_idx) {
346✔
392
            let entry_point = PipelineEntryPoint::new(
272✔
393
                AppSourceId::new(product_input_name.clone(), None),
272✔
394
                0 as PortHandle,
272✔
395
            );
272✔
396

272✔
397
            product_entry_points.push(entry_point);
272✔
398
            used_sources.push(product_input_name);
272✔
399
        } else {
272✔
400
            input_nodes.push((
74✔
401
                product_input_name,
74✔
402
                product_processor_name.clone(),
74✔
403
                0 as PortHandle,
74✔
404
            ));
74✔
405
        }
74✔
406
    }
×
407

×
408
    for (index, (join_relation_alias, join)) in input_tables.joins.iter().enumerate() {
346✔
409
        let (relation_name_or_alias, relation) =
67✔
410
            (join_relation_alias.clone(), join.relation.clone());
67✔
411

67✔
412
        if relation_is_a_window(&relation).map_err(PipelineError::WindowError)? {
67✔
413
            let window_processor = WindowProcessorFactory::new(relation.clone());
×
414
            let window_processor_name = format!("window_{}", uuid::Uuid::new_v4());
×
415
            let window_input_name = window_processor.get_source_name()?;
×
416
            let mut window_entry_points = vec![];
×
417

×
418
            if is_an_entry_point(&window_input_name, pipeline_map, pipeline_idx) {
×
419
                let entry_point = PipelineEntryPoint::new(
×
420
                    AppSourceId::new(window_input_name.clone(), None),
×
421
                    DEFAULT_PORT_HANDLE as PortHandle,
×
422
                );
×
423

×
424
                window_entry_points.push(entry_point.clone());
×
425
                used_sources.push(window_input_name);
×
426
            } else {
×
427
                input_nodes.push((
×
428
                    window_input_name,
×
429
                    window_processor_name.clone(),
×
430
                    DEFAULT_PORT_HANDLE as PortHandle,
×
431
                ));
×
432
            }
×
433

×
434
            pipeline.add_processor(
×
435
                Arc::new(window_processor),
×
436
                &window_processor_name,
×
437
                window_entry_points,
×
438
            );
×
439

×
440
            pipeline.connect_nodes(
×
441
                &window_processor_name,
×
442
                Some(DEFAULT_PORT_HANDLE as PortHandle),
×
443
                &product_processor_name,
×
444
                Some((index + 1) as PortHandle),
×
445
                true,
×
446
            )?;
×
447
        } else {
×
448
            let product_input_name = relation_name_or_alias.0;
67✔
449

67✔
450
            if is_an_entry_point(&product_input_name, pipeline_map, pipeline_idx) {
67✔
451
                let entry_point = PipelineEntryPoint::new(
66✔
452
                    AppSourceId::new(product_input_name.clone(), None),
66✔
453
                    (index + 1) as PortHandle,
66✔
454
                );
66✔
455

66✔
456
                product_entry_points.push(entry_point.clone());
66✔
457
                used_sources.push(product_input_name);
66✔
458
            } else {
66✔
459
                input_nodes.push((
1✔
460
                    product_input_name,
1✔
461
                    product_processor_name.clone(),
1✔
462
                    (index + 1) as PortHandle,
1✔
463
                ));
1✔
464
            }
1✔
465
        }
×
466
    }
×
467

×
468
    pipeline.add_processor(
346✔
469
        Arc::new(product_processor),
346✔
470
        &product_processor_name,
346✔
471
        product_entry_points,
346✔
472
    );
346✔
473

346✔
474
    Ok((
346✔
475
        input_nodes,
346✔
476
        (product_processor_name, DEFAULT_PORT_HANDLE as PortHandle),
346✔
477
        used_sources,
346✔
478
    ))
346✔
479
}
346✔
480

×
481
#[allow(clippy::too_many_arguments)]
×
482
fn set_to_pipeline(
22✔
483
    table_info: &TableInfo,
22✔
484
    left_select: Box<SetExpr>,
22✔
485
    right_select: Box<SetExpr>,
22✔
486
    set_quantifier: SetQuantifier,
22✔
487
    pipeline: &mut AppPipeline<SchemaSQLContext>,
22✔
488
    query_ctx: &mut QueryContext,
22✔
489
    stateful: bool,
22✔
490
    pipeline_idx: usize,
22✔
491
) -> Result<String, PipelineError> {
22✔
492
    let gen_left_set_name = format!("set_left_{}", uuid::Uuid::new_v4());
22✔
493
    let left_table_info = TableInfo {
22✔
494
        name: NameOrAlias(gen_left_set_name.clone(), None),
22✔
495
        override_name: None,
22✔
496
        is_derived: false,
22✔
497
    };
22✔
498
    let gen_right_set_name = format!("set_right_{}", uuid::Uuid::new_v4());
22✔
499
    let right_table_info = TableInfo {
22✔
500
        name: NameOrAlias(gen_right_set_name.clone(), None),
22✔
501
        override_name: None,
22✔
502
        is_derived: false,
22✔
503
    };
22✔
504

×
505
    let _left_pipeline_name = match *left_select {
22✔
506
        SetExpr::Select(select) => select_to_pipeline(
22✔
507
            &left_table_info,
22✔
508
            *select,
22✔
509
            pipeline,
22✔
510
            query_ctx,
22✔
511
            stateful,
22✔
512
            pipeline_idx,
22✔
513
        )?,
22✔
514
        SetExpr::SetOperation {
×
515
            op: _,
×
516
            set_quantifier,
×
517
            left,
×
518
            right,
×
519
        } => set_to_pipeline(
×
520
            &left_table_info,
×
521
            left,
×
522
            right,
×
523
            set_quantifier,
×
524
            pipeline,
×
525
            query_ctx,
×
526
            stateful,
×
527
            pipeline_idx,
×
528
        )?,
×
529
        _ => {
×
530
            return Err(PipelineError::InvalidQuery(
×
531
                "Invalid UNION left Query".to_string(),
×
532
            ))
×
533
        }
×
534
    };
×
535

×
536
    let _right_pipeline_name = match *right_select {
22✔
537
        SetExpr::Select(select) => select_to_pipeline(
22✔
538
            &right_table_info,
22✔
539
            *select,
22✔
540
            pipeline,
22✔
541
            query_ctx,
22✔
542
            stateful,
22✔
543
            pipeline_idx,
22✔
544
        )?,
22✔
545
        SetExpr::SetOperation {
×
546
            op: _,
×
547
            set_quantifier,
×
548
            left,
×
549
            right,
×
550
        } => set_to_pipeline(
×
551
            &right_table_info,
×
552
            left,
×
553
            right,
×
554
            set_quantifier,
×
555
            pipeline,
×
556
            query_ctx,
×
557
            stateful,
×
558
            pipeline_idx,
×
559
        )?,
×
560
        _ => {
×
561
            return Err(PipelineError::InvalidQuery(
×
562
                "Invalid UNION right Query".to_string(),
×
563
            ))
×
564
        }
×
565
    };
×
566

×
567
    let left_pipeline_output_node = match query_ctx
22✔
568
        .pipeline_map
22✔
569
        .get(&(pipeline_idx, gen_left_set_name))
22✔
570
    {
×
571
        Some(pipeline) => pipeline,
22✔
572
        None => {
×
573
            return Err(PipelineError::InvalidQuery(
×
574
                "Invalid UNION left Query".to_string(),
×
575
            ))
×
576
        }
×
577
    };
578

×
579
    let right_pipeline_output_node = match query_ctx
22✔
580
        .pipeline_map
22✔
581
        .get(&(pipeline_idx, gen_right_set_name))
22✔
582
    {
×
583
        Some(pipeline) => pipeline,
22✔
584
        None => {
×
585
            return Err(PipelineError::InvalidQuery(
×
586
                "Invalid UNION Right Query".to_string(),
×
587
            ))
×
588
        }
×
589
    };
×
590

×
591
    let set_proc_fac = SetProcessorFactory::new(set_quantifier);
22✔
592

22✔
593
    let mut gen_set_name = format!("set_{}", uuid::Uuid::new_v4());
22✔
594

22✔
595
    if table_info.override_name.is_some() {
22✔
596
        gen_set_name = table_info.override_name.to_owned().unwrap();
×
597
    }
22✔
598

×
599
    pipeline.add_processor(Arc::new(set_proc_fac), &gen_set_name, vec![]);
22✔
600

22✔
601
    pipeline.connect_nodes(
22✔
602
        &left_pipeline_output_node.node,
22✔
603
        Some(left_pipeline_output_node.port),
22✔
604
        &gen_set_name,
22✔
605
        Some(0 as PortHandle),
22✔
606
        true,
22✔
607
    )?;
22✔
608

×
609
    pipeline.connect_nodes(
22✔
610
        &right_pipeline_output_node.node,
22✔
611
        Some(right_pipeline_output_node.port),
22✔
612
        &gen_set_name,
22✔
613
        Some(1 as PortHandle),
22✔
614
        true,
22✔
615
    )?;
22✔
616

×
617
    for (_, table_name) in query_ctx.pipeline_map.keys() {
44✔
618
        query_ctx.output_tables_map.remove_entry(table_name);
44✔
619
    }
44✔
620

×
621
    query_ctx.pipeline_map.insert(
22✔
622
        (pipeline_idx, table_info.name.0.to_string()),
22✔
623
        OutputNodeInfo {
22✔
624
            node: gen_set_name.clone(),
22✔
625
            port: DEFAULT_PORT_HANDLE,
22✔
626
            is_derived: table_info.is_derived,
22✔
627
        },
22✔
628
    );
22✔
629

22✔
630
    Ok(gen_set_name)
22✔
631
}
22✔
632

×
633
/// Returns a vector of input port handles and relative table name
×
634
///
×
635
/// # Errors
×
636
///
×
637
/// This function will return an error if it's not possible to get an input name.
×
638
pub fn get_input_tables(
346✔
639
    from: &TableWithJoins,
346✔
640
    pipeline: &mut AppPipeline<SchemaSQLContext>,
346✔
641
    query_ctx: &mut QueryContext,
346✔
642
    pipeline_idx: usize,
346✔
643
) -> Result<IndexedTableWithJoins, PipelineError> {
346✔
644
    let name = get_from_source(&from.relation, pipeline, query_ctx, pipeline_idx)?;
346✔
645
    let mut joins = vec![];
346✔
646

×
647
    for join in from.joins.iter() {
346✔
648
        let input_name = get_from_source(&join.relation, pipeline, query_ctx, pipeline_idx)?;
67✔
649
        joins.push((input_name.clone(), join.clone()));
67✔
650
    }
×
651

×
652
    Ok(IndexedTableWithJoins {
346✔
653
        relation: (name, from.relation.clone()),
346✔
654
        joins,
346✔
655
    })
346✔
656
}
346✔
657

×
658
pub fn get_input_names(input_tables: &IndexedTableWithJoins) -> Vec<NameOrAlias> {
×
659
    let mut input_names = vec![];
×
660
    input_names.push(input_tables.relation.0.clone());
×
661

×
662
    for join in &input_tables.joins {
×
663
        input_names.push(join.0.clone());
×
664
    }
×
665
    input_names
×
666
}
×
667

×
668
pub fn get_entry_points(
×
669
    input_tables: &IndexedTableWithJoins,
×
670
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
671
    pipeline_idx: usize,
×
672
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
×
673
    let mut endpoints = vec![];
×
674

×
675
    let input_names = get_input_names(input_tables);
×
676

×
677
    for (input_port, table) in input_names.iter().enumerate() {
×
678
        let name = table.0.clone();
×
679
        if !pipeline_map.contains_key(&(pipeline_idx, name.clone())) {
×
680
            endpoints.push(PipelineEntryPoint::new(
×
681
                AppSourceId::new(name, None),
×
682
                input_port as PortHandle,
×
683
            ));
×
684
        }
×
685
    }
×
686

×
687
    Ok(endpoints)
×
688
}
×
689

×
690
pub fn is_an_entry_point(
413✔
691
    name: &str,
413✔
692
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
413✔
693
    pipeline_idx: usize,
413✔
694
) -> bool {
413✔
695
    if !pipeline_map.contains_key(&(pipeline_idx, name.to_owned())) {
413✔
696
        return true;
338✔
697
    }
75✔
698
    false
75✔
699
}
413✔
700

×
701
pub fn get_from_source(
413✔
702
    relation: &TableFactor,
413✔
703
    pipeline: &mut AppPipeline<SchemaSQLContext>,
413✔
704
    query_ctx: &mut QueryContext,
413✔
705
    pipeline_idx: usize,
413✔
706
) -> Result<NameOrAlias, PipelineError> {
413✔
707
    match relation {
413✔
708
        TableFactor::Table { name, alias, .. } => {
391✔
709
            let input_name = name
391✔
710
                .0
391✔
711
                .iter()
391✔
712
                .map(ExpressionBuilder::normalize_ident)
391✔
713
                .collect::<Vec<String>>()
391✔
714
                .join(".");
391✔
715
            let alias_name = alias
391✔
716
                .as_ref()
391✔
717
                .map(|a| ExpressionBuilder::fullname_from_ident(&[a.name.clone()]));
391✔
718

391✔
719
            Ok(NameOrAlias(input_name, alias_name))
391✔
720
        }
×
721
        TableFactor::Derived {
×
722
            lateral: _,
×
723
            subquery,
22✔
724
            alias,
22✔
725
        } => {
22✔
726
            let name = format!("derived_{}", uuid::Uuid::new_v4());
22✔
727
            let alias_name = alias.as_ref().map(|alias_ident| {
22✔
728
                ExpressionBuilder::fullname_from_ident(&[alias_ident.name.clone()])
12✔
729
            });
22✔
730

22✔
731
            let name_or = NameOrAlias(name, alias_name);
22✔
732
            query_to_pipeline(
22✔
733
                &TableInfo {
22✔
734
                    name: name_or.clone(),
22✔
735
                    is_derived: true,
22✔
736
                    override_name: None,
22✔
737
                },
22✔
738
                subquery,
22✔
739
                pipeline,
22✔
740
                query_ctx,
22✔
741
                false,
22✔
742
                pipeline_idx,
22✔
743
            )?;
22✔
744

745
            Ok(name_or)
22✔
746
        }
747
        _ => Err(PipelineError::UnsupportedSqlError(
×
748
            UnsupportedSqlError::JoinTable,
×
749
        )),
×
750
    }
×
751
}
413✔
752

×
753
#[cfg(test)]
×
754
mod tests {
×
755
    use dozer_core::app::AppPipeline;
×
756

×
757
    use super::statement_to_pipeline;
×
758

×
759
    #[test]
1✔
760
    fn parse_sql_pipeline() {
1✔
761
        let sql = r#"
1✔
762
                SELECT
1✔
763
                    a.name as "Genre",
1✔
764
                    SUM(amount) as "Gross Revenue(in $)"
1✔
765
                INTO gross_revenue_stats
1✔
766
                FROM
1✔
767
                (
1✔
768
                    SELECT
1✔
769
                        c.name,
1✔
770
                        f.title,
1✔
771
                        p.amount
1✔
772
                    FROM film f
1✔
773
                    LEFT JOIN film_category fc
1✔
774
                        ON fc.film_id = f.film_id
1✔
775
                    LEFT JOIN category c
1✔
776
                        ON fc.category_id = c.category_id
1✔
777
                    LEFT JOIN inventory i
1✔
778
                        ON i.film_id = f.film_id
1✔
779
                    LEFT JOIN rental r
1✔
780
                        ON r.inventory_id = i.inventory_id
1✔
781
                    LEFT JOIN payment p
1✔
782
                        ON p.rental_id = r.rental_id
1✔
783
                    WHERE p.amount IS NOT NULL
1✔
784
                ) a
1✔
785
                GROUP BY name;
1✔
786

1✔
787
                SELECT
1✔
788
                f.name, f.title, p.amount
1✔
789
                INTO film_amounts
1✔
790
                FROM film f
1✔
791
                LEFT JOIN film_category fc;
1✔
792

1✔
793
                WITH tbl as (select id from a)
1✔
794
                select id
1✔
795
                into cte_table
1✔
796
                from tbl;
1✔
797

1✔
798
                WITH tbl as (select id from  a),
1✔
799
                tbl2 as (select id from tbl)
1✔
800
                select id
1✔
801
                into nested_cte_table
1✔
802
                from tbl2;
1✔
803

1✔
804
                WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
805
                cte_table2 as (select id_ct1 from cte_table1)
1✔
806
                select id_ct2
1✔
807
                into nested_derived_table
1✔
808
                from cte_table2;
1✔
809

1✔
810
                with tbl as (select id, ticker from stocks)
1✔
811
                select tbl.id
1✔
812
                into nested_stocks_table
1✔
813
                from  stocks join tbl on tbl.id = stocks.id;
1✔
814
            "#;
1✔
815

1✔
816
        let context = statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap();
1✔
817

1✔
818
        // Should create as many output tables as into statements
1✔
819
        let mut output_keys = context.output_tables_map.keys().collect::<Vec<_>>();
1✔
820
        output_keys.sort();
1✔
821
        let mut expected_keys = vec![
1✔
822
            "gross_revenue_stats",
1✔
823
            "film_amounts",
1✔
824
            "cte_table",
1✔
825
            "nested_cte_table",
1✔
826
            "nested_derived_table",
1✔
827
            "nested_stocks_table",
1✔
828
        ];
1✔
829
        expected_keys.sort();
1✔
830
        assert_eq!(output_keys, expected_keys);
1✔
831
    }
1✔
832
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc