• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102907265

pending completion
4102907265

Pull #803

github

GitHub
Merge d4339f5c1 into 7c772e92a
Pull Request #803: feat: Refactor sql to be common across sources

601 of 601 new or added lines in 24 files covered. (100.0%)

23644 of 37870 relevant lines covered (62.43%)

37088.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.94
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::selection::factory::SelectionProcessorFactory;
5
use dozer_core::dag::app::AppPipeline;
6
use dozer_core::dag::app::PipelineEntryPoint;
7
use dozer_core::dag::appsource::AppSourceId;
8
use dozer_core::dag::node::PortHandle;
9
use dozer_core::dag::DEFAULT_PORT_HANDLE;
10
use sqlparser::ast::{Join, TableFactor, TableWithJoins};
11
use sqlparser::{
12
    ast::{Query, Select, SetExpr, Statement},
13
    dialect::AnsiDialect,
14
    parser::Parser,
15
};
16
use std::collections::HashMap;
17
use std::sync::Arc;
18

19
use super::errors::UnsupportedSqlError;
20
use super::expression::builder::{fullname_from_ident, normalize_ident, NameOrAlias};
21
use super::product::factory::FromProcessorFactory;
22

23
#[derive(Debug, Clone, Default)]
1,420✔
24
pub struct SchemaSQLContext {}
25

26
#[derive(Debug, Clone)]
×
27
pub struct QueryTableInfo {
×
28
    // Name to connect in dag
29
    pub node: String,
30
    // Port to connect in dag
31
    pub port: PortHandle,
32
    // If this table is originally from a source or created in transforms
×
33
    pub is_derived: bool,
34
    // TODO add:indexes to the tables
35
}
36

37
pub struct TableInfo {
38
    pub name: NameOrAlias,
×
39
    pub override_name: Option<String>,
×
40
    pub is_derived: bool,
×
41
}
×
42
/// The struct contains some contexts during query to pipeline.
×
43
#[derive(Debug, Clone, Default)]
70✔
44
pub struct QueryContext {
×
45
    // Internal tables map, used to store the tables that are created by the queries
×
46
    pub pipeline_map: HashMap<(usize, String), QueryTableInfo>,
×
47

×
48
    // Output tables map that are marked with "INTO" used to store the tables, these can be exposed to sinks.
×
49
    pub output_tables_map: HashMap<String, QueryTableInfo>,
×
50

×
51
    // Used Sources
×
52
    pub used_sources: Vec<String>,
×
53
}
×
54

×
55
#[derive(Debug, Clone)]
83✔
56
pub struct IndexedTabelWithJoins {
×
57
    pub relation: (NameOrAlias, TableFactor),
×
58
    pub joins: Vec<(NameOrAlias, Join)>,
×
59
}
60

×
61
pub fn statement_to_pipeline(
70✔
62
    sql: &str,
70✔
63
    pipeline: &mut AppPipeline<SchemaSQLContext>,
70✔
64
    override_name: Option<String>,
70✔
65
) -> Result<QueryContext, PipelineError> {
70✔
66
    let dialect = AnsiDialect {};
70✔
67
    let mut ctx = QueryContext::default();
70✔
68

70✔
69
    let ast = Parser::parse_sql(&dialect, sql).unwrap();
70✔
70
    let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
70✔
71

×
72
    for (idx, statement) in ast.iter().enumerate() {
75✔
73
        match statement {
75✔
74
            Statement::Query(query) => {
75✔
75
                query_to_pipeline(
75✔
76
                    &TableInfo {
75✔
77
                        name: query_name.clone(),
75✔
78
                        is_derived: false,
75✔
79
                        override_name: override_name.clone(),
75✔
80
                    },
75✔
81
                    query,
75✔
82
                    pipeline,
75✔
83
                    &mut ctx,
75✔
84
                    false,
75✔
85
                    idx,
75✔
86
                )?;
75✔
87
            }
88
            s => {
×
89
                return Err(PipelineError::UnsupportedSqlError(
×
90
                    UnsupportedSqlError::GenericError(s.to_string()),
×
91
                ))
×
92
            }
×
93
        }
×
94
    }
×
95

×
96
    Ok(ctx)
70✔
97
}
70✔
98

×
99
fn query_to_pipeline(
83✔
100
    table_info: &TableInfo,
83✔
101
    query: &Query,
83✔
102
    pipeline: &mut AppPipeline<SchemaSQLContext>,
83✔
103
    query_ctx: &mut QueryContext,
83✔
104
    stateful: bool,
83✔
105
    pipeline_idx: usize,
83✔
106
) -> Result<(), PipelineError> {
83✔
107
    // return error if there is unsupported syntax
83✔
108
    if !query.order_by.is_empty() {
83✔
109
        return Err(PipelineError::UnsupportedSqlError(
×
110
            UnsupportedSqlError::OrderByError,
×
111
        ));
×
112
    }
83✔
113

83✔
114
    if query.limit.is_some() || query.offset.is_some() {
83✔
115
        return Err(PipelineError::UnsupportedSqlError(
×
116
            UnsupportedSqlError::LimitOffsetError,
×
117
        ));
×
118
    }
83✔
119

×
120
    // Attach the first pipeline if there is with clause
×
121
    if let Some(with) = &query.with {
83✔
122
        if with.recursive {
4✔
123
            return Err(PipelineError::UnsupportedSqlError(
×
124
                UnsupportedSqlError::Recursive,
×
125
            ));
×
126
        }
4✔
127

×
128
        for table in &with.cte_tables {
10✔
129
            if table.from.is_some() {
6✔
130
                return Err(PipelineError::UnsupportedSqlError(
×
131
                    UnsupportedSqlError::CteFromError,
×
132
                ));
×
133
            }
6✔
134
            let table_name = table.alias.name.to_string();
6✔
135
            if query_ctx
6✔
136
                .pipeline_map
6✔
137
                .contains_key(&(pipeline_idx, table_name.clone()))
6✔
138
            {
×
139
                return Err(InvalidQuery(format!(
×
140
                    "WITH query name {table_name:?} specified more than once"
×
141
                )));
×
142
            }
6✔
143
            query_to_pipeline(
6✔
144
                &TableInfo {
6✔
145
                    name: NameOrAlias(table_name.clone(), Some(table_name)),
6✔
146
                    is_derived: true,
6✔
147
                    override_name: None,
6✔
148
                },
6✔
149
                &table.query,
6✔
150
                pipeline,
6✔
151
                query_ctx,
6✔
152
                true,
6✔
153
                pipeline_idx,
6✔
154
            )?;
6✔
155
        }
×
156
    };
79✔
157

×
158
    match *query.body.clone() {
83✔
159
        SetExpr::Select(select) => {
83✔
160
            select_to_pipeline(
83✔
161
                table_info,
83✔
162
                *select,
83✔
163
                pipeline,
83✔
164
                query_ctx,
83✔
165
                stateful,
83✔
166
                pipeline_idx,
83✔
167
            )?;
83✔
168
        }
×
169
        SetExpr::Query(query) => {
×
170
            let query_name = format!("subquery_{}", uuid::Uuid::new_v4());
×
171
            let mut ctx = QueryContext::default();
×
172
            query_to_pipeline(
×
173
                &TableInfo {
×
174
                    name: NameOrAlias(query_name, None),
×
175
                    is_derived: true,
×
176
                    override_name: None,
×
177
                },
×
178
                &query,
×
179
                pipeline,
×
180
                &mut ctx,
×
181
                stateful,
×
182
                pipeline_idx,
×
183
            )?
×
184
        }
×
185
        _ => {
×
186
            return Err(PipelineError::UnsupportedSqlError(
×
187
                UnsupportedSqlError::SelectOnlyError,
×
188
            ))
×
189
        }
×
190
    };
×
191
    Ok(())
83✔
192
}
83✔
193

×
194
fn select_to_pipeline(
83✔
195
    table_info: &TableInfo,
83✔
196
    select: Select,
83✔
197
    pipeline: &mut AppPipeline<SchemaSQLContext>,
83✔
198
    query_ctx: &mut QueryContext,
83✔
199
    stateful: bool,
83✔
200
    pipeline_idx: usize,
83✔
201
) -> Result<(), PipelineError> {
83✔
202
    // FROM clause
83✔
203
    if select.from.len() != 1 {
83✔
204
        return Err(PipelineError::UnsupportedSqlError(
×
205
            UnsupportedSqlError::FromCommaSyntax,
×
206
        ));
×
207
    }
83✔
208

×
209
    let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx, pipeline_idx)?;
83✔
210

×
211
    let product = FromProcessorFactory::new(input_tables.clone());
83✔
212

213
    let input_endpoints =
83✔
214
        get_entry_points(&input_tables, &mut query_ctx.pipeline_map, pipeline_idx)?;
83✔
215

216
    let gen_product_name = format!("product_{}", uuid::Uuid::new_v4());
83✔
217
    let gen_agg_name = format!("agg_{}", uuid::Uuid::new_v4());
83✔
218
    let gen_selection_name = format!("select_{}", uuid::Uuid::new_v4());
83✔
219
    pipeline.add_processor(Arc::new(product), &gen_product_name, input_endpoints);
83✔
220

83✔
221
    let input_names = get_input_names(&input_tables);
83✔
222
    for (port_index, table_name) in input_names.iter().enumerate() {
110✔
223
        if let Some(table_info) = query_ctx
110✔
224
            .pipeline_map
110✔
225
            .get(&(pipeline_idx, table_name.0.clone()))
110✔
226
        {
×
227
            pipeline.connect_nodes(
8✔
228
                &table_info.node,
8✔
229
                Some(table_info.port),
8✔
230
                &gen_product_name,
8✔
231
                Some(port_index as PortHandle),
8✔
232
                true,
8✔
233
            )?;
8✔
234
            // If not present in pipeline_map, insert into used_sources as this is coming from source
×
235
        } else {
102✔
236
            query_ctx.used_sources.push(table_name.0.clone());
102✔
237
        }
102✔
238
    }
×
239

240
    let aggregation =
83✔
241
        AggregationProcessorFactory::new(select.projection.clone(), select.group_by, stateful);
83✔
242

83✔
243
    pipeline.add_processor(Arc::new(aggregation), &gen_agg_name, vec![]);
83✔
244

×
245
    // Where clause
×
246
    if let Some(selection) = select.selection {
83✔
247
        let selection = SelectionProcessorFactory::new(selection);
30✔
248

30✔
249
        pipeline.add_processor(Arc::new(selection), &gen_selection_name, vec![]);
30✔
250

30✔
251
        pipeline.connect_nodes(
30✔
252
            &gen_product_name,
30✔
253
            Some(DEFAULT_PORT_HANDLE),
30✔
254
            &gen_selection_name,
30✔
255
            Some(DEFAULT_PORT_HANDLE),
30✔
256
            true,
30✔
257
        )?;
30✔
258

×
259
        pipeline.connect_nodes(
30✔
260
            &gen_selection_name,
30✔
261
            Some(DEFAULT_PORT_HANDLE),
30✔
262
            &gen_agg_name,
30✔
263
            Some(DEFAULT_PORT_HANDLE),
30✔
264
            true,
30✔
265
        )?;
30✔
266
    } else {
×
267
        pipeline.connect_nodes(
53✔
268
            &gen_product_name,
53✔
269
            Some(DEFAULT_PORT_HANDLE),
53✔
270
            &gen_agg_name,
53✔
271
            Some(DEFAULT_PORT_HANDLE),
53✔
272
            true,
53✔
273
        )?;
53✔
274
    }
×
275

×
276
    query_ctx.pipeline_map.insert(
83✔
277
        (pipeline_idx, table_info.name.0.to_string()),
83✔
278
        QueryTableInfo {
83✔
279
            node: gen_agg_name.clone(),
83✔
280
            port: DEFAULT_PORT_HANDLE,
83✔
281
            is_derived: table_info.is_derived,
83✔
282
        },
83✔
283
    );
83✔
284

285
    let output_table_name = if let Some(into) = select.into {
83✔
286
        Some(into.name.to_string())
6✔
287
    } else {
×
288
        table_info.override_name.clone()
77✔
289
    };
×
290
    if let Some(table_name) = output_table_name {
83✔
291
        query_ctx.output_tables_map.insert(
75✔
292
            table_name,
75✔
293
            QueryTableInfo {
75✔
294
                node: gen_agg_name,
75✔
295
                port: DEFAULT_PORT_HANDLE,
75✔
296
                is_derived: false,
75✔
297
            },
75✔
298
        );
75✔
299
    }
76✔
300

×
301
    Ok(())
83✔
302
}
83✔
303

304
/// Returns a vector of input port handles and relative table name
×
305
///
306
/// # Errors
307
///
308
/// This function will return an error if it's not possible to get an input name.
309
pub fn get_input_tables(
83✔
310
    from: &TableWithJoins,
83✔
311
    pipeline: &mut AppPipeline<SchemaSQLContext>,
83✔
312
    query_ctx: &mut QueryContext,
83✔
313
    pipeline_idx: usize,
83✔
314
) -> Result<IndexedTabelWithJoins, PipelineError> {
83✔
315
    let name = get_from_source(&from.relation, pipeline, query_ctx, pipeline_idx)?;
83✔
316
    let mut joins = vec![];
83✔
317

×
318
    for join in from.joins.iter() {
83✔
319
        let input_name = get_from_source(&join.relation, pipeline, query_ctx, pipeline_idx)?;
27✔
320
        joins.push((input_name.clone(), join.clone()));
27✔
321
    }
×
322

×
323
    Ok(IndexedTabelWithJoins {
83✔
324
        relation: (name, from.relation.clone()),
83✔
325
        joins,
83✔
326
    })
83✔
327
}
83✔
328

×
329
pub fn get_input_names(input_tables: &IndexedTabelWithJoins) -> Vec<NameOrAlias> {
529✔
330
    let mut input_names = vec![];
529✔
331
    input_names.push(input_tables.relation.0.clone());
529✔
332

×
333
    for join in &input_tables.joins {
711✔
334
        input_names.push(join.0.clone());
182✔
335
    }
182✔
336
    input_names
529✔
337
}
529✔
338
pub fn get_entry_points(
83✔
339
    input_tables: &IndexedTabelWithJoins,
83✔
340
    pipeline_map: &mut HashMap<(usize, String), QueryTableInfo>,
83✔
341
    pipeline_idx: usize,
83✔
342
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
83✔
343
    let mut endpoints = vec![];
83✔
344

83✔
345
    let input_names = get_input_names(input_tables);
83✔
346

×
347
    for (input_port, table) in input_names.iter().enumerate() {
110✔
348
        let name = table.0.clone();
110✔
349
        if !pipeline_map.contains_key(&(pipeline_idx, name.clone())) {
110✔
350
            endpoints.push(PipelineEntryPoint::new(
102✔
351
                AppSourceId::new(name, None),
102✔
352
                input_port as PortHandle,
102✔
353
            ));
102✔
354
        }
102✔
355
    }
×
356

×
357
    Ok(endpoints)
83✔
358
}
83✔
359

×
360
pub fn get_from_source(
110✔
361
    relation: &TableFactor,
110✔
362
    pipeline: &mut AppPipeline<SchemaSQLContext>,
110✔
363
    query_ctx: &mut QueryContext,
110✔
364
    pipeline_idx: usize,
110✔
365
) -> Result<NameOrAlias, PipelineError> {
110✔
366
    match relation {
110✔
367
        TableFactor::Table { name, alias, .. } => {
108✔
368
            let input_name = name
108✔
369
                .0
108✔
370
                .iter()
108✔
371
                .map(normalize_ident)
108✔
372
                .collect::<Vec<String>>()
108✔
373
                .join(".");
108✔
374
            let alias_name = alias
108✔
375
                .as_ref()
108✔
376
                .map(|a| fullname_from_ident(&[a.name.clone()]));
108✔
377

108✔
378
            Ok(NameOrAlias(input_name, alias_name))
108✔
379
        }
380
        TableFactor::Derived {
381
            lateral: _,
382
            subquery,
2✔
383
            alias,
2✔
384
        } => {
2✔
385
            let name = format!("derived_{}", uuid::Uuid::new_v4());
2✔
386
            let alias_name = alias
2✔
387
                .as_ref()
2✔
388
                .map(|alias_ident| fullname_from_ident(&[alias_ident.name.clone()]));
2✔
389

2✔
390
            let name_or = NameOrAlias(name, alias_name);
2✔
391
            query_to_pipeline(
2✔
392
                &TableInfo {
2✔
393
                    name: name_or.clone(),
2✔
394
                    is_derived: true,
2✔
395
                    override_name: None,
2✔
396
                },
2✔
397
                subquery,
2✔
398
                pipeline,
2✔
399
                query_ctx,
2✔
400
                false,
2✔
401
                pipeline_idx,
2✔
402
            )?;
2✔
403

404
            Ok(name_or)
2✔
405
        }
406
        _ => Err(PipelineError::UnsupportedSqlError(
×
407
            UnsupportedSqlError::JoinTable,
×
408
        )),
×
409
    }
410
}
110✔
411

412
#[cfg(test)]
413
mod tests {
414
    use dozer_core::dag::app::AppPipeline;
415

416
    use super::statement_to_pipeline;
417

418
    #[test]
1✔
419
    fn parse_sql_pipeline() {
1✔
420
        let sql = r#"
1✔
421
                SELECT
1✔
422
                a.name as "Genre",
1✔
423
                    SUM(amount) as "Gross Revenue(in $)"
1✔
424
                INTO gross_revenue_stats
1✔
425
                FROM
1✔
426
                (
1✔
427
                    SELECT
1✔
428
                    c.name, f.title, p.amount
1✔
429
                FROM film f
1✔
430
                LEFT JOIN film_category fc
1✔
431
                ON fc.film_id = f.film_id
1✔
432
                LEFT JOIN category c
1✔
433
                ON fc.category_id = c.category_id
1✔
434
                LEFT JOIN inventory i
1✔
435
                ON i.film_id = f.film_id
1✔
436
                LEFT JOIN rental r
1✔
437
                ON r.inventory_id = i.inventory_id
1✔
438
                LEFT JOIN payment p
1✔
439
                ON p.rental_id = r.rental_id
1✔
440
                WHERE p.amount IS NOT NULL
1✔
441
                ) a
1✔
442
                GROUP BY name;
1✔
443
            
1✔
444
                SELECT
1✔
445
                f.name, f.title, p.amount
1✔
446
                INTO film_amounts
1✔
447
                FROM film f
1✔
448
                LEFT JOIN film_category fc;
1✔
449
                
1✔
450
                WITH tbl as (select id from a)
1✔
451
                select id 
1✔
452
                into cte_table
1✔
453
                from tbl;
1✔
454
                
1✔
455
                WITH tbl as (select id from  a),
1✔
456
                tbl2 as (select id from tbl)
1✔
457
                select id 
1✔
458
                into nested_cte_table
1✔
459
                from tbl2;
1✔
460
                
1✔
461
                WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
462
                cte_table2 as (select id_ct1 from cte_table1)
1✔
463
                select id_ct2 
1✔
464
                into nested_derived_table
1✔
465
                from cte_table2;
1✔
466
               
1✔
467
                with tbl as (select id, ticker from stocks)
1✔
468
                select tbl.id 
1✔
469
                into nested_stocks_table
1✔
470
                from  stocks join tbl on tbl.id = stocks.id;
1✔
471
            "#;
1✔
472

1✔
473
        let context = statement_to_pipeline(sql, &mut AppPipeline::new(), None).unwrap();
1✔
474

1✔
475
        // Should create as many output tables as into statements
1✔
476
        let mut output_keys = context.output_tables_map.keys().collect::<Vec<_>>();
1✔
477
        output_keys.sort();
1✔
478
        let mut expected_keys = vec![
1✔
479
            "gross_revenue_stats",
1✔
480
            "film_amounts",
1✔
481
            "cte_table",
1✔
482
            "nested_cte_table",
1✔
483
            "nested_derived_table",
1✔
484
            "nested_stocks_table",
1✔
485
        ];
1✔
486
        expected_keys.sort();
1✔
487
        assert_eq!(output_keys, expected_keys);
1✔
488
    }
1✔
489
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc