• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5829059745

pending completion
5829059745

Pull #1844

github

supergi01
added comments for downloader.rs
Pull Request #1844: feat/live-reload, download and start react server

735 of 735 new or added lines in 11 files covered. (100.0%)

45536 of 61287 relevant lines covered (74.3%)

51206.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.06
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::{ExpressionBuilder, NameOrAlias};
5
use crate::pipeline::selection::factory::SelectionProcessorFactory;
6
use dozer_core::app::AppPipeline;
7
use dozer_core::app::PipelineEntryPoint;
8
use dozer_core::node::PortHandle;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use sqlparser::ast::{Join, SetOperator, SetQuantifier, TableFactor, TableWithJoins};
11

12
use sqlparser::{
13
    ast::{Query, Select, SetExpr, Statement},
14
    dialect::DozerDialect,
15
    parser::Parser,
16
};
17
use std::collections::HashMap;
18

19
use super::errors::UnsupportedSqlError;
20
use super::pipeline_builder::from_builder::insert_from_to_pipeline;
21

22
use super::product::set::set_factory::SetProcessorFactory;
23

24
#[derive(Debug, Clone, Default)]
3,646✔
25
pub struct SchemaSQLContext {}
26

27
#[derive(Debug, Clone)]
16✔
28
pub struct OutputNodeInfo {
29
    // Name to connect in dag
30
    pub node: String,
31
    // Port to connect in dag
32
    pub port: PortHandle,
33
    // If this table is originally from a source or created in transforms
34
    pub is_derived: bool,
35
    // TODO add:indexes to the tables
36
}
37

38
pub struct TableInfo {
39
    pub name: NameOrAlias,
40
    pub override_name: Option<String>,
41
    pub is_derived: bool,
42
}
43
/// The struct contains some contexts during query to pipeline.
44
#[derive(Debug, Clone, Default)]
627✔
45
pub struct QueryContext {
46
    // Internal tables map, used to store the tables that are created by the queries
47
    pub pipeline_map: HashMap<(usize, String), OutputNodeInfo>,
48

49
    // Output tables map that are marked with "INTO" used to store the tables, these can be exposed to sinks.
50
    pub output_tables_map: HashMap<String, OutputNodeInfo>,
51

52
    // Used Sources
53
    pub used_sources: Vec<String>,
54

55
    // Processors counter
56
    pub processor_counter: usize,
57
}
58

59
impl QueryContext {
60
    pub fn get_next_processor_id(&mut self) -> usize {
3,185✔
61
        self.processor_counter += 1;
3,185✔
62
        self.processor_counter
3,185✔
63
    }
3,185✔
64
}
65

66
#[derive(Debug, Clone)]
×
67
pub struct IndexedTableWithJoins {
68
    pub relation: (NameOrAlias, TableFactor),
69
    pub joins: Vec<(NameOrAlias, Join)>,
70
}
71
pub fn statement_to_pipeline(
315✔
72
    sql: &str,
315✔
73
    pipeline: &mut AppPipeline<SchemaSQLContext>,
315✔
74
    override_name: Option<String>,
315✔
75
) -> Result<QueryContext, PipelineError> {
315✔
76
    let dialect = DozerDialect {};
315✔
77
    let mut ctx = QueryContext::default();
315✔
78

79
    let ast = Parser::parse_sql(&dialect, sql)
315✔
80
        .map_err(|err| PipelineError::InternalError(Box::new(err)))?;
315✔
81
    let query_name = NameOrAlias(format!("query_{}", ctx.get_next_processor_id()), None);
315✔
82

83
    for (idx, statement) in ast.iter().enumerate() {
320✔
84
        match statement {
320✔
85
            Statement::Query(query) => {
320✔
86
                query_to_pipeline(
320✔
87
                    &TableInfo {
320✔
88
                        name: query_name.clone(),
320✔
89
                        is_derived: false,
320✔
90
                        override_name: override_name.clone(),
320✔
91
                    },
320✔
92
                    query,
320✔
93
                    pipeline,
320✔
94
                    &mut ctx,
320✔
95
                    false,
320✔
96
                    idx,
320✔
97
                )?;
320✔
98
            }
99
            s => {
×
100
                return Err(PipelineError::UnsupportedSqlError(
×
101
                    UnsupportedSqlError::GenericError(s.to_string()),
×
102
                ))
×
103
            }
104
        }
105
    }
106
    if ctx.output_tables_map.is_empty() {
315✔
107
        return Err(PipelineError::NoIntoProvided);
1✔
108
    }
314✔
109

314✔
110
    Ok(ctx)
314✔
111
}
315✔
112

×
113
fn query_to_pipeline(
380✔
114
    table_info: &TableInfo,
380✔
115
    query: &Query,
380✔
116
    pipeline: &mut AppPipeline<SchemaSQLContext>,
380✔
117
    query_ctx: &mut QueryContext,
380✔
118
    stateful: bool,
380✔
119
    pipeline_idx: usize,
380✔
120
) -> Result<(), PipelineError> {
380✔
121
    // return error if there is unsupported syntax
380✔
122
    if !query.order_by.is_empty() {
380✔
123
        return Err(PipelineError::UnsupportedSqlError(
×
124
            UnsupportedSqlError::OrderByError,
×
125
        ));
×
126
    }
380✔
127

380✔
128
    if query.limit.is_some() || query.offset.is_some() {
380✔
129
        return Err(PipelineError::UnsupportedSqlError(
×
130
            UnsupportedSqlError::LimitOffsetError,
×
131
        ));
×
132
    }
380✔
133

×
134
    // Attach the first pipeline if there is with clause
×
135
    if let Some(with) = &query.with {
380✔
136
        if with.recursive {
36✔
137
            return Err(PipelineError::UnsupportedSqlError(
×
138
                UnsupportedSqlError::Recursive,
×
139
            ));
×
140
        }
36✔
141

×
142
        for table in &with.cte_tables {
74✔
143
            if table.from.is_some() {
38✔
144
                return Err(PipelineError::UnsupportedSqlError(
×
145
                    UnsupportedSqlError::CteFromError,
×
146
                ));
×
147
            }
38✔
148
            let table_name = table.alias.name.to_string();
38✔
149
            if query_ctx
38✔
150
                .pipeline_map
38✔
151
                .contains_key(&(pipeline_idx, table_name.clone()))
38✔
152
            {
×
153
                return Err(InvalidQuery(format!(
×
154
                    "WITH query name {table_name:?} specified more than once"
×
155
                )));
×
156
            }
38✔
157
            query_to_pipeline(
38✔
158
                &TableInfo {
38✔
159
                    name: NameOrAlias(table_name.clone(), Some(table_name)),
38✔
160
                    is_derived: true,
38✔
161
                    override_name: None,
38✔
162
                },
38✔
163
                &table.query,
38✔
164
                pipeline,
38✔
165
                query_ctx,
38✔
166
                true,
38✔
167
                pipeline_idx,
38✔
168
            )?;
38✔
169
        }
×
170
    };
344✔
171

×
172
    match *query.body.clone() {
380✔
173
        SetExpr::Select(select) => {
364✔
174
            select_to_pipeline(
364✔
175
                table_info,
364✔
176
                *select,
364✔
177
                pipeline,
364✔
178
                query_ctx,
364✔
179
                stateful,
364✔
180
                pipeline_idx,
364✔
181
            )?;
364✔
182
        }
×
183
        SetExpr::Query(query) => {
×
184
            let query_name = format!("subquery_{}", query_ctx.get_next_processor_id());
×
185
            let mut ctx = QueryContext::default();
×
186
            query_to_pipeline(
×
187
                &TableInfo {
×
188
                    name: NameOrAlias(query_name, None),
×
189
                    is_derived: true,
×
190
                    override_name: None,
×
191
                },
×
192
                &query,
×
193
                pipeline,
×
194
                &mut ctx,
×
195
                stateful,
×
196
                pipeline_idx,
×
197
            )?
×
198
        }
×
199
        SetExpr::SetOperation {
×
200
            op,
16✔
201
            set_quantifier,
16✔
202
            left,
16✔
203
            right,
16✔
204
        } => match op {
16✔
205
            SetOperator::Union => {
×
206
                set_to_pipeline(
16✔
207
                    table_info,
16✔
208
                    left,
16✔
209
                    right,
16✔
210
                    set_quantifier,
16✔
211
                    pipeline,
16✔
212
                    query_ctx,
16✔
213
                    stateful,
16✔
214
                    pipeline_idx,
16✔
215
                )?;
16✔
216
            }
217
            _ => return Err(PipelineError::InvalidOperator(op.to_string())),
×
218
        },
×
219
        _ => {
×
220
            return Err(PipelineError::UnsupportedSqlError(
×
221
                UnsupportedSqlError::GenericError("Unsupported query body structure".to_string()),
×
222
            ))
×
223
        }
×
224
    };
225
    Ok(())
380✔
226
}
380✔
227

×
228
fn select_to_pipeline(
396✔
229
    table_info: &TableInfo,
396✔
230
    select: Select,
396✔
231
    pipeline: &mut AppPipeline<SchemaSQLContext>,
396✔
232
    query_ctx: &mut QueryContext,
396✔
233
    stateful: bool,
396✔
234
    pipeline_idx: usize,
396✔
235
) -> Result<String, PipelineError> {
396✔
236
    // FROM clause
396✔
237
    if select.from.len() != 1 {
396✔
238
        return Err(PipelineError::UnsupportedSqlError(
×
239
            UnsupportedSqlError::FromCommaSyntax,
×
240
        ));
×
241
    }
396✔
242

243
    // let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx, pipeline_idx)?;
244
    //
245
    // let (input_nodes, output_node, mut used_sources) = add_from_to_pipeline(
246
    //     pipeline,
247
    //     &input_tables,
248
    //     &mut query_ctx.pipeline_map,
249
    //     pipeline_idx,
×
250
    // )?;
×
251

252
    let connection_info =
396✔
253
        insert_from_to_pipeline(&select.from[0], pipeline, pipeline_idx, query_ctx)?;
396✔
254

×
255
    let input_nodes = connection_info.input_nodes;
396✔
256
    let output_node = connection_info.output_node;
396✔
257

396✔
258
    let gen_agg_name = format!("agg_{}", query_ctx.get_next_processor_id());
396✔
259

396✔
260
    let gen_selection_name = format!("select_{}", query_ctx.get_next_processor_id());
396✔
261
    let (gen_product_name, product_output_port) = output_node;
396✔
262

×
263
    for (source_name, processor_name, processor_port) in input_nodes.iter() {
396✔
264
        if let Some(table_info) = query_ctx
60✔
265
            .pipeline_map
60✔
266
            .get(&(pipeline_idx, source_name.clone()))
60✔
267
        {
60✔
268
            pipeline.connect_nodes(
60✔
269
                &table_info.node,
60✔
270
                table_info.port,
60✔
271
                processor_name,
60✔
272
                *processor_port as PortHandle,
60✔
273
            );
60✔
274
            // If not present in pipeline_map, insert into used_sources as this is coming from source
60✔
275
        } else {
60✔
276
            query_ctx.used_sources.push(source_name.clone());
×
277
        }
×
278
    }
×
279

×
280
    let aggregation =
396✔
281
        AggregationProcessorFactory::new(gen_agg_name.clone(), select.clone(), stateful);
396✔
282

396✔
283
    pipeline.add_processor(Box::new(aggregation), &gen_agg_name, vec![]);
396✔
284

×
285
    // Where clause
×
286
    if let Some(selection) = select.selection {
396✔
287
        let selection = SelectionProcessorFactory::new(gen_selection_name.to_owned(), selection);
102✔
288

102✔
289
        pipeline.add_processor(Box::new(selection), &gen_selection_name, vec![]);
102✔
290

102✔
291
        pipeline.connect_nodes(
102✔
292
            &gen_product_name,
102✔
293
            product_output_port,
102✔
294
            &gen_selection_name,
102✔
295
            DEFAULT_PORT_HANDLE,
102✔
296
        );
102✔
297

102✔
298
        pipeline.connect_nodes(
102✔
299
            &gen_selection_name,
102✔
300
            DEFAULT_PORT_HANDLE,
102✔
301
            &gen_agg_name,
102✔
302
            DEFAULT_PORT_HANDLE,
102✔
303
        );
102✔
304
    } else {
294✔
305
        pipeline.connect_nodes(
294✔
306
            &gen_product_name,
294✔
307
            product_output_port,
294✔
308
            &gen_agg_name,
294✔
309
            DEFAULT_PORT_HANDLE,
294✔
310
        );
294✔
311
    }
294✔
312

×
313
    query_ctx.pipeline_map.insert(
396✔
314
        (pipeline_idx, table_info.name.0.to_string()),
396✔
315
        OutputNodeInfo {
396✔
316
            node: gen_agg_name.clone(),
396✔
317
            port: DEFAULT_PORT_HANDLE,
396✔
318
            is_derived: table_info.is_derived,
396✔
319
        },
396✔
320
    );
396✔
321

322
    let output_table_name = if let Some(into) = select.into {
396✔
323
        Some(into.name.to_string())
38✔
324
    } else {
×
325
        table_info.override_name.clone()
358✔
326
    };
×
327
    if let Some(table_name) = output_table_name {
396✔
328
        query_ctx.output_tables_map.insert(
319✔
329
            table_name,
319✔
330
            OutputNodeInfo {
319✔
331
                node: gen_agg_name.clone(),
319✔
332
                port: DEFAULT_PORT_HANDLE,
319✔
333
                is_derived: false,
319✔
334
            },
319✔
335
        );
319✔
336
    }
321✔
337

338
    Ok(gen_agg_name)
396✔
339
}
396✔
340

×
341
#[allow(clippy::too_many_arguments)]
×
342
fn set_to_pipeline(
16✔
343
    table_info: &TableInfo,
16✔
344
    left_select: Box<SetExpr>,
16✔
345
    right_select: Box<SetExpr>,
16✔
346
    set_quantifier: SetQuantifier,
16✔
347
    pipeline: &mut AppPipeline<SchemaSQLContext>,
16✔
348
    query_ctx: &mut QueryContext,
16✔
349
    stateful: bool,
16✔
350
    pipeline_idx: usize,
16✔
351
) -> Result<String, PipelineError> {
16✔
352
    let gen_left_set_name = format!("set_left_{}", query_ctx.get_next_processor_id());
16✔
353
    let left_table_info = TableInfo {
16✔
354
        name: NameOrAlias(gen_left_set_name.clone(), None),
16✔
355
        override_name: None,
16✔
356
        is_derived: false,
16✔
357
    };
16✔
358
    let gen_right_set_name = format!("set_right_{}", query_ctx.get_next_processor_id());
16✔
359
    let right_table_info = TableInfo {
16✔
360
        name: NameOrAlias(gen_right_set_name.clone(), None),
16✔
361
        override_name: None,
16✔
362
        is_derived: false,
16✔
363
    };
16✔
364

×
365
    let _left_pipeline_name = match *left_select {
16✔
366
        SetExpr::Select(select) => select_to_pipeline(
16✔
367
            &left_table_info,
16✔
368
            *select,
16✔
369
            pipeline,
16✔
370
            query_ctx,
16✔
371
            stateful,
16✔
372
            pipeline_idx,
16✔
373
        )?,
16✔
374
        SetExpr::SetOperation {
×
375
            op: _,
×
376
            set_quantifier,
×
377
            left,
×
378
            right,
×
379
        } => set_to_pipeline(
×
380
            &left_table_info,
×
381
            left,
×
382
            right,
×
383
            set_quantifier,
×
384
            pipeline,
×
385
            query_ctx,
×
386
            stateful,
×
387
            pipeline_idx,
×
388
        )?,
×
389
        _ => {
×
390
            return Err(PipelineError::InvalidQuery(
×
391
                "Invalid UNION left Query".to_string(),
×
392
            ))
×
393
        }
×
394
    };
×
395

×
396
    let _right_pipeline_name = match *right_select {
16✔
397
        SetExpr::Select(select) => select_to_pipeline(
16✔
398
            &right_table_info,
16✔
399
            *select,
16✔
400
            pipeline,
16✔
401
            query_ctx,
16✔
402
            stateful,
16✔
403
            pipeline_idx,
16✔
404
        )?,
16✔
405
        SetExpr::SetOperation {
×
406
            op: _,
×
407
            set_quantifier,
×
408
            left,
×
409
            right,
×
410
        } => set_to_pipeline(
×
411
            &right_table_info,
×
412
            left,
×
413
            right,
×
414
            set_quantifier,
×
415
            pipeline,
×
416
            query_ctx,
×
417
            stateful,
×
418
            pipeline_idx,
×
419
        )?,
×
420
        _ => {
×
421
            return Err(PipelineError::InvalidQuery(
×
422
                "Invalid UNION right Query".to_string(),
×
423
            ))
×
424
        }
×
425
    };
426

×
427
    let mut gen_set_name = format!("set_{}", query_ctx.get_next_processor_id());
16✔
428

×
429
    let left_pipeline_output_node = match query_ctx
16✔
430
        .pipeline_map
16✔
431
        .get(&(pipeline_idx, gen_left_set_name))
16✔
432
    {
×
433
        Some(pipeline) => pipeline,
16✔
434
        None => {
×
435
            return Err(PipelineError::InvalidQuery(
×
436
                "Invalid UNION left Query".to_string(),
×
437
            ))
×
438
        }
×
439
    };
×
440

×
441
    let right_pipeline_output_node = match query_ctx
16✔
442
        .pipeline_map
16✔
443
        .get(&(pipeline_idx, gen_right_set_name))
16✔
444
    {
×
445
        Some(pipeline) => pipeline,
16✔
446
        None => {
×
447
            return Err(PipelineError::InvalidQuery(
×
448
                "Invalid UNION Right Query".to_string(),
×
449
            ))
×
450
        }
×
451
    };
×
452

×
453
    if table_info.override_name.is_some() {
16✔
454
        gen_set_name = table_info.override_name.to_owned().unwrap();
×
455
    }
16✔
456

×
457
    let set_proc_fac = SetProcessorFactory::new(gen_set_name.clone(), set_quantifier);
16✔
458

16✔
459
    pipeline.add_processor(Box::new(set_proc_fac), &gen_set_name, vec![]);
16✔
460

16✔
461
    pipeline.connect_nodes(
16✔
462
        &left_pipeline_output_node.node,
16✔
463
        left_pipeline_output_node.port,
16✔
464
        &gen_set_name,
16✔
465
        0 as PortHandle,
16✔
466
    );
16✔
467

16✔
468
    pipeline.connect_nodes(
16✔
469
        &right_pipeline_output_node.node,
16✔
470
        right_pipeline_output_node.port,
16✔
471
        &gen_set_name,
16✔
472
        1 as PortHandle,
16✔
473
    );
16✔
474

×
475
    for (_, table_name) in query_ctx.pipeline_map.keys() {
32✔
476
        query_ctx.output_tables_map.remove_entry(table_name);
32✔
477
    }
32✔
478

×
479
    query_ctx.pipeline_map.insert(
16✔
480
        (pipeline_idx, table_info.name.0.to_string()),
16✔
481
        OutputNodeInfo {
16✔
482
            node: gen_set_name.clone(),
16✔
483
            port: DEFAULT_PORT_HANDLE,
16✔
484
            is_derived: table_info.is_derived,
16✔
485
        },
16✔
486
    );
16✔
487

16✔
488
    Ok(gen_set_name)
16✔
489
}
16✔
490

491
/// Returns a vector of input port handles and relative table name
492
///
493
/// # Errors
×
494
///
×
495
/// This function will return an error if it's not possible to get an input name.
×
496
pub fn get_input_tables(
×
497
    from: &TableWithJoins,
×
498
    pipeline: &mut AppPipeline<SchemaSQLContext>,
×
499
    query_ctx: &mut QueryContext,
×
500
    pipeline_idx: usize,
×
501
) -> Result<IndexedTableWithJoins, PipelineError> {
×
502
    let name = get_from_source(&from.relation, pipeline, query_ctx, pipeline_idx)?;
×
503
    let mut joins = vec![];
×
504

×
505
    for join in from.joins.iter() {
×
506
        let input_name = get_from_source(&join.relation, pipeline, query_ctx, pipeline_idx)?;
×
507
        joins.push((input_name.clone(), join.clone()));
×
508
    }
×
509

×
510
    Ok(IndexedTableWithJoins {
×
511
        relation: (name, from.relation.clone()),
×
512
        joins,
×
513
    })
×
514
}
×
515

×
516
pub fn get_input_names(input_tables: &IndexedTableWithJoins) -> Vec<NameOrAlias> {
×
517
    let mut input_names = vec![];
×
518
    input_names.push(input_tables.relation.0.clone());
×
519

×
520
    for join in &input_tables.joins {
×
521
        input_names.push(join.0.clone());
×
522
    }
×
523
    input_names
×
524
}
×
525

×
526
pub fn get_entry_points(
×
527
    input_tables: &IndexedTableWithJoins,
×
528
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
529
    pipeline_idx: usize,
×
530
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
×
531
    let mut endpoints = vec![];
×
532

×
533
    let input_names = get_input_names(input_tables);
×
534

×
535
    for (input_port, table) in input_names.iter().enumerate() {
×
536
        let name = table.0.clone();
×
537
        if !pipeline_map.contains_key(&(pipeline_idx, name.clone())) {
×
538
            endpoints.push(PipelineEntryPoint::new(name, input_port as PortHandle));
×
539
        }
×
540
    }
×
541

542
    Ok(endpoints)
×
543
}
×
544

×
545
pub fn is_an_entry_point(
×
546
    name: &str,
×
547
    pipeline_map: &mut HashMap<(usize, String), OutputNodeInfo>,
×
548
    pipeline_idx: usize,
×
549
) -> bool {
×
550
    if !pipeline_map.contains_key(&(pipeline_idx, name.to_owned())) {
×
551
        return true;
×
552
    }
×
553
    false
×
554
}
×
555

×
556
pub fn get_from_source(
575✔
557
    relation: &TableFactor,
575✔
558
    pipeline: &mut AppPipeline<SchemaSQLContext>,
575✔
559
    query_ctx: &mut QueryContext,
575✔
560
    pipeline_idx: usize,
575✔
561
) -> Result<NameOrAlias, PipelineError> {
575✔
562
    match relation {
575✔
563
        TableFactor::Table { name, alias, .. } => {
553✔
564
            let input_name = name
553✔
565
                .0
553✔
566
                .iter()
553✔
567
                .map(ExpressionBuilder::normalize_ident)
553✔
568
                .collect::<Vec<String>>()
553✔
569
                .join(".");
553✔
570
            let alias_name = alias
553✔
571
                .as_ref()
553✔
572
                .map(|a| ExpressionBuilder::fullname_from_ident(&[a.name.clone()]));
553✔
573

553✔
574
            Ok(NameOrAlias(input_name, alias_name))
553✔
575
        }
×
576
        TableFactor::Derived {
×
577
            lateral: _,
×
578
            subquery,
22✔
579
            alias,
22✔
580
        } => {
22✔
581
            let name = format!("derived_{}", query_ctx.get_next_processor_id());
22✔
582
            let alias_name = alias.as_ref().map(|alias_ident| {
22✔
583
                ExpressionBuilder::fullname_from_ident(&[alias_ident.name.clone()])
18✔
584
            });
22✔
585

22✔
586
            let name_or = NameOrAlias(name, alias_name);
22✔
587
            query_to_pipeline(
22✔
588
                &TableInfo {
22✔
589
                    name: name_or.clone(),
22✔
590
                    is_derived: true,
22✔
591
                    override_name: None,
22✔
592
                },
22✔
593
                subquery,
22✔
594
                pipeline,
22✔
595
                query_ctx,
22✔
596
                false,
22✔
597
                pipeline_idx,
22✔
598
            )?;
22✔
599

×
600
            Ok(name_or)
22✔
601
        }
×
602
        _ => Err(PipelineError::UnsupportedSqlError(
×
603
            UnsupportedSqlError::JoinTable,
×
604
        )),
×
605
    }
606
}
575✔
607

608
#[cfg(test)]
609
mod tests {
610
    use dozer_core::app::AppPipeline;
611

×
612
    use super::statement_to_pipeline;
×
613
    #[test]
1✔
614
    #[should_panic]
×
615
    fn disallow_zero_outgoing_ndes() {
1✔
616
        let sql = "select * from film";
1✔
617
        statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap();
1✔
618
    }
1✔
619
    #[test]
1✔
620
    fn parse_sql_pipeline() {
1✔
621
        let sql = r#"
1✔
622
                SELECT
1✔
623
                    a.name as "Genre",
1✔
624
                    SUM(amount) as "Gross Revenue(in $)"
1✔
625
                INTO gross_revenue_stats
1✔
626
                FROM
1✔
627
                (
1✔
628
                    SELECT
1✔
629
                        c.name,
1✔
630
                        f.title,
1✔
631
                        p.amount
1✔
632
                    FROM film f
1✔
633
                    LEFT JOIN film_category fc
1✔
634
                        ON fc.film_id = f.film_id
1✔
635
                    LEFT JOIN category c
1✔
636
                        ON fc.category_id = c.category_id
1✔
637
                    LEFT JOIN inventory i
1✔
638
                        ON i.film_id = f.film_id
1✔
639
                    LEFT JOIN rental r
1✔
640
                        ON r.inventory_id = i.inventory_id
1✔
641
                    LEFT JOIN payment p
1✔
642
                        ON p.rental_id = r.rental_id
1✔
643
                    WHERE p.amount IS NOT NULL
1✔
644
                ) a
1✔
645
                GROUP BY name;
1✔
646

1✔
647
                SELECT
1✔
648
                f.name, f.title, p.amount
1✔
649
                INTO film_amounts
1✔
650
                FROM film f
1✔
651
                LEFT JOIN film_category fc;
1✔
652

1✔
653
                WITH tbl as (select id from a)
1✔
654
                select id
1✔
655
                into cte_table
1✔
656
                from tbl;
1✔
657

1✔
658
                WITH tbl as (select id from  a),
1✔
659
                tbl2 as (select id from tbl)
1✔
660
                select id
1✔
661
                into nested_cte_table
1✔
662
                from tbl2;
1✔
663

1✔
664
                WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
665
                cte_table2 as (select id_ct1 from cte_table1)
1✔
666
                select id_ct2
1✔
667
                into nested_derived_table
1✔
668
                from cte_table2;
1✔
669

1✔
670
                with tbl as (select id, ticker from stocks)
1✔
671
                select tbl.id
1✔
672
                into nested_stocks_table
1✔
673
                from  stocks join tbl on tbl.id = stocks.id;
1✔
674
            "#;
1✔
675

1✔
676
        let context = statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap();
1✔
677

1✔
678
        // Should create as many output tables as into statements
1✔
679
        let mut output_keys = context.output_tables_map.keys().collect::<Vec<_>>();
1✔
680
        output_keys.sort();
1✔
681
        let mut expected_keys = vec![
1✔
682
            "gross_revenue_stats",
1✔
683
            "film_amounts",
1✔
684
            "cte_table",
1✔
685
            "nested_cte_table",
1✔
686
            "nested_derived_table",
1✔
687
            "nested_stocks_table",
1✔
688
        ];
1✔
689
        expected_keys.sort();
1✔
690
        assert_eq!(output_keys, expected_keys);
1✔
691
    }
1✔
692
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc