• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.5
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::selection::factory::SelectionProcessorFactory;
4
use crate::pipeline::{errors::PipelineError, product::factory::ProductProcessorFactory};
5
use dozer_core::dag::app::AppPipeline;
6
use dozer_core::dag::app::PipelineEntryPoint;
7
use dozer_core::dag::appsource::AppSourceId;
8
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
9
use dozer_core::dag::node::PortHandle;
10
use sqlparser::ast::{Join, TableFactor, TableWithJoins};
11
use sqlparser::{
12
    ast::{Query, Select, SetExpr, Statement},
13
    dialect::AnsiDialect,
14
    parser::Parser,
15
};
16
use std::collections::HashMap;
17
use std::sync::Arc;
18

19
use super::errors::UnsupportedSqlError;
20
use super::expression::builder::{fullname_from_ident, normalize_ident, NameOrAlias};
21

22
#[derive(Debug, Clone, Default)]
1,704✔
23
pub struct SchemaSQLContext {}
24

25
/// The struct contains some contexts during query to pipeline.
26
#[derive(Debug, Clone, Default)]
115✔
27
pub struct QueryContext {
28
    pub pipeline_map: HashMap<String, (String, PortHandle)>,
29
}
30

31
#[derive(Debug, Clone)]
144✔
32
pub struct IndexedTabelWithJoins {
33
    pub relation: (NameOrAlias, TableFactor),
34
    pub joins: Vec<(NameOrAlias, Join)>,
35
}
36

37
pub fn statement_to_pipeline(
115✔
38
    sql: &str,
115✔
39
) -> Result<(AppPipeline<SchemaSQLContext>, (String, PortHandle)), PipelineError> {
115✔
40
    let dialect = AnsiDialect {};
115✔
41
    let mut ctx = QueryContext::default();
115✔
42

115✔
43
    let ast = Parser::parse_sql(&dialect, sql).unwrap();
115✔
44
    let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
115✔
45
    let statement = ast.get(0).expect("First statement is missing").to_owned();
115✔
46

115✔
47
    let mut pipeline = AppPipeline::new();
115✔
48
    if let Statement::Query(query) = statement {
115✔
49
        query_to_pipeline(&query_name, &query, &mut pipeline, &mut ctx, false)?;
115✔
50
    };
×
51
    let node = ctx
115✔
52
        .pipeline_map
115✔
53
        .get(&query_name.0)
115✔
54
        .expect("query should have been initialized")
115✔
55
        .to_owned();
115✔
56
    Ok((pipeline, node))
115✔
57
}
115✔
58

×
59
fn query_to_pipeline(
144✔
60
    processor_name: &NameOrAlias,
144✔
61
    query: &Query,
144✔
62
    pipeline: &mut AppPipeline<SchemaSQLContext>,
144✔
63
    query_ctx: &mut QueryContext,
144✔
64
    stateful: bool,
144✔
65
) -> Result<(), PipelineError> {
144✔
66
    // return error if there is unsupported syntax
144✔
67
    if !query.order_by.is_empty() {
144✔
68
        return Err(PipelineError::UnsupportedSqlError(
×
69
            UnsupportedSqlError::OrderByError,
×
70
        ));
×
71
    }
144✔
72

144✔
73
    if query.limit.is_some() || query.offset.is_some() {
144✔
74
        return Err(PipelineError::UnsupportedSqlError(
×
75
            UnsupportedSqlError::LimitOffsetError,
×
76
        ));
×
77
    }
144✔
78

×
79
    // Attach the first pipeline if there is with clause
×
80
    if let Some(with) = &query.with {
144✔
81
        if with.recursive {
13✔
82
            return Err(PipelineError::UnsupportedSqlError(
×
83
                UnsupportedSqlError::Recursive,
×
84
            ));
×
85
        }
13✔
86

×
87
        for table in &with.cte_tables {
28✔
88
            if table.from.is_some() {
15✔
89
                return Err(PipelineError::UnsupportedSqlError(
×
90
                    UnsupportedSqlError::CteFromError,
×
91
                ));
×
92
            }
15✔
93
            let table_name = table.alias.name.to_string();
15✔
94
            if query_ctx.pipeline_map.contains_key(&table_name) {
15✔
95
                return Err(InvalidQuery(format!(
×
96
                    "WITH query name {table_name:?} specified more than once"
×
97
                )));
×
98
            }
15✔
99
            query_to_pipeline(
15✔
100
                &NameOrAlias(table_name.clone(), Some(table_name)),
15✔
101
                &table.query,
15✔
102
                pipeline,
15✔
103
                query_ctx,
15✔
104
                false,
15✔
105
            )?;
15✔
106
        }
107
    };
131✔
108

109
    match *query.body.clone() {
144✔
110
        SetExpr::Select(select) => {
144✔
111
            select_to_pipeline(processor_name, *select, pipeline, query_ctx, stateful)?;
144✔
112
        }
113
        SetExpr::Query(query) => {
×
114
            let query_name = format!("subquery_{}", uuid::Uuid::new_v4());
×
115
            let mut ctx = QueryContext::default();
×
116
            query_to_pipeline(
×
117
                &NameOrAlias(query_name, None),
×
118
                &query,
×
119
                pipeline,
×
120
                &mut ctx,
×
121
                stateful,
×
122
            )?
×
123
        }
×
124
        _ => {
125
            return Err(PipelineError::UnsupportedSqlError(
×
126
                UnsupportedSqlError::SelectOnlyError,
×
127
            ))
×
128
        }
×
129
    };
130
    Ok(())
144✔
131
}
144✔
132

×
133
fn select_to_pipeline(
144✔
134
    processor_name: &NameOrAlias,
144✔
135
    select: Select,
144✔
136
    pipeline: &mut AppPipeline<SchemaSQLContext>,
144✔
137
    query_ctx: &mut QueryContext,
144✔
138
    stateful: bool,
144✔
139
) -> Result<(), PipelineError> {
144✔
140
    // FROM clause
144✔
141
    if select.from.len() != 1 {
144✔
142
        return Err(PipelineError::UnsupportedSqlError(
×
143
            UnsupportedSqlError::FromCommaSyntax,
×
144
        ));
×
145
    }
144✔
146

147
    let input_tables = get_input_tables(&select.from[0], pipeline, query_ctx)?;
144✔
148

149
    let product = ProductProcessorFactory::new(input_tables.clone());
144✔
150

151
    let input_endpoints = get_entry_points(&input_tables, &mut query_ctx.pipeline_map)?;
144✔
152

153
    let gen_product_name = format!("product_{}", uuid::Uuid::new_v4());
144✔
154
    let gen_agg_name = format!("agg_{}", uuid::Uuid::new_v4());
144✔
155
    let gen_selection_name = format!("select_{}", uuid::Uuid::new_v4());
144✔
156
    pipeline.add_processor(Arc::new(product), &gen_product_name, input_endpoints);
144✔
157

144✔
158
    let input_names = get_input_names(&input_tables);
144✔
159
    for (port_index, table_name) in input_names.iter().enumerate() {
160✔
160
        if let Some((processor_name, processor_port)) = query_ctx.pipeline_map.get(&table_name.0) {
160✔
161
            pipeline.connect_nodes(
29✔
162
                processor_name,
29✔
163
                Some(*processor_port),
29✔
164
                &gen_product_name,
29✔
165
                Some(port_index as PortHandle),
29✔
166
            )?;
29✔
167
        }
131✔
168
    }
169

170
    let aggregation = AggregationProcessorFactory::new(
144✔
171
        input_tables.relation.0,
144✔
172
        select.projection.clone(),
144✔
173
        select.group_by,
144✔
174
        stateful,
144✔
175
    );
144✔
176

144✔
177
    pipeline.add_processor(Arc::new(aggregation), &gen_agg_name, vec![]);
144✔
178

179
    // Where clause
180
    if let Some(selection) = select.selection {
144✔
181
        let selection = SelectionProcessorFactory::new(selection);
44✔
182

44✔
183
        pipeline.add_processor(Arc::new(selection), &gen_selection_name, vec![]);
44✔
184

44✔
185
        pipeline.connect_nodes(
44✔
186
            &gen_product_name,
44✔
187
            Some(DEFAULT_PORT_HANDLE),
44✔
188
            &gen_selection_name,
44✔
189
            Some(DEFAULT_PORT_HANDLE),
44✔
190
        )?;
44✔
191

192
        pipeline.connect_nodes(
44✔
193
            &gen_selection_name,
44✔
194
            Some(DEFAULT_PORT_HANDLE),
44✔
195
            &gen_agg_name,
44✔
196
            Some(DEFAULT_PORT_HANDLE),
44✔
197
        )?;
44✔
198
    } else {
199
        pipeline.connect_nodes(
100✔
200
            &gen_product_name,
100✔
201
            Some(DEFAULT_PORT_HANDLE),
100✔
202
            &gen_agg_name,
100✔
203
            Some(DEFAULT_PORT_HANDLE),
100✔
204
        )?;
100✔
205
    }
206

207
    query_ctx.pipeline_map.insert(
144✔
208
        processor_name.0.clone(),
144✔
209
        (gen_agg_name, DEFAULT_PORT_HANDLE),
144✔
210
    );
144✔
211

144✔
212
    Ok(())
144✔
213
}
144✔
214

215
/// Returns a vector of input port handles and relative table name
216
///
217
/// # Errors
218
///
×
219
/// This function will return an error if it's not possible to get an input name.
×
220
pub fn get_input_tables(
144✔
221
    from: &TableWithJoins,
144✔
222
    pipeline: &mut AppPipeline<SchemaSQLContext>,
144✔
223
    query_ctx: &mut QueryContext,
144✔
224
) -> Result<IndexedTabelWithJoins, PipelineError> {
144✔
225
    let mut input_tables = vec![];
144✔
226

×
227
    let name = get_from_source(&from.relation, pipeline, query_ctx)?;
144✔
228
    input_tables.insert(0, name.clone());
144✔
229
    let mut joins = vec![];
144✔
230

×
231
    for (index, join) in from.joins.iter().enumerate() {
144✔
232
        let input_name = get_from_source(&join.relation, pipeline, query_ctx)?;
16✔
233
        joins.push((input_name.clone(), join.clone()));
16✔
234
        input_tables.insert(index + 1, input_name);
16✔
235
    }
×
236

×
237
    Ok(IndexedTabelWithJoins {
144✔
238
        relation: (name, from.relation.clone()),
144✔
239
        joins,
144✔
240
    })
144✔
241
}
144✔
242

×
243
pub fn get_input_names(input_tables: &IndexedTabelWithJoins) -> Vec<NameOrAlias> {
945✔
244
    let mut input_names = vec![];
945✔
245
    input_names.push(input_tables.relation.0.clone());
945✔
246

×
247
    for join in &input_tables.joins {
1,031✔
248
        input_names.push(join.0.clone());
86✔
249
    }
86✔
250
    input_names
945✔
251
}
945✔
252
pub fn get_entry_points(
144✔
253
    input_tables: &IndexedTabelWithJoins,
144✔
254
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
144✔
255
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
144✔
256
    let mut endpoints = vec![];
144✔
257

144✔
258
    let input_names = get_input_names(input_tables);
144✔
259

260
    for (input_port, table) in input_names.iter().enumerate() {
160✔
261
        let name = table.0.clone();
160✔
262
        if !pipeline_map.contains_key(&name) {
160✔
263
            endpoints.push(PipelineEntryPoint::new(
131✔
264
                AppSourceId::new(name, None),
131✔
265
                input_port as PortHandle,
131✔
266
            ));
131✔
267
        }
131✔
268
    }
269

270
    Ok(endpoints)
144✔
271
}
144✔
272

273
pub fn get_from_source(
160✔
274
    relation: &TableFactor,
160✔
275
    pipeline: &mut AppPipeline<SchemaSQLContext>,
160✔
276
    query_ctx: &mut QueryContext,
160✔
277
) -> Result<NameOrAlias, PipelineError> {
160✔
278
    match relation {
160✔
279
        TableFactor::Table { name, alias, .. } => {
146✔
280
            let input_name = name
146✔
281
                .0
146✔
282
                .iter()
146✔
283
                .map(normalize_ident)
146✔
284
                .collect::<Vec<String>>()
146✔
285
                .join(".");
146✔
286
            let alias_name = alias
146✔
287
                .as_ref()
146✔
288
                .map(|a| fullname_from_ident(&[a.name.clone()]));
146✔
289

146✔
290
            Ok(NameOrAlias(input_name, alias_name))
146✔
291
        }
292
        TableFactor::Derived {
293
            lateral: _,
294
            subquery,
14✔
295
            alias,
14✔
296
        } => {
14✔
297
            let name = format!("derived_{}", uuid::Uuid::new_v4());
14✔
298
            let alias_name = alias
14✔
299
                .as_ref()
14✔
300
                .map(|alias_ident| fullname_from_ident(&[alias_ident.name.clone()]));
14✔
301

14✔
302
            let name_or = NameOrAlias(name, alias_name);
14✔
303
            query_to_pipeline(&name_or, subquery, pipeline, query_ctx, false)?;
14✔
304

305
            Ok(name_or)
14✔
306
        }
307
        _ => Err(PipelineError::UnsupportedSqlError(
×
308
            UnsupportedSqlError::JoinTable,
×
309
        )),
×
310
    }
311
}
160✔
312

313
#[cfg(test)]
314
mod tests {
315
    use super::statement_to_pipeline;
316

317
    #[test]
1✔
318
    fn parse_sql_pipeline() {
1✔
319
        let statements: Vec<&str> = vec![
1✔
320
            r#"
1✔
321
                SELECT
1✔
322
                a.name as "Genre",
1✔
323
                    SUM(amount) as "Gross Revenue(in $)"
1✔
324
                FROM
1✔
325
                (
1✔
326
                    SELECT
1✔
327
                    c.name, f.title, p.amount
1✔
328
                FROM film f
1✔
329
                LEFT JOIN film_category fc
1✔
330
                ON fc.film_id = f.film_id
1✔
331
                LEFT JOIN category c
1✔
332
                ON fc.category_id = c.category_id
1✔
333
                LEFT JOIN inventory i
1✔
334
                ON i.film_id = f.film_id
1✔
335
                LEFT JOIN rental r
1✔
336
                ON r.inventory_id = i.inventory_id
1✔
337
                LEFT JOIN payment p
1✔
338
                ON p.rental_id = r.rental_id
1✔
339
                WHERE p.amount IS NOT NULL
1✔
340
                ) a
1✔
341

1✔
342
                GROUP BY name;
1✔
343
            "#,
1✔
344
            r#"
1✔
345
                SELECT
1✔
346
                c.name, f.title, p.amount
1✔
347
            FROM film f
1✔
348
            LEFT JOIN film_category fc
1✔
349
            "#,
1✔
350
            r#"
1✔
351
            WITH tbl as (select id from a)
1✔
352
            select id from tbl
1✔
353
            "#,
1✔
354
            r#"
1✔
355
            WITH tbl as (select id from  a),
1✔
356
            tbl2 as (select id from tbl)
1✔
357
            select id from tbl2
1✔
358
            "#,
1✔
359
            r#"
1✔
360
            WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
361
            cte_table2 as (select id_ct1 from cte_table1)
1✔
362
            select id_ct2 from cte_table2
1✔
363
            "#,
1✔
364
            r#"
1✔
365
                with tbl as (select id, ticker from stocks)
1✔
366
                select tbl.id from  stocks join tbl on tbl.id = stocks.id;
1✔
367
            "#,
1✔
368
        ];
1✔
369
        for sql in statements {
7✔
370
            let _pipeline = statement_to_pipeline(sql).unwrap();
6✔
371
        }
6✔
372
    }
1✔
373
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc