• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4370280743

pending completion
4370280743

push

github

GitHub
Bump async-trait from 0.1.65 to 0.1.66 (#1179)

27808 of 38702 relevant lines covered (71.85%)

25323.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.95
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::{ExpressionBuilder, NameOrAlias};
5
use crate::pipeline::product::set_factory::SetProcessorFactory;
6
use crate::pipeline::selection::factory::SelectionProcessorFactory;
7
use crate::pipeline::window::builder::relation_is_a_window;
8
use dozer_core::app::AppPipeline;
9
use dozer_core::app::PipelineEntryPoint;
10
use dozer_core::appsource::AppSourceId;
11
use dozer_core::node::PortHandle;
12
use dozer_core::DEFAULT_PORT_HANDLE;
13
use sqlparser::ast::{Join, SetOperator, SetQuantifier, TableFactor, TableWithJoins};
14

15
use sqlparser::{
16
    ast::{Query, Select, SetExpr, Statement},
17
    dialect::AnsiDialect,
18
    parser::Parser,
19
};
20
use std::collections::HashMap;
21
use std::sync::Arc;
22

23
use super::errors::UnsupportedSqlError;
24
use super::product::factory::FromProcessorFactory;
×
25
use super::window::factory::WindowProcessorFactory;
26

27
#[derive(Debug, Clone, Default)]
2,761✔
28
pub struct SchemaSQLContext {}
29

30
#[derive(Debug, Clone)]
×
31
pub struct OutputNodeInfo {
32
    // Name to connect in dag
33
    pub node: String,
34
    // Port to connect in dag
35
    pub port: PortHandle,
36
    // If this table is originally from a source or created in transforms
37
    pub is_derived: bool,
38
    // TODO add:indexes to the tables
39
}
40

41
pub struct TableInfo {
42
    pub name: NameOrAlias,
43
    pub override_name: Option<String>,
44
    pub is_derived: bool,
×
45
}
46
/// The struct contains some contexts during query to pipeline.
47
#[derive(Debug, Clone, Default)]
244✔
48
pub struct QueryContext {
49
    // Internal tables map, used to store the tables that are created by the queries
50
    pub pipeline_map: HashMap<(usize, String), OutputNodeInfo>,
51

52
    // Output tables map that are marked with "INTO" used to store the tables, these can be exposed to sinks.
53
    pub output_tables_map: HashMap<String, OutputNodeInfo>,
54

55
    // Used Sources
56
    pub used_sources: Vec<String>,
×
57
}
58

59
#[derive(Debug, Clone)]
346✔
60
pub struct IndexedTableWithJoins {
61
    pub relation: (NameOrAlias, TableFactor),
×
62
    pub joins: Vec<(NameOrAlias, Join)>,
×
63
}
×
64
pub fn statement_to_pipeline(
244✔
65
    sql: &str,
244✔
66
    pipeline: &mut AppPipeline<SchemaSQLContext>,
244✔
67
    override_name: Option<String>,
244✔
68
) -> Result<QueryContext, PipelineError> {
244✔
69
    let dialect = AnsiDialect {};
244✔
70
    let mut ctx = QueryContext::default();
244✔
71

244✔
72
    let ast = Parser::parse_sql(&dialect, sql).unwrap();
244✔
73
    let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
244✔
74

×
75
    for (idx, statement) in ast.iter().enumerate() {
249✔
76
        match statement {
249✔
77
            Statement::Query(query) => {
249✔
78
                query_to_pipeline(
249✔
79
                    &TableInfo {
249✔
80
                        name: query_name.clone(),
249✔
81
                        is_derived: false,
249✔
82
                        override_name: override_name.clone(),
249✔
83
                    },
249✔
84
                    query,
249✔
85
                    pipeline,
249✔
86
                    &mut ctx,
249✔
87
                    false,
249✔
88
                    idx,
249✔
89
                )?;
249✔
90
            }
×
91
            s => {
×
92
                return Err(PipelineError::UnsupportedSqlError(
×
93
                    UnsupportedSqlError::GenericError(s.to_string()),
×
94
                ))
×
95
            }
96
        }
×
97
    }
×
98

99
    Ok(ctx)
244✔
100
}
244✔
101

×
102
fn query_to_pipeline(
324✔
103
    table_info: &TableInfo,
324✔
104
    query: &Query,
324✔
105
    pipeline: &mut AppPipeline<SchemaSQLContext>,
324✔
106
    query_ctx: &mut QueryContext,
324✔
107
    stateful: bool,
324✔
108
    pipeline_idx: usize,
324✔
109
) -> Result<(), PipelineError> {
324✔
110
    // return error if there is unsupported syntax
324✔
111
    if !query.order_by.is_empty() {
324✔
112
        return Err(PipelineError::UnsupportedSqlError(
×
113
            UnsupportedSqlError::OrderByError,
×
114
        ));
×
115
    }
324✔
116

324✔
117
    if query.limit.is_some() || query.offset.is_some() {
324✔
118
        return Err(PipelineError::UnsupportedSqlError(
×
119
            UnsupportedSqlError::LimitOffsetError,
×
120
        ));
×
121
    }
324✔
122

×
123
    // Attach the first pipeline if there is with clause
×
124
    if let Some(with) = &query.with {
324✔
125
        if with.recursive {
51✔
126
            return Err(PipelineError::UnsupportedSqlError(
×
127
                UnsupportedSqlError::Recursive,
×
128
            ));
×
129
        }
51✔
130

×
131
        for table in &with.cte_tables {
104✔
132
            if table.from.is_some() {
53✔
133
                return Err(PipelineError::UnsupportedSqlError(
×
134
                    UnsupportedSqlError::CteFromError,
×
135
                ));
×
136
            }
53✔
137
            let table_name = table.alias.name.to_string();
53✔
138
            if query_ctx
53✔
139
                .pipeline_map
53✔
140
                .contains_key(&(pipeline_idx, table_name.clone()))
53✔
141
            {
×
142
                return Err(InvalidQuery(format!(
×
143
                    "WITH query name {table_name:?} specified more than once"
×
144
                )));
×
145
            }
53✔
146
            query_to_pipeline(
53✔
147
                &TableInfo {
53✔
148
                    name: NameOrAlias(table_name.clone(), Some(table_name)),
53✔
149
                    is_derived: true,
53✔
150
                    override_name: None,
53✔
151
                },
53✔
152
                &table.query,
53✔
153
                pipeline,
53✔
154
                query_ctx,
53✔
155
                true,
53✔
156
                pipeline_idx,
53✔
157
            )?;
53✔
158
        }
×
159
    };
273✔
160

×
161
    match *query.body.clone() {
324✔
162
        SetExpr::Select(select) => {
302✔
163
            select_to_pipeline(
302✔
164
                table_info,
302✔
165
                *select,
302✔
166
                pipeline,
302✔
167
                query_ctx,
302✔
168
                stateful,
302✔
169
                pipeline_idx,
302✔
170
            )?;
302✔
171
        }
×
172
        SetExpr::Query(query) => {
×
173
            let query_name = format!("subquery_{}", uuid::Uuid::new_v4());
×
174
            let mut ctx = QueryContext::default();
×
175
            query_to_pipeline(
×
176
                &TableInfo {
×
177
                    name: NameOrAlias(query_name, None),
×
178
                    is_derived: true,
×
179
                    override_name: None,
×
180
                },
×
181
                &query,
×
182
                pipeline,
×
183
                &mut ctx,
×
184
                stateful,
×
185
                pipeline_idx,
×
186
            )?
×
187
        }
×
188
        SetExpr::SetOperation {
×
189
            op,
22✔
190
            set_quantifier,
22✔
191
            left,
22✔
192
            right,
22✔
193
        } => match op {
22✔
194
            SetOperator::Union => {
×
195
                set_to_pipeline(
22✔
196
                    table_info,
22✔
197
                    left,
22✔
198
                    right,
22✔
199
                    set_quantifier,
22✔
200
                    pipeline,
22✔
201
                    query_ctx,
22✔
202
                    stateful,
22✔
203
                    pipeline_idx,
22✔
204
                )?;
22✔
205
            }
206
            _ => return Err(PipelineError::InvalidOperator(op.to_string())),
×
207
        },
×
208
        _ => {
×
209
            return Err(PipelineError::UnsupportedSqlError(
×
210
                UnsupportedSqlError::GenericError("Unsupported query body structure".to_string()),
×
211
            ))
×
212
        }
×
213
    };
214
    Ok(())
324✔
215
}
324✔
216

×
217
fn select_to_pipeline(
346✔
218
    table_info: &TableInfo,
346✔
219
    select: Select,
346✔
220
    pipeline: &mut AppPipeline<SchemaSQLContext>,
346✔
221
    query_ctx: &mut QueryContext,
346✔
222
    stateful: bool,
346✔
223
    pipeline_idx: usize,
346✔
224
) -> Result<String, PipelineError> {
346✔
225
    // FROM clause
346✔
226
    if select.from.len() != 1 {
346✔
227
        return Err(PipelineError::UnsupportedSqlError(
×
228
            UnsupportedSqlError::FromCommaSyntax,
×
229
        ));
×
230
    }
346✔
231

×
232
    let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx, pipeline_idx)?;
346✔
233

×
234
    let (input_nodes, output_node) = add_from_to_pipeline(
346✔
235
        pipeline,
346✔
236
        &input_tables,
346✔
237
        &mut query_ctx.pipeline_map,
346✔
238
        pipeline_idx,
346✔
239
    )?;
346✔
240

×
241
    let gen_agg_name = format!("agg_{}", uuid::Uuid::new_v4());
346✔
242
    let gen_selection_name = format!("select_{}", uuid::Uuid::new_v4());
346✔
243
    let (gen_product_name, product_output_port) = output_node;
346✔
244

×
245
    for (source_name, processor_name, processor_port) in input_nodes.iter() {
346✔
246
        if let Some(table_info) = query_ctx
75✔
247
            .pipeline_map
75✔
248
            .get(&(pipeline_idx, source_name.clone()))
75✔
249
        {
×
250
            pipeline.connect_nodes(
75✔
251
                &table_info.node,
75✔
252
                Some(table_info.port),
75✔
253
                processor_name,
75✔
254
                Some(*processor_port as PortHandle),
75✔
255
                true,
75✔
256
            )?;
75✔
257
            // If not present in pipeline_map, insert into used_sources as this is coming from source
×
258
        } else {
×
259
            query_ctx.used_sources.push(source_name.clone());
×
260
        }
×
261
    }
×
262

×
263
    let aggregation = AggregationProcessorFactory::new(select.clone(), stateful);
346✔
264

346✔
265
    pipeline.add_processor(Arc::new(aggregation), &gen_agg_name, vec![]);
346✔
266

×
267
    // Where clause
×
268
    if let Some(selection) = select.selection {
346✔
269
        let selection = SelectionProcessorFactory::new(selection);
72✔
270

72✔
271
        pipeline.add_processor(Arc::new(selection), &gen_selection_name, vec![]);
72✔
272

72✔
273
        pipeline.connect_nodes(
72✔
274
            &gen_product_name,
72✔
275
            Some(product_output_port),
72✔
276
            &gen_selection_name,
72✔
277
            Some(DEFAULT_PORT_HANDLE),
72✔
278
            true,
72✔
279
        )?;
72✔
280

×
281
        pipeline.connect_nodes(
72✔
282
            &gen_selection_name,
72✔
283
            Some(DEFAULT_PORT_HANDLE),
72✔
284
            &gen_agg_name,
72✔
285
            Some(DEFAULT_PORT_HANDLE),
72✔
286
            true,
72✔
287
        )?;
72✔
288
    } else {
×
289
        pipeline.connect_nodes(
274✔
290
            &gen_product_name,
274✔
291
            Some(product_output_port),
274✔
292
            &gen_agg_name,
274✔
293
            Some(DEFAULT_PORT_HANDLE),
274✔
294
            true,
274✔
295
        )?;
274✔
296
    }
×
297

×
298
    query_ctx.pipeline_map.insert(
346✔
299
        (pipeline_idx, table_info.name.0.to_string()),
346✔
300
        OutputNodeInfo {
346✔
301
            node: gen_agg_name.clone(),
346✔
302
            port: DEFAULT_PORT_HANDLE,
346✔
303
            is_derived: table_info.is_derived,
346✔
304
        },
346✔
305
    );
346✔
306

307
    let output_table_name = if let Some(into) = select.into {
346✔
308
        Some(into.name.to_string())
8✔
309
    } else {
×
310
        table_info.override_name.clone()
338✔
311
    };
×
312
    if let Some(table_name) = output_table_name {
346✔
313
        query_ctx.output_tables_map.insert(
249✔
314
            table_name,
249✔
315
            OutputNodeInfo {
249✔
316
                node: gen_agg_name.clone(),
249✔
317
                port: DEFAULT_PORT_HANDLE,
249✔
318
                is_derived: false,
249✔
319
            },
249✔
320
        );
249✔
321
    }
252✔
322

323
    Ok(gen_agg_name)
346✔
324
}
346✔
325

×
326
#[allow(clippy::type_complexity)]
×
327
fn add_from_to_pipeline(
346✔
328
    pipeline: &mut AppPipeline<SchemaSQLContext>,
346✔
329
    input_tables: &IndexedTableWithJoins,
346✔
330
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
346✔
331
    pipeline_idx: usize,
346✔
332
) -> Result<(Vec<(String, String, PortHandle)>, (String, PortHandle)), PipelineError> {
346✔
333
    let mut pipeline_entry_points = vec![];
346✔
334
    let mut product_entry_points = vec![];
346✔
335
    let mut input_nodes = vec![];
346✔
336

346✔
337
    let (relation_name_or_alias, relation) = input_tables.relation.clone();
346✔
338

346✔
339
    let product_processor_name = format!("product_{}", uuid::Uuid::new_v4());
346✔
340
    let product_processor = FromProcessorFactory::new(input_tables.clone());
346✔
341

346✔
342
    if relation_is_a_window(&relation).map_err(PipelineError::WindowError)? {
346✔
343
        let window_processor = WindowProcessorFactory::new(relation);
×
344
        let window_processor_name = format!("window_{}", uuid::Uuid::new_v4());
×
345
        let window_source_name = window_processor.get_source_name()?;
×
346
        let mut window_entry_points = vec![];
×
347

×
348
        if is_an_entry_point(&window_source_name, pipeline_map, pipeline_idx) {
×
349
            let entry_point = PipelineEntryPoint::new(
×
350
                AppSourceId::new(window_source_name, None),
×
351
                DEFAULT_PORT_HANDLE as PortHandle,
×
352
            );
×
353

×
354
            window_entry_points.push(entry_point.clone());
×
355
            pipeline_entry_points.push(entry_point);
×
356
        } else {
×
357
            input_nodes.push((
×
358
                window_source_name,
×
359
                window_processor_name.clone(),
×
360
                DEFAULT_PORT_HANDLE as PortHandle,
×
361
            ));
×
362
        }
×
363

×
364
        pipeline.add_processor(
×
365
            Arc::new(window_processor),
×
366
            &window_processor_name,
×
367
            window_entry_points,
×
368
        );
×
369

×
370
        pipeline.connect_nodes(
×
371
            &window_processor_name,
×
372
            Some(DEFAULT_PORT_HANDLE as PortHandle),
×
373
            &product_processor_name,
×
374
            Some(0 as PortHandle),
×
375
            true,
×
376
        )?;
×
377
    } else {
378
        let product_input_name = relation_name_or_alias.0;
346✔
379

346✔
380
        if is_an_entry_point(&product_input_name, pipeline_map, pipeline_idx) {
346✔
381
            let entry_point = PipelineEntryPoint::new(
272✔
382
                AppSourceId::new(product_input_name, None),
272✔
383
                0 as PortHandle,
272✔
384
            );
272✔
385

272✔
386
            product_entry_points.push(entry_point.clone());
272✔
387
            pipeline_entry_points.push(entry_point);
272✔
388
        } else {
272✔
389
            input_nodes.push((
74✔
390
                product_input_name,
74✔
391
                product_processor_name.clone(),
74✔
392
                0 as PortHandle,
74✔
393
            ));
74✔
394
        }
74✔
395
    }
×
396

×
397
    for (index, (join_relation_alias, join)) in input_tables.joins.iter().enumerate() {
346✔
398
        let (relation_name_or_alias, relation) =
67✔
399
            (join_relation_alias.clone(), join.relation.clone());
67✔
400

67✔
401
        if relation_is_a_window(&relation).map_err(PipelineError::WindowError)? {
67✔
402
            let window_processor = WindowProcessorFactory::new(relation.clone());
×
403
            let window_processor_name = format!("window_{}", uuid::Uuid::new_v4());
×
404
            let window_input_name = window_processor.get_source_name()?;
×
405
            let mut window_entry_points = vec![];
×
406

×
407
            if is_an_entry_point(&window_input_name, pipeline_map, pipeline_idx) {
×
408
                let entry_point = PipelineEntryPoint::new(
×
409
                    AppSourceId::new(window_input_name, None),
×
410
                    DEFAULT_PORT_HANDLE as PortHandle,
×
411
                );
×
412

×
413
                window_entry_points.push(entry_point.clone());
×
414
                pipeline_entry_points.push(entry_point);
×
415
            } else {
×
416
                input_nodes.push((
×
417
                    window_input_name,
×
418
                    window_processor_name.clone(),
×
419
                    DEFAULT_PORT_HANDLE as PortHandle,
×
420
                ));
×
421
            }
×
422

×
423
            pipeline.add_processor(
×
424
                Arc::new(window_processor),
×
425
                &window_processor_name,
×
426
                window_entry_points,
×
427
            );
×
428

×
429
            pipeline.connect_nodes(
×
430
                &window_processor_name,
×
431
                Some(DEFAULT_PORT_HANDLE as PortHandle),
×
432
                &product_processor_name,
×
433
                Some((index + 1) as PortHandle),
×
434
                true,
×
435
            )?;
×
436
        } else {
×
437
            let product_input_name = relation_name_or_alias.0;
67✔
438

67✔
439
            if is_an_entry_point(&product_input_name, pipeline_map, pipeline_idx) {
67✔
440
                let entry_point = PipelineEntryPoint::new(
66✔
441
                    AppSourceId::new(product_input_name, None),
66✔
442
                    (index + 1) as PortHandle,
66✔
443
                );
66✔
444

66✔
445
                product_entry_points.push(entry_point.clone());
66✔
446
                pipeline_entry_points.push(entry_point);
66✔
447
            } else {
66✔
448
                input_nodes.push((
1✔
449
                    product_input_name,
1✔
450
                    product_processor_name.clone(),
1✔
451
                    (index + 1) as PortHandle,
1✔
452
                ));
1✔
453
            }
1✔
454
        }
×
455
    }
×
456

×
457
    pipeline.add_processor(
346✔
458
        Arc::new(product_processor),
346✔
459
        &product_processor_name,
346✔
460
        product_entry_points,
346✔
461
    );
346✔
462

346✔
463
    Ok((
346✔
464
        input_nodes,
346✔
465
        (product_processor_name, DEFAULT_PORT_HANDLE as PortHandle),
346✔
466
    ))
346✔
467
}
346✔
468

×
469
#[allow(clippy::too_many_arguments)]
×
470
fn set_to_pipeline(
22✔
471
    table_info: &TableInfo,
22✔
472
    left_select: Box<SetExpr>,
22✔
473
    right_select: Box<SetExpr>,
22✔
474
    set_quantifier: SetQuantifier,
22✔
475
    pipeline: &mut AppPipeline<SchemaSQLContext>,
22✔
476
    query_ctx: &mut QueryContext,
22✔
477
    stateful: bool,
22✔
478
    pipeline_idx: usize,
22✔
479
) -> Result<String, PipelineError> {
22✔
480
    let gen_left_set_name = format!("set_left_{}", uuid::Uuid::new_v4());
22✔
481
    let left_table_info = TableInfo {
22✔
482
        name: NameOrAlias(gen_left_set_name.clone(), None),
22✔
483
        override_name: None,
22✔
484
        is_derived: false,
22✔
485
    };
22✔
486
    let gen_right_set_name = format!("set_right_{}", uuid::Uuid::new_v4());
22✔
487
    let right_table_info = TableInfo {
22✔
488
        name: NameOrAlias(gen_right_set_name.clone(), None),
22✔
489
        override_name: None,
22✔
490
        is_derived: false,
22✔
491
    };
22✔
492

493
    let _left_pipeline_name = match *left_select {
22✔
494
        SetExpr::Select(select) => select_to_pipeline(
22✔
495
            &left_table_info,
22✔
496
            *select,
22✔
497
            pipeline,
22✔
498
            query_ctx,
22✔
499
            stateful,
22✔
500
            pipeline_idx,
22✔
501
        )?,
22✔
502
        SetExpr::SetOperation {
×
503
            op: _,
504
            set_quantifier,
×
505
            left,
×
506
            right,
×
507
        } => set_to_pipeline(
×
508
            &left_table_info,
×
509
            left,
×
510
            right,
×
511
            set_quantifier,
×
512
            pipeline,
×
513
            query_ctx,
×
514
            stateful,
×
515
            pipeline_idx,
×
516
        )?,
×
517
        _ => {
×
518
            return Err(PipelineError::InvalidQuery(
×
519
                "Invalid UNION left Query".to_string(),
×
520
            ))
×
521
        }
×
522
    };
×
523

×
524
    let _right_pipeline_name = match *right_select {
22✔
525
        SetExpr::Select(select) => select_to_pipeline(
22✔
526
            &right_table_info,
22✔
527
            *select,
22✔
528
            pipeline,
22✔
529
            query_ctx,
22✔
530
            stateful,
22✔
531
            pipeline_idx,
22✔
532
        )?,
22✔
533
        SetExpr::SetOperation {
×
534
            op: _,
×
535
            set_quantifier,
×
536
            left,
×
537
            right,
×
538
        } => set_to_pipeline(
×
539
            &right_table_info,
×
540
            left,
×
541
            right,
×
542
            set_quantifier,
×
543
            pipeline,
×
544
            query_ctx,
×
545
            stateful,
×
546
            pipeline_idx,
×
547
        )?,
×
548
        _ => {
×
549
            return Err(PipelineError::InvalidQuery(
×
550
                "Invalid UNION right Query".to_string(),
×
551
            ))
×
552
        }
553
    };
554

×
555
    let left_pipeline_output_node = match query_ctx
22✔
556
        .pipeline_map
22✔
557
        .get(&(pipeline_idx, gen_left_set_name))
22✔
558
    {
×
559
        Some(pipeline) => pipeline,
22✔
560
        None => {
×
561
            return Err(PipelineError::InvalidQuery(
×
562
                "Invalid UNION left Query".to_string(),
×
563
            ))
×
564
        }
×
565
    };
×
566

×
567
    let right_pipeline_output_node = match query_ctx
22✔
568
        .pipeline_map
22✔
569
        .get(&(pipeline_idx, gen_right_set_name))
22✔
570
    {
×
571
        Some(pipeline) => pipeline,
22✔
572
        None => {
×
573
            return Err(PipelineError::InvalidQuery(
×
574
                "Invalid UNION Right Query".to_string(),
×
575
            ))
×
576
        }
×
577
    };
578

×
579
    let set_proc_fac = SetProcessorFactory::new(set_quantifier);
22✔
580

22✔
581
    let mut gen_set_name = format!("set_{}", uuid::Uuid::new_v4());
22✔
582

22✔
583
    if table_info.override_name.is_some() {
22✔
584
        gen_set_name = table_info.override_name.to_owned().unwrap();
×
585
    }
22✔
586

587
    pipeline.add_processor(Arc::new(set_proc_fac), &gen_set_name, vec![]);
22✔
588

22✔
589
    pipeline.connect_nodes(
22✔
590
        &left_pipeline_output_node.node,
22✔
591
        Some(left_pipeline_output_node.port),
22✔
592
        &gen_set_name,
22✔
593
        Some(0 as PortHandle),
22✔
594
        true,
22✔
595
    )?;
22✔
596

×
597
    pipeline.connect_nodes(
22✔
598
        &right_pipeline_output_node.node,
22✔
599
        Some(right_pipeline_output_node.port),
22✔
600
        &gen_set_name,
22✔
601
        Some(1 as PortHandle),
22✔
602
        true,
22✔
603
    )?;
22✔
604

×
605
    for (_, table_name) in query_ctx.pipeline_map.keys() {
44✔
606
        query_ctx.output_tables_map.remove_entry(table_name);
44✔
607
    }
44✔
608

×
609
    query_ctx.pipeline_map.insert(
22✔
610
        (pipeline_idx, table_info.name.0.to_string()),
22✔
611
        OutputNodeInfo {
22✔
612
            node: gen_set_name.clone(),
22✔
613
            port: DEFAULT_PORT_HANDLE,
22✔
614
            is_derived: table_info.is_derived,
22✔
615
        },
22✔
616
    );
22✔
617

22✔
618
    Ok(gen_set_name)
22✔
619
}
22✔
620

×
621
/// Returns a vector of input port handles and relative table name
×
622
///
×
623
/// # Errors
×
624
///
×
625
/// This function will return an error if it's not possible to get an input name.
×
626
pub fn get_input_tables(
346✔
627
    from: &TableWithJoins,
346✔
628
    pipeline: &mut AppPipeline<SchemaSQLContext>,
346✔
629
    query_ctx: &mut QueryContext,
346✔
630
    pipeline_idx: usize,
346✔
631
) -> Result<IndexedTableWithJoins, PipelineError> {
346✔
632
    let name = get_from_source(&from.relation, pipeline, query_ctx, pipeline_idx)?;
346✔
633
    let mut joins = vec![];
346✔
634

×
635
    for join in from.joins.iter() {
346✔
636
        let input_name = get_from_source(&join.relation, pipeline, query_ctx, pipeline_idx)?;
67✔
637
        joins.push((input_name.clone(), join.clone()));
67✔
638
    }
×
639

×
640
    Ok(IndexedTableWithJoins {
346✔
641
        relation: (name, from.relation.clone()),
346✔
642
        joins,
346✔
643
    })
346✔
644
}
346✔
645

×
646
pub fn get_input_names(input_tables: &IndexedTableWithJoins) -> Vec<NameOrAlias> {
×
647
    let mut input_names = vec![];
×
648
    input_names.push(input_tables.relation.0.clone());
×
649

×
650
    for join in &input_tables.joins {
×
651
        input_names.push(join.0.clone());
×
652
    }
×
653
    input_names
×
654
}
×
655

×
656
pub fn get_entry_points(
×
657
    input_tables: &IndexedTableWithJoins,
×
658
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
659
    pipeline_idx: usize,
×
660
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
×
661
    let mut endpoints = vec![];
×
662

×
663
    let input_names = get_input_names(input_tables);
×
664

665
    for (input_port, table) in input_names.iter().enumerate() {
×
666
        let name = table.0.clone();
×
667
        if !pipeline_map.contains_key(&(pipeline_idx, name.clone())) {
×
668
            endpoints.push(PipelineEntryPoint::new(
×
669
                AppSourceId::new(name, None),
×
670
                input_port as PortHandle,
×
671
            ));
×
672
        }
×
673
    }
674

675
    Ok(endpoints)
×
676
}
×
677

678
pub fn is_an_entry_point(
413✔
679
    name: &str,
413✔
680
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
413✔
681
    pipeline_idx: usize,
413✔
682
) -> bool {
413✔
683
    if !pipeline_map.contains_key(&(pipeline_idx, name.to_owned())) {
413✔
684
        return true;
338✔
685
    }
75✔
686
    false
75✔
687
}
413✔
688

689
pub fn get_from_source(
413✔
690
    relation: &TableFactor,
413✔
691
    pipeline: &mut AppPipeline<SchemaSQLContext>,
413✔
692
    query_ctx: &mut QueryContext,
413✔
693
    pipeline_idx: usize,
413✔
694
) -> Result<NameOrAlias, PipelineError> {
413✔
695
    match relation {
413✔
696
        TableFactor::Table { name, alias, .. } => {
391✔
697
            let input_name = name
391✔
698
                .0
391✔
699
                .iter()
391✔
700
                .map(ExpressionBuilder::normalize_ident)
391✔
701
                .collect::<Vec<String>>()
391✔
702
                .join(".");
391✔
703
            let alias_name = alias
391✔
704
                .as_ref()
391✔
705
                .map(|a| ExpressionBuilder::fullname_from_ident(&[a.name.clone()]));
391✔
706

391✔
707
            Ok(NameOrAlias(input_name, alias_name))
391✔
708
        }
709
        TableFactor::Derived {
710
            lateral: _,
711
            subquery,
22✔
712
            alias,
22✔
713
        } => {
22✔
714
            let name = format!("derived_{}", uuid::Uuid::new_v4());
22✔
715
            let alias_name = alias.as_ref().map(|alias_ident| {
22✔
716
                ExpressionBuilder::fullname_from_ident(&[alias_ident.name.clone()])
12✔
717
            });
22✔
718

22✔
719
            let name_or = NameOrAlias(name, alias_name);
22✔
720
            query_to_pipeline(
22✔
721
                &TableInfo {
22✔
722
                    name: name_or.clone(),
22✔
723
                    is_derived: true,
22✔
724
                    override_name: None,
22✔
725
                },
22✔
726
                subquery,
22✔
727
                pipeline,
22✔
728
                query_ctx,
22✔
729
                false,
22✔
730
                pipeline_idx,
22✔
731
            )?;
22✔
732

733
            Ok(name_or)
22✔
734
        }
735
        _ => Err(PipelineError::UnsupportedSqlError(
×
736
            UnsupportedSqlError::JoinTable,
×
737
        )),
×
738
    }
739
}
413✔
740

741
#[cfg(test)]
742
mod tests {
743
    use dozer_core::app::AppPipeline;
744

745
    use super::statement_to_pipeline;
746

747
    #[test]
1✔
748
    fn parse_sql_pipeline() {
1✔
749
        let sql = r#"
1✔
750
                SELECT
1✔
751
                    a.name as "Genre",
1✔
752
                    SUM(amount) as "Gross Revenue(in $)"
1✔
753
                INTO gross_revenue_stats
1✔
754
                FROM
1✔
755
                (
1✔
756
                    SELECT
1✔
757
                        c.name,
1✔
758
                        f.title,
1✔
759
                        p.amount
1✔
760
                    FROM film f
1✔
761
                    LEFT JOIN film_category fc
1✔
762
                        ON fc.film_id = f.film_id
1✔
763
                    LEFT JOIN category c
1✔
764
                        ON fc.category_id = c.category_id
1✔
765
                    LEFT JOIN inventory i
1✔
766
                        ON i.film_id = f.film_id
1✔
767
                    LEFT JOIN rental r
1✔
768
                        ON r.inventory_id = i.inventory_id
1✔
769
                    LEFT JOIN payment p
1✔
770
                        ON p.rental_id = r.rental_id
1✔
771
                    WHERE p.amount IS NOT NULL
1✔
772
                ) a
1✔
773
                GROUP BY name;
1✔
774

1✔
775
                SELECT
1✔
776
                f.name, f.title, p.amount
1✔
777
                INTO film_amounts
1✔
778
                FROM film f
1✔
779
                LEFT JOIN film_category fc;
1✔
780

1✔
781
                WITH tbl as (select id from a)
1✔
782
                select id
1✔
783
                into cte_table
1✔
784
                from tbl;
1✔
785

1✔
786
                WITH tbl as (select id from  a),
1✔
787
                tbl2 as (select id from tbl)
1✔
788
                select id
1✔
789
                into nested_cte_table
1✔
790
                from tbl2;
1✔
791

1✔
792
                WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
793
                cte_table2 as (select id_ct1 from cte_table1)
1✔
794
                select id_ct2
1✔
795
                into nested_derived_table
1✔
796
                from cte_table2;
1✔
797

1✔
798
                with tbl as (select id, ticker from stocks)
1✔
799
                select tbl.id
1✔
800
                into nested_stocks_table
1✔
801
                from  stocks join tbl on tbl.id = stocks.id;
1✔
802
            "#;
1✔
803

1✔
804
        let context = statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap();
1✔
805

1✔
806
        // Should create as many output tables as into statements
1✔
807
        let mut output_keys = context.output_tables_map.keys().collect::<Vec<_>>();
1✔
808
        output_keys.sort();
1✔
809
        let mut expected_keys = vec![
1✔
810
            "gross_revenue_stats",
1✔
811
            "film_amounts",
1✔
812
            "cte_table",
1✔
813
            "nested_cte_table",
1✔
814
            "nested_derived_table",
1✔
815
            "nested_stocks_table",
1✔
816
        ];
1✔
817
        expected_keys.sort();
1✔
818
        assert_eq!(output_keys, expected_keys);
1✔
819
    }
1✔
820
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc