• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5978430793

25 Aug 2023 04:54PM UTC coverage: 75.575% (-0.7%) from 76.279%
5978430793

push

github

web-flow
Bump ordered-float from 3.4.0 to 3.9.1 (#1919)

Bumps [ordered-float](https://github.com/reem/rust-ordered-float) from 3.4.0 to 3.9.1.
- [Release notes](https://github.com/reem/rust-ordered-float/releases)
- [Commits](https://github.com/reem/rust-ordered-float/compare/v3.4.0...v3.9.1)

---
updated-dependencies:
- dependency-name: ordered-float
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

47272 of 62550 relevant lines covered (75.57%)

49425.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.14
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::{ExpressionBuilder, NameOrAlias};
5
use crate::pipeline::selection::factory::SelectionProcessorFactory;
6
use dozer_core::app::AppPipeline;
7
use dozer_core::app::PipelineEntryPoint;
8
use dozer_core::node::PortHandle;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use sqlparser::ast::{Join, SetOperator, SetQuantifier, TableFactor, TableWithJoins};
11

12
use sqlparser::{
13
    ast::{Query, Select, SetExpr, Statement},
14
    dialect::DozerDialect,
15
    parser::Parser,
16
};
17
use std::collections::HashMap;
18

19
use super::errors::UnsupportedSqlError;
20
use super::pipeline_builder::from_builder::insert_from_to_pipeline;
21

22
use super::product::set::set_factory::SetProcessorFactory;
23

24
#[derive(Debug, Clone, Default)]
3,477✔
25
pub struct SchemaSQLContext {}
26

27
#[derive(Debug, Clone)]
40✔
28
pub struct OutputNodeInfo {
29
    // Name to connect in dag
30
    pub node: String,
31
    // Port to connect in dag
32
    pub port: PortHandle,
33
    // If this table is originally from a source or created in transforms
34
    pub is_derived: bool,
35
    // TODO add:indexes to the tables
36
}
37

38
pub struct TableInfo {
39
    pub name: NameOrAlias,
40
    pub override_name: Option<String>,
41
    pub is_derived: bool,
42
}
43
/// The struct contains some contexts during query to pipeline.
44
#[derive(Debug, Clone, Default)]
555✔
45
pub struct QueryContext {
46
    // Internal tables map, used to store the tables that are created by the queries
47
    pub pipeline_map: HashMap<(usize, String), OutputNodeInfo>,
48

49
    // Output tables map that are marked with "INTO" used to store the tables, these can be exposed to sinks.
50
    pub output_tables_map: HashMap<String, OutputNodeInfo>,
51

52
    // Used Sources
53
    pub used_sources: Vec<String>,
54

55
    // Processors counter
56
    pub processor_counter: usize,
57
}
58

59
impl QueryContext {
60
    pub fn get_next_processor_id(&mut self) -> usize {
2,832✔
61
        self.processor_counter += 1;
2,832✔
62
        self.processor_counter
2,832✔
63
    }
2,832✔
64
}
65

66
#[derive(Debug, Clone)]
×
67
pub struct IndexedTableWithJoins {
68
    pub relation: (NameOrAlias, TableFactor),
69
    pub joins: Vec<(NameOrAlias, Join)>,
70
}
71
pub fn statement_to_pipeline(
555✔
72
    sql: &str,
555✔
73
    pipeline: &mut AppPipeline<SchemaSQLContext>,
555✔
74
    override_name: Option<String>,
555✔
75
) -> Result<QueryContext, PipelineError> {
555✔
76
    let dialect = DozerDialect {};
555✔
77
    let mut ctx = QueryContext::default();
555✔
78
    let is_top_select = true;
555✔
79
    let ast = Parser::parse_sql(&dialect, sql)
555✔
80
        .map_err(|err| PipelineError::InternalError(Box::new(err)))?;
555✔
81
    let query_name = NameOrAlias(format!("query_{}", ctx.get_next_processor_id()), None);
555✔
82

83
    for (idx, statement) in ast.iter().enumerate() {
560✔
84
        match statement {
560✔
85
            Statement::Query(query) => {
560✔
86
                query_to_pipeline(
560✔
87
                    &TableInfo {
560✔
88
                        name: query_name.clone(),
560✔
89
                        is_derived: false,
560✔
90
                        override_name: override_name.clone(),
560✔
91
                    },
560✔
92
                    query,
560✔
93
                    pipeline,
560✔
94
                    &mut ctx,
560✔
95
                    false,
560✔
96
                    idx,
560✔
97
                    is_top_select,
560✔
98
                )?;
560✔
99
            }
100
            s => {
×
101
                return Err(PipelineError::UnsupportedSqlError(
×
102
                    UnsupportedSqlError::GenericError(s.to_string()),
×
103
                ))
×
104
            }
105
        }
106
    }
107

108
    Ok(ctx)
551✔
109
}
555✔
110

111
fn query_to_pipeline(
663✔
112
    table_info: &TableInfo,
663✔
113
    query: &Query,
663✔
114
    pipeline: &mut AppPipeline<SchemaSQLContext>,
663✔
115
    query_ctx: &mut QueryContext,
663✔
116
    stateful: bool,
663✔
117
    pipeline_idx: usize,
663✔
118
    is_top_select: bool,
663✔
119
) -> Result<(), PipelineError> {
663✔
120
    // return error if there is unsupported syntax
663✔
121
    if !query.order_by.is_empty() {
663✔
122
        return Err(PipelineError::UnsupportedSqlError(
×
123
            UnsupportedSqlError::OrderByError,
×
124
        ));
×
125
    }
663✔
126

663✔
127
    if query.limit.is_some() || query.offset.is_some() {
663✔
128
        return Err(PipelineError::UnsupportedSqlError(
×
129
            UnsupportedSqlError::LimitOffsetError,
×
130
        ));
×
131
    }
663✔
132

133
    // Attach the first pipeline if there is with clause
134
    if let Some(with) = &query.with {
663✔
135
        if with.recursive {
62✔
136
            return Err(PipelineError::UnsupportedSqlError(
×
137
                UnsupportedSqlError::Recursive,
×
138
            ));
×
139
        }
62✔
140

141
        for table in &with.cte_tables {
126✔
142
            if table.from.is_some() {
64✔
143
                return Err(PipelineError::UnsupportedSqlError(
×
144
                    UnsupportedSqlError::CteFromError,
×
145
                ));
×
146
            }
64✔
147
            let table_name = table.alias.name.to_string();
64✔
148
            if query_ctx
64✔
149
                .pipeline_map
64✔
150
                .contains_key(&(pipeline_idx, table_name.clone()))
64✔
151
            {
152
                return Err(InvalidQuery(format!(
×
153
                    "WITH query name {table_name:?} specified more than once"
×
154
                )));
×
155
            }
64✔
156
            query_to_pipeline(
64✔
157
                &TableInfo {
64✔
158
                    name: NameOrAlias(table_name.clone(), Some(table_name)),
64✔
159
                    is_derived: true,
64✔
160
                    override_name: None,
64✔
161
                },
64✔
162
                &table.query,
64✔
163
                pipeline,
64✔
164
                query_ctx,
64✔
165
                true,
64✔
166
                pipeline_idx,
64✔
167
                false, //Inside a with clause, so not top select
64✔
168
            )?;
64✔
169
        }
170
    };
601✔
171

172
    match *query.body.clone() {
663✔
173
        SetExpr::Select(select) => {
635✔
174
            select_to_pipeline(
635✔
175
                table_info,
635✔
176
                *select,
635✔
177
                pipeline,
635✔
178
                query_ctx,
635✔
179
                stateful,
635✔
180
                pipeline_idx,
635✔
181
                is_top_select,
635✔
182
            )?;
635✔
183
        }
184
        SetExpr::Query(query) => {
×
185
            let query_name = format!("subquery_{}", query_ctx.get_next_processor_id());
×
186
            let mut ctx = QueryContext::default();
×
187
            query_to_pipeline(
×
188
                &TableInfo {
×
189
                    name: NameOrAlias(query_name, None),
×
190
                    is_derived: true,
×
191
                    override_name: None,
×
192
                },
×
193
                &query,
×
194
                pipeline,
×
195
                &mut ctx,
×
196
                stateful,
×
197
                pipeline_idx,
×
198
                false, //Inside a subquery, so not top select
×
199
            )?
×
200
        }
201
        SetExpr::SetOperation {
202
            op,
28✔
203
            set_quantifier,
28✔
204
            left,
28✔
205
            right,
28✔
206
        } => match op {
28✔
207
            SetOperator::Union => {
208
                set_to_pipeline(
28✔
209
                    table_info,
28✔
210
                    left,
28✔
211
                    right,
28✔
212
                    set_quantifier,
28✔
213
                    pipeline,
28✔
214
                    query_ctx,
28✔
215
                    stateful,
28✔
216
                    pipeline_idx,
28✔
217
                    is_top_select,
28✔
218
                )?;
28✔
219
            }
220
            _ => return Err(PipelineError::InvalidOperator(op.to_string())),
×
221
        },
222
        _ => {
223
            return Err(PipelineError::UnsupportedSqlError(
×
224
                UnsupportedSqlError::GenericError("Unsupported query body structure".to_string()),
×
225
            ))
×
226
        }
227
    };
228
    Ok(())
659✔
229
}
663✔
230

231
fn select_to_pipeline(
691✔
232
    table_info: &TableInfo,
691✔
233
    select: Select,
691✔
234
    pipeline: &mut AppPipeline<SchemaSQLContext>,
691✔
235
    query_ctx: &mut QueryContext,
691✔
236
    stateful: bool,
691✔
237
    pipeline_idx: usize,
691✔
238
    is_top_select: bool,
691✔
239
) -> Result<String, PipelineError> {
691✔
240
    // FROM clause
691✔
241
    if select.from.len() != 1 {
691✔
242
        return Err(PipelineError::UnsupportedSqlError(
×
243
            UnsupportedSqlError::FromCommaSyntax,
×
244
        ));
×
245
    }
691✔
246

247
    // let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx, pipeline_idx)?;
248
    //
249
    // let (input_nodes, output_node, mut used_sources) = add_from_to_pipeline(
250
    //     pipeline,
251
    //     &input_tables,
252
    //     &mut query_ctx.pipeline_map,
253
    //     pipeline_idx,
254
    // )?;
255

256
    let connection_info =
691✔
257
        insert_from_to_pipeline(&select.from[0], pipeline, pipeline_idx, query_ctx)?;
691✔
258

259
    let input_nodes = connection_info.input_nodes;
691✔
260
    let output_node = connection_info.output_node;
691✔
261

691✔
262
    let gen_agg_name = format!("agg--{}", query_ctx.get_next_processor_id());
691✔
263

691✔
264
    let gen_selection_name = format!("select--{}", query_ctx.get_next_processor_id());
691✔
265
    let (gen_product_name, product_output_port) = output_node;
691✔
266

267
    for (source_name, processor_name, processor_port) in input_nodes.iter() {
691✔
268
        if let Some(table_info) = query_ctx
103✔
269
            .pipeline_map
103✔
270
            .get(&(pipeline_idx, source_name.clone()))
103✔
271
        {
103✔
272
            pipeline.connect_nodes(
103✔
273
                &table_info.node,
103✔
274
                table_info.port,
103✔
275
                processor_name,
103✔
276
                *processor_port as PortHandle,
103✔
277
            );
103✔
278
            // If not present in pipeline_map, insert into used_sources as this is coming from source
103✔
279
        } else {
103✔
280
            query_ctx.used_sources.push(source_name.clone());
×
281
        }
×
282
    }
283

284
    let aggregation = AggregationProcessorFactory::new(
691✔
285
        gen_agg_name.clone(),
691✔
286
        select.clone(),
691✔
287
        stateful,
691✔
288
        pipeline
691✔
289
            .flags()
691✔
290
            .enable_probabilistic_optimizations
691✔
291
            .in_aggregations,
691✔
292
    );
691✔
293

691✔
294
    pipeline.add_processor(Box::new(aggregation), &gen_agg_name, vec![]);
691✔
295

×
296
    // Where clause
×
297
    if let Some(selection) = select.selection {
691✔
298
        let selection = SelectionProcessorFactory::new(gen_selection_name.to_owned(), selection);
177✔
299

177✔
300
        pipeline.add_processor(Box::new(selection), &gen_selection_name, vec![]);
177✔
301

177✔
302
        pipeline.connect_nodes(
177✔
303
            &gen_product_name,
177✔
304
            product_output_port,
177✔
305
            &gen_selection_name,
177✔
306
            DEFAULT_PORT_HANDLE,
177✔
307
        );
177✔
308

177✔
309
        pipeline.connect_nodes(
177✔
310
            &gen_selection_name,
177✔
311
            DEFAULT_PORT_HANDLE,
177✔
312
            &gen_agg_name,
177✔
313
            DEFAULT_PORT_HANDLE,
177✔
314
        );
177✔
315
    } else {
514✔
316
        pipeline.connect_nodes(
514✔
317
            &gen_product_name,
514✔
318
            product_output_port,
514✔
319
            &gen_agg_name,
514✔
320
            DEFAULT_PORT_HANDLE,
514✔
321
        );
514✔
322
    }
514✔
323

×
324
    query_ctx.pipeline_map.insert(
691✔
325
        (pipeline_idx, table_info.name.0.to_string()),
691✔
326
        OutputNodeInfo {
691✔
327
            node: gen_agg_name.clone(),
691✔
328
            port: DEFAULT_PORT_HANDLE,
691✔
329
            is_derived: table_info.is_derived,
691✔
330
        },
691✔
331
    );
691✔
332

×
333
    let output_table_name = if let Some(into) = select.into {
691✔
334
        Some(into.name.to_string())
65✔
335
    } else {
336
        table_info.override_name.clone()
626✔
337
    };
×
338

×
339
    if is_top_select && output_table_name.is_none() {
691✔
340
        return Err(PipelineError::MissingIntoClause);
4✔
341
    }
687✔
342

×
343
    if let Some(table_name) = output_table_name {
687✔
344
        query_ctx.output_tables_map.insert(
556✔
345
            table_name,
556✔
346
            OutputNodeInfo {
556✔
347
                node: gen_agg_name.clone(),
556✔
348
                port: DEFAULT_PORT_HANDLE,
556✔
349
                is_derived: false,
556✔
350
            },
556✔
351
        );
556✔
352
    }
558✔
353

×
354
    Ok(gen_agg_name)
687✔
355
}
691✔
356

×
357
#[allow(clippy::too_many_arguments)]
×
358
fn set_to_pipeline(
28✔
359
    table_info: &TableInfo,
28✔
360
    left_select: Box<SetExpr>,
28✔
361
    right_select: Box<SetExpr>,
28✔
362
    set_quantifier: SetQuantifier,
28✔
363
    pipeline: &mut AppPipeline<SchemaSQLContext>,
28✔
364
    query_ctx: &mut QueryContext,
28✔
365
    stateful: bool,
28✔
366
    pipeline_idx: usize,
28✔
367
    is_top_select: bool,
28✔
368
) -> Result<String, PipelineError> {
28✔
369
    let gen_left_set_name = format!("set_left_{}", query_ctx.get_next_processor_id());
28✔
370
    let left_table_info = TableInfo {
28✔
371
        name: NameOrAlias(gen_left_set_name.clone(), None),
28✔
372
        override_name: None,
28✔
373
        is_derived: false,
28✔
374
    };
28✔
375
    let gen_right_set_name = format!("set_right_{}", query_ctx.get_next_processor_id());
28✔
376
    let right_table_info = TableInfo {
28✔
377
        name: NameOrAlias(gen_right_set_name.clone(), None),
28✔
378
        override_name: None,
28✔
379
        is_derived: false,
28✔
380
    };
28✔
381

×
382
    let _left_pipeline_name = match *left_select {
28✔
383
        SetExpr::Select(select) => select_to_pipeline(
28✔
384
            &left_table_info,
28✔
385
            *select,
28✔
386
            pipeline,
28✔
387
            query_ctx,
28✔
388
            stateful,
28✔
389
            pipeline_idx,
28✔
390
            is_top_select,
28✔
391
        )?,
28✔
392
        SetExpr::SetOperation {
×
393
            op: _,
×
394
            set_quantifier,
×
395
            left,
×
396
            right,
×
397
        } => set_to_pipeline(
×
398
            &left_table_info,
×
399
            left,
×
400
            right,
×
401
            set_quantifier,
×
402
            pipeline,
×
403
            query_ctx,
×
404
            stateful,
×
405
            pipeline_idx,
×
406
            is_top_select,
×
407
        )?,
×
408
        _ => {
×
409
            return Err(PipelineError::InvalidQuery(
×
410
                "Invalid UNION left Query".to_string(),
×
411
            ))
×
412
        }
×
413
    };
×
414

×
415
    let _right_pipeline_name = match *right_select {
28✔
416
        SetExpr::Select(select) => select_to_pipeline(
28✔
417
            &right_table_info,
28✔
418
            *select,
28✔
419
            pipeline,
28✔
420
            query_ctx,
28✔
421
            stateful,
28✔
422
            pipeline_idx,
28✔
423
            is_top_select,
28✔
424
        )?,
28✔
425
        SetExpr::SetOperation {
×
426
            op: _,
×
427
            set_quantifier,
×
428
            left,
×
429
            right,
×
430
        } => set_to_pipeline(
×
431
            &right_table_info,
×
432
            left,
×
433
            right,
×
434
            set_quantifier,
×
435
            pipeline,
×
436
            query_ctx,
×
437
            stateful,
×
438
            pipeline_idx,
×
439
            is_top_select,
×
440
        )?,
×
441
        _ => {
×
442
            return Err(PipelineError::InvalidQuery(
×
443
                "Invalid UNION right Query".to_string(),
×
444
            ))
×
445
        }
×
446
    };
447

×
448
    let mut gen_set_name = format!("set_{}", query_ctx.get_next_processor_id());
28✔
449

×
450
    let left_pipeline_output_node = match query_ctx
28✔
451
        .pipeline_map
28✔
452
        .get(&(pipeline_idx, gen_left_set_name))
28✔
453
    {
454
        Some(pipeline) => pipeline,
28✔
455
        None => {
×
456
            return Err(PipelineError::InvalidQuery(
×
457
                "Invalid UNION left Query".to_string(),
×
458
            ))
×
459
        }
×
460
    };
461

×
462
    let right_pipeline_output_node = match query_ctx
28✔
463
        .pipeline_map
28✔
464
        .get(&(pipeline_idx, gen_right_set_name))
28✔
465
    {
466
        Some(pipeline) => pipeline,
28✔
467
        None => {
×
468
            return Err(PipelineError::InvalidQuery(
×
469
                "Invalid UNION Right Query".to_string(),
×
470
            ))
×
471
        }
×
472
    };
×
473

×
474
    if table_info.override_name.is_some() {
28✔
475
        gen_set_name = table_info.override_name.to_owned().unwrap();
×
476
    }
28✔
477

×
478
    let set_proc_fac = SetProcessorFactory::new(
28✔
479
        gen_set_name.clone(),
28✔
480
        set_quantifier,
28✔
481
        pipeline.flags().enable_probabilistic_optimizations.in_sets,
28✔
482
    );
28✔
483

28✔
484
    pipeline.add_processor(Box::new(set_proc_fac), &gen_set_name, vec![]);
28✔
485

28✔
486
    pipeline.connect_nodes(
28✔
487
        &left_pipeline_output_node.node,
28✔
488
        left_pipeline_output_node.port,
28✔
489
        &gen_set_name,
28✔
490
        0 as PortHandle,
28✔
491
    );
28✔
492

28✔
493
    pipeline.connect_nodes(
28✔
494
        &right_pipeline_output_node.node,
28✔
495
        right_pipeline_output_node.port,
28✔
496
        &gen_set_name,
28✔
497
        1 as PortHandle,
28✔
498
    );
28✔
499

×
500
    for (_, table_name) in query_ctx.pipeline_map.keys() {
56✔
501
        query_ctx.output_tables_map.remove_entry(table_name);
56✔
502
    }
56✔
503

×
504
    query_ctx.pipeline_map.insert(
28✔
505
        (pipeline_idx, table_info.name.0.to_string()),
28✔
506
        OutputNodeInfo {
28✔
507
            node: gen_set_name.clone(),
28✔
508
            port: DEFAULT_PORT_HANDLE,
28✔
509
            is_derived: table_info.is_derived,
28✔
510
        },
28✔
511
    );
28✔
512

28✔
513
    Ok(gen_set_name)
28✔
514
}
28✔
515

×
516
/// Returns a vector of input port handles and relative table name
×
517
///
×
518
/// # Errors
519
///
×
520
/// This function will return an error if it's not possible to get an input name.
×
521
pub fn get_input_tables(
×
522
    from: &TableWithJoins,
×
523
    pipeline: &mut AppPipeline<SchemaSQLContext>,
×
524
    query_ctx: &mut QueryContext,
×
525
    pipeline_idx: usize,
×
526
) -> Result<IndexedTableWithJoins, PipelineError> {
×
527
    let name = get_from_source(&from.relation, pipeline, query_ctx, pipeline_idx)?;
×
528
    let mut joins = vec![];
×
529

530
    for join in from.joins.iter() {
×
531
        let input_name = get_from_source(&join.relation, pipeline, query_ctx, pipeline_idx)?;
×
532
        joins.push((input_name.clone(), join.clone()));
×
533
    }
534

×
535
    Ok(IndexedTableWithJoins {
×
536
        relation: (name, from.relation.clone()),
×
537
        joins,
×
538
    })
×
539
}
×
540

×
541
pub fn get_input_names(input_tables: &IndexedTableWithJoins) -> Vec<NameOrAlias> {
×
542
    let mut input_names = vec![];
×
543
    input_names.push(input_tables.relation.0.clone());
×
544

×
545
    for join in &input_tables.joins {
×
546
        input_names.push(join.0.clone());
×
547
    }
×
548
    input_names
×
549
}
×
550

×
551
pub fn get_entry_points(
×
552
    input_tables: &IndexedTableWithJoins,
×
553
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
554
    pipeline_idx: usize,
×
555
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
×
556
    let mut endpoints = vec![];
×
557

×
558
    let input_names = get_input_names(input_tables);
×
559

×
560
    for (input_port, table) in input_names.iter().enumerate() {
×
561
        let name = table.0.clone();
×
562
        if !pipeline_map.contains_key(&(pipeline_idx, name.clone())) {
×
563
            endpoints.push(PipelineEntryPoint::new(name, input_port as PortHandle));
×
564
        }
×
565
    }
×
566

×
567
    Ok(endpoints)
×
568
}
×
569

570
pub fn is_an_entry_point(
×
571
    name: &str,
×
572
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
573
    pipeline_idx: usize,
×
574
) -> bool {
×
575
    if !pipeline_map.contains_key(&(pipeline_idx, name.to_owned())) {
×
576
        return true;
×
577
    }
×
578
    false
×
579
}
×
580

×
581
pub fn get_from_source(
999✔
582
    relation: &TableFactor,
999✔
583
    pipeline: &mut AppPipeline<SchemaSQLContext>,
999✔
584
    query_ctx: &mut QueryContext,
999✔
585
    pipeline_idx: usize,
999✔
586
) -> Result<NameOrAlias, PipelineError> {
999✔
587
    match relation {
999✔
588
        TableFactor::Table { name, alias, .. } => {
960✔
589
            let input_name = name
960✔
590
                .0
960✔
591
                .iter()
960✔
592
                .map(ExpressionBuilder::normalize_ident)
960✔
593
                .collect::<Vec<String>>()
960✔
594
                .join(".");
960✔
595
            let alias_name = alias
960✔
596
                .as_ref()
960✔
597
                .map(|a| ExpressionBuilder::fullname_from_ident(&[a.name.clone()]));
960✔
598

960✔
599
            Ok(NameOrAlias(input_name, alias_name))
960✔
600
        }
×
601
        TableFactor::Derived {
×
602
            lateral: _,
×
603
            subquery,
39✔
604
            alias,
39✔
605
        } => {
39✔
606
            let name = format!("derived_{}", query_ctx.get_next_processor_id());
39✔
607
            let alias_name = alias.as_ref().map(|alias_ident| {
39✔
608
                ExpressionBuilder::fullname_from_ident(&[alias_ident.name.clone()])
30✔
609
            });
39✔
610
            let is_top_select = false; //inside FROM clause, so not top select
39✔
611
            let name_or = NameOrAlias(name, alias_name);
39✔
612
            query_to_pipeline(
39✔
613
                &TableInfo {
39✔
614
                    name: name_or.clone(),
39✔
615
                    is_derived: true,
39✔
616
                    override_name: None,
39✔
617
                },
39✔
618
                subquery,
39✔
619
                pipeline,
39✔
620
                query_ctx,
39✔
621
                false,
39✔
622
                pipeline_idx,
39✔
623
                is_top_select,
39✔
624
            )?;
39✔
625

626
            Ok(name_or)
39✔
627
        }
628
        _ => Err(PipelineError::UnsupportedSqlError(
×
629
            UnsupportedSqlError::JoinTable,
×
630
        )),
×
631
    }
×
632
}
999✔
633

×
634
#[cfg(test)]
×
635
mod tests {
×
636
    use dozer_core::app::AppPipeline;
×
637

×
638
    use super::statement_to_pipeline;
×
639
    #[test]
1✔
640
    #[should_panic]
×
641
    fn disallow_zero_outgoing_ndes() {
1✔
642
        let sql = "select * from film";
1✔
643
        statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None).unwrap();
1✔
644
    }
1✔
645
    #[test]
1✔
646
    fn parse_sql_pipeline() {
1✔
647
        let sql = r#"
1✔
648
                SELECT
1✔
649
                    a.name as "Genre",
1✔
650
                    SUM(amount) as "Gross Revenue(in $)"
1✔
651
                INTO gross_revenue_stats
1✔
652
                FROM
1✔
653
                (
1✔
654
                    SELECT
1✔
655
                        c.name,
1✔
656
                        f.title,
1✔
657
                        p.amount
1✔
658
                    FROM film f
1✔
659
                    LEFT JOIN film_category fc
1✔
660
                        ON fc.film_id = f.film_id
1✔
661
                    LEFT JOIN category c
1✔
662
                        ON fc.category_id = c.category_id
1✔
663
                    LEFT JOIN inventory i
1✔
664
                        ON i.film_id = f.film_id
1✔
665
                    LEFT JOIN rental r
1✔
666
                        ON r.inventory_id = i.inventory_id
1✔
667
                    LEFT JOIN payment p
1✔
668
                        ON p.rental_id = r.rental_id
1✔
669
                    WHERE p.amount IS NOT NULL
1✔
670
                ) a
1✔
671
                GROUP BY name;
1✔
672

1✔
673
                SELECT
1✔
674
                f.name, f.title, p.amount
1✔
675
                INTO film_amounts
1✔
676
                FROM film f
1✔
677
                LEFT JOIN film_category fc;
1✔
678

1✔
679
                WITH tbl as (select id from a)
1✔
680
                select id
1✔
681
                into cte_table
1✔
682
                from tbl;
1✔
683

1✔
684
                WITH tbl as (select id from  a),
1✔
685
                tbl2 as (select id from tbl)
1✔
686
                select id
1✔
687
                into nested_cte_table
1✔
688
                from tbl2;
1✔
689

1✔
690
                WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
691
                cte_table2 as (select id_ct1 from cte_table1)
1✔
692
                select id_ct2
1✔
693
                into nested_derived_table
1✔
694
                from cte_table2;
1✔
695

1✔
696
                with tbl as (select id, ticker from stocks)
1✔
697
                select tbl.id
1✔
698
                into nested_stocks_table
1✔
699
                from  stocks join tbl on tbl.id = stocks.id;
1✔
700
            "#;
1✔
701

1✔
702
        let context =
1✔
703
            statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None).unwrap();
1✔
704

1✔
705
        // Should create as many output tables as into statements
1✔
706
        let mut output_keys = context.output_tables_map.keys().collect::<Vec<_>>();
1✔
707
        output_keys.sort();
1✔
708
        let mut expected_keys = vec![
1✔
709
            "gross_revenue_stats",
1✔
710
            "film_amounts",
1✔
711
            "cte_table",
1✔
712
            "nested_cte_table",
1✔
713
            "nested_derived_table",
1✔
714
            "nested_stocks_table",
1✔
715
        ];
1✔
716
        expected_keys.sort();
1✔
717
        assert_eq!(output_keys, expected_keys);
1✔
718
    }
1✔
719
}
×
720

×
721
#[test]
1✔
722
fn test_missing_into_in_simple_from_clause() {
1✔
723
    let sql = r#"SELECT a FROM B "#;
1✔
724
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
725
    //check if the result is an error
×
726
    assert!(matches!(result, Err(PipelineError::MissingIntoClause)))
1✔
727
}
1✔
728

×
729
#[test]
1✔
730
fn test_correct_into_clause() {
1✔
731
    let sql = r#"SELECT a INTO C FROM B"#;
1✔
732
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
733
    //check if the result is ok
1✔
734
    assert!(result.is_ok());
1✔
735
}
1✔
736

×
737
#[test]
1✔
738
fn test_missing_into_in_nested_from_clause() {
1✔
739
    let sql = r#"SELECT a FROM (SELECT a from b)"#;
1✔
740
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
741
    //check if the result is an error
×
742
    assert!(matches!(result, Err(PipelineError::MissingIntoClause)))
1✔
743
}
1✔
744

×
745
#[test]
1✔
746
fn test_correct_into_in_nested_from() {
1✔
747
    let sql = r#"SELECT a INTO c FROM (SELECT a from b)"#;
1✔
748
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
749
    //check if the result is ok
1✔
750
    assert!(result.is_ok());
1✔
751
}
1✔
752

×
753
#[test]
1✔
754
fn test_missing_into_in_with_clause() {
1✔
755
    let sql = r#"WITH tbl as (select a from B)
1✔
756
    select B
1✔
757
    from tbl;"#;
1✔
758
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
759
    //check if the result is an error
×
760
    assert!(matches!(result, Err(PipelineError::MissingIntoClause)))
1✔
761
}
1✔
762

763
#[test]
1✔
764
fn test_correct_into_in_with_clause() {
1✔
765
    let sql = r#"WITH tbl as (select a from B)
1✔
766
    select B
1✔
767
    into C
1✔
768
    from tbl;"#;
1✔
769
    let result = statement_to_pipeline(sql, &mut AppPipeline::new_with_default_flags(), None);
1✔
770
    //check if the result is ok
1✔
771
    assert!(result.is_ok());
1✔
772
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc