• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4061292542

pending completion
4061292542

Pull #729

github

GitHub
Merge 069171d20 into de98caa91
Pull Request #729: feat: Implement multi-way JOIN

1356 of 1356 new or added lines in 10 files covered. (100.0%)

24817 of 38526 relevant lines covered (64.42%)

39509.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.78
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::selection::factory::SelectionProcessorFactory;
5
use dozer_core::dag::app::AppPipeline;
6
use dozer_core::dag::app::PipelineEntryPoint;
7
use dozer_core::dag::appsource::AppSourceId;
8
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
9
use dozer_core::dag::node::PortHandle;
10
use sqlparser::ast::{Join, TableFactor, TableWithJoins};
11
use sqlparser::{
12
    ast::{Query, Select, SetExpr, Statement},
13
    dialect::AnsiDialect,
14
    parser::Parser,
15
};
16
use std::collections::HashMap;
17
use std::sync::Arc;
18

19
use super::errors::UnsupportedSqlError;
20
use super::expression::builder::{fullname_from_ident, normalize_ident, NameOrAlias};
21
use super::product::factory::FromProcessorFactory;
22

×
23
#[derive(Debug, Clone, Default)]
1,856✔
24
pub struct SchemaSQLContext {}
25

26
/// The struct contains some contexts during query to pipeline.
×
27
#[derive(Debug, Clone, Default)]
124✔
28
pub struct QueryContext {
29
    pub pipeline_map: HashMap<String, (String, PortHandle)>,
30
}
31

×
32
#[derive(Debug, Clone)]
153✔
33
pub struct IndexedTabelWithJoins {
34
    pub relation: (NameOrAlias, TableFactor),
35
    pub joins: Vec<(NameOrAlias, Join)>,
36
}
37

×
38
pub fn statement_to_pipeline(
124✔
39
    sql: &str,
124✔
40
) -> Result<(AppPipeline<SchemaSQLContext>, (String, PortHandle)), PipelineError> {
124✔
41
    let dialect = AnsiDialect {};
124✔
42
    let mut ctx = QueryContext::default();
124✔
43

124✔
44
    let ast = Parser::parse_sql(&dialect, sql).unwrap();
124✔
45
    let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
124✔
46
    let statement = ast.get(0).expect("First statement is missing").to_owned();
124✔
47

124✔
48
    let mut pipeline = AppPipeline::new();
124✔
49
    if let Statement::Query(query) = statement {
124✔
50
        query_to_pipeline(&query_name, &query, &mut pipeline, &mut ctx, false)?;
124✔
51
    };
×
52
    let node = ctx
124✔
53
        .pipeline_map
124✔
54
        .get(&query_name.0)
124✔
55
        .expect("query should have been initialized")
124✔
56
        .to_owned();
124✔
57
    Ok((pipeline, node))
124✔
58
}
124✔
59

×
60
fn query_to_pipeline(
153✔
61
    processor_name: &NameOrAlias,
153✔
62
    query: &Query,
153✔
63
    pipeline: &mut AppPipeline<SchemaSQLContext>,
153✔
64
    query_ctx: &mut QueryContext,
153✔
65
    stateful: bool,
153✔
66
) -> Result<(), PipelineError> {
153✔
67
    // return error if there is unsupported syntax
153✔
68
    if !query.order_by.is_empty() {
153✔
69
        return Err(PipelineError::UnsupportedSqlError(
×
70
            UnsupportedSqlError::OrderByError,
×
71
        ));
×
72
    }
153✔
73

153✔
74
    if query.limit.is_some() || query.offset.is_some() {
153✔
75
        return Err(PipelineError::UnsupportedSqlError(
×
76
            UnsupportedSqlError::LimitOffsetError,
×
77
        ));
×
78
    }
153✔
79

×
80
    // Attach the first pipeline if there is with clause
×
81
    if let Some(with) = &query.with {
153✔
82
        if with.recursive {
13✔
83
            return Err(PipelineError::UnsupportedSqlError(
×
84
                UnsupportedSqlError::Recursive,
×
85
            ));
×
86
        }
13✔
87

×
88
        for table in &with.cte_tables {
28✔
89
            if table.from.is_some() {
15✔
90
                return Err(PipelineError::UnsupportedSqlError(
×
91
                    UnsupportedSqlError::CteFromError,
×
92
                ));
×
93
            }
15✔
94
            let table_name = table.alias.name.to_string();
15✔
95
            if query_ctx.pipeline_map.contains_key(&table_name) {
15✔
96
                return Err(InvalidQuery(format!(
×
97
                    "WITH query name {table_name:?} specified more than once"
×
98
                )));
×
99
            }
15✔
100
            query_to_pipeline(
15✔
101
                &NameOrAlias(table_name.clone(), Some(table_name)),
15✔
102
                &table.query,
15✔
103
                pipeline,
15✔
104
                query_ctx,
15✔
105
                true,
15✔
106
            )?;
15✔
107
        }
×
108
    };
140✔
109

×
110
    match *query.body.clone() {
153✔
111
        SetExpr::Select(select) => {
153✔
112
            select_to_pipeline(processor_name, *select, pipeline, query_ctx, stateful)?;
153✔
113
        }
×
114
        SetExpr::Query(query) => {
×
115
            let query_name = format!("subquery_{}", uuid::Uuid::new_v4());
×
116
            let mut ctx = QueryContext::default();
×
117
            query_to_pipeline(
×
118
                &NameOrAlias(query_name, None),
×
119
                &query,
×
120
                pipeline,
×
121
                &mut ctx,
×
122
                stateful,
×
123
            )?
×
124
        }
125
        _ => {
×
126
            return Err(PipelineError::UnsupportedSqlError(
×
127
                UnsupportedSqlError::SelectOnlyError,
×
128
            ))
×
129
        }
130
    };
×
131
    Ok(())
153✔
132
}
153✔
133

×
134
fn select_to_pipeline(
153✔
135
    processor_name: &NameOrAlias,
153✔
136
    select: Select,
153✔
137
    pipeline: &mut AppPipeline<SchemaSQLContext>,
153✔
138
    query_ctx: &mut QueryContext,
153✔
139
    stateful: bool,
153✔
140
) -> Result<(), PipelineError> {
153✔
141
    // FROM clause
153✔
142
    if select.from.len() != 1 {
153✔
143
        return Err(PipelineError::UnsupportedSqlError(
×
144
            UnsupportedSqlError::FromCommaSyntax,
×
145
        ));
×
146
    }
153✔
147

×
148
    let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx)?;
153✔
149

×
150
    let product = FromProcessorFactory::new(input_tables.clone());
153✔
151

×
152
    let input_endpoints = get_entry_points(&input_tables, &mut query_ctx.pipeline_map)?;
153✔
153

×
154
    let gen_product_name = format!("product_{}", uuid::Uuid::new_v4());
153✔
155
    let gen_agg_name = format!("agg_{}", uuid::Uuid::new_v4());
153✔
156
    let gen_selection_name = format!("select_{}", uuid::Uuid::new_v4());
153✔
157
    pipeline.add_processor(Arc::new(product), &gen_product_name, input_endpoints);
153✔
158

153✔
159
    let input_names = get_input_names(&input_tables);
153✔
160
    for (port_index, table_name) in input_names.iter().enumerate() {
184✔
161
        if let Some((processor_name, processor_port)) = query_ctx.pipeline_map.get(&table_name.0) {
184✔
162
            pipeline.connect_nodes(
29✔
163
                processor_name,
29✔
164
                Some(*processor_port),
29✔
165
                &gen_product_name,
29✔
166
                Some(port_index as PortHandle),
29✔
167
            )?;
29✔
168
        }
155✔
169
    }
170

×
171
    let aggregation =
153✔
172
        AggregationProcessorFactory::new(select.projection.clone(), select.group_by, stateful);
153✔
173

153✔
174
    pipeline.add_processor(Arc::new(aggregation), &gen_agg_name, vec![]);
153✔
175

×
176
    // Where clause
×
177
    if let Some(selection) = select.selection {
153✔
178
        let selection = SelectionProcessorFactory::new(selection);
44✔
179

44✔
180
        pipeline.add_processor(Arc::new(selection), &gen_selection_name, vec![]);
44✔
181

44✔
182
        pipeline.connect_nodes(
44✔
183
            &gen_product_name,
44✔
184
            Some(DEFAULT_PORT_HANDLE),
44✔
185
            &gen_selection_name,
44✔
186
            Some(DEFAULT_PORT_HANDLE),
44✔
187
        )?;
44✔
188

×
189
        pipeline.connect_nodes(
44✔
190
            &gen_selection_name,
44✔
191
            Some(DEFAULT_PORT_HANDLE),
44✔
192
            &gen_agg_name,
44✔
193
            Some(DEFAULT_PORT_HANDLE),
44✔
194
        )?;
44✔
195
    } else {
×
196
        pipeline.connect_nodes(
109✔
197
            &gen_product_name,
109✔
198
            Some(DEFAULT_PORT_HANDLE),
109✔
199
            &gen_agg_name,
109✔
200
            Some(DEFAULT_PORT_HANDLE),
109✔
201
        )?;
109✔
202
    }
×
203

×
204
    query_ctx.pipeline_map.insert(
153✔
205
        processor_name.0.clone(),
153✔
206
        (gen_agg_name, DEFAULT_PORT_HANDLE),
153✔
207
    );
153✔
208

153✔
209
    Ok(())
153✔
210
}
153✔
211

×
212
/// Returns a vector of input port handles and relative table name
×
213
///
×
214
/// # Errors
215
///
216
/// This function will return an error if it's not possible to get an input name.
×
217
pub fn get_input_tables(
153✔
218
    from: &TableWithJoins,
153✔
219
    pipeline: &mut AppPipeline<SchemaSQLContext>,
153✔
220
    query_ctx: &mut QueryContext,
153✔
221
) -> Result<IndexedTabelWithJoins, PipelineError> {
153✔
222
    let name = get_from_source(&from.relation, pipeline, query_ctx)?;
153✔
223
    let mut joins = vec![];
153✔
224

×
225
    for join in from.joins.iter() {
153✔
226
        let input_name = get_from_source(&join.relation, pipeline, query_ctx)?;
31✔
227
        joins.push((input_name.clone(), join.clone()));
31✔
228
    }
×
229

×
230
    Ok(IndexedTabelWithJoins {
153✔
231
        relation: (name, from.relation.clone()),
153✔
232
        joins,
153✔
233
    })
153✔
234
}
153✔
235

×
236
pub fn get_input_names(input_tables: &IndexedTabelWithJoins) -> Vec<NameOrAlias> {
1,023✔
237
    let mut input_names = vec![];
1,023✔
238
    input_names.push(input_tables.relation.0.clone());
1,023✔
239

×
240
    for join in &input_tables.joins {
1,241✔
241
        input_names.push(join.0.clone());
218✔
242
    }
218✔
243
    input_names
1,023✔
244
}
1,023✔
245
pub fn get_entry_points(
153✔
246
    input_tables: &IndexedTabelWithJoins,
153✔
247
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
153✔
248
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
153✔
249
    let mut endpoints = vec![];
153✔
250

153✔
251
    let input_names = get_input_names(input_tables);
153✔
252

×
253
    for (input_port, table) in input_names.iter().enumerate() {
184✔
254
        let name = table.0.clone();
184✔
255
        if !pipeline_map.contains_key(&name) {
184✔
256
            endpoints.push(PipelineEntryPoint::new(
155✔
257
                AppSourceId::new(name, None),
155✔
258
                input_port as PortHandle,
155✔
259
            ));
155✔
260
        }
155✔
261
    }
×
262

×
263
    Ok(endpoints)
153✔
264
}
153✔
265

×
266
pub fn get_from_source(
184✔
267
    relation: &TableFactor,
184✔
268
    pipeline: &mut AppPipeline<SchemaSQLContext>,
184✔
269
    query_ctx: &mut QueryContext,
184✔
270
) -> Result<NameOrAlias, PipelineError> {
184✔
271
    match relation {
184✔
272
        TableFactor::Table { name, alias, .. } => {
170✔
273
            let input_name = name
170✔
274
                .0
170✔
275
                .iter()
170✔
276
                .map(normalize_ident)
170✔
277
                .collect::<Vec<String>>()
170✔
278
                .join(".");
170✔
279
            let alias_name = alias
170✔
280
                .as_ref()
170✔
281
                .map(|a| fullname_from_ident(&[a.name.clone()]));
170✔
282

170✔
283
            Ok(NameOrAlias(input_name, alias_name))
170✔
284
        }
×
285
        TableFactor::Derived {
×
286
            lateral: _,
×
287
            subquery,
14✔
288
            alias,
14✔
289
        } => {
14✔
290
            let name = format!("derived_{}", uuid::Uuid::new_v4());
14✔
291
            let alias_name = alias
14✔
292
                .as_ref()
14✔
293
                .map(|alias_ident| fullname_from_ident(&[alias_ident.name.clone()]));
14✔
294

14✔
295
            let name_or = NameOrAlias(name, alias_name);
14✔
296
            query_to_pipeline(&name_or, subquery, pipeline, query_ctx, false)?;
14✔
297

×
298
            Ok(name_or)
14✔
299
        }
×
300
        _ => Err(PipelineError::UnsupportedSqlError(
×
301
            UnsupportedSqlError::JoinTable,
×
302
        )),
×
303
    }
×
304
}
184✔
305

×
306
#[cfg(test)]
×
307
mod tests {
×
308
    use super::statement_to_pipeline;
×
309

×
310
    #[test]
1✔
311
    fn parse_sql_pipeline() {
1✔
312
        let statements: Vec<&str> = vec![
1✔
313
            r#"
1✔
314
                SELECT
1✔
315
                a.name as "Genre",
1✔
316
                    SUM(amount) as "Gross Revenue(in $)"
1✔
317
                FROM
1✔
318
                (
1✔
319
                    SELECT
1✔
320
                    c.name, f.title, p.amount
1✔
321
                FROM film f
1✔
322
                LEFT JOIN film_category fc
1✔
323
                ON fc.film_id = f.film_id
1✔
324
                LEFT JOIN category c
1✔
325
                ON fc.category_id = c.category_id
1✔
326
                LEFT JOIN inventory i
1✔
327
                ON i.film_id = f.film_id
1✔
328
                LEFT JOIN rental r
1✔
329
                ON r.inventory_id = i.inventory_id
1✔
330
                LEFT JOIN payment p
1✔
331
                ON p.rental_id = r.rental_id
1✔
332
                WHERE p.amount IS NOT NULL
1✔
333
                ) a
1✔
334

1✔
335
                GROUP BY name;
1✔
336
            "#,
1✔
337
            r#"
1✔
338
                SELECT
1✔
339
                c.name, f.title, p.amount
1✔
340
            FROM film f
1✔
341
            LEFT JOIN film_category fc
1✔
342
            "#,
1✔
343
            r#"
1✔
344
            WITH tbl as (select id from a)
1✔
345
            select id from tbl
1✔
346
            "#,
1✔
347
            r#"
1✔
348
            WITH tbl as (select id from  a),
1✔
349
            tbl2 as (select id from tbl)
1✔
350
            select id from tbl2
1✔
351
            "#,
1✔
352
            r#"
1✔
353
            WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
354
            cte_table2 as (select id_ct1 from cte_table1)
1✔
355
            select id_ct2 from cte_table2
1✔
356
            "#,
1✔
357
            r#"
1✔
358
                with tbl as (select id, ticker from stocks)
1✔
359
                select tbl.id from  stocks join tbl on tbl.id = stocks.id;
1✔
360
            "#,
1✔
361
        ];
1✔
362
        for sql in statements {
7✔
363
            let _pipeline = statement_to_pipeline(sql).unwrap();
6✔
364
        }
6✔
365
    }
1✔
366
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc