• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965606986

pending completion
3965606986

push

github

GitHub
chore: bump sqlparser to v0.30.0 (#686)

13 of 13 new or added lines in 3 files covered. (100.0%)

21931 of 32697 relevant lines covered (67.07%)

36311.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.0
/dozer-sql/src/pipeline/expression/builder.rs
1
use std::cmp;
2

3
use dozer_types::{
4
    ordered_float::OrderedFloat,
5
    types::{Field, Schema},
6
};
7

8
use sqlparser::ast::{
9
    BinaryOperator as SqlBinaryOperator, DataType, Expr as SqlExpr, Expr, Function, FunctionArg,
10
    FunctionArgExpr, Ident, TrimWhereField, UnaryOperator as SqlUnaryOperator, Value as SqlValue,
11
};
12

13
use crate::pipeline::errors::{JoinError, PipelineError};
14
use crate::pipeline::expression::aggregate::AggregateFunctionType;
15
use crate::pipeline::expression::builder::PipelineError::InvalidArgument;
16
use crate::pipeline::expression::builder::PipelineError::InvalidExpression;
17
use crate::pipeline::expression::builder::PipelineError::InvalidOperator;
18
use crate::pipeline::expression::builder::PipelineError::InvalidValue;
19
use crate::pipeline::expression::execution::Expression;
20
use crate::pipeline::expression::execution::Expression::ScalarFunction;
21
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
22
use crate::pipeline::expression::scalar::common::ScalarFunctionType;
23
use crate::pipeline::expression::scalar::string::TrimType;
24

25
use super::cast::CastOperatorType;
26

27
pub type Bypass = bool;
28

29
pub enum BuilderExpressionType {
30
    PreAggregation,
31
    Aggregation,
32
    // PostAggregation,
33
    FullExpression,
34
}
35

36
pub struct ExpressionBuilder;
37

38
impl ExpressionBuilder {
39
    pub fn build(
113✔
40
        &self,
113✔
41
        expression_type: &BuilderExpressionType,
113✔
42
        sql_expression: &SqlExpr,
113✔
43
        schema: &Schema,
113✔
44
    ) -> Result<Box<Expression>, PipelineError> {
113✔
45
        let (expression, _bypass) =
113✔
46
            self.parse_sql_expression(expression_type, sql_expression, schema)?;
113✔
47
        Ok(expression)
113✔
48
    }
113✔
49

50
    pub fn parse_sql_expression(
51
        &self,
52
        expression_type: &BuilderExpressionType,
53
        expression: &SqlExpr,
54
        schema: &Schema,
55
    ) -> Result<(Box<Expression>, bool), PipelineError> {
56
        match expression {
69✔
57
            SqlExpr::Trim {
58
                expr,
38✔
59
                trim_where,
38✔
60
                trim_what,
38✔
61
            } => self.parse_sql_trim_function(expression_type, expr, trim_where, trim_what, schema),
38✔
62
            SqlExpr::Identifier(ident) => self.parse_sql_column(&[ident.clone()], schema),
1,265✔
63
            SqlExpr::CompoundIdentifier(ident) => self.parse_sql_column(ident, schema),
2✔
64
            SqlExpr::Value(SqlValue::Number(n, _)) => self.parse_sql_number(n),
43✔
65
            SqlExpr::Value(SqlValue::Null) => {
66
                Ok((Box::new(Expression::Literal(Field::Null)), false))
×
67
            }
68
            SqlExpr::Value(SqlValue::SingleQuotedString(s) | SqlValue::DoubleQuotedString(s)) => {
26✔
69
                parse_sql_string(s)
26✔
70
            }
71
            SqlExpr::UnaryOp { expr, op } => {
×
72
                self.parse_sql_unary_op(expression_type, op, expr, schema)
×
73
            }
74
            SqlExpr::BinaryOp { left, op, right } => {
79✔
75
                self.parse_sql_binary_op(expression_type, left, op, right, schema)
79✔
76
            }
77
            SqlExpr::Nested(expr) => self.parse_sql_expression(expression_type, expr, schema),
12✔
78
            SqlExpr::Function(sql_function) => match expression_type {
182✔
79
                BuilderExpressionType::PreAggregation => self.parse_sql_function_pre_aggregation(
84✔
80
                    expression_type,
84✔
81
                    sql_function,
84✔
82
                    schema,
84✔
83
                    expression,
84✔
84
                ),
84✔
85
                BuilderExpressionType::Aggregation => self.parse_sql_function_aggregation(
84✔
86
                    expression_type,
84✔
87
                    sql_function,
84✔
88
                    schema,
84✔
89
                    expression,
84✔
90
                ),
84✔
91
                // ExpressionType::PostAggregation => todo!(),
92
                BuilderExpressionType::FullExpression => {
93
                    self.parse_sql_function(expression_type, sql_function, schema, expression)
14✔
94
                }
95
            },
96
            SqlExpr::Like {
97
                negated,
×
98
                expr,
×
99
                pattern,
×
100
                escape_char,
×
101
            } => self.parse_sql_like_operator(
×
102
                expression_type,
×
103
                negated,
×
104
                expr,
×
105
                pattern,
×
106
                escape_char,
×
107
                schema,
×
108
            ),
×
109
            SqlExpr::Cast { expr, data_type } => {
64✔
110
                self.parse_sql_cast_operator(expression_type, expr, data_type, schema)
64✔
111
            }
112
            _ => Err(InvalidExpression(format!("{:?}", expression))),
×
113
        }
114
    }
1,711✔
115

116
    fn parse_sql_column(
1,267✔
117
        &self,
1,267✔
118
        ident: &[Ident],
1,267✔
119
        schema: &Schema,
1,267✔
120
    ) -> Result<(Box<Expression>, bool), PipelineError> {
1,267✔
121
        Ok((
1,267✔
122
            Box::new(Expression::Column {
1,267✔
123
                index: get_field_index(ident, schema)?,
1,267✔
124
                //index: schema.get_field_index(&ident[0].value)?.0,
125
            }),
126
            false,
127
        ))
128
    }
1,267✔
129

130
    fn parse_sql_trim_function(
38✔
131
        &self,
38✔
132
        expression_type: &BuilderExpressionType,
38✔
133
        expr: &Expr,
38✔
134
        trim_where: &Option<TrimWhereField>,
38✔
135
        trim_what: &Option<Box<Expr>>,
38✔
136
        schema: &Schema,
38✔
137
    ) -> Result<(Box<Expression>, bool), PipelineError> {
38✔
138
        let arg = self.parse_sql_expression(expression_type, expr, schema)?.0;
38✔
139
        let what = match trim_what {
38✔
140
            Some(e) => Some(self.parse_sql_expression(expression_type, e, schema)?.0),
8✔
141
            _ => None,
30✔
142
        };
143
        let typ = trim_where.as_ref().map(|e| match e {
38✔
144
            TrimWhereField::Both => TrimType::Both,
2✔
145
            TrimWhereField::Leading => TrimType::Leading,
2✔
146
            TrimWhereField::Trailing => TrimType::Trailing,
2✔
147
        });
38✔
148
        Ok((Box::new(Expression::Trim { arg, what, typ }), false))
38✔
149
    }
38✔
150

151
    fn parse_sql_function(
14✔
152
        &self,
14✔
153
        expression_type: &BuilderExpressionType,
14✔
154
        sql_function: &Function,
14✔
155
        schema: &Schema,
14✔
156
        expression: &SqlExpr,
14✔
157
    ) -> Result<(Box<Expression>, bool), PipelineError> {
14✔
158
        let name = sql_function.name.to_string().to_lowercase();
14✔
159
        if let Ok(function) = ScalarFunctionType::new(&name) {
14✔
160
            let mut arg_exprs = vec![];
13✔
161
            for arg in &sql_function.args {
31✔
162
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
18✔
163
                match r {
18✔
164
                    Ok(result) => {
18✔
165
                        if result.1 {
18✔
166
                            return Ok(result);
×
167
                        } else {
18✔
168
                            arg_exprs.push(*result.0);
18✔
169
                        }
18✔
170
                    }
171
                    Err(error) => {
×
172
                        return Err(error);
×
173
                    }
174
                }
175
            }
176

177
            return Ok((
13✔
178
                Box::new(ScalarFunction {
13✔
179
                    fun: function,
13✔
180
                    args: arg_exprs,
13✔
181
                }),
13✔
182
                false,
13✔
183
            ));
13✔
184
        };
1✔
185
        if AggregateFunctionType::new(&name).is_ok() {
1✔
186
            let arg = sql_function.args.first().unwrap();
1✔
187
            let r = self.parse_sql_function_arg(expression_type, arg, schema)?;
1✔
188
            return Ok((r.0, false)); // switch bypass to true, since the argument of this Aggregation must be the final result
1✔
189
        };
×
190
        Err(InvalidExpression(format!("{:?}", expression)))
×
191
    }
14✔
192

193
    fn parse_sql_function_pre_aggregation(
84✔
194
        &self,
84✔
195
        expression_type: &BuilderExpressionType,
84✔
196
        sql_function: &Function,
84✔
197
        schema: &Schema,
84✔
198
        expression: &SqlExpr,
84✔
199
    ) -> Result<(Box<Expression>, bool), PipelineError> {
84✔
200
        let name = sql_function.name.to_string().to_lowercase();
84✔
201

202
        if let Ok(function) = ScalarFunctionType::new(&name) {
84✔
203
            let mut arg_exprs = vec![];
×
204
            for arg in &sql_function.args {
×
205
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
×
206
                match r {
×
207
                    Ok(result) => {
×
208
                        if result.1 {
×
209
                            return Ok(result);
×
210
                        } else {
×
211
                            arg_exprs.push(*result.0);
×
212
                        }
×
213
                    }
214
                    Err(error) => {
×
215
                        return Err(error);
×
216
                    }
217
                }
218
            }
219

220
            return Ok((
×
221
                Box::new(ScalarFunction {
×
222
                    fun: function,
×
223
                    args: arg_exprs,
×
224
                }),
×
225
                false,
×
226
            ));
×
227
        };
84✔
228
        if AggregateFunctionType::new(&name).is_ok() {
84✔
229
            let arg = sql_function.args.first().unwrap();
84✔
230
            let r = self.parse_sql_function_arg(expression_type, arg, schema)?;
84✔
231
            return Ok((r.0, true)); // switch bypass to true, since the argument of this Aggregation must be the final result
84✔
232
        };
×
233
        Err(InvalidExpression(format!("{:?}", expression)))
×
234
    }
84✔
235

236
    fn parse_sql_function_aggregation(
84✔
237
        &self,
84✔
238
        expression_type: &BuilderExpressionType,
84✔
239
        sql_function: &Function,
84✔
240
        schema: &Schema,
84✔
241
        expression: &SqlExpr,
84✔
242
    ) -> Result<(Box<Expression>, bool), PipelineError> {
84✔
243
        let name = sql_function.name.to_string().to_lowercase();
84✔
244

245
        if let Ok(function) = ScalarFunctionType::new(&name) {
84✔
246
            let mut arg_exprs = vec![];
×
247
            for arg in &sql_function.args {
×
248
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
×
249
                match r {
×
250
                    Ok(result) => {
×
251
                        if result.1 {
×
252
                            return Ok(result);
×
253
                        } else {
×
254
                            arg_exprs.push(*result.0);
×
255
                        }
×
256
                    }
257
                    Err(error) => {
×
258
                        return Err(error);
×
259
                    }
260
                }
261
            }
262

263
            return Ok((
×
264
                Box::new(ScalarFunction {
×
265
                    fun: function,
×
266
                    args: arg_exprs,
×
267
                }),
×
268
                false,
×
269
            ));
×
270
        };
84✔
271

272
        if let Ok(function) = AggregateFunctionType::new(&name) {
84✔
273
            let mut arg_exprs = vec![];
84✔
274
            for arg in &sql_function.args {
168✔
275
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
84✔
276
                match r {
84✔
277
                    Ok(result) => {
84✔
278
                        if result.1 {
84✔
279
                            return Ok(result);
×
280
                        } else {
84✔
281
                            arg_exprs.push(*result.0);
84✔
282
                        }
84✔
283
                    }
284
                    Err(error) => {
×
285
                        return Err(error);
×
286
                    }
287
                }
288
            }
289

290
            return Ok((
84✔
291
                Box::new(Expression::AggregateFunction {
84✔
292
                    fun: function,
84✔
293
                    args: arg_exprs,
84✔
294
                }),
84✔
295
                true, // switch bypass to true, since this Aggregation must be the final result
84✔
296
            ));
84✔
297
        };
×
298

×
299
        Err(InvalidExpression(format!(
×
300
            "Unsupported Expression: {:?}",
×
301
            expression
×
302
        )))
×
303
    }
84✔
304

305
    fn parse_sql_function_arg(
306
        &self,
307
        expression_type: &BuilderExpressionType,
308
        argument: &FunctionArg,
309
        schema: &Schema,
310
    ) -> Result<(Box<Expression>, bool), PipelineError> {
311
        match argument {
187✔
312
            FunctionArg::Named {
313
                name: _,
314
                arg: FunctionArgExpr::Expr(arg),
×
315
            } => self.parse_sql_expression(expression_type, arg, schema),
×
316
            FunctionArg::Named {
317
                name: _,
318
                arg: FunctionArgExpr::Wildcard,
319
            } => Err(InvalidArgument(format!("{:?}", argument))),
×
320
            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
187✔
321
                self.parse_sql_expression(expression_type, arg, schema)
187✔
322
            }
323
            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
324
                Err(InvalidArgument(format!("{:?}", argument)))
×
325
            }
326
            _ => Err(InvalidArgument(format!("{:?}", argument))),
×
327
        }
328
    }
187✔
329

330
    fn parse_sql_unary_op(
×
331
        &self,
×
332
        expression_type: &BuilderExpressionType,
×
333
        op: &SqlUnaryOperator,
×
334
        expr: &SqlExpr,
×
335
        schema: &Schema,
×
336
    ) -> Result<(Box<Expression>, Bypass), PipelineError> {
×
337
        let (arg, bypass) = self.parse_sql_expression(expression_type, expr, schema)?;
×
338
        if bypass {
×
339
            return Ok((arg, bypass));
×
340
        }
×
341

342
        let operator = match op {
×
343
            SqlUnaryOperator::Not => UnaryOperatorType::Not,
×
344
            SqlUnaryOperator::Plus => UnaryOperatorType::Plus,
×
345
            SqlUnaryOperator::Minus => UnaryOperatorType::Minus,
×
346
            _ => return Err(InvalidOperator(format!("{:?}", op))),
×
347
        };
348

349
        Ok((Box::new(Expression::UnaryOperator { operator, arg }), false))
×
350
    }
×
351

352
    fn parse_sql_binary_op(
79✔
353
        &self,
79✔
354
        expression_type: &BuilderExpressionType,
79✔
355
        left: &SqlExpr,
79✔
356
        op: &SqlBinaryOperator,
79✔
357
        right: &SqlExpr,
79✔
358
        schema: &Schema,
79✔
359
    ) -> Result<(Box<Expression>, bool), PipelineError> {
79✔
360
        let (left_op, bypass_left) = self.parse_sql_expression(expression_type, left, schema)?;
79✔
361
        if bypass_left {
79✔
362
            return Ok((left_op, bypass_left));
×
363
        }
79✔
364
        let (right_op, bypass_right) = self.parse_sql_expression(expression_type, right, schema)?;
79✔
365
        if bypass_right {
79✔
366
            return Ok((right_op, bypass_right));
×
367
        }
79✔
368

369
        let operator = match op {
79✔
370
            SqlBinaryOperator::Gt => BinaryOperatorType::Gt,
18✔
371
            SqlBinaryOperator::GtEq => BinaryOperatorType::Gte,
1✔
372
            SqlBinaryOperator::Lt => BinaryOperatorType::Lt,
12✔
373
            SqlBinaryOperator::LtEq => BinaryOperatorType::Lte,
12✔
374
            SqlBinaryOperator::Eq => BinaryOperatorType::Eq,
18✔
375
            SqlBinaryOperator::NotEq => BinaryOperatorType::Ne,
×
376

377
            SqlBinaryOperator::Plus => BinaryOperatorType::Add,
×
378
            SqlBinaryOperator::Minus => BinaryOperatorType::Sub,
×
379
            SqlBinaryOperator::Multiply => BinaryOperatorType::Mul,
×
380
            SqlBinaryOperator::Divide => BinaryOperatorType::Div,
×
381
            SqlBinaryOperator::Modulo => BinaryOperatorType::Mod,
×
382

383
            SqlBinaryOperator::And => BinaryOperatorType::And,
12✔
384
            SqlBinaryOperator::Or => BinaryOperatorType::Or,
6✔
385

386
            // BinaryOperator::BitwiseAnd => ...
387
            // BinaryOperator::BitwiseOr => ...
388
            // BinaryOperator::StringConcat => ...
389
            _ => return Err(InvalidOperator(format!("{:?}", op))),
×
390
        };
391

392
        Ok((
79✔
393
            Box::new(Expression::BinaryOperator {
79✔
394
                left: left_op,
79✔
395
                operator,
79✔
396
                right: right_op,
79✔
397
            }),
79✔
398
            false,
79✔
399
        ))
79✔
400
    }
79✔
401

402
    fn parse_sql_number(&self, n: &str) -> Result<(Box<Expression>, Bypass), PipelineError> {
43✔
403
        match n.parse::<i64>() {
43✔
404
            Ok(n) => Ok((Box::new(Expression::Literal(Field::Int(n))), false)),
43✔
405
            Err(_) => match n.parse::<f64>() {
×
406
                Ok(f) => Ok((
×
407
                    Box::new(Expression::Literal(Field::Float(OrderedFloat(f)))),
×
408
                    false,
×
409
                )),
×
410
                Err(_) => Err(InvalidValue(n.to_string())),
×
411
            },
412
        }
413
    }
43✔
414

415
    fn parse_sql_like_operator(
×
416
        &self,
×
417
        expression_type: &BuilderExpressionType,
×
418
        negated: &bool,
×
419
        expr: &Expr,
×
420
        pattern: &Expr,
×
421
        escape_char: &Option<char>,
×
422
        schema: &Schema,
×
423
    ) -> Result<(Box<Expression>, bool), PipelineError> {
×
424
        let arg = self.parse_sql_expression(expression_type, expr, schema)?;
×
425
        let pattern = self.parse_sql_expression(expression_type, pattern, schema)?;
×
426
        let like_expression = Box::new(Expression::Like {
×
427
            arg: arg.0,
×
428
            pattern: pattern.0,
×
429
            escape: *escape_char,
×
430
        });
×
431
        if *negated {
×
432
            Ok((
×
433
                Box::new(Expression::UnaryOperator {
×
434
                    operator: UnaryOperatorType::Not,
×
435
                    arg: like_expression,
×
436
                }),
×
437
                arg.1,
×
438
            ))
×
439
        } else {
440
            Ok((like_expression, arg.1))
×
441
        }
442
    }
×
443

444
    fn parse_sql_cast_operator(
64✔
445
        &self,
64✔
446
        expression_type: &BuilderExpressionType,
64✔
447
        expr: &Expr,
64✔
448
        data_type: &DataType,
64✔
449
        schema: &Schema,
64✔
450
    ) -> Result<(Box<Expression>, bool), PipelineError> {
64✔
451
        let expression = self.parse_sql_expression(expression_type, expr, schema)?;
64✔
452
        let cast_to = match data_type {
64✔
453
            DataType::Decimal(_) => CastOperatorType::Decimal,
×
454
            DataType::Binary(_) => CastOperatorType::Binary,
×
455
            DataType::Float(_) => CastOperatorType::Float,
10✔
456
            DataType::Int(_) => CastOperatorType::Int,
6✔
457
            DataType::Integer(_) => CastOperatorType::Int,
×
458
            DataType::UnsignedInt(_) => CastOperatorType::UInt,
×
459
            DataType::UnsignedInteger(_) => CastOperatorType::UInt,
×
460
            DataType::Boolean => CastOperatorType::Boolean,
12✔
461
            DataType::Date => CastOperatorType::Date,
×
462
            DataType::Timestamp(..) => CastOperatorType::Timestamp,
×
463
            DataType::Text => CastOperatorType::Text,
18✔
464
            DataType::String => CastOperatorType::String,
18✔
465
            DataType::Custom(name, ..) => {
×
466
                if name.to_string().to_lowercase() == "bson" {
×
467
                    CastOperatorType::Bson
×
468
                } else {
469
                    Err(PipelineError::InvalidFunction(format!(
×
470
                        "Unsupported Cast type {}",
×
471
                        name
×
472
                    )))?
×
473
                }
474
            }
475
            _ => Err(PipelineError::InvalidFunction(format!(
×
476
                "Unsupported Cast type {}",
×
477
                data_type
×
478
            )))?,
×
479
        };
480
        Ok((
64✔
481
            Box::new(Expression::Cast {
64✔
482
                arg: expression.0,
64✔
483
                typ: cast_to,
64✔
484
            }),
64✔
485
            expression.1,
64✔
486
        ))
64✔
487
    }
64✔
488
}
489

490
pub fn fullname_from_ident(ident: &[Ident]) -> String {
1,280✔
491
    let mut ident_tokens = vec![];
1,280✔
492
    for token in ident.iter() {
1,282✔
493
        ident_tokens.push(token.value.clone());
1,282✔
494
    }
1,282✔
495
    ident_tokens.join(".")
1,280✔
496
}
1,280✔
497

498
pub fn get_field_index(ident: &[Ident], schema: &Schema) -> Result<usize, PipelineError> {
1,270✔
499
    let full_ident = fullname_from_ident(ident);
1,270✔
500

1,270✔
501
    let mut field_index: Option<usize> = None;
1,270✔
502

503
    for (index, field) in schema.fields.iter().enumerate() {
4,762✔
504
        if compare_name(field.name.clone(), full_ident.clone()) {
4,762✔
505
            if field_index.is_some() {
1,271✔
506
                return Err(PipelineError::InvalidQuery(format!(
1✔
507
                    "Ambiguous Field {}",
1✔
508
                    full_ident
1✔
509
                )));
1✔
510
            } else {
1,270✔
511
                field_index = Some(index);
1,270✔
512
            }
1,270✔
513
        }
3,491✔
514
    }
515
    if let Some(index) = field_index {
1,269✔
516
        Ok(index)
1,269✔
517
    } else {
518
        Err(PipelineError::JoinError(JoinError::FieldError(full_ident)))
×
519
    }
×
520
}
1,270✔
521

×
522
pub(crate) fn compare_name(name: String, ident: String) -> bool {
4,773✔
523
    let left = name.split('.').collect::<Vec<&str>>();
4,773✔
524
    let right = ident.split('.').collect::<Vec<&str>>();
4,773✔
525

4,773✔
526
    let left_len = left.len();
4,773✔
527
    let right_len = right.len();
4,773✔
528

4,773✔
529
    let shorter = cmp::min(left_len, right_len);
4,773✔
530
    let mut is_equal = false;
4,773✔
531
    for i in 1..shorter + 1 {
4,784✔
532
        if left[left_len - i] == right[right_len - i] {
4,784✔
533
            is_equal = true;
1,289✔
534
        } else {
1,289✔
535
            is_equal = false;
3,495✔
536
            break;
3,495✔
537
        }
×
538
    }
×
539

×
540
    is_equal
4,773✔
541
}
4,773✔
542

543
fn parse_sql_string(s: &str) -> Result<(Box<Expression>, bool), PipelineError> {
26✔
544
    Ok((
26✔
545
        Box::new(Expression::Literal(Field::String(s.to_owned()))),
26✔
546
        false,
26✔
547
    ))
26✔
548
}
26✔
549

×
550
pub(crate) fn normalize_ident(id: &Ident) -> String {
95✔
551
    match id.quote_style {
95✔
552
        Some(_) => id.value.clone(),
×
553
        None => id.value.clone(),
95✔
554
    }
×
555
}
95✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc