• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.72
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::selection::factory::SelectionProcessorFactory;
5
use dozer_core::dag::app::AppPipeline;
6
use dozer_core::dag::app::PipelineEntryPoint;
7
use dozer_core::dag::appsource::AppSourceId;
8
use dozer_core::dag::node::PortHandle;
9
use dozer_core::dag::DEFAULT_PORT_HANDLE;
10
use sqlparser::ast::{Join, TableFactor, TableWithJoins};
11
use sqlparser::{
12
    ast::{Query, Select, SetExpr, Statement},
13
    dialect::AnsiDialect,
14
    parser::Parser,
15
};
16
use std::collections::HashMap;
17
use std::sync::Arc;
18

19
use super::errors::UnsupportedSqlError;
20
use super::expression::builder::{fullname_from_ident, normalize_ident, NameOrAlias};
21
use super::product::factory::FromProcessorFactory;
22

23
#[derive(Debug, Clone, Default)]
1,516✔
24
pub struct SchemaSQLContext {}
25

26
/// The struct contains some contexts during query to pipeline.
27
#[derive(Debug, Clone, Default)]
109✔
28
pub struct QueryContext {
29
    pub pipeline_map: HashMap<String, (String, PortHandle)>,
30
}
31

32
#[derive(Debug, Clone)]
117✔
33
pub struct IndexedTableWithJoins {
34
    pub relation: (NameOrAlias, TableFactor),
35
    pub joins: Vec<(NameOrAlias, Join)>,
36
}
37

38
pub fn statement_to_pipeline(
109✔
39
    sql: &str,
109✔
40
) -> Result<(AppPipeline<SchemaSQLContext>, (String, PortHandle)), PipelineError> {
109✔
41
    let dialect = AnsiDialect {};
109✔
42
    let mut ctx = QueryContext::default();
109✔
43

109✔
44
    let ast = Parser::parse_sql(&dialect, sql).unwrap();
109✔
45
    let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
109✔
46
    let statement = ast.get(0).expect("First statement is missing").to_owned();
109✔
47

109✔
48
    let mut pipeline = AppPipeline::new();
109✔
49
    if let Statement::Query(query) = statement {
109✔
50
        query_to_pipeline(&query_name, &query, &mut pipeline, &mut ctx, false)?;
109✔
51
    };
×
52
    let node = ctx
109✔
53
        .pipeline_map
109✔
54
        .get(&query_name.0)
109✔
55
        .expect("query should have been initialized")
109✔
56
        .to_owned();
109✔
57
    Ok((pipeline, node))
109✔
58
}
109✔
59

60
fn query_to_pipeline(
117✔
61
    processor_name: &NameOrAlias,
117✔
62
    query: &Query,
117✔
63
    pipeline: &mut AppPipeline<SchemaSQLContext>,
117✔
64
    query_ctx: &mut QueryContext,
117✔
65
    stateful: bool,
117✔
66
) -> Result<(), PipelineError> {
117✔
67
    // return error if there is unsupported syntax
117✔
68
    if !query.order_by.is_empty() {
117✔
69
        return Err(PipelineError::UnsupportedSqlError(
×
70
            UnsupportedSqlError::OrderByError,
×
71
        ));
×
72
    }
117✔
73

117✔
74
    if query.limit.is_some() || query.offset.is_some() {
117✔
75
        return Err(PipelineError::UnsupportedSqlError(
×
76
            UnsupportedSqlError::LimitOffsetError,
×
77
        ));
×
78
    }
117✔
79

80
    // Attach the first pipeline if there is with clause
81
    if let Some(with) = &query.with {
117✔
82
        if with.recursive {
4✔
83
            return Err(PipelineError::UnsupportedSqlError(
×
84
                UnsupportedSqlError::Recursive,
×
85
            ));
×
86
        }
4✔
87

88
        for table in &with.cte_tables {
10✔
89
            if table.from.is_some() {
6✔
90
                return Err(PipelineError::UnsupportedSqlError(
×
91
                    UnsupportedSqlError::CteFromError,
×
92
                ));
×
93
            }
6✔
94
            let table_name = table.alias.name.to_string();
6✔
95
            if query_ctx.pipeline_map.contains_key(&table_name) {
6✔
96
                return Err(InvalidQuery(format!(
×
97
                    "WITH query name {table_name:?} specified more than once"
×
98
                )));
×
99
            }
6✔
100
            query_to_pipeline(
6✔
101
                &NameOrAlias(table_name.clone(), Some(table_name)),
6✔
102
                &table.query,
6✔
103
                pipeline,
6✔
104
                query_ctx,
6✔
105
                true,
6✔
106
            )?;
6✔
107
        }
108
    };
113✔
109

110
    match *query.body.clone() {
117✔
111
        SetExpr::Select(select) => {
117✔
112
            select_to_pipeline(processor_name, *select, pipeline, query_ctx, stateful)?;
117✔
113
        }
114
        SetExpr::Query(query) => {
×
115
            let query_name = format!("subquery_{}", uuid::Uuid::new_v4());
×
116
            let mut ctx = QueryContext::default();
×
117
            query_to_pipeline(
×
118
                &NameOrAlias(query_name, None),
×
119
                &query,
×
120
                pipeline,
×
121
                &mut ctx,
×
122
                stateful,
×
123
            )?
×
124
        }
125
        _ => {
126
            return Err(PipelineError::UnsupportedSqlError(
×
127
                UnsupportedSqlError::SelectOnlyError,
×
128
            ))
×
129
        }
130
    };
131
    Ok(())
117✔
132
}
117✔
133

134
fn select_to_pipeline(
117✔
135
    processor_name: &NameOrAlias,
117✔
136
    select: Select,
117✔
137
    pipeline: &mut AppPipeline<SchemaSQLContext>,
117✔
138
    query_ctx: &mut QueryContext,
117✔
139
    stateful: bool,
117✔
140
) -> Result<(), PipelineError> {
117✔
141
    // FROM clause
117✔
142
    if select.from.len() != 1 {
117✔
143
        return Err(PipelineError::UnsupportedSqlError(
×
144
            UnsupportedSqlError::FromCommaSyntax,
×
145
        ));
×
146
    }
117✔
147

148
    let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx)?;
117✔
149

150
    let product = FromProcessorFactory::new(input_tables.clone());
117✔
151

152
    let input_endpoints = get_entry_points(&input_tables, &mut query_ctx.pipeline_map)?;
117✔
153

154
    let gen_product_name = format!("product_{}", uuid::Uuid::new_v4());
117✔
155
    let gen_agg_name = format!("agg_{}", uuid::Uuid::new_v4());
117✔
156
    let gen_selection_name = format!("select_{}", uuid::Uuid::new_v4());
117✔
157
    pipeline.add_processor(Arc::new(product), &gen_product_name, input_endpoints);
117✔
158

117✔
159
    let input_names = get_input_names(&input_tables);
117✔
160
    for (port_index, table_name) in input_names.iter().enumerate() {
154✔
161
        if let Some((processor_name, processor_port)) = query_ctx.pipeline_map.get(&table_name.0) {
154✔
162
            pipeline.connect_nodes(
8✔
163
                processor_name,
8✔
164
                Some(*processor_port),
8✔
165
                &gen_product_name,
8✔
166
                Some(port_index as PortHandle),
8✔
167
            )?;
8✔
168
        }
146✔
169
    }
170

171
    let aggregation = AggregationProcessorFactory::new(select.clone(), stateful);
117✔
172

117✔
173
    pipeline.add_processor(Arc::new(aggregation), &gen_agg_name, vec![]);
117✔
174

×
175
    // Where clause
176
    if let Some(selection) = select.selection {
117✔
177
        let selection = SelectionProcessorFactory::new(selection);
44✔
178

44✔
179
        pipeline.add_processor(Arc::new(selection), &gen_selection_name, vec![]);
44✔
180

44✔
181
        pipeline.connect_nodes(
44✔
182
            &gen_product_name,
44✔
183
            Some(DEFAULT_PORT_HANDLE),
44✔
184
            &gen_selection_name,
44✔
185
            Some(DEFAULT_PORT_HANDLE),
44✔
186
        )?;
44✔
187

×
188
        pipeline.connect_nodes(
44✔
189
            &gen_selection_name,
44✔
190
            Some(DEFAULT_PORT_HANDLE),
44✔
191
            &gen_agg_name,
44✔
192
            Some(DEFAULT_PORT_HANDLE),
44✔
193
        )?;
44✔
194
    } else {
×
195
        pipeline.connect_nodes(
73✔
196
            &gen_product_name,
73✔
197
            Some(DEFAULT_PORT_HANDLE),
73✔
198
            &gen_agg_name,
73✔
199
            Some(DEFAULT_PORT_HANDLE),
73✔
200
        )?;
73✔
201
    }
×
202

203
    query_ctx.pipeline_map.insert(
117✔
204
        processor_name.0.clone(),
117✔
205
        (gen_agg_name, DEFAULT_PORT_HANDLE),
117✔
206
    );
117✔
207

117✔
208
    Ok(())
117✔
209
}
117✔
210

×
211
/// Returns a vector of input port handles and relative table name
212
///
213
/// # Errors
214
///
215
/// This function will return an error if it's not possible to get an input name.
216
pub fn get_input_tables(
117✔
217
    from: &TableWithJoins,
117✔
218
    pipeline: &mut AppPipeline<SchemaSQLContext>,
117✔
219
    query_ctx: &mut QueryContext,
117✔
220
) -> Result<IndexedTableWithJoins, PipelineError> {
117✔
221
    let name = get_from_source(&from.relation, pipeline, query_ctx)?;
117✔
222
    let mut joins = vec![];
117✔
223

×
224
    for join in from.joins.iter() {
117✔
225
        let input_name = get_from_source(&join.relation, pipeline, query_ctx)?;
37✔
226
        joins.push((input_name.clone(), join.clone()));
37✔
227
    }
×
228

229
    Ok(IndexedTableWithJoins {
117✔
230
        relation: (name, from.relation.clone()),
117✔
231
        joins,
117✔
232
    })
117✔
233
}
117✔
234

×
235
pub fn get_input_names(input_tables: &IndexedTableWithJoins) -> Vec<NameOrAlias> {
777✔
236
    let mut input_names = vec![];
777✔
237
    input_names.push(input_tables.relation.0.clone());
777✔
238

×
239
    for join in &input_tables.joins {
1,043✔
240
        input_names.push(join.0.clone());
266✔
241
    }
266✔
242
    input_names
777✔
243
}
777✔
244
pub fn get_entry_points(
117✔
245
    input_tables: &IndexedTableWithJoins,
117✔
246
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
117✔
247
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
117✔
248
    let mut endpoints = vec![];
117✔
249

117✔
250
    let input_names = get_input_names(input_tables);
117✔
251

×
252
    for (input_port, table) in input_names.iter().enumerate() {
154✔
253
        let name = table.0.clone();
154✔
254
        if !pipeline_map.contains_key(&name) {
154✔
255
            endpoints.push(PipelineEntryPoint::new(
146✔
256
                AppSourceId::new(name, None),
146✔
257
                input_port as PortHandle,
146✔
258
            ));
146✔
259
        }
146✔
260
    }
×
261

262
    Ok(endpoints)
117✔
263
}
117✔
264

×
265
pub fn get_from_source(
154✔
266
    relation: &TableFactor,
154✔
267
    pipeline: &mut AppPipeline<SchemaSQLContext>,
154✔
268
    query_ctx: &mut QueryContext,
154✔
269
) -> Result<NameOrAlias, PipelineError> {
154✔
270
    match relation {
154✔
271
        TableFactor::Table { name, alias, .. } => {
152✔
272
            let input_name = name
152✔
273
                .0
152✔
274
                .iter()
152✔
275
                .map(normalize_ident)
152✔
276
                .collect::<Vec<String>>()
152✔
277
                .join(".");
152✔
278
            let alias_name = alias
152✔
279
                .as_ref()
152✔
280
                .map(|a| fullname_from_ident(&[a.name.clone()]));
152✔
281

152✔
282
            Ok(NameOrAlias(input_name, alias_name))
152✔
283
        }
×
284
        TableFactor::Derived {
285
            lateral: _,
286
            subquery,
2✔
287
            alias,
2✔
288
        } => {
2✔
289
            let name = format!("derived_{}", uuid::Uuid::new_v4());
2✔
290
            let alias_name = alias
2✔
291
                .as_ref()
2✔
292
                .map(|alias_ident| fullname_from_ident(&[alias_ident.name.clone()]));
2✔
293

2✔
294
            let name_or = NameOrAlias(name, alias_name);
2✔
295
            query_to_pipeline(&name_or, subquery, pipeline, query_ctx, false)?;
2✔
296

×
297
            Ok(name_or)
2✔
298
        }
×
299
        _ => Err(PipelineError::UnsupportedSqlError(
×
300
            UnsupportedSqlError::JoinTable,
×
301
        )),
×
302
    }
×
303
}
154✔
304

×
305
#[cfg(test)]
306
mod tests {
307
    use super::statement_to_pipeline;
308

309
    #[test]
1✔
310
    fn parse_sql_pipeline() {
1✔
311
        let statements: Vec<&str> = vec![
1✔
312
            r#"
1✔
313
                SELECT
1✔
314
                    a.name as "Genre",
1✔
315
                    SUM(amount) as "Gross Revenue(in $)"
1✔
316
                FROM
1✔
317
                (
1✔
318
                    SELECT
1✔
319
                        c.name,
1✔
320
                        f.title,
1✔
321
                        p.amount
1✔
322
                    FROM film f
1✔
323
                    LEFT JOIN film_category fc
1✔
324
                        ON fc.film_id = f.film_id
1✔
325
                    LEFT JOIN category c
1✔
326
                        ON fc.category_id = c.category_id
1✔
327
                    LEFT JOIN inventory i
1✔
328
                        ON i.film_id = f.film_id
1✔
329
                    LEFT JOIN rental r
1✔
330
                        ON r.inventory_id = i.inventory_id
1✔
331
                    LEFT JOIN payment p
1✔
332
                        ON p.rental_id = r.rental_id
1✔
333
                    WHERE p.amount IS NOT NULL
1✔
334
                ) a
1✔
335

1✔
336
                GROUP BY name;
1✔
337
            "#,
1✔
338
            r#"
1✔
339
                SELECT
1✔
340
                    c.name,
1✔
341
                    f.title,
1✔
342
                    p.amount
1✔
343
                FROM film f
1✔
344
                LEFT JOIN film_category fc
1✔
345
                    "#,
1✔
346
            r#"
1✔
347
                WITH tbl as (select id from a)
1✔
348
                select id from tbl
1✔
349
                    "#,
1✔
350
            r#"
1✔
351
                WITH tbl as (select id from  a),
1✔
352
                tbl2 as (select id from tbl)
1✔
353
                select id from tbl2
1✔
354
                    "#,
1✔
355
            r#"
1✔
356
                WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
357
                cte_table2 as (select id_ct1 from cte_table1)
1✔
358
                select id_ct2 from cte_table2
1✔
359
            "#,
1✔
360
            r#"
1✔
361
                with tbl as (select id, ticker from stocks)
1✔
362
                select tbl.id from  stocks join tbl on tbl.id = stocks.id;
1✔
363
            "#,
1✔
364
        ];
1✔
365
        for sql in statements {
7✔
366
            let _pipeline = statement_to_pipeline(sql).unwrap();
6✔
367
        }
6✔
368
    }
1✔
369
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc