• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5693561935

pending completion
5693561935

push

github

web-flow
chore: Remove `AppSourceId` which is no longer used (#1803)

* chore: Remove `AppSourceeId` which is no longer used

* chore: Remove `AppSource` and simplify source endpoint finding process

* chore: Use `Box` instead of `Arc` for the factories

* chore: Remove unused parameter in `AppPipeline::connect_nodes`

* chore: Remove an unused `Option`

443 of 443 new or added lines in 22 files covered. (100.0%)

45511 of 58843 relevant lines covered (77.34%)

39550.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.89
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::{ExpressionBuilder, NameOrAlias};
5
use crate::pipeline::selection::factory::SelectionProcessorFactory;
6
use dozer_core::app::AppPipeline;
7
use dozer_core::app::PipelineEntryPoint;
8
use dozer_core::node::PortHandle;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use sqlparser::ast::{Join, SetOperator, SetQuantifier, TableFactor, TableWithJoins};
11

12
use sqlparser::{
13
    ast::{Query, Select, SetExpr, Statement},
14
    dialect::DozerDialect,
15
    parser::Parser,
16
};
17
use std::collections::HashMap;
18

19
use super::errors::UnsupportedSqlError;
20
use super::pipeline_builder::from_builder::insert_from_to_pipeline;
21

22
use super::product::set::set_factory::SetProcessorFactory;
23

24
#[derive(Debug, Clone, Default)]
2,586✔
25
pub struct SchemaSQLContext {}
26

27
#[derive(Debug, Clone)]
16✔
28
pub struct OutputNodeInfo {
29
    // Name to connect in dag
30
    pub node: String,
31
    // Port to connect in dag
32
    pub port: PortHandle,
33
    // If this table is originally from a source or created in transforms
34
    pub is_derived: bool,
35
    // TODO add:indexes to the tables
36
}
37

38
pub struct TableInfo {
39
    pub name: NameOrAlias,
40
    pub override_name: Option<String>,
41
    pub is_derived: bool,
42
}
43
/// The struct contains some contexts during query to pipeline.
44
#[derive(Debug, Clone, Default)]
314✔
45
pub struct QueryContext {
46
    // Internal tables map, used to store the tables that are created by the queries
47
    pub pipeline_map: HashMap<(usize, String), OutputNodeInfo>,
48

49
    // Output tables map that are marked with "INTO" used to store the tables, these can be exposed to sinks.
50
    pub output_tables_map: HashMap<String, OutputNodeInfo>,
51

52
    // Used Sources
53
    pub used_sources: Vec<String>,
54

55
    // Processors counter
56
    pub processor_counter: usize,
57
}
58

59
impl QueryContext {
60
    pub fn get_next_processor_id(&mut self) -> usize {
1,617✔
61
        self.processor_counter += 1;
1,617✔
62
        self.processor_counter
1,617✔
63
    }
1,617✔
64
}
65

66
#[derive(Debug, Clone)]
×
67
pub struct IndexedTableWithJoins {
68
    pub relation: (NameOrAlias, TableFactor),
69
    pub joins: Vec<(NameOrAlias, Join)>,
70
}
71
pub fn statement_to_pipeline(
314✔
72
    sql: &str,
314✔
73
    pipeline: &mut AppPipeline<SchemaSQLContext>,
314✔
74
    override_name: Option<String>,
314✔
75
) -> Result<QueryContext, PipelineError> {
314✔
76
    let dialect = DozerDialect {};
314✔
77
    let mut ctx = QueryContext::default();
314✔
78

79
    let ast = Parser::parse_sql(&dialect, sql)
314✔
80
        .map_err(|err| PipelineError::InternalError(Box::new(err)))?;
314✔
81
    let query_name = NameOrAlias(format!("query_{}", ctx.get_next_processor_id()), None);
314✔
82

83
    for (idx, statement) in ast.iter().enumerate() {
319✔
84
        match statement {
319✔
85
            Statement::Query(query) => {
319✔
86
                query_to_pipeline(
319✔
87
                    &TableInfo {
319✔
88
                        name: query_name.clone(),
319✔
89
                        is_derived: false,
319✔
90
                        override_name: override_name.clone(),
319✔
91
                    },
319✔
92
                    query,
319✔
93
                    pipeline,
319✔
94
                    &mut ctx,
319✔
95
                    false,
319✔
96
                    idx,
319✔
97
                )?;
319✔
98
            }
99
            s => {
×
100
                return Err(PipelineError::UnsupportedSqlError(
×
101
                    UnsupportedSqlError::GenericError(s.to_string()),
×
102
                ))
×
103
            }
104
        }
105
    }
106

107
    Ok(ctx)
314✔
108
}
314✔
109

110
fn query_to_pipeline(
379✔
111
    table_info: &TableInfo,
379✔
112
    query: &Query,
379✔
113
    pipeline: &mut AppPipeline<SchemaSQLContext>,
379✔
114
    query_ctx: &mut QueryContext,
379✔
115
    stateful: bool,
379✔
116
    pipeline_idx: usize,
379✔
117
) -> Result<(), PipelineError> {
379✔
118
    // return error if there is unsupported syntax
379✔
119
    if !query.order_by.is_empty() {
379✔
120
        return Err(PipelineError::UnsupportedSqlError(
×
121
            UnsupportedSqlError::OrderByError,
×
122
        ));
×
123
    }
379✔
124

379✔
125
    if query.limit.is_some() || query.offset.is_some() {
379✔
126
        return Err(PipelineError::UnsupportedSqlError(
×
127
            UnsupportedSqlError::LimitOffsetError,
×
128
        ));
×
129
    }
379✔
130

131
    // Attach the first pipeline if there is with clause
132
    if let Some(with) = &query.with {
379✔
133
        if with.recursive {
36✔
134
            return Err(PipelineError::UnsupportedSqlError(
×
135
                UnsupportedSqlError::Recursive,
×
136
            ));
×
137
        }
36✔
138

139
        for table in &with.cte_tables {
74✔
140
            if table.from.is_some() {
38✔
141
                return Err(PipelineError::UnsupportedSqlError(
×
142
                    UnsupportedSqlError::CteFromError,
×
143
                ));
×
144
            }
38✔
145
            let table_name = table.alias.name.to_string();
38✔
146
            if query_ctx
38✔
147
                .pipeline_map
38✔
148
                .contains_key(&(pipeline_idx, table_name.clone()))
38✔
149
            {
150
                return Err(InvalidQuery(format!(
×
151
                    "WITH query name {table_name:?} specified more than once"
×
152
                )));
×
153
            }
38✔
154
            query_to_pipeline(
38✔
155
                &TableInfo {
38✔
156
                    name: NameOrAlias(table_name.clone(), Some(table_name)),
38✔
157
                    is_derived: true,
38✔
158
                    override_name: None,
38✔
159
                },
38✔
160
                &table.query,
38✔
161
                pipeline,
38✔
162
                query_ctx,
38✔
163
                true,
38✔
164
                pipeline_idx,
38✔
165
            )?;
38✔
166
        }
167
    };
343✔
168

169
    match *query.body.clone() {
379✔
170
        SetExpr::Select(select) => {
363✔
171
            select_to_pipeline(
363✔
172
                table_info,
363✔
173
                *select,
363✔
174
                pipeline,
363✔
175
                query_ctx,
363✔
176
                stateful,
363✔
177
                pipeline_idx,
363✔
178
            )?;
363✔
179
        }
180
        SetExpr::Query(query) => {
×
181
            let query_name = format!("subquery_{}", query_ctx.get_next_processor_id());
×
182
            let mut ctx = QueryContext::default();
×
183
            query_to_pipeline(
×
184
                &TableInfo {
×
185
                    name: NameOrAlias(query_name, None),
×
186
                    is_derived: true,
×
187
                    override_name: None,
×
188
                },
×
189
                &query,
×
190
                pipeline,
×
191
                &mut ctx,
×
192
                stateful,
×
193
                pipeline_idx,
×
194
            )?
×
195
        }
196
        SetExpr::SetOperation {
197
            op,
16✔
198
            set_quantifier,
16✔
199
            left,
16✔
200
            right,
16✔
201
        } => match op {
16✔
202
            SetOperator::Union => {
203
                set_to_pipeline(
16✔
204
                    table_info,
16✔
205
                    left,
16✔
206
                    right,
16✔
207
                    set_quantifier,
16✔
208
                    pipeline,
16✔
209
                    query_ctx,
16✔
210
                    stateful,
16✔
211
                    pipeline_idx,
16✔
212
                )?;
16✔
213
            }
214
            _ => return Err(PipelineError::InvalidOperator(op.to_string())),
×
215
        },
216
        _ => {
217
            return Err(PipelineError::UnsupportedSqlError(
×
218
                UnsupportedSqlError::GenericError("Unsupported query body structure".to_string()),
×
219
            ))
×
220
        }
221
    };
222
    Ok(())
379✔
223
}
379✔
224

225
fn select_to_pipeline(
395✔
226
    table_info: &TableInfo,
395✔
227
    select: Select,
395✔
228
    pipeline: &mut AppPipeline<SchemaSQLContext>,
395✔
229
    query_ctx: &mut QueryContext,
395✔
230
    stateful: bool,
395✔
231
    pipeline_idx: usize,
395✔
232
) -> Result<String, PipelineError> {
395✔
233
    // FROM clause
395✔
234
    if select.from.len() != 1 {
395✔
235
        return Err(PipelineError::UnsupportedSqlError(
×
236
            UnsupportedSqlError::FromCommaSyntax,
×
237
        ));
×
238
    }
395✔
239

240
    // let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx, pipeline_idx)?;
241
    //
242
    // let (input_nodes, output_node, mut used_sources) = add_from_to_pipeline(
243
    //     pipeline,
244
    //     &input_tables,
245
    //     &mut query_ctx.pipeline_map,
246
    //     pipeline_idx,
247
    // )?;
248

249
    let connection_info =
395✔
250
        insert_from_to_pipeline(&select.from[0], pipeline, pipeline_idx, query_ctx)?;
395✔
251

252
    let input_nodes = connection_info.input_nodes;
395✔
253
    let output_node = connection_info.output_node;
395✔
254

395✔
255
    let gen_agg_name = format!("agg_{}", query_ctx.get_next_processor_id());
395✔
256

395✔
257
    let gen_selection_name = format!("select_{}", query_ctx.get_next_processor_id());
395✔
258
    let (gen_product_name, product_output_port) = output_node;
395✔
259

260
    for (source_name, processor_name, processor_port) in input_nodes.iter() {
395✔
261
        if let Some(table_info) = query_ctx
60✔
262
            .pipeline_map
60✔
263
            .get(&(pipeline_idx, source_name.clone()))
60✔
264
        {
60✔
265
            pipeline.connect_nodes(
60✔
266
                &table_info.node,
60✔
267
                table_info.port,
60✔
268
                processor_name,
60✔
269
                *processor_port as PortHandle,
60✔
270
            );
60✔
271
            // If not present in pipeline_map, insert into used_sources as this is coming from source
60✔
272
        } else {
60✔
273
            query_ctx.used_sources.push(source_name.clone());
×
274
        }
×
275
    }
276

277
    let aggregation =
395✔
278
        AggregationProcessorFactory::new(gen_agg_name.clone(), select.clone(), stateful);
395✔
279

395✔
280
    pipeline.add_processor(Box::new(aggregation), &gen_agg_name, vec![]);
395✔
281

282
    // Where clause
283
    if let Some(selection) = select.selection {
395✔
284
        let selection = SelectionProcessorFactory::new(gen_selection_name.to_owned(), selection);
102✔
285

102✔
286
        pipeline.add_processor(Box::new(selection), &gen_selection_name, vec![]);
102✔
287

102✔
288
        pipeline.connect_nodes(
102✔
289
            &gen_product_name,
102✔
290
            product_output_port,
102✔
291
            &gen_selection_name,
102✔
292
            DEFAULT_PORT_HANDLE,
102✔
293
        );
102✔
294

102✔
295
        pipeline.connect_nodes(
102✔
296
            &gen_selection_name,
102✔
297
            DEFAULT_PORT_HANDLE,
102✔
298
            &gen_agg_name,
102✔
299
            DEFAULT_PORT_HANDLE,
102✔
300
        );
102✔
301
    } else {
293✔
302
        pipeline.connect_nodes(
293✔
303
            &gen_product_name,
293✔
304
            product_output_port,
293✔
305
            &gen_agg_name,
293✔
306
            DEFAULT_PORT_HANDLE,
293✔
307
        );
293✔
308
    }
293✔
309

310
    query_ctx.pipeline_map.insert(
395✔
311
        (pipeline_idx, table_info.name.0.to_string()),
395✔
312
        OutputNodeInfo {
395✔
313
            node: gen_agg_name.clone(),
395✔
314
            port: DEFAULT_PORT_HANDLE,
395✔
315
            is_derived: table_info.is_derived,
395✔
316
        },
395✔
317
    );
395✔
318

319
    let output_table_name = if let Some(into) = select.into {
395✔
320
        Some(into.name.to_string())
38✔
321
    } else {
322
        table_info.override_name.clone()
357✔
323
    };
324
    if let Some(table_name) = output_table_name {
395✔
325
        query_ctx.output_tables_map.insert(
319✔
326
            table_name,
319✔
327
            OutputNodeInfo {
319✔
328
                node: gen_agg_name.clone(),
319✔
329
                port: DEFAULT_PORT_HANDLE,
319✔
330
                is_derived: false,
319✔
331
            },
319✔
332
        );
319✔
333
    }
320✔
334

335
    Ok(gen_agg_name)
395✔
336
}
395✔
337

338
#[allow(clippy::too_many_arguments)]
339
fn set_to_pipeline(
16✔
340
    table_info: &TableInfo,
16✔
341
    left_select: Box<SetExpr>,
16✔
342
    right_select: Box<SetExpr>,
16✔
343
    set_quantifier: SetQuantifier,
16✔
344
    pipeline: &mut AppPipeline<SchemaSQLContext>,
16✔
345
    query_ctx: &mut QueryContext,
16✔
346
    stateful: bool,
16✔
347
    pipeline_idx: usize,
16✔
348
) -> Result<String, PipelineError> {
16✔
349
    let gen_left_set_name = format!("set_left_{}", query_ctx.get_next_processor_id());
16✔
350
    let left_table_info = TableInfo {
16✔
351
        name: NameOrAlias(gen_left_set_name.clone(), None),
16✔
352
        override_name: None,
16✔
353
        is_derived: false,
16✔
354
    };
16✔
355
    let gen_right_set_name = format!("set_right_{}", query_ctx.get_next_processor_id());
16✔
356
    let right_table_info = TableInfo {
16✔
357
        name: NameOrAlias(gen_right_set_name.clone(), None),
16✔
358
        override_name: None,
16✔
359
        is_derived: false,
16✔
360
    };
16✔
361

362
    let _left_pipeline_name = match *left_select {
16✔
363
        SetExpr::Select(select) => select_to_pipeline(
16✔
364
            &left_table_info,
16✔
365
            *select,
16✔
366
            pipeline,
16✔
367
            query_ctx,
16✔
368
            stateful,
16✔
369
            pipeline_idx,
16✔
370
        )?,
16✔
371
        SetExpr::SetOperation {
372
            op: _,
373
            set_quantifier,
×
374
            left,
×
375
            right,
×
376
        } => set_to_pipeline(
×
377
            &left_table_info,
×
378
            left,
×
379
            right,
×
380
            set_quantifier,
×
381
            pipeline,
×
382
            query_ctx,
×
383
            stateful,
×
384
            pipeline_idx,
×
385
        )?,
×
386
        _ => {
387
            return Err(PipelineError::InvalidQuery(
×
388
                "Invalid UNION left Query".to_string(),
×
389
            ))
×
390
        }
391
    };
392

393
    let _right_pipeline_name = match *right_select {
16✔
394
        SetExpr::Select(select) => select_to_pipeline(
16✔
395
            &right_table_info,
16✔
396
            *select,
16✔
397
            pipeline,
16✔
398
            query_ctx,
16✔
399
            stateful,
16✔
400
            pipeline_idx,
16✔
401
        )?,
16✔
402
        SetExpr::SetOperation {
403
            op: _,
404
            set_quantifier,
×
405
            left,
×
406
            right,
×
407
        } => set_to_pipeline(
×
408
            &right_table_info,
×
409
            left,
×
410
            right,
×
411
            set_quantifier,
×
412
            pipeline,
×
413
            query_ctx,
×
414
            stateful,
×
415
            pipeline_idx,
×
416
        )?,
×
417
        _ => {
418
            return Err(PipelineError::InvalidQuery(
×
419
                "Invalid UNION right Query".to_string(),
×
420
            ))
×
421
        }
422
    };
423

424
    let mut gen_set_name = format!("set_{}", query_ctx.get_next_processor_id());
16✔
425

426
    let left_pipeline_output_node = match query_ctx
16✔
427
        .pipeline_map
16✔
428
        .get(&(pipeline_idx, gen_left_set_name))
16✔
429
    {
430
        Some(pipeline) => pipeline,
16✔
431
        None => {
432
            return Err(PipelineError::InvalidQuery(
×
433
                "Invalid UNION left Query".to_string(),
×
434
            ))
×
435
        }
436
    };
437

438
    let right_pipeline_output_node = match query_ctx
16✔
439
        .pipeline_map
16✔
440
        .get(&(pipeline_idx, gen_right_set_name))
16✔
441
    {
442
        Some(pipeline) => pipeline,
16✔
443
        None => {
444
            return Err(PipelineError::InvalidQuery(
×
445
                "Invalid UNION Right Query".to_string(),
×
446
            ))
×
447
        }
448
    };
449

450
    if table_info.override_name.is_some() {
16✔
451
        gen_set_name = table_info.override_name.to_owned().unwrap();
×
452
    }
16✔
453

454
    let set_proc_fac = SetProcessorFactory::new(gen_set_name.clone(), set_quantifier);
16✔
455

16✔
456
    pipeline.add_processor(Box::new(set_proc_fac), &gen_set_name, vec![]);
16✔
457

16✔
458
    pipeline.connect_nodes(
16✔
459
        &left_pipeline_output_node.node,
16✔
460
        left_pipeline_output_node.port,
16✔
461
        &gen_set_name,
16✔
462
        0 as PortHandle,
16✔
463
    );
16✔
464

16✔
465
    pipeline.connect_nodes(
16✔
466
        &right_pipeline_output_node.node,
16✔
467
        right_pipeline_output_node.port,
16✔
468
        &gen_set_name,
16✔
469
        1 as PortHandle,
16✔
470
    );
16✔
471

472
    for (_, table_name) in query_ctx.pipeline_map.keys() {
32✔
473
        query_ctx.output_tables_map.remove_entry(table_name);
32✔
474
    }
32✔
475

476
    query_ctx.pipeline_map.insert(
16✔
477
        (pipeline_idx, table_info.name.0.to_string()),
16✔
478
        OutputNodeInfo {
16✔
479
            node: gen_set_name.clone(),
16✔
480
            port: DEFAULT_PORT_HANDLE,
16✔
481
            is_derived: table_info.is_derived,
16✔
482
        },
16✔
483
    );
16✔
484

16✔
485
    Ok(gen_set_name)
16✔
486
}
16✔
487

488
/// Returns a vector of input port handles and relative table name
489
///
490
/// # Errors
491
///
492
/// This function will return an error if it's not possible to get an input name.
493
pub fn get_input_tables(
×
494
    from: &TableWithJoins,
×
495
    pipeline: &mut AppPipeline<SchemaSQLContext>,
×
496
    query_ctx: &mut QueryContext,
×
497
    pipeline_idx: usize,
×
498
) -> Result<IndexedTableWithJoins, PipelineError> {
×
499
    let name = get_from_source(&from.relation, pipeline, query_ctx, pipeline_idx)?;
×
500
    let mut joins = vec![];
×
501

502
    for join in from.joins.iter() {
×
503
        let input_name = get_from_source(&join.relation, pipeline, query_ctx, pipeline_idx)?;
×
504
        joins.push((input_name.clone(), join.clone()));
×
505
    }
506

507
    Ok(IndexedTableWithJoins {
×
508
        relation: (name, from.relation.clone()),
×
509
        joins,
×
510
    })
×
511
}
×
512

513
pub fn get_input_names(input_tables: &IndexedTableWithJoins) -> Vec<NameOrAlias> {
×
514
    let mut input_names = vec![];
×
515
    input_names.push(input_tables.relation.0.clone());
×
516

517
    for join in &input_tables.joins {
×
518
        input_names.push(join.0.clone());
×
519
    }
×
520
    input_names
×
521
}
×
522

523
pub fn get_entry_points(
×
524
    input_tables: &IndexedTableWithJoins,
×
525
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
526
    pipeline_idx: usize,
×
527
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
×
528
    let mut endpoints = vec![];
×
529

×
530
    let input_names = get_input_names(input_tables);
×
531

532
    for (input_port, table) in input_names.iter().enumerate() {
×
533
        let name = table.0.clone();
×
534
        if !pipeline_map.contains_key(&(pipeline_idx, name.clone())) {
×
535
            endpoints.push(PipelineEntryPoint::new(name, input_port as PortHandle));
×
536
        }
×
537
    }
538

539
    Ok(endpoints)
×
540
}
×
541

542
pub fn is_an_entry_point(
×
543
    name: &str,
×
544
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
545
    pipeline_idx: usize,
×
546
) -> bool {
×
547
    if !pipeline_map.contains_key(&(pipeline_idx, name.to_owned())) {
×
548
        return true;
×
549
    }
×
550
    false
×
551
}
×
552

553
pub fn get_from_source(
574✔
554
    relation: &TableFactor,
574✔
555
    pipeline: &mut AppPipeline<SchemaSQLContext>,
574✔
556
    query_ctx: &mut QueryContext,
574✔
557
    pipeline_idx: usize,
574✔
558
) -> Result<NameOrAlias, PipelineError> {
574✔
559
    match relation {
574✔
560
        TableFactor::Table { name, alias, .. } => {
552✔
561
            let input_name = name
552✔
562
                .0
552✔
563
                .iter()
552✔
564
                .map(ExpressionBuilder::normalize_ident)
552✔
565
                .collect::<Vec<String>>()
552✔
566
                .join(".");
552✔
567
            let alias_name = alias
552✔
568
                .as_ref()
552✔
569
                .map(|a| ExpressionBuilder::fullname_from_ident(&[a.name.clone()]));
552✔
570

552✔
571
            Ok(NameOrAlias(input_name, alias_name))
552✔
572
        }
573
        TableFactor::Derived {
574
            lateral: _,
575
            subquery,
22✔
576
            alias,
22✔
577
        } => {
22✔
578
            let name = format!("derived_{}", query_ctx.get_next_processor_id());
22✔
579
            let alias_name = alias.as_ref().map(|alias_ident| {
22✔
580
                ExpressionBuilder::fullname_from_ident(&[alias_ident.name.clone()])
18✔
581
            });
22✔
582

22✔
583
            let name_or = NameOrAlias(name, alias_name);
22✔
584
            query_to_pipeline(
22✔
585
                &TableInfo {
22✔
586
                    name: name_or.clone(),
22✔
587
                    is_derived: true,
22✔
588
                    override_name: None,
22✔
589
                },
22✔
590
                subquery,
22✔
591
                pipeline,
22✔
592
                query_ctx,
22✔
593
                false,
22✔
594
                pipeline_idx,
22✔
595
            )?;
22✔
596

597
            Ok(name_or)
22✔
598
        }
599
        _ => Err(PipelineError::UnsupportedSqlError(
×
600
            UnsupportedSqlError::JoinTable,
×
601
        )),
×
602
    }
603
}
574✔
604

605
#[cfg(test)]
606
mod tests {
607
    use dozer_core::app::AppPipeline;
608

609
    use super::statement_to_pipeline;
610

611
    #[test]
1✔
612
    fn parse_sql_pipeline() {
1✔
613
        let sql = r#"
1✔
614
                SELECT
1✔
615
                    a.name as "Genre",
1✔
616
                    SUM(amount) as "Gross Revenue(in $)"
1✔
617
                INTO gross_revenue_stats
1✔
618
                FROM
1✔
619
                (
1✔
620
                    SELECT
1✔
621
                        c.name,
1✔
622
                        f.title,
1✔
623
                        p.amount
1✔
624
                    FROM film f
1✔
625
                    LEFT JOIN film_category fc
1✔
626
                        ON fc.film_id = f.film_id
1✔
627
                    LEFT JOIN category c
1✔
628
                        ON fc.category_id = c.category_id
1✔
629
                    LEFT JOIN inventory i
1✔
630
                        ON i.film_id = f.film_id
1✔
631
                    LEFT JOIN rental r
1✔
632
                        ON r.inventory_id = i.inventory_id
1✔
633
                    LEFT JOIN payment p
1✔
634
                        ON p.rental_id = r.rental_id
1✔
635
                    WHERE p.amount IS NOT NULL
1✔
636
                ) a
1✔
637
                GROUP BY name;
1✔
638

1✔
639
                SELECT
1✔
640
                f.name, f.title, p.amount
1✔
641
                INTO film_amounts
1✔
642
                FROM film f
1✔
643
                LEFT JOIN film_category fc;
1✔
644

1✔
645
                WITH tbl as (select id from a)
1✔
646
                select id
1✔
647
                into cte_table
1✔
648
                from tbl;
1✔
649

1✔
650
                WITH tbl as (select id from  a),
1✔
651
                tbl2 as (select id from tbl)
1✔
652
                select id
1✔
653
                into nested_cte_table
1✔
654
                from tbl2;
1✔
655

1✔
656
                WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
657
                cte_table2 as (select id_ct1 from cte_table1)
1✔
658
                select id_ct2
1✔
659
                into nested_derived_table
1✔
660
                from cte_table2;
1✔
661

1✔
662
                with tbl as (select id, ticker from stocks)
1✔
663
                select tbl.id
1✔
664
                into nested_stocks_table
1✔
665
                from  stocks join tbl on tbl.id = stocks.id;
1✔
666
            "#;
1✔
667

1✔
668
        let context = statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap();
1✔
669

1✔
670
        // Should create as many output tables as into statements
1✔
671
        let mut output_keys = context.output_tables_map.keys().collect::<Vec<_>>();
1✔
672
        output_keys.sort();
1✔
673
        let mut expected_keys = vec![
1✔
674
            "gross_revenue_stats",
1✔
675
            "film_amounts",
1✔
676
            "cte_table",
1✔
677
            "nested_cte_table",
1✔
678
            "nested_derived_table",
1✔
679
            "nested_stocks_table",
1✔
680
        ];
1✔
681
        expected_keys.sort();
1✔
682
        assert_eq!(output_keys, expected_keys);
1✔
683
    }
1✔
684
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc