• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965135367

pending completion
3965135367

Pull #680

github

GitHub
Merge 1add77327 into 56c0cf2b3
Pull Request #680: feat: Implement nested queries and CTE.

506 of 506 new or added lines in 18 files covered. (100.0%)

21999 of 33062 relevant lines covered (66.54%)

50489.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.07
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::selection::factory::SelectionProcessorFactory;
4
use crate::pipeline::{errors::PipelineError, product::factory::ProductProcessorFactory};
5
use dozer_core::dag::app::AppPipeline;
6
use dozer_core::dag::app::PipelineEntryPoint;
7
use dozer_core::dag::appsource::AppSourceId;
8
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
9
use dozer_core::dag::node::PortHandle;
10
use sqlparser::ast::{Join, TableFactor, TableWithJoins};
11
use sqlparser::{
12
    ast::{Query, Select, SetExpr, Statement},
13
    dialect::AnsiDialect,
14
    parser::Parser,
15
};
16
use std::collections::HashMap;
17
use std::sync::Arc;
18

19
use super::errors::UnsupportedSqlError;
20
use super::expression::builder::{fullname_from_ident, normalize_ident};
21

×
22
#[derive(Debug, Clone)]
90✔
23
pub struct IndexedTabelWithJoins {
×
24
    pub relation: (NameOrAlias, TableFactor),
×
25
    pub joins: Vec<(NameOrAlias, Join)>,
×
26
}
×
27

×
28
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
768✔
29
pub struct NameOrAlias(pub String, pub Option<String>);
×
30

×
31
pub fn statement_to_pipeline(
82✔
32
    sql: &str,
82✔
33
) -> Result<(AppPipeline, (String, PortHandle)), PipelineError> {
82✔
34
    let dialect = AnsiDialect {};
82✔
35

82✔
36
    let ast = Parser::parse_sql(&dialect, sql).unwrap();
82✔
37
    let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
82✔
38
    let statement = ast.get(0).expect("First statement is missing").to_owned();
82✔
39

82✔
40
    let mut pipeline = AppPipeline::new();
82✔
41
    let mut pipeline_map = HashMap::new();
82✔
42
    if let Statement::Query(query) = statement {
82✔
43
        query_to_pipeline(&query_name, &query, &mut pipeline, &mut pipeline_map, false)?;
82✔
44
    };
×
45
    let node = pipeline_map
82✔
46
        .get(&query_name.0)
82✔
47
        .expect("query should have been initialized")
82✔
48
        .to_owned();
82✔
49
    Ok((pipeline, node))
82✔
50
}
82✔
51

×
52
fn query_to_pipeline(
×
53
    processor_name: &NameOrAlias,
×
54
    query: &Query,
×
55
    pipeline: &mut AppPipeline,
×
56
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
×
57
    stateful: bool,
×
58
) -> Result<(), PipelineError> {
×
59
    // Attach the first pipeline if there is with clause
×
60
    if let Some(with) = &query.with {
90✔
61
        if with.recursive {
4✔
62
            return Err(PipelineError::UnsupportedSqlError(
×
63
                UnsupportedSqlError::Recursive,
×
64
            ));
×
65
        }
4✔
66

×
67
        for table in &with.cte_tables {
10✔
68
            if table.from.is_some() {
6✔
69
                return Err(PipelineError::UnsupportedSqlError(
×
70
                    UnsupportedSqlError::CteFromError,
×
71
                ));
×
72
            }
6✔
73
            let table_name = table.alias.name.to_string();
6✔
74
            query_to_pipeline(
6✔
75
                &NameOrAlias(table_name.clone(), Some(table_name)),
6✔
76
                &table.query,
6✔
77
                pipeline,
6✔
78
                pipeline_map,
6✔
79
                true,
6✔
80
            )?;
6✔
81
        }
×
82
    };
86✔
83

×
84
    match *query.body.clone() {
90✔
85
        SetExpr::Select(select) => {
90✔
86
            select_to_pipeline(processor_name, *select, pipeline, pipeline_map, stateful)?;
90✔
87
        }
×
88
        SetExpr::Query(query) => {
×
89
            let query_name = format!("subquery_{}", uuid::Uuid::new_v4());
×
90

×
91
            query_to_pipeline(
×
92
                &NameOrAlias(query_name, None),
×
93
                &query,
×
94
                pipeline,
×
95
                pipeline_map,
×
96
                stateful,
×
97
            )?
×
98
        }
×
99
        _ => {
×
100
            return Err(PipelineError::UnsupportedSqlError(
×
101
                UnsupportedSqlError::SelectOnlyError,
×
102
            ))
×
103
        }
×
104
    };
×
105
    Ok(())
90✔
106
}
90✔
107

×
108
fn select_to_pipeline(
90✔
109
    processor_name: &NameOrAlias,
90✔
110
    select: Select,
90✔
111
    pipeline: &mut AppPipeline,
90✔
112
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
90✔
113
    stateful: bool,
90✔
114
) -> Result<(), PipelineError> {
90✔
115
    // FROM clause
90✔
116
    if select.from.len() != 1 {
90✔
117
        return Err(InvalidQuery(
×
118
            "FROM clause doesn't support \"Comma Syntax\"".to_string(),
×
119
        ));
×
120
    }
90✔
121

×
122
    let input_tables = get_input_tables(&select.from[0], pipeline, pipeline_map)?;
90✔
123

×
124
    let product = ProductProcessorFactory::new(input_tables.clone());
90✔
125

×
126
    let input_endpoints = get_entry_points(&input_tables, pipeline_map)?;
90✔
127

×
128
    let gen_product_name = format!("product_{}", uuid::Uuid::new_v4());
90✔
129
    let gen_agg_name = format!("agg_{}", uuid::Uuid::new_v4());
90✔
130
    let gen_selection_name = format!("select_{}", uuid::Uuid::new_v4());
90✔
131
    pipeline.add_processor(Arc::new(product), &gen_product_name, input_endpoints);
90✔
132

90✔
133
    let input_names = get_input_names(&input_tables);
90✔
134
    for (port_index, table_name) in input_names.iter().enumerate() {
97✔
135
        if let Some((processor_name, processor_port)) = pipeline_map.get(&table_name.0) {
97✔
136
            pipeline.connect_nodes(
8✔
137
                processor_name,
8✔
138
                Some(*processor_port),
8✔
139
                &gen_product_name,
8✔
140
                Some(port_index as PortHandle),
8✔
141
            )?;
8✔
142
        }
89✔
143
    }
144

145
    let aggregation =
90✔
146
        AggregationProcessorFactory::new(select.projection.clone(), select.group_by, stateful);
90✔
147

90✔
148
    pipeline.add_processor(Arc::new(aggregation), &gen_agg_name, vec![]);
90✔
149

150
    // Where clause
×
151
    if let Some(selection) = select.selection {
90✔
152
        let selection = SelectionProcessorFactory::new(selection);
44✔
153
        // first_node_name = String::from("selection");
44✔
154

44✔
155
        pipeline.add_processor(Arc::new(selection), &gen_selection_name, vec![]);
44✔
156

44✔
157
        pipeline.connect_nodes(
44✔
158
            &gen_product_name,
44✔
159
            Some(DEFAULT_PORT_HANDLE),
44✔
160
            &gen_selection_name,
44✔
161
            Some(DEFAULT_PORT_HANDLE),
44✔
162
        )?;
44✔
163

164
        pipeline.connect_nodes(
44✔
165
            &gen_selection_name,
44✔
166
            Some(DEFAULT_PORT_HANDLE),
44✔
167
            &gen_agg_name,
44✔
168
            Some(DEFAULT_PORT_HANDLE),
44✔
169
        )?;
44✔
170
    } else {
×
171
        pipeline.connect_nodes(
46✔
172
            &gen_product_name,
46✔
173
            Some(DEFAULT_PORT_HANDLE),
46✔
174
            &gen_agg_name,
46✔
175
            Some(DEFAULT_PORT_HANDLE),
46✔
176
        )?;
46✔
177
    }
×
178

×
179
    pipeline_map.insert(
90✔
180
        processor_name.0.clone(),
90✔
181
        (gen_agg_name, DEFAULT_PORT_HANDLE),
90✔
182
    );
90✔
183

90✔
184
    Ok(())
90✔
185
}
90✔
186

×
187
/// Returns a vector of input port handles and relative table name
×
188
///
×
189
/// # Errors
×
190
///
×
191
/// This function will return an error if it's not possible to get an input name.
×
192
pub fn get_input_tables(
90✔
193
    from: &TableWithJoins,
90✔
194
    pipeline: &mut AppPipeline,
90✔
195
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
90✔
196
) -> Result<IndexedTabelWithJoins, PipelineError> {
90✔
197
    let mut input_tables = vec![];
90✔
198

199
    let name = get_from_source(&from.relation, pipeline, pipeline_map)?;
90✔
200
    input_tables.insert(0, name.clone());
90✔
201
    let mut joins = vec![];
90✔
202

203
    for (index, join) in from.joins.iter().enumerate() {
90✔
204
        let input_name = get_from_source(&join.relation, pipeline, pipeline_map)?;
7✔
205
        joins.push((input_name.clone(), join.clone()));
7✔
206
        input_tables.insert(index + 1, input_name);
7✔
207
    }
208

209
    Ok(IndexedTabelWithJoins {
90✔
210
        relation: (name, from.relation.clone()),
90✔
211
        joins,
90✔
212
    })
90✔
213
}
90✔
214

215
pub fn get_input_names(input_tables: &IndexedTabelWithJoins) -> Vec<NameOrAlias> {
408✔
216
    let mut input_names = vec![];
408✔
217
    input_names.push(input_tables.relation.0.clone());
408✔
218

219
    for join in &input_tables.joins {
422✔
220
        input_names.push(join.0.clone());
14✔
221
    }
14✔
222
    input_names
408✔
223
}
408✔
224
pub fn get_entry_points(
90✔
225
    input_tables: &IndexedTabelWithJoins,
90✔
226
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
90✔
227
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
90✔
228
    let mut endpoints = vec![];
90✔
229

90✔
230
    let input_names = get_input_names(input_tables);
90✔
231

232
    for (input_port, table) in input_names.iter().enumerate() {
97✔
233
        let name = table.0.clone();
97✔
234
        if !pipeline_map.contains_key(&name) {
97✔
235
            endpoints.push(PipelineEntryPoint::new(
89✔
236
                AppSourceId::new(name, None),
89✔
237
                input_port as PortHandle,
89✔
238
            ));
89✔
239
        }
89✔
240
    }
241

242
    Ok(endpoints)
90✔
243
}
90✔
244

245
pub fn get_from_source(
97✔
246
    relation: &TableFactor,
97✔
247
    pipeline: &mut AppPipeline,
97✔
248
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
97✔
249
) -> Result<NameOrAlias, PipelineError> {
97✔
250
    match relation {
97✔
251
        TableFactor::Table { name, alias, .. } => {
95✔
252
            let input_name = name
95✔
253
                .0
95✔
254
                .iter()
95✔
255
                .map(normalize_ident)
95✔
256
                .collect::<Vec<String>>()
95✔
257
                .join(".");
95✔
258
            let alias_name = alias
95✔
259
                .as_ref()
95✔
260
                .map(|a| fullname_from_ident(&[a.name.clone()]));
95✔
261

95✔
262
            Ok(NameOrAlias(input_name, alias_name))
95✔
263
        }
264
        TableFactor::Derived {
265
            lateral: _,
266
            subquery,
2✔
267
            alias,
2✔
268
        } => {
2✔
269
            let name = format!("derived_{}", uuid::Uuid::new_v4());
2✔
270
            let alias_name = alias
2✔
271
                .as_ref()
2✔
272
                .map(|alias_ident| fullname_from_ident(&[alias_ident.name.clone()]));
2✔
273

2✔
274
            let name_or = NameOrAlias(name, alias_name);
2✔
275
            query_to_pipeline(&name_or, subquery, pipeline, pipeline_map, true)?;
2✔
276

277
            Ok(name_or)
2✔
278
        }
279
        _ => Err(PipelineError::UnsupportedSqlError(
×
280
            UnsupportedSqlError::JoinTable,
×
281
        )),
×
282
    }
283
}
97✔
284

285
#[cfg(test)]
286
mod tests {
287
    use super::statement_to_pipeline;
288

289
    #[test]
1✔
290
    fn sql_logic_test_1() {
1✔
291
        let statements: Vec<&str> = vec![
1✔
292
            r#"
1✔
293
            SELECT
1✔
294
            a.name as "Genre",
1✔
295
                SUM(amount) as "Gross Revenue(in $)"
1✔
296
            FROM
1✔
297
            (
1✔
298
                SELECT
1✔
299
                c.name, f.title, p.amount
1✔
300
            FROM film f
1✔
301
            LEFT JOIN film_category fc
1✔
302
            ON fc.film_id = f.film_id
1✔
303
            LEFT JOIN category c
1✔
304
            ON fc.category_id = c.category_id
1✔
305
            LEFT JOIN inventory i
1✔
306
            ON i.film_id = f.film_id
1✔
307
            LEFT JOIN rental r
1✔
308
            ON r.inventory_id = i.inventory_id
1✔
309
            LEFT JOIN payment p
1✔
310
            ON p.rental_id = r.rental_id
1✔
311
            WHERE p.amount IS NOT NULL
1✔
312
            ) a
1✔
313

1✔
314
            GROUP BY name
1✔
315
            ORDER BY sum(amount) desc
1✔
316
            LIMIT 5;
1✔
317
            "#,
1✔
318
            r#"
1✔
319
                SELECT
1✔
320
                c.name, f.title, p.amount
1✔
321
            FROM film f
1✔
322
            LEFT JOIN film_category fc
1✔
323
            "#,
1✔
324
            r#"
1✔
325
            WITH tbl as (select id from a)
1✔
326
            select id from tbl
1✔
327
            "#,
1✔
328
            r#"
1✔
329
            WITH tbl as (select id from  a),
1✔
330
            tbl2 as (select id from tbl)
1✔
331
            select id from tbl2
1✔
332
            "#,
1✔
333
            r#"
1✔
334
            WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
335
            cte_table2 as (select id_ct1 from cte_table1)
1✔
336
            select id_ct2 from cte_table2
1✔
337
            "#,
1✔
338
            r#"
1✔
339
                with tbl as (select id, ticker from stocks)
1✔
340
                select tbl.id from  stocks join tbl on tbl.id = stocks.id;
1✔
341
            "#,
1✔
342
        ];
1✔
343
        for sql in statements {
7✔
344
            let _pipeline = statement_to_pipeline(sql).unwrap();
6✔
345
        }
6✔
346
    }
1✔
347
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc