• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3975475267

pending completion
3975475267

Pull #699

github

GitHub
Merge cbe01669c into 02f99a9c0
Pull Request #699: feature: Atomatically trim record history in `RecordWriter`

229 of 229 new or added lines in 6 files covered. (100.0%)

22375 of 33744 relevant lines covered (66.31%)

45958.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.02
/dozer-sql/src/pipeline/builder.rs
1
use crate::pipeline::aggregation::factory::AggregationProcessorFactory;
2
use crate::pipeline::builder::PipelineError::InvalidQuery;
3
use crate::pipeline::selection::factory::SelectionProcessorFactory;
4
use crate::pipeline::{errors::PipelineError, product::factory::ProductProcessorFactory};
5
use dozer_core::dag::app::AppPipeline;
6
use dozer_core::dag::app::PipelineEntryPoint;
7
use dozer_core::dag::appsource::AppSourceId;
8
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
9
use dozer_core::dag::node::PortHandle;
10
use sqlparser::ast::{Join, TableFactor, TableWithJoins};
11
use sqlparser::{
12
    ast::{Query, Select, SetExpr, Statement},
13
    dialect::AnsiDialect,
14
    parser::Parser,
15
};
16
use std::collections::{HashMap, HashSet};
17
use std::sync::Arc;
18

19
use super::errors::UnsupportedSqlError;
20
use super::expression::builder::{fullname_from_ident, normalize_ident};
21

×
22
/// The struct contains some contexts during query to pipeline.
×
23
#[derive(Debug, Clone, Default)]
84✔
24
pub struct QueryContext {
×
25
    pub cte_names: HashSet<String>,
×
26
}
×
27

×
28
#[derive(Debug, Clone)]
90✔
29
pub struct IndexedTabelWithJoins {
×
30
    pub relation: (NameOrAlias, TableFactor),
×
31
    pub joins: Vec<(NameOrAlias, Join)>,
×
32
}
×
33

×
34
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
768✔
35
pub struct NameOrAlias(pub String, pub Option<String>);
×
36

×
37
pub fn statement_to_pipeline(
82✔
38
    sql: &str,
82✔
39
) -> Result<(AppPipeline, (String, PortHandle)), PipelineError> {
82✔
40
    let dialect = AnsiDialect {};
82✔
41
    let mut ctx = QueryContext::default();
82✔
42

82✔
43
    let ast = Parser::parse_sql(&dialect, sql).unwrap();
82✔
44
    let query_name = NameOrAlias(format!("query_{}", uuid::Uuid::new_v4()), None);
82✔
45
    let statement = ast.get(0).expect("First statement is missing").to_owned();
82✔
46

82✔
47
    let mut pipeline = AppPipeline::new();
82✔
48
    let mut pipeline_map = HashMap::new();
82✔
49
    if let Statement::Query(query) = statement {
82✔
50
        query_to_pipeline(
82✔
51
            &query_name,
82✔
52
            &query,
82✔
53
            &mut pipeline,
82✔
54
            &mut pipeline_map,
82✔
55
            &mut ctx,
82✔
56
            false,
82✔
57
        )?;
82✔
58
    };
×
59
    let node = pipeline_map
82✔
60
        .get(&query_name.0)
82✔
61
        .expect("query should have been initialized")
82✔
62
        .to_owned();
82✔
63
    Ok((pipeline, node))
82✔
64
}
82✔
65

×
66
fn query_to_pipeline(
×
67
    processor_name: &NameOrAlias,
×
68
    query: &Query,
×
69
    pipeline: &mut AppPipeline,
×
70
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
×
71
    query_ctx: &mut QueryContext,
×
72
    stateful: bool,
×
73
) -> Result<(), PipelineError> {
×
74
    // Attach the first pipeline if there is with clause
×
75
    if let Some(with) = &query.with {
90✔
76
        if with.recursive {
4✔
77
            return Err(PipelineError::UnsupportedSqlError(
×
78
                UnsupportedSqlError::Recursive,
×
79
            ));
×
80
        }
4✔
81

×
82
        for table in &with.cte_tables {
10✔
83
            if table.from.is_some() {
6✔
84
                return Err(PipelineError::UnsupportedSqlError(
×
85
                    UnsupportedSqlError::CteFromError,
×
86
                ));
×
87
            }
6✔
88
            let table_name = table.alias.name.to_string();
6✔
89
            if query_ctx.cte_names.contains(&table_name) {
6✔
90
                return Err(InvalidQuery(format!(
×
91
                    "WITH query name {table_name:?} specified more than once"
×
92
                )));
×
93
            }
6✔
94
            query_ctx.cte_names.insert(table_name.clone());
6✔
95
            query_to_pipeline(
6✔
96
                &NameOrAlias(table_name.clone(), Some(table_name)),
6✔
97
                &table.query,
6✔
98
                pipeline,
6✔
99
                pipeline_map,
6✔
100
                query_ctx,
6✔
101
                false,
6✔
102
            )?;
6✔
103
        }
×
104
    };
86✔
105

×
106
    match *query.body.clone() {
90✔
107
        SetExpr::Select(select) => {
90✔
108
            select_to_pipeline(processor_name, *select, pipeline, pipeline_map, stateful)?;
90✔
109
        }
×
110
        SetExpr::Query(query) => {
×
111
            let query_name = format!("subquery_{}", uuid::Uuid::new_v4());
×
112
            let mut ctx = QueryContext::default();
×
113
            query_to_pipeline(
×
114
                &NameOrAlias(query_name, None),
×
115
                &query,
×
116
                pipeline,
×
117
                pipeline_map,
×
118
                &mut ctx,
×
119
                stateful,
×
120
            )?
×
121
        }
×
122
        _ => {
×
123
            return Err(PipelineError::UnsupportedSqlError(
×
124
                UnsupportedSqlError::SelectOnlyError,
×
125
            ))
×
126
        }
×
127
    };
×
128
    Ok(())
90✔
129
}
90✔
130

×
131
fn select_to_pipeline(
90✔
132
    processor_name: &NameOrAlias,
90✔
133
    select: Select,
90✔
134
    pipeline: &mut AppPipeline,
90✔
135
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
90✔
136
    stateful: bool,
90✔
137
) -> Result<(), PipelineError> {
90✔
138
    // FROM clause
90✔
139
    if select.from.len() != 1 {
90✔
140
        return Err(InvalidQuery(
×
141
            "FROM clause doesn't support \"Comma Syntax\"".to_string(),
×
142
        ));
×
143
    }
90✔
144

145
    let input_tables = get_input_tables(&select.from[0], pipeline, pipeline_map)?;
90✔
146

×
147
    let product = ProductProcessorFactory::new(input_tables.clone());
90✔
148

×
149
    let input_endpoints = get_entry_points(&input_tables, pipeline_map)?;
90✔
150

×
151
    let gen_product_name = format!("product_{}", uuid::Uuid::new_v4());
90✔
152
    let gen_agg_name = format!("agg_{}", uuid::Uuid::new_v4());
90✔
153
    let gen_selection_name = format!("select_{}", uuid::Uuid::new_v4());
90✔
154
    pipeline.add_processor(Arc::new(product), &gen_product_name, input_endpoints);
90✔
155

90✔
156
    let input_names = get_input_names(&input_tables);
90✔
157
    for (port_index, table_name) in input_names.iter().enumerate() {
97✔
158
        if let Some((processor_name, processor_port)) = pipeline_map.get(&table_name.0) {
97✔
159
            pipeline.connect_nodes(
8✔
160
                processor_name,
8✔
161
                Some(*processor_port),
8✔
162
                &gen_product_name,
8✔
163
                Some(port_index as PortHandle),
8✔
164
            )?;
8✔
165
        }
89✔
166
    }
×
167

×
168
    let aggregation =
90✔
169
        AggregationProcessorFactory::new(select.projection.clone(), select.group_by, stateful);
90✔
170

90✔
171
    pipeline.add_processor(Arc::new(aggregation), &gen_agg_name, vec![]);
90✔
172

×
173
    // Where clause
×
174
    if let Some(selection) = select.selection {
90✔
175
        let selection = SelectionProcessorFactory::new(selection);
44✔
176
        // first_node_name = String::from("selection");
44✔
177

44✔
178
        pipeline.add_processor(Arc::new(selection), &gen_selection_name, vec![]);
44✔
179

44✔
180
        pipeline.connect_nodes(
44✔
181
            &gen_product_name,
44✔
182
            Some(DEFAULT_PORT_HANDLE),
44✔
183
            &gen_selection_name,
44✔
184
            Some(DEFAULT_PORT_HANDLE),
44✔
185
        )?;
44✔
186

×
187
        pipeline.connect_nodes(
44✔
188
            &gen_selection_name,
44✔
189
            Some(DEFAULT_PORT_HANDLE),
44✔
190
            &gen_agg_name,
44✔
191
            Some(DEFAULT_PORT_HANDLE),
44✔
192
        )?;
44✔
193
    } else {
×
194
        pipeline.connect_nodes(
46✔
195
            &gen_product_name,
46✔
196
            Some(DEFAULT_PORT_HANDLE),
46✔
197
            &gen_agg_name,
46✔
198
            Some(DEFAULT_PORT_HANDLE),
46✔
199
        )?;
46✔
200
    }
×
201

×
202
    pipeline_map.insert(
90✔
203
        processor_name.0.clone(),
90✔
204
        (gen_agg_name, DEFAULT_PORT_HANDLE),
90✔
205
    );
90✔
206

90✔
207
    Ok(())
90✔
208
}
90✔
209

×
210
/// Returns a vector of input port handles and relative table name
×
211
///
×
212
/// # Errors
×
213
///
×
214
/// This function will return an error if it's not possible to get an input name.
215
pub fn get_input_tables(
90✔
216
    from: &TableWithJoins,
90✔
217
    pipeline: &mut AppPipeline,
90✔
218
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
90✔
219
) -> Result<IndexedTabelWithJoins, PipelineError> {
90✔
220
    let mut input_tables = vec![];
90✔
221

×
222
    let name = get_from_source(&from.relation, pipeline, pipeline_map)?;
90✔
223
    input_tables.insert(0, name.clone());
90✔
224
    let mut joins = vec![];
90✔
225

×
226
    for (index, join) in from.joins.iter().enumerate() {
90✔
227
        let input_name = get_from_source(&join.relation, pipeline, pipeline_map)?;
7✔
228
        joins.push((input_name.clone(), join.clone()));
7✔
229
        input_tables.insert(index + 1, input_name);
7✔
230
    }
×
231

232
    Ok(IndexedTabelWithJoins {
90✔
233
        relation: (name, from.relation.clone()),
90✔
234
        joins,
90✔
235
    })
90✔
236
}
90✔
237

×
238
pub fn get_input_names(input_tables: &IndexedTabelWithJoins) -> Vec<NameOrAlias> {
408✔
239
    let mut input_names = vec![];
408✔
240
    input_names.push(input_tables.relation.0.clone());
408✔
241

242
    for join in &input_tables.joins {
422✔
243
        input_names.push(join.0.clone());
14✔
244
    }
14✔
245
    input_names
408✔
246
}
408✔
247
pub fn get_entry_points(
90✔
248
    input_tables: &IndexedTabelWithJoins,
90✔
249
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
90✔
250
) -> Result<Vec<PipelineEntryPoint>, PipelineError> {
90✔
251
    let mut endpoints = vec![];
90✔
252

90✔
253
    let input_names = get_input_names(input_tables);
90✔
254

×
255
    for (input_port, table) in input_names.iter().enumerate() {
97✔
256
        let name = table.0.clone();
97✔
257
        if !pipeline_map.contains_key(&name) {
97✔
258
            endpoints.push(PipelineEntryPoint::new(
89✔
259
                AppSourceId::new(name, None),
89✔
260
                input_port as PortHandle,
89✔
261
            ));
89✔
262
        }
89✔
263
    }
264

265
    Ok(endpoints)
90✔
266
}
90✔
267

×
268
pub fn get_from_source(
97✔
269
    relation: &TableFactor,
97✔
270
    pipeline: &mut AppPipeline,
97✔
271
    pipeline_map: &mut HashMap<String, (String, PortHandle)>,
97✔
272
) -> Result<NameOrAlias, PipelineError> {
97✔
273
    match relation {
97✔
274
        TableFactor::Table { name, alias, .. } => {
95✔
275
            let input_name = name
95✔
276
                .0
95✔
277
                .iter()
95✔
278
                .map(normalize_ident)
95✔
279
                .collect::<Vec<String>>()
95✔
280
                .join(".");
95✔
281
            let alias_name = alias
95✔
282
                .as_ref()
95✔
283
                .map(|a| fullname_from_ident(&[a.name.clone()]));
95✔
284

95✔
285
            Ok(NameOrAlias(input_name, alias_name))
95✔
286
        }
287
        TableFactor::Derived {
288
            lateral: _,
289
            subquery,
2✔
290
            alias,
2✔
291
        } => {
2✔
292
            let name = format!("derived_{}", uuid::Uuid::new_v4());
2✔
293
            let alias_name = alias
2✔
294
                .as_ref()
2✔
295
                .map(|alias_ident| fullname_from_ident(&[alias_ident.name.clone()]));
2✔
296

2✔
297
            let name_or = NameOrAlias(name, alias_name);
2✔
298
            let mut ctx = QueryContext::default();
2✔
299
            query_to_pipeline(&name_or, subquery, pipeline, pipeline_map, &mut ctx, false)?;
2✔
300

×
301
            Ok(name_or)
2✔
302
        }
×
303
        _ => Err(PipelineError::UnsupportedSqlError(
×
304
            UnsupportedSqlError::JoinTable,
×
305
        )),
×
306
    }
×
307
}
97✔
308

×
309
#[cfg(test)]
×
310
mod tests {
×
311
    use super::statement_to_pipeline;
×
312

×
313
    #[test]
1✔
314
    fn sql_logic_test_1() {
1✔
315
        let statements: Vec<&str> = vec![
1✔
316
            r#"
1✔
317
            SELECT
1✔
318
            a.name as "Genre",
1✔
319
                SUM(amount) as "Gross Revenue(in $)"
1✔
320
            FROM
1✔
321
            (
1✔
322
                SELECT
1✔
323
                c.name, f.title, p.amount
1✔
324
            FROM film f
1✔
325
            LEFT JOIN film_category fc
1✔
326
            ON fc.film_id = f.film_id
1✔
327
            LEFT JOIN category c
1✔
328
            ON fc.category_id = c.category_id
1✔
329
            LEFT JOIN inventory i
1✔
330
            ON i.film_id = f.film_id
1✔
331
            LEFT JOIN rental r
1✔
332
            ON r.inventory_id = i.inventory_id
1✔
333
            LEFT JOIN payment p
1✔
334
            ON p.rental_id = r.rental_id
1✔
335
            WHERE p.amount IS NOT NULL
1✔
336
            ) a
1✔
337

1✔
338
            GROUP BY name
1✔
339
            ORDER BY sum(amount) desc
1✔
340
            LIMIT 5;
1✔
341
            "#,
1✔
342
            r#"
1✔
343
                SELECT
1✔
344
                c.name, f.title, p.amount
1✔
345
            FROM film f
1✔
346
            LEFT JOIN film_category fc
1✔
347
            "#,
1✔
348
            r#"
1✔
349
            WITH tbl as (select id from a)
1✔
350
            select id from tbl
1✔
351
            "#,
1✔
352
            r#"
1✔
353
            WITH tbl as (select id from  a),
1✔
354
            tbl2 as (select id from tbl)
1✔
355
            select id from tbl2
1✔
356
            "#,
1✔
357
            r#"
1✔
358
            WITH cte_table1 as (select id_dt1 from (select id_t1 from table_1) as derived_table_1),
1✔
359
            cte_table2 as (select id_ct1 from cte_table1)
1✔
360
            select id_ct2 from cte_table2
1✔
361
            "#,
1✔
362
            r#"
1✔
363
                with tbl as (select id, ticker from stocks)
1✔
364
                select tbl.id from  stocks join tbl on tbl.id = stocks.id;
1✔
365
            "#,
1✔
366
        ];
1✔
367
        for sql in statements {
7✔
368
            let _pipeline = statement_to_pipeline(sql).unwrap();
6✔
369
        }
6✔
370
    }
1✔
371
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc