• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6013595925

29 Aug 2023 02:22PM UTC coverage: 76.363% (-1.6%) from 77.986%
6013595925

push

github

web-flow
chore: Increase e2e test timeout to 90 minutes (#1936)

Signed-off-by: Bei Chu <914745487@qq.com>

49036 of 64214 relevant lines covered (76.36%)

48121.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.18
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::{ExpressionBuilder, NameOrAlias};
5
use crate::pipeline::selection::factory::SelectionProcessorFactory;
6
use dozer_core::app::AppPipeline;
7
use dozer_core::app::PipelineEntryPoint;
8
use dozer_core::node::PortHandle;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use sqlparser::ast::{Join, SetOperator, SetQuantifier, TableFactor, TableWithJoins};
11

12
use sqlparser::{
13
    ast::{Query, Select, SetExpr, Statement},
14
    dialect::DozerDialect,
15
    parser::Parser,
16
};
17
use std::collections::HashMap;
18

19
use super::errors::UnsupportedSqlError;
20
use super::pipeline_builder::from_builder::insert_from_to_pipeline;
21

22
use super::product::set::set_factory::SetProcessorFactory;
23

24
#[derive(Debug, Clone)]
40✔
25
pub struct OutputNodeInfo {
26
    // Name to connect in dag
27
    pub node: String,
×
28
    // Port to connect in dag
29
    pub port: PortHandle,
30
    // If this table is originally from a source or created in transforms
31
    pub is_derived: bool,
32
    // TODO add:indexes to the tables
33
}
34

35
pub struct TableInfo {
36
    pub name: NameOrAlias,
37
    pub override_name: Option<String>,
38
    pub is_derived: bool,
39
}
40
/// The struct contains some contexts during query to pipeline.
41
#[derive(Debug, Clone, Default)]
555✔
42
pub struct QueryContext {
43
    // Internal tables map, used to store the tables that are created by the queries
44
    pub pipeline_map: HashMap<(usize, String), OutputNodeInfo>,
×
45

46
    // Output tables map that are marked with "INTO" used to store the tables, these can be exposed to sinks.
47
    pub output_tables_map: HashMap<String, OutputNodeInfo>,
48

49
    // Used Sources
50
    pub used_sources: Vec<String>,
51

52
    // Processors counter
53
    pub processor_counter: usize,
54
}
55

56
impl QueryContext {
57
    pub fn get_next_processor_id(&mut self) -> usize {
2,832✔
58
        self.processor_counter += 1;
2,832✔
59
        self.processor_counter
2,832✔
60
    }
2,832✔
61
}
×
62

×
63
#[derive(Debug, Clone)]
×
64
pub struct IndexedTableWithJoins {
65
    pub relation: (NameOrAlias, TableFactor),
66
    pub joins: Vec<(NameOrAlias, Join)>,
×
67
}
68
pub fn statement_to_pipeline(
555✔
69
    sql: &str,
555✔
70
    pipeline: &mut AppPipeline,
555✔
71
    override_name: Option<String>,
555✔
72
) -> Result<QueryContext, PipelineError> {
555✔
73
    let dialect = DozerDialect {};
555✔
74
    let mut ctx = QueryContext::default();
555✔
75
    let is_top_select = true;
555✔
76
    let ast = Parser::parse_sql(&dialect, sql)
555✔
77
        .map_err(|err| PipelineError::InternalError(Box::new(err)))?;
555✔
78
    let query_name = NameOrAlias(format!("query_{}", ctx.get_next_processor_id()), None);
555✔
79

×
80
    for (idx, statement) in ast.iter().enumerate() {
560✔
81
        match statement {
560✔
82
            Statement::Query(query) => {
560✔
83
                query_to_pipeline(
560✔
84
                    &TableInfo {
560✔
85
                        name: query_name.clone(),
560✔
86
                        is_derived: false,
560✔
87
                        override_name: override_name.clone(),
560✔
88
                    },
560✔
89
                    query,
560✔
90
                    pipeline,
560✔
91
                    &mut ctx,
560✔
92
                    false,
560✔
93
                    idx,
560✔
94
                    is_top_select,
560✔
95
                )?;
560✔
96
            }
×
97
            s => {
×
98
                return Err(PipelineError::UnsupportedSqlError(
×
99
                    UnsupportedSqlError::GenericError(s.to_string()),
×
100
                ))
×
101
            }
×
102
        }
×
103
    }
×
104

105
    Ok(ctx)
551✔
106
}
555✔
107

108
fn query_to_pipeline(
663✔
109
    table_info: &TableInfo,
663✔
110
    query: &Query,
663✔
111
    pipeline: &mut AppPipeline,
663✔
112
    query_ctx: &mut QueryContext,
663✔
113
    stateful: bool,
663✔
114
    pipeline_idx: usize,
663✔
115
    is_top_select: bool,
663✔
116
) -> Result<(), PipelineError> {
663✔
117
    // return error if there is unsupported syntax
663✔
118
    if !query.order_by.is_empty() {
663✔
119
        return Err(PipelineError::UnsupportedSqlError(
×
120
            UnsupportedSqlError::OrderByError,
×
121
        ));
×
122
    }
663✔
123

663✔
124
    if query.limit.is_some() || query.offset.is_some() {
663✔
125
        return Err(PipelineError::UnsupportedSqlError(
×
126
            UnsupportedSqlError::LimitOffsetError,
×
127
        ));
×
128
    }
663✔
129

×
130
    // Attach the first pipeline if there is with clause
×
131
    if let Some(with) = &query.with {
663✔
132
        if with.recursive {
62✔
133
            return Err(PipelineError::UnsupportedSqlError(
×
134
                UnsupportedSqlError::Recursive,
×
135
            ));
×
136
        }
62✔
137

×
138
        for table in &with.cte_tables {
126✔
139
            if table.from.is_some() {
64✔
140
                return Err(PipelineError::UnsupportedSqlError(
×
141
                    UnsupportedSqlError::CteFromError,
×
142
                ));
×
143
            }
64✔
144
            let table_name = table.alias.name.to_string();
64✔
145
            if query_ctx
64✔
146
                .pipeline_map
64✔
147
                .contains_key(&(pipeline_idx, table_name.clone()))
64✔
148
            {
×
149
                return Err(InvalidQuery(format!(
×
150
                    "WITH query name {table_name:?} specified more than once"
×
151
                )));
×
152
            }
64✔
153
            query_to_pipeline(
64✔
154
                &TableInfo {
64✔
155
                    name: NameOrAlias(table_name.clone(), Some(table_name)),
64✔
156
                    is_derived: true,
64✔
157
                    override_name: None,
64✔
158
                },
64✔
159
                &table.query,
64✔
160
                pipeline,
64✔
161
                query_ctx,
64✔
162
                true,
64✔
163
                pipeline_idx,
64✔
164
                false, //Inside a with clause, so not top select
64✔
165
            )?;
64✔
166
        }
×
167
    };
601✔
168

×
169
    match *query.body.clone() {
663✔
170
        SetExpr::Select(select) => {
635✔
171
            select_to_pipeline(
635✔
172
                table_info,
635✔
173
                *select,
635✔
174
                pipeline,
635✔
175
                query_ctx,
635✔
176
                stateful,
635✔
177
                pipeline_idx,
635✔
178
                is_top_select,
635✔
179
            )?;
635✔
180
        }
×
181
        SetExpr::Query(query) => {
×
182
            let query_name = format!("subquery_{}", query_ctx.get_next_processor_id());
×
183
            let mut ctx = QueryContext::default();
×
184
            query_to_pipeline(
×
185
                &TableInfo {
×
186
                    name: NameOrAlias(query_name, None),
×
187
                    is_derived: true,
×
188
                    override_name: None,
×
189
                },
×
190
                &query,
×
191
                pipeline,
×
192
                &mut ctx,
×
193
                stateful,
×
194
                pipeline_idx,
×
195
                false, //Inside a subquery, so not top select
×
196
            )?
×
197
        }
×
198
        SetExpr::SetOperation {
×
199
            op,
28✔
200
            set_quantifier,
28✔
201
            left,
28✔
202
            right,
28✔
203
        } => match op {
28✔
204
            SetOperator::Union => {
×
205
                set_to_pipeline(
28✔
206
                    table_info,
28✔
207
                    left,
28✔
208
                    right,
28✔
209
                    set_quantifier,
28✔
210
                    pipeline,
28✔
211
                    query_ctx,
28✔
212
                    stateful,
28✔
213
                    pipeline_idx,
28✔
214
                    is_top_select,
28✔
215
                )?;
28✔
216
            }
×
217
            _ => return Err(PipelineError::InvalidOperator(op.to_string())),
×
218
        },
×
219
        _ => {
220
            return Err(PipelineError::UnsupportedSqlError(
×
221
                UnsupportedSqlError::GenericError("Unsupported query body structure".to_string()),
×
222
            ))
×
223
        }
×
224
    };
×
225
    Ok(())
659✔
226
}
663✔
227

228
fn select_to_pipeline(
691✔
229
    table_info: &TableInfo,
691✔
230
    select: Select,
691✔
231
    pipeline: &mut AppPipeline,
691✔
232
    query_ctx: &mut QueryContext,
691✔
233
    stateful: bool,
691✔
234
    pipeline_idx: usize,
691✔
235
    is_top_select: bool,
691✔
236
) -> Result<String, PipelineError> {
691✔
237
    // FROM clause
691✔
238
    if select.from.len() != 1 {
691✔
239
        return Err(PipelineError::UnsupportedSqlError(
×
240
            UnsupportedSqlError::FromCommaSyntax,
×
241
        ));
×
242
    }
691✔
243

×
244
    // let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx, pipeline_idx)?;
×
245
    //
×
246
    // let (input_nodes, output_node, mut used_sources) = add_from_to_pipeline(
247
    //     pipeline,
248
    //     &input_tables,
249
    //     &mut query_ctx.pipeline_map,
250
    //     pipeline_idx,
251
    // )?;
252

253
    let connection_info =
691✔
254
        insert_from_to_pipeline(&select.from[0], pipeline, pipeline_idx, query_ctx)?;
691✔
255

256
    let input_nodes = connection_info.input_nodes;
691✔
257
    let output_node = connection_info.output_node;
691✔
258

691✔
259
    let gen_agg_name = format!("agg--{}", query_ctx.get_next_processor_id());
691✔
260

691✔
261
    let gen_selection_name = format!("select--{}", query_ctx.get_next_processor_id());
691✔
262
    let (gen_product_name, product_output_port) = output_node;
691✔
263

×
264
    for (source_name, processor_name, processor_port) in input_nodes.iter() {
691✔
265
        if let Some(table_info) = query_ctx
103✔
266
            .pipeline_map
103✔
267
            .get(&(pipeline_idx, source_name.clone()))
103✔
268
        {
103✔
269
            pipeline.connect_nodes(
103✔
270
                &table_info.node,
103✔
271
                table_info.port,
103✔
272
                processor_name,
103✔
273
                *processor_port as PortHandle,
103✔
274
            );
103✔
275
            // If not present in pipeline_map, insert into used_sources as this is coming from source
103✔
276
        } else {
103✔
277
            query_ctx.used_sources.push(source_name.clone());
×
278
        }
×
279
    }
×
280

×
281
    let aggregation = AggregationProcessorFactory::new(
691✔
282
        gen_agg_name.clone(),
691✔
283
        select.clone(),
691✔
284
        stateful,
691✔
285
        pipeline
691✔
286
            .flags()
691✔
287
            .enable_probabilistic_optimizations
691✔
288
            .in_aggregations,
691✔
289
    );
691✔
290

691✔
291
    pipeline.add_processor(Box::new(aggregation), &gen_agg_name, vec![]);
691✔
292

×
293
    // Where clause
×
294
    if let Some(selection) = select.selection {
691✔
295
        let selection = SelectionProcessorFactory::new(gen_selection_name.to_owned(), selection);
177✔
296

177✔
297
        pipeline.add_processor(Box::new(selection), &gen_selection_name, vec![]);
177✔
298

177✔
299
        pipeline.connect_nodes(
177✔
300
            &gen_product_name,
177✔
301
            product_output_port,
177✔
302
            &gen_selection_name,
177✔
303
            DEFAULT_PORT_HANDLE,
177✔
304
        );
177✔
305

177✔
306
        pipeline.connect_nodes(
177✔
307
            &gen_selection_name,
177✔
308
            DEFAULT_PORT_HANDLE,
177✔
309
            &gen_agg_name,
177✔
310
            DEFAULT_PORT_HANDLE,
177✔
311
        );
177✔
312
    } else {
514✔
313
        pipeline.connect_nodes(
514✔
314
            &gen_product_name,
514✔
315
            product_output_port,
514✔
316
            &gen_agg_name,
514✔
317
            DEFAULT_PORT_HANDLE,
514✔
318
        );
514✔
319
    }
514✔
320

×
321
    query_ctx.pipeline_map.insert(
691✔
322
        (pipeline_idx, table_info.name.0.to_string()),
691✔
323
        OutputNodeInfo {
691✔
324
            node: gen_agg_name.clone(),
691✔
325
            port: DEFAULT_PORT_HANDLE,
691✔
326
            is_derived: table_info.is_derived,
691✔
327
        },
691✔
328
    );
691✔
329

×
330
    let output_table_name = if let Some(into) = select.into {
691✔
331
        Some(into.name.to_string())
65✔
332
    } else {
333
        table_info.override_name.clone()
626✔
334
    };
×
335

336
    if is_top_select && output_table_name.is_none() {
691✔
337
        return Err(PipelineError::MissingIntoClause);
4✔
338
    }
687✔
339

×
340
    if let Some(table_name) = output_table_name {
687✔
341
        query_ctx.output_tables_map.insert(
556✔
342
            table_name,
556✔
343
            OutputNodeInfo {
556✔
344
                node: gen_agg_name.clone(),
556✔
345
                port: DEFAULT_PORT_HANDLE,
556✔
346
                is_derived: false,
556✔
347
            },
556✔
348
        );
556✔
349
    }
558✔
350

×
351
    Ok(gen_agg_name)
687✔
352
}
691✔
353

354
#[allow(clippy::too_many_arguments)]
×
355
fn set_to_pipeline(
28✔
356
    table_info: &TableInfo,
28✔
357
    left_select: Box<SetExpr>,
28✔
358
    right_select: Box<SetExpr>,
28✔
359
    set_quantifier: SetQuantifier,
28✔
360
    pipeline: &mut AppPipeline,
28✔
361
    query_ctx: &mut QueryContext,
28✔
362
    stateful: bool,
28✔
363
    pipeline_idx: usize,
28✔
364
    is_top_select: bool,
28✔
365
) -> Result<String, PipelineError> {
28✔
366
    let gen_left_set_name = format!("set_left_{}", query_ctx.get_next_processor_id());
28✔
367
    let left_table_info = TableInfo {
28✔
368
        name: NameOrAlias(gen_left_set_name.clone(), None),
28✔
369
        override_name: None,
28✔
370
        is_derived: false,
28✔
371
    };
28✔
372
    let gen_right_set_name = format!("set_right_{}", query_ctx.get_next_processor_id());
28✔
373
    let right_table_info = TableInfo {
28✔
374
        name: NameOrAlias(gen_right_set_name.clone(), None),
28✔
375
        override_name: None,
28✔
376
        is_derived: false,
28✔
377
    };
28✔
378

×
379
    let _left_pipeline_name = match *left_select {
28✔
380
        SetExpr::Select(select) => select_to_pipeline(
28✔
381
            &left_table_info,
28✔
382
            *select,
28✔
383
            pipeline,
28✔
384
            query_ctx,
28✔
385
            stateful,
28✔
386
            pipeline_idx,
28✔
387
            is_top_select,
28✔
388
        )?,
28✔
389
        SetExpr::SetOperation {
×
390
            op: _,
×
391
            set_quantifier,
×
392
            left,
×
393
            right,
×
394
        } => set_to_pipeline(
×
395
            &left_table_info,
×
396
            left,
×
397
            right,
×
398
            set_quantifier,
×
399
            pipeline,
×
400
            query_ctx,
×
401
            stateful,
×
402
            pipeline_idx,
×
403
            is_top_select,
×
404
        )?,
×
405
        _ => {
×
406
            return Err(PipelineError::InvalidQuery(
×
407
                "Invalid UNION left Query".to_string(),
×
408
            ))
×
409
        }
×
410
    };
×
411

×
412
    let _right_pipeline_name = match *right_select {
28✔
413
        SetExpr::Select(select) => select_to_pipeline(
28✔
414
            &right_table_info,
28✔
415
            *select,
28✔
416
            pipeline,
28✔
417
            query_ctx,
28✔
418
            stateful,
28✔
419
            pipeline_idx,
28✔
420
            is_top_select,
28✔
421
        )?,
28✔
422
        SetExpr::SetOperation {
×
423
            op: _,
×
424
            set_quantifier,
×
425
            left,
×
426
            right,
×
427
        } => set_to_pipeline(
×
428
            &right_table_info,
×
429
            left,
×
430
            right,
×
431
            set_quantifier,
×
432
            pipeline,
×
433
            query_ctx,
×
434
            stateful,
×
435
            pipeline_idx,
×
436
            is_top_select,
×
437
        )?,
×
438
        _ => {
×
439
            return Err(PipelineError::InvalidQuery(
×
440
                "Invalid UNION right Query".to_string(),
×
441
            ))
×
442
        }
×
443
    };
×
444

×
445
    let mut gen_set_name = format!("set_{}", query_ctx.get_next_processor_id());
28✔
446

447
    let left_pipeline_output_node = match query_ctx
28✔
448
        .pipeline_map
28✔
449
        .get(&(pipeline_idx, gen_left_set_name))
28✔
450
    {
×
451
        Some(pipeline) => pipeline,
28✔
452
        None => {
×
453
            return Err(PipelineError::InvalidQuery(
×
454
                "Invalid UNION left Query".to_string(),
×
455
            ))
×
456
        }
×
457
    };
×
458

×
459
    let right_pipeline_output_node = match query_ctx
28✔
460
        .pipeline_map
28✔
461
        .get(&(pipeline_idx, gen_right_set_name))
28✔
462
    {
×
463
        Some(pipeline) => pipeline,
28✔
464
        None => {
×
465
            return Err(PipelineError::InvalidQuery(
×
466
                "Invalid UNION Right Query".to_string(),
×
467
            ))
×
468
        }
×
469
    };
×
470

×
471
    if table_info.override_name.is_some() {
28✔
472
        gen_set_name = table_info.override_name.to_owned().unwrap();
×
473
    }
28✔
474

×
475
    let set_proc_fac = SetProcessorFactory::new(
28✔
476
        gen_set_name.clone(),
28✔
477
        set_quantifier,
28✔
478
        pipeline.flags().enable_probabilistic_optimizations.in_sets,
28✔
479
    );
28✔
480

28✔
481
    pipeline.add_processor(Box::new(set_proc_fac), &gen_set_name, vec![]);
28✔
482

28✔
483
    pipeline.connect_nodes(
28✔
484
        &left_pipeline_output_node.node,
28✔
485
        left_pipeline_output_node.port,
28✔
486
        &gen_set_name,
28✔
487
        0 as PortHandle,
28✔
488
    );
28✔
489

28✔
490
    pipeline.connect_nodes(
28✔
491
        &right_pipeline_output_node.node,
28✔
492
        right_pipeline_output_node.port,
28✔
493
        &gen_set_name,
28✔
494
        1 as PortHandle,
28✔
495
    );
28✔
496

×
497
    for (_, table_name) in query_ctx.pipeline_map.keys() {
56✔
498
        query_ctx.output_tables_map.remove_entry(table_name);
56✔
499
    }
56✔
500

×
501
    query_ctx.pipeline_map.insert(
28✔
502
        (pipeline_idx, table_info.name.0.to_string()),
28✔
503
        OutputNodeInfo {
28✔
504
            node: gen_set_name.clone(),
28✔
505
            port: DEFAULT_PORT_HANDLE,
28✔
506
            is_derived: table_info.is_derived,
28✔
507
        },
28✔
508
    );
28✔
509

28✔
510
    Ok(gen_set_name)
28✔
511
}
28✔
512

×
513
/// Returns a vector of input port handles and relative table name
×
514
///
×
515
/// # Errors
516
///
517
/// This function will return an error if it's not possible to get an input name.
518
pub fn get_input_tables(
×
519
    from: &TableWithJoins,
×
520
    pipeline: &mut AppPipeline,
×
521
    query_ctx: &mut QueryContext,
×
522
    pipeline_idx: usize,
×
523
) -> Result<IndexedTableWithJoins, PipelineError> {
×
524
    let name = get_from_source(&from.relation, pipeline, query_ctx, pipeline_idx)?;
×
525
    let mut joins = vec![];
×
526

×
527
    for join in from.joins.iter() {
×
528
        let input_name = get_from_source(&join.relation, pipeline, query_ctx, pipeline_idx)?;
×
529
        joins.push((input_name.clone(), join.clone()));
×
530
    }
×
531

×
532
    Ok(IndexedTableWithJoins {
×
533
        relation: (name, from.relation.clone()),
×
534
        joins,
×
535
    })
×
536
}
×
537

×
538
pub fn get_input_names(input_tables: &IndexedTableWithJoins) -> Vec<NameOrAlias> {
×
539
    let mut input_names = vec![];
×
540
    input_names.push(input_tables.relation.0.clone());
×
541

×
542
    for join in &input_tables.joins {
×
543
        input_names.push(join.0.clone());
×
544
    }
×
545
    input_names
×
546
}
×
547

×
548
pub fn get_entry_points(
×
549
    input_tables: &IndexedTableWithJoins,
×
550
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
551
    pipeline_idx: usize,
×
552
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
×
553
    let mut endpoints = vec![];
×
554

×
555
    let input_names = get_input_names(input_tables);
×
556

×
557
    for (input_port, table) in input_names.iter().enumerate() {
×
558
        let name = table.0.clone();
×
559
        if !pipeline_map.contains_key(&(pipeline_idx, name.clone())) {
×
560
            endpoints.push(PipelineEntryPoint::new(name, input_port as PortHandle));
×
561
        }
×
562
    }
×
563

×
564
    Ok(endpoints)
×
565
}
×
566

567
pub fn is_an_entry_point(
×
568
    name: &str,
×
569
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
570
    pipeline_idx: usize,
×
571
) -> bool {
×
572
    if !pipeline_map.contains_key(&(pipeline_idx, name.to_owned())) {
×
573
        return true;
×
574
    }
×
575
    false
×
576
}
×
577

×
578
pub fn get_from_source(
999✔
579
    relation: &TableFactor,
999✔
580
    pipeline: &mut AppPipeline,
999✔
581
    query_ctx: &mut QueryContext,
999✔
582
    pipeline_idx: usize,
999✔
583
) -> Result<NameOrAlias, PipelineError> {
999✔
584
    match relation {
999✔
585
        TableFactor::Table { name, alias, .. } => {
960✔
586
            let input_name = name
960✔
587
                .0
960✔
588
                .iter()
960✔
589
                .map(ExpressionBuilder::normalize_ident)
960✔
590
                .collect::<Vec<String>>()
960✔
591
                .join(".");
960✔
592
            let alias_name = alias
960✔
593
                .as_ref()
960✔
594
                .map(|a| ExpressionBuilder::fullname_from_ident(&[a.name.clone()]));
960✔
595

960✔
596
            Ok(NameOrAlias(input_name, alias_name))
960✔
597
        }
×
598
        TableFactor::Derived {
×
599
            lateral: _,
×
600
            subquery,
39✔
601
            alias,
39✔
602
        } => {
39✔
603
            let name = format!("derived_{}", query_ctx.get_next_processor_id());
39✔
604
            let alias_name = alias.as_ref().map(|alias_ident| {
39✔
605
                ExpressionBuilder::fullname_from_ident(&[alias_ident.name.clone()])
30✔
606
            });
39✔
607
            let is_top_select = false; //inside FROM clause, so not top select
39✔
608
            let name_or = NameOrAlias(name, alias_name);
39✔
609
            query_to_pipeline(
39✔
610
                &TableInfo {
39✔
611
                    name: name_or.clone(),
39✔
612
                    is_derived: true,
39✔
613
                    override_name: None,
39✔
614
                },
39✔
615
                subquery,
39✔
616
                pipeline,
39✔
617
                query_ctx,
39✔
618
                false,
39✔
619
                pipeline_idx,
39✔
620
                is_top_select,
39✔
621
            )?;
39✔
622

×
623
            Ok(name_or)
39✔
624
        }
×
625
        _ => Err(PipelineError::UnsupportedSqlError(
×
626
            UnsupportedSqlError::JoinTable,
×
627
        )),
×
628
    }
×
629
}
999✔
630

×
631
#[cfg(test)]
632
mod tests {
×
633
    use dozer_core::app::AppPipeline;
634

635
    use super::statement_to_pipeline;
636
    #[test]
1✔
637
    #[should_panic]
638
    fn disallow_zero_outgoing_ndes() {
1✔
639
        let sql = "select * from film";
1✔
640
        statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None).unwrap();
1✔
641
    }
1✔
642
    #[test]
1✔
643
    fn parse_sql_pipeline() {
1✔
644
        let sql = r#"
1✔
645
                SELECT
1✔
646
                    a.name as "Genre",
1✔
647
                    SUM(amount) as "Gross Revenue(in $)"
1✔
648
                INTO gross_revenue_stats
1✔
649
                FROM
1✔
650
                (
1✔
651
                    SELECT
1✔
652
                        c.name,
1✔
653
                        f.title,
1✔
654
                        p.amount
1✔
655
                    FROM film f
1✔
656
                    LEFT JOIN film_category fc
1✔
657
                        ON fc.film_id = f.film_id
1✔
658
                    LEFT JOIN category c
1✔
659
                        ON fc.category_id = c.category_id
1✔
660
                    LEFT JOIN inventory i
1✔
661
                        ON i.film_id = f.film_id
1✔
662
                    LEFT JOIN rental r
1✔
663
                        ON r.inventory_id = i.inventory_id
1✔
664
                    LEFT JOIN payment p
1✔
665
                        ON p.rental_id = r.rental_id
1✔
666
                    WHERE p.amount IS NOT NULL
1✔
667
                ) a
1✔
668
                GROUP BY name;
1✔
669

1✔
670
                SELECT
1✔
671
                f.name, f.title, p.amount
1✔
672
                INTO film_amounts
1✔
673
                FROM film f
1✔
674
                LEFT JOIN film_category fc;
1✔
675

1✔
676
                WITH tbl as (select id from a)
1✔
677
                select id
1✔
678
                into cte_table
1✔
679
                from tbl;
1✔
680

1✔
681
                WITH tbl as (select id from  a),
1✔
682
                tbl2 as (select id from tbl)
1✔
683
                select id
1✔
684
                into nested_cte_table
1✔
685
                from tbl2;
1✔
686

1✔
687
                WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
688
                cte_table2 as (select id_ct1 from cte_table1)
1✔
689
                select id_ct2
1✔
690
                into nested_derived_table
1✔
691
                from cte_table2;
1✔
692

1✔
693
                with tbl as (select id, ticker from stocks)
1✔
694
                select tbl.id
1✔
695
                into nested_stocks_table
1✔
696
                from  stocks join tbl on tbl.id = stocks.id;
1✔
697
            "#;
1✔
698

1✔
699
        let context =
1✔
700
            statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None).unwrap();
1✔
701

1✔
702
        // Should create as many output tables as into statements
1✔
703
        let mut output_keys = context.output_tables_map.keys().collect::<Vec<_>>();
1✔
704
        output_keys.sort();
1✔
705
        let mut expected_keys = vec![
1✔
706
            "gross_revenue_stats",
1✔
707
            "film_amounts",
1✔
708
            "cte_table",
1✔
709
            "nested_cte_table",
1✔
710
            "nested_derived_table",
1✔
711
            "nested_stocks_table",
1✔
712
        ];
1✔
713
        expected_keys.sort();
1✔
714
        assert_eq!(output_keys, expected_keys);
1✔
715
    }
1✔
716
}
×
717

×
718
#[test]
1✔
719
fn test_missing_into_in_simple_from_clause() {
1✔
720
    let sql = r#"SELECT a FROM B "#;
1✔
721
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
722
    //check if the result is an error
×
723
    assert!(matches!(result, Err(PipelineError::MissingIntoClause)))
1✔
724
}
1✔
725

726
#[test]
1✔
727
fn test_correct_into_clause() {
1✔
728
    let sql = r#"SELECT a INTO C FROM B"#;
1✔
729
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
730
    //check if the result is ok
1✔
731
    assert!(result.is_ok());
1✔
732
}
1✔
733

×
734
#[test]
1✔
735
fn test_missing_into_in_nested_from_clause() {
1✔
736
    let sql = r#"SELECT a FROM (SELECT a from b)"#;
1✔
737
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
738
    //check if the result is an error
×
739
    assert!(matches!(result, Err(PipelineError::MissingIntoClause)))
1✔
740
}
1✔
741

742
#[test]
1✔
743
fn test_correct_into_in_nested_from() {
1✔
744
    let sql = r#"SELECT a INTO c FROM (SELECT a from b)"#;
1✔
745
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
746
    //check if the result is ok
1✔
747
    assert!(result.is_ok());
1✔
748
}
1✔
749

×
750
#[test]
1✔
751
fn test_missing_into_in_with_clause() {
1✔
752
    let sql = r#"WITH tbl as (select a from B)
1✔
753
    select B
1✔
754
    from tbl;"#;
1✔
755
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
756
    //check if the result is an error
×
757
    assert!(matches!(result, Err(PipelineError::MissingIntoClause)))
1✔
758
}
1✔
759

760
#[test]
1✔
761
fn test_correct_into_in_with_clause() {
1✔
762
    let sql = r#"WITH tbl as (select a from B)
1✔
763
    select B
1✔
764
    into C
1✔
765
    from tbl;"#;
1✔
766
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
767
    //check if the result is ok
1✔
768
    assert!(result.is_ok());
1✔
769
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc