• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.31
/dozer-sql/src/pipeline/expression/builder_new.rs
1
#![allow(dead_code)]
2
use dozer_types::{
3
    ordered_float::OrderedFloat,
4
    types::{Field, Schema},
5
};
6

7
use dozer_types::types::{FieldDefinition, SourceDefinition};
8
use sqlparser::ast::{
9
    BinaryOperator as SqlBinaryOperator, DataType, Expr as SqlExpr, Expr, Function, FunctionArg,
10
    FunctionArgExpr, Ident, TrimWhereField, UnaryOperator as SqlUnaryOperator, Value as SqlValue,
11
};
12

13
use crate::pipeline::errors::PipelineError;
14
use crate::pipeline::errors::PipelineError::{
15
    InvalidArgument, InvalidExpression, InvalidNestedAggregationFunction, InvalidOperator,
16
    InvalidValue,
17
};
18
use crate::pipeline::expression::aggregate::AggregateFunctionType;
19

20
use crate::pipeline::expression::execution::Expression;
21
use crate::pipeline::expression::execution::Expression::ScalarFunction;
22
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
23
use crate::pipeline::expression::scalar::common::ScalarFunctionType;
24
use crate::pipeline::expression::scalar::string::TrimType;
25

26
use super::cast::CastOperatorType;
27
pub struct ExpressionBuilder;
28

29
#[derive(Clone, PartialEq, Debug)]
9✔
30
pub struct ExpressionContext {
31
    // Must be an aggregation function
32
    pub aggregations: Vec<Expression>,
33
    pub offset: usize,
34
}
35

36
impl ExpressionContext {
37
    pub fn new(offset: usize) -> Self {
1,122✔
38
        Self {
1,122✔
39
            aggregations: Vec::new(),
1,122✔
40
            offset,
1,122✔
41
        }
1,122✔
42
    }
1,122✔
43

44
    pub fn from(offset: usize, aggregations: Vec<Expression>) -> Self {
1✔
45
        Self {
1✔
46
            aggregations,
1✔
47
            offset,
1✔
48
        }
1✔
49
    }
1✔
50
}
51

52
impl ExpressionBuilder {
53
    pub fn build(
1,123✔
54
        context: &mut ExpressionContext,
1,123✔
55
        parse_aggregations: bool,
1,123✔
56
        sql_expression: &SqlExpr,
1,123✔
57
        schema: &Schema,
1,123✔
58
    ) -> Result<Box<Expression>, PipelineError> {
1,123✔
59
        Self::parse_sql_expression(context, parse_aggregations, sql_expression, schema)
1,123✔
60
    }
1,123✔
61

62
    fn parse_sql_expression(
63
        context: &mut ExpressionContext,
64
        parse_aggregations: bool,
65
        expression: &SqlExpr,
66
        schema: &Schema,
67
    ) -> Result<Box<Expression>, PipelineError> {
68
        match expression {
13✔
69
            SqlExpr::Trim {
70
                expr,
18✔
71
                trim_where,
18✔
72
                trim_what,
18✔
73
            } => Self::parse_sql_trim_function(
18✔
74
                context,
18✔
75
                parse_aggregations,
18✔
76
                expr,
18✔
77
                trim_where,
18✔
78
                trim_what,
18✔
79
                schema,
18✔
80
            ),
18✔
81
            SqlExpr::Identifier(ident) => Self::parse_sql_column(&[ident.clone()], schema),
985✔
82
            SqlExpr::CompoundIdentifier(ident) => Self::parse_sql_column(ident, schema),
149✔
83
            SqlExpr::Value(SqlValue::Number(n, _)) => Self::parse_sql_number(n),
9✔
84
            SqlExpr::Value(SqlValue::Null) => Ok(Box::new(Expression::Literal(Field::Null))),
×
85
            SqlExpr::Value(SqlValue::SingleQuotedString(s) | SqlValue::DoubleQuotedString(s)) => {
4✔
86
                Self::parse_sql_string(s)
4✔
87
            }
88
            SqlExpr::UnaryOp { expr, op } => {
×
89
                Self::parse_sql_unary_op(context, parse_aggregations, op, expr, schema)
×
90
            }
91
            SqlExpr::BinaryOp { left, op, right } => {
5✔
92
                Self::parse_sql_binary_op(context, parse_aggregations, left, op, right, schema)
5✔
93
            }
94
            SqlExpr::Nested(expr) => {
1✔
95
                Self::parse_sql_expression(context, parse_aggregations, expr, schema)
1✔
96
            }
97
            SqlExpr::Function(sql_function) => Self::parse_sql_function(
163✔
98
                context,
163✔
99
                parse_aggregations,
163✔
100
                sql_function,
163✔
101
                schema,
163✔
102
                expression,
163✔
103
            ),
163✔
104
            SqlExpr::Like {
105
                negated,
×
106
                expr,
×
107
                pattern,
×
108
                escape_char,
×
109
            } => Self::parse_sql_like_operator(
×
110
                context,
×
111
                parse_aggregations,
×
112
                negated,
×
113
                expr,
×
114
                pattern,
×
115
                escape_char,
×
116
                schema,
×
117
            ),
×
118
            SqlExpr::Cast { expr, data_type } => {
1✔
119
                Self::parse_sql_cast_operator(context, parse_aggregations, expr, data_type, schema)
1✔
120
            }
121
            _ => Err(InvalidExpression(format!("{expression:?}"))),
×
122
        }
123
    }
1,335✔
124

125
    fn parse_sql_column(
1,133✔
126
        ident: &[Ident],
1,133✔
127
        schema: &Schema,
1,133✔
128
    ) -> Result<Box<Expression>, PipelineError> {
1,133✔
129
        let (src_field, src_table_or_alias, src_connection) = match ident.len() {
1,133✔
130
            1 => (&ident[0].value, None, None),
984✔
131
            2 => (&ident[1].value, Some(&ident[0].value), None),
148✔
132
            3 => (
1✔
133
                &ident[2].value,
1✔
134
                Some(&ident[1].value),
1✔
135
                Some(&ident[0].value),
1✔
136
            ),
1✔
137
            _ => {
138
                return Err(InvalidExpression(
×
139
                    ident
×
140
                        .iter()
×
141
                        .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
142
                ))
×
143
            }
144
        };
145

146
        let matching_by_field: Vec<(usize, &FieldDefinition)> = schema
1,133✔
147
            .fields
1,133✔
148
            .iter()
1,133✔
149
            .enumerate()
1,133✔
150
            .filter(|(_idx, f)| &f.name == src_field)
5,504✔
151
            .collect();
1,133✔
152

1,133✔
153
        match matching_by_field.len() {
1,133✔
154
            1 => Ok(Box::new(Expression::Column {
1,061✔
155
                index: matching_by_field[0].0,
1,061✔
156
            })),
1,061✔
157
            _ => match src_table_or_alias {
72✔
158
                None => Err(InvalidExpression(
×
159
                    ident
×
160
                        .iter()
×
161
                        .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
162
                )),
×
163
                Some(src_table_or_alias) => {
72✔
164
                    let matching_by_table_or_alias: Vec<(usize, &FieldDefinition)> =
72✔
165
                        matching_by_field
72✔
166
                            .into_iter()
72✔
167
                            .filter(|(_idx, field)| match &field.source {
144✔
168
                                SourceDefinition::Alias { name } => name == src_table_or_alias,
108✔
169
                                SourceDefinition::Table {
170
                                    name,
36✔
171
                                    connection: _,
36✔
172
                                } => name == src_table_or_alias,
36✔
173
                                _ => false,
×
174
                            })
144✔
175
                            .collect();
72✔
176

72✔
177
                    match matching_by_table_or_alias.len() {
72✔
178
                        1 => Ok(Box::new(Expression::Column {
72✔
179
                            index: matching_by_table_or_alias[0].0,
72✔
180
                        })),
72✔
181
                        _ => match src_connection {
×
182
                            None => Err(InvalidExpression(
×
183
                                ident
×
184
                                    .iter()
×
185
                                    .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
186
                            )),
×
187
                            Some(src_connection) => {
×
188
                                let matching_by_connection: Vec<(usize, &FieldDefinition)> =
×
189
                                    matching_by_table_or_alias
×
190
                                        .into_iter()
×
191
                                        .filter(|(_idx, field)| match &field.source {
×
192
                                            SourceDefinition::Table {
193
                                                name: _,
194
                                                connection,
×
195
                                            } => connection == src_connection,
×
196
                                            _ => false,
×
197
                                        })
×
198
                                        .collect();
×
199

×
200
                                match matching_by_connection.len() {
×
201
                                    1 => Ok(Box::new(Expression::Column {
×
202
                                        index: matching_by_connection[0].0,
×
203
                                    })),
×
204
                                    _ => Err(InvalidExpression(
×
205
                                        ident
×
206
                                            .iter()
×
207
                                            .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
208
                                    )),
×
209
                                }
210
                            }
211
                        },
212
                    }
213
                }
214
            },
215
        }
216
    }
1,133✔
217

218
    fn parse_sql_trim_function(
18✔
219
        context: &mut ExpressionContext,
18✔
220
        parse_aggregations: bool,
18✔
221
        expr: &Expr,
18✔
222
        trim_where: &Option<TrimWhereField>,
18✔
223
        trim_what: &Option<Box<Expr>>,
18✔
224
        schema: &Schema,
18✔
225
    ) -> Result<Box<Expression>, PipelineError> {
18✔
226
        let arg = Self::parse_sql_expression(context, parse_aggregations, expr, schema)?;
18✔
227
        let what = match trim_what {
18✔
228
            Some(e) => Some(Self::parse_sql_expression(
×
229
                context,
×
230
                parse_aggregations,
×
231
                e,
×
232
                schema,
×
233
            )?),
×
234
            _ => None,
18✔
235
        };
236
        let typ = trim_where.as_ref().map(|e| match e {
18✔
237
            TrimWhereField::Both => TrimType::Both,
×
238
            TrimWhereField::Leading => TrimType::Leading,
×
239
            TrimWhereField::Trailing => TrimType::Trailing,
×
240
        });
18✔
241
        Ok(Box::new(Expression::Trim { arg, what, typ }))
18✔
242
    }
18✔
243

244
    fn parse_sql_function(
163✔
245
        context: &mut ExpressionContext,
163✔
246
        parse_aggregations: bool,
163✔
247
        sql_function: &Function,
163✔
248
        schema: &Schema,
163✔
249
        _expression: &SqlExpr,
163✔
250
    ) -> Result<Box<Expression>, PipelineError> {
163✔
251
        let function_name = sql_function.name.to_string().to_lowercase();
163✔
252

163✔
253
        match (
163✔
254
            AggregateFunctionType::new(function_name.as_str()),
163✔
255
            parse_aggregations,
163✔
256
        ) {
163✔
257
            (Ok(aggr), true) => {
143✔
258
                let mut arg_expr: Vec<Expression> = Vec::new();
143✔
259
                for arg in &sql_function.args {
285✔
260
                    arg_expr.push(*Self::parse_sql_function_arg(context, false, arg, schema)?);
143✔
261
                }
262
                let measure = Expression::AggregateFunction {
142✔
263
                    fun: aggr,
142✔
264
                    args: arg_expr,
142✔
265
                };
142✔
266
                let index = match context
142✔
267
                    .aggregations
142✔
268
                    .iter()
142✔
269
                    .enumerate()
142✔
270
                    .find(|e| e.1 == &measure)
142✔
271
                {
272
                    Some((index, _existing)) => index,
2✔
273
                    _ => {
274
                        context.aggregations.push(measure);
140✔
275
                        context.aggregations.len() - 1
140✔
276
                    }
277
                };
278
                Ok(Box::new(Expression::Column {
142✔
279
                    index: context.offset + index,
142✔
280
                }))
142✔
281
            }
282
            (Ok(_agg), false) => Err(InvalidNestedAggregationFunction(function_name)),
1✔
283
            (Err(_), _) => {
284
                let mut function_args: Vec<Expression> = Vec::new();
19✔
285
                for arg in &sql_function.args {
58✔
286
                    function_args.push(*Self::parse_sql_function_arg(
39✔
287
                        context,
39✔
288
                        parse_aggregations,
39✔
289
                        arg,
39✔
290
                        schema,
39✔
291
                    )?);
39✔
292
                }
293
                let function_type = ScalarFunctionType::new(function_name.as_str())?;
19✔
294

295
                Ok(Box::new(ScalarFunction {
19✔
296
                    fun: function_type,
19✔
297
                    args: function_args,
19✔
298
                }))
19✔
299
            }
300
        }
301
    }
163✔
302

303
    fn parse_sql_function_arg(
304
        context: &mut ExpressionContext,
305
        parse_aggregations: bool,
306
        argument: &FunctionArg,
307
        schema: &Schema,
308
    ) -> Result<Box<Expression>, PipelineError> {
309
        match argument {
181✔
310
            FunctionArg::Named {
311
                name: _,
312
                arg: FunctionArgExpr::Expr(arg),
×
313
            } => Self::parse_sql_expression(context, parse_aggregations, arg, schema),
×
314
            FunctionArg::Named {
315
                name: _,
316
                arg: FunctionArgExpr::Wildcard,
317
            } => Err(InvalidArgument(format!("{argument:?}"))),
×
318
            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
182✔
319
                Self::parse_sql_expression(context, parse_aggregations, arg, schema)
182✔
320
            }
321
            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
322
                Err(InvalidArgument(format!("{argument:?}")))
×
323
            }
324
            FunctionArg::Named {
325
                name: _,
326
                arg: FunctionArgExpr::QualifiedWildcard(_),
327
            } => Err(InvalidArgument(format!("{argument:?}"))),
×
328
            FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
329
                Err(InvalidArgument(format!("{argument:?}")))
×
330
            }
331
        }
332
    }
182✔
333

334
    fn parse_sql_unary_op(
×
335
        context: &mut ExpressionContext,
×
336
        parse_aggregations: bool,
×
337
        op: &SqlUnaryOperator,
×
338
        expr: &SqlExpr,
×
339
        schema: &Schema,
×
340
    ) -> Result<Box<Expression>, PipelineError> {
×
341
        let arg = Self::parse_sql_expression(context, parse_aggregations, expr, schema)?;
×
342
        let operator = match op {
×
343
            SqlUnaryOperator::Not => UnaryOperatorType::Not,
×
344
            SqlUnaryOperator::Plus => UnaryOperatorType::Plus,
×
345
            SqlUnaryOperator::Minus => UnaryOperatorType::Minus,
×
346
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
347
        };
348

349
        Ok(Box::new(Expression::UnaryOperator { operator, arg }))
×
350
    }
×
351

352
    fn parse_sql_binary_op(
5✔
353
        context: &mut ExpressionContext,
5✔
354
        parse_aggregations: bool,
5✔
355
        left: &SqlExpr,
5✔
356
        op: &SqlBinaryOperator,
5✔
357
        right: &SqlExpr,
5✔
358
        schema: &Schema,
5✔
359
    ) -> Result<Box<Expression>, PipelineError> {
5✔
360
        let left_op = Self::parse_sql_expression(context, parse_aggregations, left, schema)?;
5✔
361
        let right_op = Self::parse_sql_expression(context, parse_aggregations, right, schema)?;
5✔
362

363
        let operator = match op {
5✔
364
            SqlBinaryOperator::Gt => BinaryOperatorType::Gt,
1✔
365
            SqlBinaryOperator::GtEq => BinaryOperatorType::Gte,
×
366
            SqlBinaryOperator::Lt => BinaryOperatorType::Lt,
×
367
            SqlBinaryOperator::LtEq => BinaryOperatorType::Lte,
×
368
            SqlBinaryOperator::Eq => BinaryOperatorType::Eq,
×
369
            SqlBinaryOperator::NotEq => BinaryOperatorType::Ne,
×
370
            SqlBinaryOperator::Plus => BinaryOperatorType::Add,
4✔
371
            SqlBinaryOperator::Minus => BinaryOperatorType::Sub,
×
372
            SqlBinaryOperator::Multiply => BinaryOperatorType::Mul,
×
373
            SqlBinaryOperator::Divide => BinaryOperatorType::Div,
×
374
            SqlBinaryOperator::Modulo => BinaryOperatorType::Mod,
×
375
            SqlBinaryOperator::And => BinaryOperatorType::And,
×
376
            SqlBinaryOperator::Or => BinaryOperatorType::Or,
×
377
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
378
        };
379

380
        Ok(Box::new(Expression::BinaryOperator {
5✔
381
            left: left_op,
5✔
382
            operator,
5✔
383
            right: right_op,
5✔
384
        }))
5✔
385
    }
5✔
386

387
    fn parse_sql_number(n: &str) -> Result<Box<Expression>, PipelineError> {
9✔
388
        match n.parse::<i64>() {
9✔
389
            Ok(n) => Ok(Box::new(Expression::Literal(Field::Int(n)))),
9✔
390
            Err(_) => match n.parse::<f64>() {
×
391
                Ok(f) => Ok(Box::new(Expression::Literal(Field::Float(OrderedFloat(f))))),
×
392
                Err(_) => Err(InvalidValue(n.to_string())),
×
393
            },
394
        }
395
    }
9✔
396

397
    fn parse_sql_like_operator(
×
398
        context: &mut ExpressionContext,
×
399
        parse_aggregations: bool,
×
400
        negated: &bool,
×
401
        expr: &Expr,
×
402
        pattern: &Expr,
×
403
        escape_char: &Option<char>,
×
404
        schema: &Schema,
×
405
    ) -> Result<Box<Expression>, PipelineError> {
×
406
        let arg = Self::parse_sql_expression(context, parse_aggregations, expr, schema)?;
×
407
        let pattern = Self::parse_sql_expression(context, parse_aggregations, pattern, schema)?;
×
408
        let like_expression = Box::new(Expression::Like {
×
409
            arg,
×
410
            pattern,
×
411
            escape: *escape_char,
×
412
        });
×
413
        if *negated {
×
414
            Ok(Box::new(Expression::UnaryOperator {
×
415
                operator: UnaryOperatorType::Not,
×
416
                arg: like_expression,
×
417
            }))
×
418
        } else {
419
            Ok(like_expression)
×
420
        }
421
    }
×
422

423
    fn parse_sql_cast_operator(
1✔
424
        context: &mut ExpressionContext,
1✔
425
        parse_aggregations: bool,
1✔
426
        expr: &Expr,
1✔
427
        data_type: &DataType,
1✔
428
        schema: &Schema,
1✔
429
    ) -> Result<Box<Expression>, PipelineError> {
1✔
430
        let expression = Self::parse_sql_expression(context, parse_aggregations, expr, schema)?;
1✔
431
        let cast_to = match data_type {
1✔
432
            DataType::Decimal(_) => CastOperatorType::Decimal,
×
433
            DataType::Binary(_) => CastOperatorType::Binary,
×
434
            DataType::Float(_) => CastOperatorType::Float,
×
435
            DataType::Int(_) => CastOperatorType::Int,
×
436
            DataType::Integer(_) => CastOperatorType::Int,
×
437
            DataType::UnsignedInt(_) => CastOperatorType::UInt,
×
438
            DataType::UnsignedInteger(_) => CastOperatorType::UInt,
×
439
            DataType::Boolean => CastOperatorType::Boolean,
×
440
            DataType::Date => CastOperatorType::Date,
×
441
            DataType::Timestamp(..) => CastOperatorType::Timestamp,
×
442
            DataType::Text => CastOperatorType::Text,
×
443
            DataType::String => CastOperatorType::String,
1✔
444
            DataType::Custom(name, ..) => {
×
445
                if name.to_string().to_lowercase() == "bson" {
×
446
                    CastOperatorType::Bson
×
447
                } else {
448
                    Err(PipelineError::InvalidFunction(format!(
×
449
                        "Unsupported Cast type {name}"
×
450
                    )))?
×
451
                }
452
            }
453
            _ => Err(PipelineError::InvalidFunction(format!(
×
454
                "Unsupported Cast type {data_type}"
×
455
            )))?,
×
456
        };
457
        Ok(Box::new(Expression::Cast {
1✔
458
            arg: expression,
1✔
459
            typ: cast_to,
1✔
460
        }))
1✔
461
    }
1✔
462

463
    fn parse_sql_string(s: &str) -> Result<Box<Expression>, PipelineError> {
4✔
464
        Ok(Box::new(Expression::Literal(Field::String(s.to_owned()))))
4✔
465
    }
4✔
466

467
    pub fn fullname_from_ident(ident: &[Ident]) -> String {
×
468
        let mut ident_tokens = vec![];
×
469
        for token in ident.iter() {
×
470
            ident_tokens.push(token.value.clone());
×
471
        }
×
472
        ident_tokens.join(".")
×
473
    }
×
474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc