• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4060960250

pending completion
4060960250

Pull #729

github

GitHub
Merge 25e0159b7 into de98caa91
Pull Request #729: feat: Implement multi-way JOIN

1356 of 1356 new or added lines in 10 files covered. (100.0%)

24359 of 38526 relevant lines covered (63.23%)

37090.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.78
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::selection::factory::SelectionProcessorFactory;
5
use dozer_core::dag::app::AppPipeline;
6
use dozer_core::dag::app::PipelineEntryPoint;
7
use dozer_core::dag::appsource::AppSourceId;
8
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
9
use dozer_core::dag::node::PortHandle;
10
use sqlparser::ast::{Join, TableFactor, TableWithJoins};
11
use sqlparser::{
12
    ast::{Query, Select, SetExpr, Statement},
13
    dialect::AnsiDialect,
14
    parser::Parser,
15
};
16
use std::collections::HashMap;
17
use std::sync::Arc;
18

19
use super::errors::UnsupportedSqlError;
20
use super::expression::builder::{fullname_from_ident, normalize_ident, NameOrAlias};
21
use super::product::factory::FromProcessorFactory;
22

×
23
#[derive(Debug, Clone, Default)]
1,560✔
24
pub struct SchemaSQLContext {}
25

26
/// The struct contains some contexts during query to pipeline.
×
27
#[derive(Debug, Clone, Default)]
106✔
28
pub struct QueryContext {
29
    pub pipeline_map: HashMap<String, (String, PortHandle)>,
30
}
31

×
32
#[derive(Debug, Clone)]
135✔
33
pub struct IndexedTabelWithJoins {
34
    pub relation: (NameOrAlias, TableFactor),
35
    pub joins: Vec<(NameOrAlias, Join)>,
36
}
37

×
38
pub fn statement_to_pipeline(
106✔
39
    sql: &str,
106✔
40
) -> Result<(AppPipeline<SchemaSQLContext>, (String, PortHandle)), PipelineError> {
106✔
41
    let dialect = AnsiDialect {};
106✔
42
    let mut ctx = QueryContext::default();
106✔
43

106✔
44
    let ast = Parser::parse_sql(&dialect, sql).unwrap();
106✔
45
    let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
106✔
46
    let statement = ast.get(0).expect("First statement is missing").to_owned();
106✔
47

106✔
48
    let mut pipeline = AppPipeline::new();
106✔
49
    if let Statement::Query(query) = statement {
106✔
50
        query_to_pipeline(&query_name, &query, &mut pipeline, &mut ctx, false)?;
106✔
51
    };
×
52
    let node = ctx
106✔
53
        .pipeline_map
106✔
54
        .get(&query_name.0)
106✔
55
        .expect("query should have been initialized")
106✔
56
        .to_owned();
106✔
57
    Ok((pipeline, node))
106✔
58
}
106✔
59

×
60
fn query_to_pipeline(
135✔
61
    processor_name: &NameOrAlias,
135✔
62
    query: &Query,
135✔
63
    pipeline: &mut AppPipeline<SchemaSQLContext>,
135✔
64
    query_ctx: &mut QueryContext,
135✔
65
    stateful: bool,
135✔
66
) -> Result<(), PipelineError> {
135✔
67
    // return error if there is unsupported syntax
135✔
68
    if !query.order_by.is_empty() {
135✔
69
        return Err(PipelineError::UnsupportedSqlError(
×
70
            UnsupportedSqlError::OrderByError,
×
71
        ));
×
72
    }
135✔
73

135✔
74
    if query.limit.is_some() || query.offset.is_some() {
135✔
75
        return Err(PipelineError::UnsupportedSqlError(
×
76
            UnsupportedSqlError::LimitOffsetError,
×
77
        ));
×
78
    }
135✔
79

×
80
    // Attach the first pipeline if there is with clause
×
81
    if let Some(with) = &query.with {
135✔
82
        if with.recursive {
13✔
83
            return Err(PipelineError::UnsupportedSqlError(
×
84
                UnsupportedSqlError::Recursive,
×
85
            ));
×
86
        }
13✔
87

×
88
        for table in &with.cte_tables {
28✔
89
            if table.from.is_some() {
15✔
90
                return Err(PipelineError::UnsupportedSqlError(
×
91
                    UnsupportedSqlError::CteFromError,
×
92
                ));
×
93
            }
15✔
94
            let table_name = table.alias.name.to_string();
15✔
95
            if query_ctx.pipeline_map.contains_key(&table_name) {
15✔
96
                return Err(InvalidQuery(format!(
×
97
                    "WITH query name {table_name:?} specified more than once"
×
98
                )));
×
99
            }
15✔
100
            query_to_pipeline(
15✔
101
                &NameOrAlias(table_name.clone(), Some(table_name)),
15✔
102
                &table.query,
15✔
103
                pipeline,
15✔
104
                query_ctx,
15✔
105
                true,
15✔
106
            )?;
15✔
107
        }
×
108
    };
122✔
109

×
110
    match *query.body.clone() {
135✔
111
        SetExpr::Select(select) => {
135✔
112
            select_to_pipeline(processor_name, *select, pipeline, query_ctx, stateful)?;
135✔
113
        }
×
114
        SetExpr::Query(query) => {
×
115
            let query_name = format!("subquery_{}", uuid::Uuid::new_v4());
×
116
            let mut ctx = QueryContext::default();
×
117
            query_to_pipeline(
×
118
                &NameOrAlias(query_name, None),
×
119
                &query,
×
120
                pipeline,
×
121
                &mut ctx,
×
122
                stateful,
×
123
            )?
×
124
        }
125
        _ => {
×
126
            return Err(PipelineError::UnsupportedSqlError(
×
127
                UnsupportedSqlError::SelectOnlyError,
×
128
            ))
×
129
        }
130
    };
×
131
    Ok(())
135✔
132
}
135✔
133

×
134
fn select_to_pipeline(
135✔
135
    processor_name: &NameOrAlias,
135✔
136
    select: Select,
135✔
137
    pipeline: &mut AppPipeline<SchemaSQLContext>,
135✔
138
    query_ctx: &mut QueryContext,
135✔
139
    stateful: bool,
135✔
140
) -> Result<(), PipelineError> {
135✔
141
    // FROM clause
135✔
142
    if select.from.len() != 1 {
135✔
143
        return Err(PipelineError::UnsupportedSqlError(
×
144
            UnsupportedSqlError::FromCommaSyntax,
×
145
        ));
×
146
    }
135✔
147

×
148
    let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx)?;
135✔
149

×
150
    let product = FromProcessorFactory::new(input_tables.clone());
135✔
151

×
152
    let input_endpoints = get_entry_points(&input_tables, &mut query_ctx.pipeline_map)?;
135✔
153

×
154
    let gen_product_name = format!("product_{}", uuid::Uuid::new_v4());
135✔
155
    let gen_agg_name = format!("agg_{}", uuid::Uuid::new_v4());
135✔
156
    let gen_selection_name = format!("select_{}", uuid::Uuid::new_v4());
135✔
157
    pipeline.add_processor(Arc::new(product), &gen_product_name, input_endpoints);
135✔
158

135✔
159
    let input_names = get_input_names(&input_tables);
135✔
160
    for (port_index, table_name) in input_names.iter().enumerate() {
142✔
161
        if let Some((processor_name, processor_port)) = query_ctx.pipeline_map.get(&table_name.0) {
142✔
162
            pipeline.connect_nodes(
29✔
163
                processor_name,
29✔
164
                Some(*processor_port),
29✔
165
                &gen_product_name,
29✔
166
                Some(port_index as PortHandle),
29✔
167
            )?;
29✔
168
        }
113✔
169
    }
170

×
171
    let aggregation =
135✔
172
        AggregationProcessorFactory::new(select.projection.clone(), select.group_by, stateful);
135✔
173

135✔
174
    pipeline.add_processor(Arc::new(aggregation), &gen_agg_name, vec![]);
135✔
175

×
176
    // Where clause
×
177
    if let Some(selection) = select.selection {
135✔
178
        let selection = SelectionProcessorFactory::new(selection);
44✔
179

44✔
180
        pipeline.add_processor(Arc::new(selection), &gen_selection_name, vec![]);
44✔
181

44✔
182
        pipeline.connect_nodes(
44✔
183
            &gen_product_name,
44✔
184
            Some(DEFAULT_PORT_HANDLE),
44✔
185
            &gen_selection_name,
44✔
186
            Some(DEFAULT_PORT_HANDLE),
44✔
187
        )?;
44✔
188

×
189
        pipeline.connect_nodes(
44✔
190
            &gen_selection_name,
44✔
191
            Some(DEFAULT_PORT_HANDLE),
44✔
192
            &gen_agg_name,
44✔
193
            Some(DEFAULT_PORT_HANDLE),
44✔
194
        )?;
44✔
195
    } else {
×
196
        pipeline.connect_nodes(
91✔
197
            &gen_product_name,
91✔
198
            Some(DEFAULT_PORT_HANDLE),
91✔
199
            &gen_agg_name,
91✔
200
            Some(DEFAULT_PORT_HANDLE),
91✔
201
        )?;
91✔
202
    }
×
203

×
204
    query_ctx.pipeline_map.insert(
135✔
205
        processor_name.0.clone(),
135✔
206
        (gen_agg_name, DEFAULT_PORT_HANDLE),
135✔
207
    );
135✔
208

135✔
209
    Ok(())
135✔
210
}
135✔
211

×
212
/// Returns a vector of input port handles and relative table name
×
213
///
×
214
/// # Errors
215
///
216
/// This function will return an error if it's not possible to get an input name.
×
217
pub fn get_input_tables(
135✔
218
    from: &TableWithJoins,
135✔
219
    pipeline: &mut AppPipeline<SchemaSQLContext>,
135✔
220
    query_ctx: &mut QueryContext,
135✔
221
) -> Result<IndexedTabelWithJoins, PipelineError> {
135✔
222
    let name = get_from_source(&from.relation, pipeline, query_ctx)?;
135✔
223
    let mut joins = vec![];
135✔
224

×
225
    for join in from.joins.iter() {
135✔
226
        let input_name = get_from_source(&join.relation, pipeline, query_ctx)?;
7✔
227
        joins.push((input_name.clone(), join.clone()));
7✔
228
    }
×
229

×
230
    Ok(IndexedTabelWithJoins {
135✔
231
        relation: (name, from.relation.clone()),
135✔
232
        joins,
135✔
233
    })
135✔
234
}
135✔
235

×
236
pub fn get_input_names(input_tables: &IndexedTabelWithJoins) -> Vec<NameOrAlias> {
873✔
237
    let mut input_names = vec![];
873✔
238
    input_names.push(input_tables.relation.0.clone());
873✔
239

×
240
    for join in &input_tables.joins {
887✔
241
        input_names.push(join.0.clone());
14✔
242
    }
14✔
243
    input_names
873✔
244
}
873✔
245
pub fn get_entry_points(
135✔
246
    input_tables: &IndexedTabelWithJoins,
135✔
247
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
135✔
248
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
135✔
249
    let mut endpoints = vec![];
135✔
250

135✔
251
    let input_names = get_input_names(input_tables);
135✔
252

×
253
    for (input_port, table) in input_names.iter().enumerate() {
142✔
254
        let name = table.0.clone();
142✔
255
        if !pipeline_map.contains_key(&name) {
142✔
256
            endpoints.push(PipelineEntryPoint::new(
113✔
257
                AppSourceId::new(name, None),
113✔
258
                input_port as PortHandle,
113✔
259
            ));
113✔
260
        }
113✔
261
    }
×
262

×
263
    Ok(endpoints)
135✔
264
}
135✔
265

×
266
pub fn get_from_source(
142✔
267
    relation: &TableFactor,
142✔
268
    pipeline: &mut AppPipeline<SchemaSQLContext>,
142✔
269
    query_ctx: &mut QueryContext,
142✔
270
) -> Result<NameOrAlias, PipelineError> {
142✔
271
    match relation {
142✔
272
        TableFactor::Table { name, alias, .. } => {
128✔
273
            let input_name = name
128✔
274
                .0
128✔
275
                .iter()
128✔
276
                .map(normalize_ident)
128✔
277
                .collect::<Vec<String>>()
128✔
278
                .join(".");
128✔
279
            let alias_name = alias
128✔
280
                .as_ref()
128✔
281
                .map(|a| fullname_from_ident(&[a.name.clone()]));
128✔
282

128✔
283
            Ok(NameOrAlias(input_name, alias_name))
128✔
284
        }
×
285
        TableFactor::Derived {
×
286
            lateral: _,
×
287
            subquery,
14✔
288
            alias,
14✔
289
        } => {
14✔
290
            let name = format!("derived_{}", uuid::Uuid::new_v4());
14✔
291
            let alias_name = alias
14✔
292
                .as_ref()
14✔
293
                .map(|alias_ident| fullname_from_ident(&[alias_ident.name.clone()]));
14✔
294

14✔
295
            let name_or = NameOrAlias(name, alias_name);
14✔
296
            query_to_pipeline(&name_or, subquery, pipeline, query_ctx, false)?;
14✔
297

×
298
            Ok(name_or)
14✔
299
        }
×
300
        _ => Err(PipelineError::UnsupportedSqlError(
×
301
            UnsupportedSqlError::JoinTable,
×
302
        )),
×
303
    }
×
304
}
142✔
305

×
306
#[cfg(test)]
×
307
mod tests {
×
308
    use super::statement_to_pipeline;
×
309

×
310
    #[test]
1✔
311
    fn parse_sql_pipeline() {
1✔
312
        let statements: Vec<&str> = vec![
1✔
313
            r#"
1✔
314
                SELECT
1✔
315
                a.name as "Genre",
1✔
316
                    SUM(amount) as "Gross Revenue(in $)"
1✔
317
                FROM
1✔
318
                (
1✔
319
                    SELECT
1✔
320
                    c.name, f.title, p.amount
1✔
321
                FROM film f
1✔
322
                LEFT JOIN film_category fc
1✔
323
                ON fc.film_id = f.film_id
1✔
324
                LEFT JOIN category c
1✔
325
                ON fc.category_id = c.category_id
1✔
326
                LEFT JOIN inventory i
1✔
327
                ON i.film_id = f.film_id
1✔
328
                LEFT JOIN rental r
1✔
329
                ON r.inventory_id = i.inventory_id
1✔
330
                LEFT JOIN payment p
1✔
331
                ON p.rental_id = r.rental_id
1✔
332
                WHERE p.amount IS NOT NULL
1✔
333
                ) a
1✔
334

1✔
335
                GROUP BY name;
1✔
336
            "#,
1✔
337
            r#"
1✔
338
                SELECT
1✔
339
                c.name, f.title, p.amount
1✔
340
            FROM film f
1✔
341
            LEFT JOIN film_category fc
1✔
342
            "#,
1✔
343
            r#"
1✔
344
            WITH tbl as (select id from a)
1✔
345
            select id from tbl
1✔
346
            "#,
1✔
347
            r#"
1✔
348
            WITH tbl as (select id from  a),
1✔
349
            tbl2 as (select id from tbl)
1✔
350
            select id from tbl2
1✔
351
            "#,
1✔
352
            r#"
1✔
353
            WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
354
            cte_table2 as (select id_ct1 from cte_table1)
1✔
355
            select id_ct2 from cte_table2
1✔
356
            "#,
1✔
357
            r#"
1✔
358
                with tbl as (select id, ticker from stocks)
1✔
359
                select tbl.id from  stocks join tbl on tbl.id = stocks.id;
1✔
360
            "#,
1✔
361
        ];
1✔
362
        for sql in statements {
7✔
363
            let _pipeline = statement_to_pipeline(sql).unwrap();
6✔
364
        }
6✔
365
    }
1✔
366
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc