• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5881075993

16 Aug 2023 03:58PM UTC coverage: 77.415% (-0.2%) from 77.649%
5881075993

push

github

web-flow
feat: replace jaeger with xray (#1862)

feat: replace jaeger with xray

23 of 23 new or added lines in 2 files covered. (100.0%)

46085 of 59530 relevant lines covered (77.41%)

55729.22 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.57
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::{ExpressionBuilder, NameOrAlias};
5
use crate::pipeline::selection::factory::SelectionProcessorFactory;
6
use dozer_core::app::AppPipeline;
7
use dozer_core::app::PipelineEntryPoint;
8
use dozer_core::node::PortHandle;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use sqlparser::ast::{Join, SetOperator, SetQuantifier, TableFactor, TableWithJoins};
11

12
use sqlparser::{
13
    ast::{Query, Select, SetExpr, Statement},
14
    dialect::DozerDialect,
15
    parser::Parser,
16
};
17
use std::collections::HashMap;
18

19
use super::errors::UnsupportedSqlError;
20
use super::pipeline_builder::from_builder::insert_from_to_pipeline;
21

22
use super::product::set::set_factory::SetProcessorFactory;
23

24
#[derive(Debug, Clone, Default)]
3,646✔
25
pub struct SchemaSQLContext {}
26

27
#[derive(Debug, Clone)]
16✔
28
pub struct OutputNodeInfo {
29
    // Name to connect in dag
30
    pub node: String,
31
    // Port to connect in dag
32
    pub port: PortHandle,
33
    // If this table is originally from a source or created in transforms
34
    pub is_derived: bool,
35
    // TODO add:indexes to the tables
36
}
37

38
pub struct TableInfo {
39
    pub name: NameOrAlias,
40
    pub override_name: Option<String>,
41
    pub is_derived: bool,
42
}
43
/// The struct contains some contexts during query to pipeline.
44
#[derive(Debug, Clone, Default)]
633✔
45
pub struct QueryContext {
46
    // Internal tables map, used to store the tables that are created by the queries
47
    pub pipeline_map: HashMap<(usize, String), OutputNodeInfo>,
48

49
    // Output tables map that are marked with "INTO" used to store the tables, these can be exposed to sinks.
50
    pub output_tables_map: HashMap<String, OutputNodeInfo>,
51

52
    // Used Sources
53
    pub used_sources: Vec<String>,
54

55
    // Processors counter
56
    pub processor_counter: usize,
57
}
58

59
impl QueryContext {
60
    pub fn get_next_processor_id(&mut self) -> usize {
3,223✔
61
        self.processor_counter += 1;
3,223✔
62
        self.processor_counter
3,223✔
63
    }
3,223✔
64
}
65

66
#[derive(Debug, Clone)]
×
67
pub struct IndexedTableWithJoins {
68
    pub relation: (NameOrAlias, TableFactor),
69
    pub joins: Vec<(NameOrAlias, Join)>,
70
}
71
pub fn statement_to_pipeline(
321✔
72
    sql: &str,
321✔
73
    pipeline: &mut AppPipeline<SchemaSQLContext>,
321✔
74
    override_name: Option<String>,
321✔
75
) -> Result<QueryContext, PipelineError> {
321✔
76
    let dialect = DozerDialect {};
321✔
77
    let mut ctx = QueryContext::default();
321✔
78
    let is_top_select = true;
321✔
79
    let ast = Parser::parse_sql(&dialect, sql)
321✔
80
        .map_err(|err| PipelineError::InternalError(Box::new(err)))?;
321✔
81
    let query_name = NameOrAlias(format!("query_{}", ctx.get_next_processor_id()), None);
321✔
82

83
    for (idx, statement) in ast.iter().enumerate() {
326✔
84
        match statement {
326✔
85
            Statement::Query(query) => {
326✔
86
                query_to_pipeline(
326✔
87
                    &TableInfo {
326✔
88
                        name: query_name.clone(),
326✔
89
                        is_derived: false,
326✔
90
                        override_name: override_name.clone(),
326✔
91
                    },
326✔
92
                    query,
326✔
93
                    pipeline,
326✔
94
                    &mut ctx,
326✔
95
                    false,
326✔
96
                    idx,
326✔
97
                    is_top_select,
326✔
98
                )?;
326✔
99
            }
×
100
            s => {
×
101
                return Err(PipelineError::UnsupportedSqlError(
×
102
                    UnsupportedSqlError::GenericError(s.to_string()),
×
103
                ))
×
104
            }
105
        }
106
    }
×
107

×
108
    Ok(ctx)
317✔
109
}
321✔
110

×
111
fn query_to_pipeline(
390✔
112
    table_info: &TableInfo,
390✔
113
    query: &Query,
390✔
114
    pipeline: &mut AppPipeline<SchemaSQLContext>,
390✔
115
    query_ctx: &mut QueryContext,
390✔
116
    stateful: bool,
390✔
117
    pipeline_idx: usize,
390✔
118
    is_top_select: bool,
390✔
119
) -> Result<(), PipelineError> {
390✔
120
    // return error if there is unsupported syntax
390✔
121
    if !query.order_by.is_empty() {
390✔
122
        return Err(PipelineError::UnsupportedSqlError(
×
123
            UnsupportedSqlError::OrderByError,
×
124
        ));
×
125
    }
390✔
126

390✔
127
    if query.limit.is_some() || query.offset.is_some() {
390✔
128
        return Err(PipelineError::UnsupportedSqlError(
×
129
            UnsupportedSqlError::LimitOffsetError,
×
130
        ));
×
131
    }
390✔
132

×
133
    // Attach the first pipeline if there is with clause
134
    if let Some(with) = &query.with {
390✔
135
        if with.recursive {
38✔
136
            return Err(PipelineError::UnsupportedSqlError(
×
137
                UnsupportedSqlError::Recursive,
×
138
            ));
×
139
        }
38✔
140

×
141
        for table in &with.cte_tables {
78✔
142
            if table.from.is_some() {
40✔
143
                return Err(PipelineError::UnsupportedSqlError(
×
144
                    UnsupportedSqlError::CteFromError,
×
145
                ));
×
146
            }
40✔
147
            let table_name = table.alias.name.to_string();
40✔
148
            if query_ctx
40✔
149
                .pipeline_map
40✔
150
                .contains_key(&(pipeline_idx, table_name.clone()))
40✔
151
            {
×
152
                return Err(InvalidQuery(format!(
×
153
                    "WITH query name {table_name:?} specified more than once"
×
154
                )));
×
155
            }
40✔
156
            query_to_pipeline(
40✔
157
                &TableInfo {
40✔
158
                    name: NameOrAlias(table_name.clone(), Some(table_name)),
40✔
159
                    is_derived: true,
40✔
160
                    override_name: None,
40✔
161
                },
40✔
162
                &table.query,
40✔
163
                pipeline,
40✔
164
                query_ctx,
40✔
165
                true,
40✔
166
                pipeline_idx,
40✔
167
                false, //Inside a with clause, so not top select
40✔
168
            )?;
40✔
169
        }
170
    };
352✔
171

172
    match *query.body.clone() {
390✔
173
        SetExpr::Select(select) => {
374✔
174
            select_to_pipeline(
374✔
175
                table_info,
374✔
176
                *select,
374✔
177
                pipeline,
374✔
178
                query_ctx,
374✔
179
                stateful,
374✔
180
                pipeline_idx,
374✔
181
                is_top_select,
374✔
182
            )?;
374✔
183
        }
×
184
        SetExpr::Query(query) => {
×
185
            let query_name = format!("subquery_{}", query_ctx.get_next_processor_id());
×
186
            let mut ctx = QueryContext::default();
×
187
            query_to_pipeline(
×
188
                &TableInfo {
×
189
                    name: NameOrAlias(query_name, None),
×
190
                    is_derived: true,
×
191
                    override_name: None,
×
192
                },
×
193
                &query,
×
194
                pipeline,
×
195
                &mut ctx,
×
196
                stateful,
×
197
                pipeline_idx,
×
198
                false, //Inside a subquery, so not top select
×
199
            )?
×
200
        }
×
201
        SetExpr::SetOperation {
×
202
            op,
16✔
203
            set_quantifier,
16✔
204
            left,
16✔
205
            right,
16✔
206
        } => match op {
16✔
207
            SetOperator::Union => {
×
208
                set_to_pipeline(
16✔
209
                    table_info,
16✔
210
                    left,
16✔
211
                    right,
16✔
212
                    set_quantifier,
16✔
213
                    pipeline,
16✔
214
                    query_ctx,
16✔
215
                    stateful,
16✔
216
                    pipeline_idx,
16✔
217
                    is_top_select,
16✔
218
                )?;
16✔
219
            }
220
            _ => return Err(PipelineError::InvalidOperator(op.to_string())),
×
221
        },
×
222
        _ => {
×
223
            return Err(PipelineError::UnsupportedSqlError(
×
224
                UnsupportedSqlError::GenericError("Unsupported query body structure".to_string()),
×
225
            ))
×
226
        }
×
227
    };
228
    Ok(())
386✔
229
}
390✔
230

×
231
fn select_to_pipeline(
406✔
232
    table_info: &TableInfo,
406✔
233
    select: Select,
406✔
234
    pipeline: &mut AppPipeline<SchemaSQLContext>,
406✔
235
    query_ctx: &mut QueryContext,
406✔
236
    stateful: bool,
406✔
237
    pipeline_idx: usize,
406✔
238
    is_top_select: bool,
406✔
239
) -> Result<String, PipelineError> {
406✔
240
    // FROM clause
406✔
241
    if select.from.len() != 1 {
406✔
242
        return Err(PipelineError::UnsupportedSqlError(
×
243
            UnsupportedSqlError::FromCommaSyntax,
×
244
        ));
×
245
    }
406✔
246

247
    // let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx, pipeline_idx)?;
248
    //
249
    // let (input_nodes, output_node, mut used_sources) = add_from_to_pipeline(
250
    //     pipeline,
251
    //     &input_tables,
252
    //     &mut query_ctx.pipeline_map,
×
253
    //     pipeline_idx,
×
254
    // )?;
255

×
256
    let connection_info =
406✔
257
        insert_from_to_pipeline(&select.from[0], pipeline, pipeline_idx, query_ctx)?;
406✔
258

×
259
    let input_nodes = connection_info.input_nodes;
406✔
260
    let output_node = connection_info.output_node;
406✔
261

406✔
262
    let gen_agg_name = format!("agg_{}", query_ctx.get_next_processor_id());
406✔
263

406✔
264
    let gen_selection_name = format!("select_{}", query_ctx.get_next_processor_id());
406✔
265
    let (gen_product_name, product_output_port) = output_node;
406✔
266

×
267
    for (source_name, processor_name, processor_port) in input_nodes.iter() {
406✔
268
        if let Some(table_info) = query_ctx
64✔
269
            .pipeline_map
64✔
270
            .get(&(pipeline_idx, source_name.clone()))
64✔
271
        {
64✔
272
            pipeline.connect_nodes(
64✔
273
                &table_info.node,
64✔
274
                table_info.port,
64✔
275
                processor_name,
64✔
276
                *processor_port as PortHandle,
64✔
277
            );
64✔
278
            // If not present in pipeline_map, insert into used_sources as this is coming from source
64✔
279
        } else {
64✔
280
            query_ctx.used_sources.push(source_name.clone());
×
281
        }
×
282
    }
×
283

×
284
    let aggregation =
406✔
285
        AggregationProcessorFactory::new(gen_agg_name.clone(), select.clone(), stateful);
406✔
286

406✔
287
    pipeline.add_processor(Box::new(aggregation), &gen_agg_name, vec![]);
406✔
288

×
289
    // Where clause
×
290
    if let Some(selection) = select.selection {
406✔
291
        let selection = SelectionProcessorFactory::new(gen_selection_name.to_owned(), selection);
102✔
292

102✔
293
        pipeline.add_processor(Box::new(selection), &gen_selection_name, vec![]);
102✔
294

102✔
295
        pipeline.connect_nodes(
102✔
296
            &gen_product_name,
102✔
297
            product_output_port,
102✔
298
            &gen_selection_name,
102✔
299
            DEFAULT_PORT_HANDLE,
102✔
300
        );
102✔
301

102✔
302
        pipeline.connect_nodes(
102✔
303
            &gen_selection_name,
102✔
304
            DEFAULT_PORT_HANDLE,
102✔
305
            &gen_agg_name,
102✔
306
            DEFAULT_PORT_HANDLE,
102✔
307
        );
102✔
308
    } else {
304✔
309
        pipeline.connect_nodes(
304✔
310
            &gen_product_name,
304✔
311
            product_output_port,
304✔
312
            &gen_agg_name,
304✔
313
            DEFAULT_PORT_HANDLE,
304✔
314
        );
304✔
315
    }
304✔
316

×
317
    query_ctx.pipeline_map.insert(
406✔
318
        (pipeline_idx, table_info.name.0.to_string()),
406✔
319
        OutputNodeInfo {
406✔
320
            node: gen_agg_name.clone(),
406✔
321
            port: DEFAULT_PORT_HANDLE,
406✔
322
            is_derived: table_info.is_derived,
406✔
323
        },
406✔
324
    );
406✔
325

×
326
    let output_table_name = if let Some(into) = select.into {
406✔
327
        Some(into.name.to_string())
41✔
328
    } else {
×
329
        table_info.override_name.clone()
365✔
330
    };
×
331

×
332
    if is_top_select && output_table_name.is_none() {
406✔
333
        return Err(PipelineError::MissingIntoClause);
4✔
334
    }
402✔
335

×
336
    if let Some(table_name) = output_table_name {
402✔
337
        query_ctx.output_tables_map.insert(
322✔
338
            table_name,
322✔
339
            OutputNodeInfo {
322✔
340
                node: gen_agg_name.clone(),
322✔
341
                port: DEFAULT_PORT_HANDLE,
322✔
342
                is_derived: false,
322✔
343
            },
322✔
344
        );
322✔
345
    }
324✔
346

×
347
    Ok(gen_agg_name)
402✔
348
}
406✔
349

×
350
#[allow(clippy::too_many_arguments)]
×
351
fn set_to_pipeline(
16✔
352
    table_info: &TableInfo,
16✔
353
    left_select: Box<SetExpr>,
16✔
354
    right_select: Box<SetExpr>,
16✔
355
    set_quantifier: SetQuantifier,
16✔
356
    pipeline: &mut AppPipeline<SchemaSQLContext>,
16✔
357
    query_ctx: &mut QueryContext,
16✔
358
    stateful: bool,
16✔
359
    pipeline_idx: usize,
16✔
360
    is_top_select: bool,
16✔
361
) -> Result<String, PipelineError> {
16✔
362
    let gen_left_set_name = format!("set_left_{}", query_ctx.get_next_processor_id());
16✔
363
    let left_table_info = TableInfo {
16✔
364
        name: NameOrAlias(gen_left_set_name.clone(), None),
16✔
365
        override_name: None,
16✔
366
        is_derived: false,
16✔
367
    };
16✔
368
    let gen_right_set_name = format!("set_right_{}", query_ctx.get_next_processor_id());
16✔
369
    let right_table_info = TableInfo {
16✔
370
        name: NameOrAlias(gen_right_set_name.clone(), None),
16✔
371
        override_name: None,
16✔
372
        is_derived: false,
16✔
373
    };
16✔
374

375
    let _left_pipeline_name = match *left_select {
16✔
376
        SetExpr::Select(select) => select_to_pipeline(
16✔
377
            &left_table_info,
16✔
378
            *select,
16✔
379
            pipeline,
16✔
380
            query_ctx,
16✔
381
            stateful,
16✔
382
            pipeline_idx,
16✔
383
            is_top_select,
16✔
384
        )?,
16✔
385
        SetExpr::SetOperation {
×
386
            op: _,
×
387
            set_quantifier,
×
388
            left,
×
389
            right,
×
390
        } => set_to_pipeline(
×
391
            &left_table_info,
×
392
            left,
×
393
            right,
×
394
            set_quantifier,
×
395
            pipeline,
×
396
            query_ctx,
×
397
            stateful,
×
398
            pipeline_idx,
×
399
            is_top_select,
×
400
        )?,
×
401
        _ => {
×
402
            return Err(PipelineError::InvalidQuery(
×
403
                "Invalid UNION left Query".to_string(),
×
404
            ))
×
405
        }
406
    };
407

×
408
    let _right_pipeline_name = match *right_select {
16✔
409
        SetExpr::Select(select) => select_to_pipeline(
16✔
410
            &right_table_info,
16✔
411
            *select,
16✔
412
            pipeline,
16✔
413
            query_ctx,
16✔
414
            stateful,
16✔
415
            pipeline_idx,
16✔
416
            is_top_select,
16✔
417
        )?,
16✔
418
        SetExpr::SetOperation {
×
419
            op: _,
×
420
            set_quantifier,
×
421
            left,
×
422
            right,
×
423
        } => set_to_pipeline(
×
424
            &right_table_info,
×
425
            left,
×
426
            right,
×
427
            set_quantifier,
×
428
            pipeline,
×
429
            query_ctx,
×
430
            stateful,
×
431
            pipeline_idx,
×
432
            is_top_select,
×
433
        )?,
×
434
        _ => {
435
            return Err(PipelineError::InvalidQuery(
×
436
                "Invalid UNION right Query".to_string(),
×
437
            ))
×
438
        }
439
    };
440

441
    let mut gen_set_name = format!("set_{}", query_ctx.get_next_processor_id());
16✔
442

×
443
    let left_pipeline_output_node = match query_ctx
16✔
444
        .pipeline_map
16✔
445
        .get(&(pipeline_idx, gen_left_set_name))
16✔
446
    {
447
        Some(pipeline) => pipeline,
16✔
448
        None => {
×
449
            return Err(PipelineError::InvalidQuery(
×
450
                "Invalid UNION left Query".to_string(),
×
451
            ))
×
452
        }
453
    };
×
454

×
455
    let right_pipeline_output_node = match query_ctx
16✔
456
        .pipeline_map
16✔
457
        .get(&(pipeline_idx, gen_right_set_name))
16✔
458
    {
×
459
        Some(pipeline) => pipeline,
16✔
460
        None => {
×
461
            return Err(PipelineError::InvalidQuery(
×
462
                "Invalid UNION Right Query".to_string(),
×
463
            ))
×
464
        }
×
465
    };
×
466

×
467
    if table_info.override_name.is_some() {
16✔
468
        gen_set_name = table_info.override_name.to_owned().unwrap();
×
469
    }
16✔
470

×
471
    let set_proc_fac = SetProcessorFactory::new(gen_set_name.clone(), set_quantifier);
16✔
472

16✔
473
    pipeline.add_processor(Box::new(set_proc_fac), &gen_set_name, vec![]);
16✔
474

16✔
475
    pipeline.connect_nodes(
16✔
476
        &left_pipeline_output_node.node,
16✔
477
        left_pipeline_output_node.port,
16✔
478
        &gen_set_name,
16✔
479
        0 as PortHandle,
16✔
480
    );
16✔
481

16✔
482
    pipeline.connect_nodes(
16✔
483
        &right_pipeline_output_node.node,
16✔
484
        right_pipeline_output_node.port,
16✔
485
        &gen_set_name,
16✔
486
        1 as PortHandle,
16✔
487
    );
16✔
488

×
489
    for (_, table_name) in query_ctx.pipeline_map.keys() {
32✔
490
        query_ctx.output_tables_map.remove_entry(table_name);
32✔
491
    }
32✔
492

493
    query_ctx.pipeline_map.insert(
16✔
494
        (pipeline_idx, table_info.name.0.to_string()),
16✔
495
        OutputNodeInfo {
16✔
496
            node: gen_set_name.clone(),
16✔
497
            port: DEFAULT_PORT_HANDLE,
16✔
498
            is_derived: table_info.is_derived,
16✔
499
        },
16✔
500
    );
16✔
501

16✔
502
    Ok(gen_set_name)
16✔
503
}
16✔
504

505
/// Returns a vector of input port handles and relative table name
×
506
///
×
507
/// # Errors
×
508
///
509
/// This function will return an error if it's not possible to get an input name.
510
pub fn get_input_tables(
×
511
    from: &TableWithJoins,
×
512
    pipeline: &mut AppPipeline<SchemaSQLContext>,
×
513
    query_ctx: &mut QueryContext,
×
514
    pipeline_idx: usize,
×
515
) -> Result<IndexedTableWithJoins, PipelineError> {
×
516
    let name = get_from_source(&from.relation, pipeline, query_ctx, pipeline_idx)?;
×
517
    let mut joins = vec![];
×
518

×
519
    for join in from.joins.iter() {
×
520
        let input_name = get_from_source(&join.relation, pipeline, query_ctx, pipeline_idx)?;
×
521
        joins.push((input_name.clone(), join.clone()));
×
522
    }
×
523

×
524
    Ok(IndexedTableWithJoins {
×
525
        relation: (name, from.relation.clone()),
×
526
        joins,
×
527
    })
×
528
}
×
529

×
530
pub fn get_input_names(input_tables: &IndexedTableWithJoins) -> Vec<NameOrAlias> {
×
531
    let mut input_names = vec![];
×
532
    input_names.push(input_tables.relation.0.clone());
×
533

×
534
    for join in &input_tables.joins {
×
535
        input_names.push(join.0.clone());
×
536
    }
×
537
    input_names
×
538
}
×
539

×
540
pub fn get_entry_points(
×
541
    input_tables: &IndexedTableWithJoins,
×
542
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
543
    pipeline_idx: usize,
×
544
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
×
545
    let mut endpoints = vec![];
×
546

×
547
    let input_names = get_input_names(input_tables);
×
548

×
549
    for (input_port, table) in input_names.iter().enumerate() {
×
550
        let name = table.0.clone();
×
551
        if !pipeline_map.contains_key(&(pipeline_idx, name.clone())) {
×
552
            endpoints.push(PipelineEntryPoint::new(name, input_port as PortHandle));
×
553
        }
×
554
    }
×
555

556
    Ok(endpoints)
×
557
}
×
558

×
559
pub fn is_an_entry_point(
×
560
    name: &str,
×
561
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
562
    pipeline_idx: usize,
×
563
) -> bool {
×
564
    if !pipeline_map.contains_key(&(pipeline_idx, name.to_owned())) {
×
565
        return true;
×
566
    }
×
567
    false
×
568
}
×
569

×
570
pub fn get_from_source(
585✔
571
    relation: &TableFactor,
585✔
572
    pipeline: &mut AppPipeline<SchemaSQLContext>,
585✔
573
    query_ctx: &mut QueryContext,
585✔
574
    pipeline_idx: usize,
585✔
575
) -> Result<NameOrAlias, PipelineError> {
585✔
576
    match relation {
585✔
577
        TableFactor::Table { name, alias, .. } => {
561✔
578
            let input_name = name
561✔
579
                .0
561✔
580
                .iter()
561✔
581
                .map(ExpressionBuilder::normalize_ident)
561✔
582
                .collect::<Vec<String>>()
561✔
583
                .join(".");
561✔
584
            let alias_name = alias
561✔
585
                .as_ref()
561✔
586
                .map(|a| ExpressionBuilder::fullname_from_ident(&[a.name.clone()]));
561✔
587

561✔
588
            Ok(NameOrAlias(input_name, alias_name))
561✔
589
        }
×
590
        TableFactor::Derived {
×
591
            lateral: _,
×
592
            subquery,
24✔
593
            alias,
24✔
594
        } => {
24✔
595
            let name = format!("derived_{}", query_ctx.get_next_processor_id());
24✔
596
            let alias_name = alias.as_ref().map(|alias_ident| {
24✔
597
                ExpressionBuilder::fullname_from_ident(&[alias_ident.name.clone()])
18✔
598
            });
24✔
599
            let is_top_select = false; //inside FROM clause, so not top select
24✔
600
            let name_or = NameOrAlias(name, alias_name);
24✔
601
            query_to_pipeline(
24✔
602
                &TableInfo {
24✔
603
                    name: name_or.clone(),
24✔
604
                    is_derived: true,
24✔
605
                    override_name: None,
24✔
606
                },
24✔
607
                subquery,
24✔
608
                pipeline,
24✔
609
                query_ctx,
24✔
610
                false,
24✔
611
                pipeline_idx,
24✔
612
                is_top_select,
24✔
613
            )?;
24✔
614

615
            Ok(name_or)
24✔
616
        }
617
        _ => Err(PipelineError::UnsupportedSqlError(
×
618
            UnsupportedSqlError::JoinTable,
×
619
        )),
×
620
    }
621
}
585✔
622

623
#[cfg(test)]
624
mod tests {
625
    use dozer_core::app::AppPipeline;
626

627
    use super::statement_to_pipeline;
628
    #[test]
1✔
629
    #[should_panic]
630
    fn disallow_zero_outgoing_ndes() {
1✔
631
        let sql = "select * from film";
1✔
632
        statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap();
1✔
633
    }
1✔
634
    #[test]
1✔
635
    fn parse_sql_pipeline() {
1✔
636
        let sql = r#"
1✔
637
                SELECT
1✔
638
                    a.name as "Genre",
1✔
639
                    SUM(amount) as "Gross Revenue(in $)"
1✔
640
                INTO gross_revenue_stats
1✔
641
                FROM
1✔
642
                (
1✔
643
                    SELECT
1✔
644
                        c.name,
1✔
645
                        f.title,
1✔
646
                        p.amount
1✔
647
                    FROM film f
1✔
648
                    LEFT JOIN film_category fc
1✔
649
                        ON fc.film_id = f.film_id
1✔
650
                    LEFT JOIN category c
1✔
651
                        ON fc.category_id = c.category_id
1✔
652
                    LEFT JOIN inventory i
1✔
653
                        ON i.film_id = f.film_id
1✔
654
                    LEFT JOIN rental r
1✔
655
                        ON r.inventory_id = i.inventory_id
1✔
656
                    LEFT JOIN payment p
1✔
657
                        ON p.rental_id = r.rental_id
1✔
658
                    WHERE p.amount IS NOT NULL
1✔
659
                ) a
1✔
660
                GROUP BY name;
1✔
661

1✔
662
                SELECT
1✔
663
                f.name, f.title, p.amount
1✔
664
                INTO film_amounts
1✔
665
                FROM film f
1✔
666
                LEFT JOIN film_category fc;
1✔
667

1✔
668
                WITH tbl as (select id from a)
1✔
669
                select id
1✔
670
                into cte_table
1✔
671
                from tbl;
1✔
672

1✔
673
                WITH tbl as (select id from  a),
1✔
674
                tbl2 as (select id from tbl)
1✔
675
                select id
1✔
676
                into nested_cte_table
1✔
677
                from tbl2;
1✔
678

1✔
679
                WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
680
                cte_table2 as (select id_ct1 from cte_table1)
1✔
681
                select id_ct2
1✔
682
                into nested_derived_table
1✔
683
                from cte_table2;
1✔
684

1✔
685
                with tbl as (select id, ticker from stocks)
1✔
686
                select tbl.id
1✔
687
                into nested_stocks_table
1✔
688
                from  stocks join tbl on tbl.id = stocks.id;
1✔
689
            "#;
1✔
690

1✔
691
        let context = statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap();
1✔
692

1✔
693
        // Should create as many output tables as into statements
1✔
694
        let mut output_keys = context.output_tables_map.keys().collect::<Vec<_>>();
1✔
695
        output_keys.sort();
1✔
696
        let mut expected_keys = vec![
1✔
697
            "gross_revenue_stats",
1✔
698
            "film_amounts",
1✔
699
            "cte_table",
1✔
700
            "nested_cte_table",
1✔
701
            "nested_derived_table",
1✔
702
            "nested_stocks_table",
1✔
703
        ];
1✔
704
        expected_keys.sort();
1✔
705
        assert_eq!(output_keys, expected_keys);
1✔
706
    }
1✔
707
}
708

709
#[test]
1✔
710
fn test_missing_into_in_simple_from_clause() {
1✔
711
    let sql = r#"SELECT a FROM B "#;
1✔
712
    let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None);
1✔
713
    //check if the result is an error
714
    assert!(matches!(result, Err(PipelineError::MissingIntoClause)))
1✔
715
}
1✔
716

717
#[test]
1✔
718
fn test_correct_into_clause() {
1✔
719
    let sql = r#"SELECT a INTO C FROM B"#;
1✔
720
    let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None);
1✔
721
    //check if the result is ok
1✔
722
    assert!(result.is_ok());
1✔
723
}
1✔
724

725
#[test]
1✔
726
fn test_missing_into_in_nested_from_clause() {
1✔
727
    let sql = r#"SELECT a FROM (SELECT a from b)"#;
1✔
728
    let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None);
1✔
729
    //check if the result is an error
730
    assert!(matches!(result, Err(PipelineError::MissingIntoClause)))
1✔
731
}
1✔
732

733
#[test]
1✔
734
fn test_correct_into_in_nested_from() {
1✔
735
    let sql = r#"SELECT a INTO c FROM (SELECT a from b)"#;
1✔
736
    let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None);
1✔
737
    //check if the result is ok
1✔
738
    assert!(result.is_ok());
1✔
739
}
1✔
740

741
#[test]
1✔
742
fn test_missing_into_in_with_clause() {
1✔
743
    let sql = r#"WITH tbl as (select a from B)
1✔
744
    select B
1✔
745
    from tbl;"#;
1✔
746
    let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None);
1✔
747
    //check if the result is an error
748
    assert!(matches!(result, Err(PipelineError::MissingIntoClause)))
1✔
749
}
1✔
750

751
#[test]
1✔
752
fn test_correct_into_in_with_clause() {
1✔
753
    let sql = r#"WITH tbl as (select a from B)
1✔
754
    select B
1✔
755
    into C
1✔
756
    from tbl;"#;
1✔
757
    let result = statement_to_pipeline(sql, &mut AppPipeline::new(), None);
1✔
758
    //check if the result is ok
1✔
759
    assert!(result.is_ok());
1✔
760
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc