• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283961027

pending completion
4283961027

push

github

GitHub
feat: Blue green cache (#1061)

645 of 645 new or added lines in 45 files covered. (100.0%)

27779 of 39307 relevant lines covered (70.67%)

52489.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.43
/dozer-sql/src/pipeline/expression/builder.rs
1
use dozer_types::{
2
    ordered_float::OrderedFloat,
3
    types::{Field, FieldDefinition, Schema, SourceDefinition},
4
};
5
use sqlparser::ast::{
6
    BinaryOperator as SqlBinaryOperator, DataType, Expr as SqlExpr, Expr, Function, FunctionArg,
7
    FunctionArgExpr, Ident, TrimWhereField, UnaryOperator as SqlUnaryOperator, Value as SqlValue,
8
};
9

10
use crate::pipeline::errors::PipelineError::{
11
    InvalidArgument, InvalidExpression, InvalidNestedAggregationFunction, InvalidOperator,
12
    InvalidValue,
13
};
14
use crate::pipeline::errors::{PipelineError, SqlError};
15
use crate::pipeline::expression::aggregate::AggregateFunctionType;
16

17
use crate::pipeline::expression::execution::Expression;
18
use crate::pipeline::expression::execution::Expression::{GeoFunction, ScalarFunction};
19
use crate::pipeline::expression::geo::common::GeoFunctionType;
20
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
21
use crate::pipeline::expression::scalar::common::ScalarFunctionType;
22
use crate::pipeline::expression::scalar::string::TrimType;
23

24
use super::cast::CastOperatorType;
25

26
#[derive(Clone, PartialEq, Debug)]
9✔
27
pub struct ExpressionBuilder {
28
    // Must be an aggregation function
29
    pub aggregations: Vec<Expression>,
30
    pub offset: usize,
31
}
32

33
impl ExpressionBuilder {
34
    pub fn new(offset: usize) -> Self {
2,983✔
35
        Self {
2,983✔
36
            aggregations: Vec::new(),
2,983✔
37
            offset,
2,983✔
38
        }
2,983✔
39
    }
2,983✔
40

41
    pub fn from(offset: usize, aggregations: Vec<Expression>) -> Self {
1✔
42
        Self {
1✔
43
            aggregations,
1✔
44
            offset,
1✔
45
        }
1✔
46
    }
1✔
47

48
    pub fn build(
2,867✔
49
        &mut self,
2,867✔
50
        parse_aggregations: bool,
2,867✔
51
        sql_expression: &SqlExpr,
2,867✔
52
        schema: &Schema,
2,867✔
53
    ) -> Result<Expression, PipelineError> {
2,867✔
54
        self.parse_sql_expression(parse_aggregations, sql_expression, schema)
2,867✔
55
    }
2,867✔
56

57
    pub(crate) fn parse_sql_expression(
58
        &mut self,
59
        parse_aggregations: bool,
60
        expression: &SqlExpr,
61
        schema: &Schema,
62
    ) -> Result<Expression, PipelineError> {
63
        match expression {
158✔
64
            SqlExpr::Trim {
65
                expr,
44✔
66
                trim_where,
44✔
67
                trim_what,
44✔
68
            } => self.parse_sql_trim_function(
44✔
69
                parse_aggregations,
44✔
70
                expr,
44✔
71
                trim_where,
44✔
72
                trim_what,
44✔
73
                schema,
44✔
74
            ),
44✔
75
            SqlExpr::Identifier(ident) => Self::parse_sql_column(&[ident.clone()], schema),
2,727✔
76
            SqlExpr::CompoundIdentifier(ident) => Self::parse_sql_column(ident, schema),
335✔
77
            SqlExpr::Value(SqlValue::Number(n, _)) => Self::parse_sql_number(n),
80✔
78
            SqlExpr::Value(SqlValue::Null) => Ok(Expression::Literal(Field::Null)),
×
79
            SqlExpr::Value(SqlValue::SingleQuotedString(s) | SqlValue::DoubleQuotedString(s)) => {
78✔
80
                Self::parse_sql_string(s)
78✔
81
            }
82
            SqlExpr::UnaryOp { expr, op } => {
×
83
                self.parse_sql_unary_op(parse_aggregations, op, expr, schema)
×
84
            }
85
            SqlExpr::BinaryOp { left, op, right } => {
136✔
86
                self.parse_sql_binary_op(parse_aggregations, left, op, right, schema)
136✔
87
            }
88
            SqlExpr::Nested(expr) => self.parse_sql_expression(parse_aggregations, expr, schema),
21✔
89
            SqlExpr::Function(sql_function) => {
335✔
90
                self.parse_sql_function(parse_aggregations, sql_function, schema)
335✔
91
            }
92
            SqlExpr::Like {
93
                negated,
×
94
                expr,
×
95
                pattern,
×
96
                escape_char,
×
97
            } => self.parse_sql_like_operator(
×
98
                parse_aggregations,
×
99
                negated,
×
100
                expr,
×
101
                pattern,
×
102
                escape_char,
×
103
                schema,
×
104
            ),
×
105
            SqlExpr::Cast { expr, data_type } => {
65✔
106
                self.parse_sql_cast_operator(parse_aggregations, expr, data_type, schema)
65✔
107
            }
108
            _ => Err(InvalidExpression(format!("{expression:?}"))),
×
109
        }
110
    }
3,821✔
111

112
    fn parse_sql_column(ident: &[Ident], schema: &Schema) -> Result<Expression, PipelineError> {
3,062✔
113
        let (src_field, src_table_or_alias, src_connection) = match ident.len() {
3,062✔
114
            1 => (&ident[0].value, None, None),
2,727✔
115
            2 => (&ident[1].value, Some(&ident[0].value), None),
334✔
116
            3 => (
1✔
117
                &ident[2].value,
1✔
118
                Some(&ident[1].value),
1✔
119
                Some(&ident[0].value),
1✔
120
            ),
1✔
121
            _ => {
122
                return Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
123
                    ident
×
124
                        .iter()
×
125
                        .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
126
                )));
×
127
            }
128
        };
129

130
        let matching_by_field: Vec<(usize, &FieldDefinition)> = schema
3,062✔
131
            .fields
3,062✔
132
            .iter()
3,062✔
133
            .enumerate()
3,062✔
134
            .filter(|(_idx, f)| &f.name == src_field)
13,036✔
135
            .collect();
3,062✔
136

3,062✔
137
        match matching_by_field.len() {
3,062✔
138
            1 => Ok(Expression::Column {
2,912✔
139
                index: matching_by_field[0].0,
2,912✔
140
            }),
2,912✔
141
            _ => match src_table_or_alias {
150✔
142
                None => Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
143
                    ident
×
144
                        .iter()
×
145
                        .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
146
                ))),
×
147
                Some(src_table_or_alias) => {
150✔
148
                    let matching_by_table_or_alias: Vec<(usize, &FieldDefinition)> =
150✔
149
                        matching_by_field
150✔
150
                            .into_iter()
150✔
151
                            .filter(|(_idx, field)| match &field.source {
300✔
152
                                SourceDefinition::Alias { name } => name == src_table_or_alias,
240✔
153
                                SourceDefinition::Table {
154
                                    name,
60✔
155
                                    connection: _,
60✔
156
                                } => name == src_table_or_alias,
60✔
157
                                _ => false,
×
158
                            })
300✔
159
                            .collect();
150✔
160

150✔
161
                    match matching_by_table_or_alias.len() {
150✔
162
                        1 => Ok(Expression::Column {
150✔
163
                            index: matching_by_table_or_alias[0].0,
150✔
164
                        }),
150✔
165
                        _ => match src_connection {
×
166
                            None => Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
167
                                ident
×
168
                                    .iter()
×
169
                                    .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
170
                            ))),
×
171
                            Some(src_connection) => {
×
172
                                let matching_by_connection: Vec<(usize, &FieldDefinition)> =
×
173
                                    matching_by_table_or_alias
×
174
                                        .into_iter()
×
175
                                        .filter(|(_idx, field)| match &field.source {
×
176
                                            SourceDefinition::Table {
177
                                                name: _,
178
                                                connection,
×
179
                                            } => connection == src_connection,
×
180
                                            _ => false,
×
181
                                        })
×
182
                                        .collect();
×
183

×
184
                                match matching_by_connection.len() {
×
185
                                    1 => Ok(Expression::Column {
×
186
                                        index: matching_by_connection[0].0,
×
187
                                    }),
×
188
                                    _ => Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
189
                                        ident
×
190
                                            .iter()
×
191
                                            .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
192
                                    ))),
×
193
                                }
194
                            }
195
                        },
196
                    }
197
                }
198
            },
199
        }
200
    }
3,062✔
201

202
    fn parse_sql_trim_function(
44✔
203
        &mut self,
44✔
204
        parse_aggregations: bool,
44✔
205
        expr: &Expr,
44✔
206
        trim_where: &Option<TrimWhereField>,
44✔
207
        trim_what: &Option<Box<Expr>>,
44✔
208
        schema: &Schema,
44✔
209
    ) -> Result<Expression, PipelineError> {
44✔
210
        let arg = Box::new(self.parse_sql_expression(parse_aggregations, expr, schema)?);
44✔
211
        let what = match trim_what {
44✔
212
            Some(e) => Some(Box::new(self.parse_sql_expression(
8✔
213
                parse_aggregations,
8✔
214
                e,
8✔
215
                schema,
8✔
216
            )?)),
8✔
217
            _ => None,
36✔
218
        };
219
        let typ = trim_where.as_ref().map(|e| match e {
44✔
220
            TrimWhereField::Both => TrimType::Both,
2✔
221
            TrimWhereField::Leading => TrimType::Leading,
2✔
222
            TrimWhereField::Trailing => TrimType::Trailing,
2✔
223
        });
44✔
224
        Ok(Expression::Trim { arg, what, typ })
44✔
225
    }
44✔
226

227
    fn parse_sql_function(
335✔
228
        &mut self,
335✔
229
        parse_aggregations: bool,
335✔
230
        sql_function: &Function,
335✔
231
        schema: &Schema,
335✔
232
    ) -> Result<Expression, PipelineError> {
335✔
233
        let function_name = sql_function.name.to_string().to_lowercase();
335✔
234

335✔
235
        #[cfg(feature = "python")]
335✔
236
        if function_name.starts_with("py_") {
335✔
237
            // The function is from python udf.
238
            let udf_name = function_name.strip_prefix("py_").unwrap();
30✔
239
            return self.parse_python_udf(udf_name, sql_function, schema);
30✔
240
        }
305✔
241

305✔
242
        match (
305✔
243
            AggregateFunctionType::new(function_name.as_str()),
305✔
244
            parse_aggregations,
305✔
245
        ) {
305✔
246
            (Ok(aggr), true) => {
257✔
247
                let mut arg_expr: Vec<Expression> = Vec::new();
257✔
248
                for arg in &sql_function.args {
514✔
249
                    let aggregation = self.parse_sql_function_arg(true, arg, schema)?;
257✔
250
                    arg_expr.push(aggregation);
257✔
251
                }
252
                let measure = Expression::AggregateFunction {
257✔
253
                    fun: aggr,
257✔
254
                    args: arg_expr,
257✔
255
                };
257✔
256
                let index = match self
257✔
257
                    .aggregations
257✔
258
                    .iter()
257✔
259
                    .enumerate()
257✔
260
                    .find(|e| e.1 == &measure)
257✔
261
                {
262
                    Some((index, _existing)) => index,
2✔
263
                    _ => {
264
                        self.aggregations.push(measure);
254✔
265
                        self.aggregations.len() - 1
254✔
266
                    }
267
                };
268
                Ok(Expression::Column {
256✔
269
                    index: self.offset + index,
256✔
270
                })
256✔
271
            }
272
            (Ok(_agg), false) => Err(InvalidNestedAggregationFunction(function_name)),
×
273
            (Err(_), _) => {
274
                let mut function_args: Vec<Expression> = Vec::new();
48✔
275
                for arg in &sql_function.args {
143✔
276
                    function_args.push(self.parse_sql_function_arg(
95✔
277
                        parse_aggregations,
95✔
278
                        arg,
95✔
279
                        schema,
95✔
280
                    )?);
95✔
281
                }
282

283
                ScalarFunctionType::new(function_name.as_str()).map_or_else(
48✔
284
                    |_e| {
48✔
285
                        let f = GeoFunctionType::new(function_name.as_str())?;
14✔
286
                        Ok(GeoFunction {
14✔
287
                            fun: f,
14✔
288
                            args: function_args.clone(),
14✔
289
                        })
14✔
290
                    },
48✔
291
                    |f| {
48✔
292
                        Ok(ScalarFunction {
34✔
293
                            fun: f,
34✔
294
                            args: function_args.clone(),
34✔
295
                        })
34✔
296
                    },
48✔
297
                )
48✔
298
            }
299
        }
300
    }
334✔
301

302
    fn parse_sql_function_arg(
303
        &mut self,
304
        parse_aggregations: bool,
305
        argument: &FunctionArg,
306
        schema: &Schema,
307
    ) -> Result<Expression, PipelineError> {
308
        match argument {
427✔
309
            FunctionArg::Named {
310
                name: _,
311
                arg: FunctionArgExpr::Expr(arg),
×
312
            } => self.parse_sql_expression(parse_aggregations, arg, schema),
×
313
            FunctionArg::Named {
314
                name: _,
315
                arg: FunctionArgExpr::Wildcard,
316
            } => Err(InvalidArgument(format!("{argument:?}"))),
×
317
            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
427✔
318
                self.parse_sql_expression(parse_aggregations, arg, schema)
427✔
319
            }
320
            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
321
                Err(InvalidArgument(format!("{argument:?}")))
×
322
            }
323
            FunctionArg::Named {
324
                name: _,
325
                arg: FunctionArgExpr::QualifiedWildcard(_),
326
            } => Err(InvalidArgument(format!("{argument:?}"))),
×
327
            FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
328
                Err(InvalidArgument(format!("{argument:?}")))
×
329
            }
330
        }
331
    }
427✔
332

333
    fn parse_sql_unary_op(
×
334
        &mut self,
×
335
        parse_aggregations: bool,
×
336
        op: &SqlUnaryOperator,
×
337
        expr: &SqlExpr,
×
338
        schema: &Schema,
×
339
    ) -> Result<Expression, PipelineError> {
×
340
        let arg = Box::new(self.parse_sql_expression(parse_aggregations, expr, schema)?);
×
341
        let operator = match op {
×
342
            SqlUnaryOperator::Not => UnaryOperatorType::Not,
×
343
            SqlUnaryOperator::Plus => UnaryOperatorType::Plus,
×
344
            SqlUnaryOperator::Minus => UnaryOperatorType::Minus,
×
345
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
346
        };
347

348
        Ok(Expression::UnaryOperator { operator, arg })
×
349
    }
×
350

351
    fn parse_sql_binary_op(
136✔
352
        &mut self,
136✔
353
        parse_aggregations: bool,
136✔
354
        left: &SqlExpr,
136✔
355
        op: &SqlBinaryOperator,
136✔
356
        right: &SqlExpr,
136✔
357
        schema: &Schema,
136✔
358
    ) -> Result<Expression, PipelineError> {
136✔
359
        let left_op = self.parse_sql_expression(parse_aggregations, left, schema)?;
136✔
360
        let right_op = self.parse_sql_expression(parse_aggregations, right, schema)?;
136✔
361

362
        let operator = match op {
136✔
363
            SqlBinaryOperator::Gt => BinaryOperatorType::Gt,
31✔
364
            SqlBinaryOperator::GtEq => BinaryOperatorType::Gte,
1✔
365
            SqlBinaryOperator::Lt => BinaryOperatorType::Lt,
20✔
366
            SqlBinaryOperator::LtEq => BinaryOperatorType::Lte,
20✔
367
            SqlBinaryOperator::Eq => BinaryOperatorType::Eq,
30✔
368
            SqlBinaryOperator::NotEq => BinaryOperatorType::Ne,
×
369
            SqlBinaryOperator::Plus => BinaryOperatorType::Add,
4✔
370
            SqlBinaryOperator::Minus => BinaryOperatorType::Sub,
×
371
            SqlBinaryOperator::Multiply => BinaryOperatorType::Mul,
×
372
            SqlBinaryOperator::Divide => BinaryOperatorType::Div,
×
373
            SqlBinaryOperator::Modulo => BinaryOperatorType::Mod,
×
374
            SqlBinaryOperator::And => BinaryOperatorType::And,
20✔
375
            SqlBinaryOperator::Or => BinaryOperatorType::Or,
10✔
376
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
377
        };
378

379
        Ok(Expression::BinaryOperator {
136✔
380
            left: Box::new(left_op),
136✔
381
            operator,
136✔
382
            right: Box::new(right_op),
136✔
383
        })
136✔
384
    }
136✔
385

386
    fn parse_sql_number(n: &str) -> Result<Expression, PipelineError> {
80✔
387
        match n.parse::<i64>() {
80✔
388
            Ok(n) => Ok(Expression::Literal(Field::Int(n))),
80✔
389
            Err(_) => match n.parse::<f64>() {
×
390
                Ok(f) => Ok(Expression::Literal(Field::Float(OrderedFloat(f)))),
×
391
                Err(_) => Err(InvalidValue(n.to_string())),
×
392
            },
393
        }
394
    }
80✔
395

396
    fn parse_sql_like_operator(
×
397
        &mut self,
×
398
        parse_aggregations: bool,
×
399
        negated: &bool,
×
400
        expr: &Expr,
×
401
        pattern: &Expr,
×
402
        escape_char: &Option<char>,
×
403
        schema: &Schema,
×
404
    ) -> Result<Expression, PipelineError> {
×
405
        let arg = self.parse_sql_expression(parse_aggregations, expr, schema)?;
×
406
        let pattern = self.parse_sql_expression(parse_aggregations, pattern, schema)?;
×
407
        let like_expression = Expression::Like {
×
408
            arg: Box::new(arg),
×
409
            pattern: Box::new(pattern),
×
410
            escape: *escape_char,
×
411
        };
×
412
        if *negated {
×
413
            Ok(Expression::UnaryOperator {
×
414
                operator: UnaryOperatorType::Not,
×
415
                arg: Box::new(like_expression),
×
416
            })
×
417
        } else {
418
            Ok(like_expression)
×
419
        }
420
    }
×
421

422
    fn parse_sql_cast_operator(
65✔
423
        &mut self,
65✔
424
        parse_aggregations: bool,
65✔
425
        expr: &Expr,
65✔
426
        data_type: &DataType,
65✔
427
        schema: &Schema,
65✔
428
    ) -> Result<Expression, PipelineError> {
65✔
429
        let expression = self.parse_sql_expression(parse_aggregations, expr, schema)?;
65✔
430
        let cast_to = match data_type {
65✔
431
            DataType::Decimal(_) => CastOperatorType::Decimal,
×
432
            DataType::Binary(_) => CastOperatorType::Binary,
×
433
            DataType::Float(_) => CastOperatorType::Float,
10✔
434
            DataType::Int(_) => CastOperatorType::Int,
6✔
435
            DataType::Integer(_) => CastOperatorType::Int,
×
436
            DataType::UnsignedInt(_) => CastOperatorType::UInt,
×
437
            DataType::UnsignedInteger(_) => CastOperatorType::UInt,
×
438
            DataType::Boolean => CastOperatorType::Boolean,
12✔
439
            DataType::Date => CastOperatorType::Date,
×
440
            DataType::Timestamp(..) => CastOperatorType::Timestamp,
×
441
            DataType::Text => CastOperatorType::Text,
18✔
442
            DataType::String => CastOperatorType::String,
19✔
443
            DataType::Custom(name, ..) => {
×
444
                if name.to_string().to_lowercase() == "bson" {
×
445
                    CastOperatorType::Bson
×
446
                } else {
447
                    Err(PipelineError::InvalidFunction(format!(
×
448
                        "Unsupported Cast type {name}"
×
449
                    )))?
×
450
                }
451
            }
452
            _ => Err(PipelineError::InvalidFunction(format!(
×
453
                "Unsupported Cast type {data_type}"
×
454
            )))?,
×
455
        };
456
        Ok(Expression::Cast {
65✔
457
            arg: Box::new(expression),
65✔
458
            typ: cast_to,
65✔
459
        })
65✔
460
    }
65✔
461

462
    fn parse_sql_string(s: &str) -> Result<Expression, PipelineError> {
78✔
463
        Ok(Expression::Literal(Field::String(s.to_owned())))
78✔
464
    }
78✔
465

466
    pub fn fullname_from_ident(ident: &[Ident]) -> String {
110✔
467
        let mut ident_tokens = vec![];
110✔
468
        for token in ident.iter() {
110✔
469
            ident_tokens.push(token.value.clone());
110✔
470
        }
110✔
471
        ident_tokens.join(".")
110✔
472
    }
110✔
473

474
    pub(crate) fn normalize_ident(id: &Ident) -> String {
391✔
475
        match id.quote_style {
391✔
476
            Some(_) => id.value.clone(),
×
477
            None => id.value.clone(),
391✔
478
        }
479
    }
391✔
480

481
    #[cfg(feature = "python")]
482
    fn parse_python_udf(
30✔
483
        &mut self,
30✔
484
        name: &str,
30✔
485
        function: &Function,
30✔
486
        schema: &Schema,
30✔
487
    ) -> Result<Expression, PipelineError> {
30✔
488
        // First, get python function define by name.
489
        // Then, transfer python function to Expression::PythonUDF
490

491
        use dozer_types::types::FieldType;
492
        use PipelineError::InvalidQuery;
493

494
        let args = function
30✔
495
            .args
30✔
496
            .iter()
30✔
497
            .map(|argument| self.parse_sql_function_arg(false, argument, schema))
75✔
498
            .collect::<Result<Vec<_>, PipelineError>>()?;
30✔
499

500
        let last_arg = args
30✔
501
            .last()
30✔
502
            .ok_or_else(|| InvalidQuery("Can't get python udf return type".to_string()))?;
30✔
503

504
        let return_type = match last_arg {
30✔
505
            Expression::Literal(Field::String(s)) => {
30✔
506
                FieldType::try_from(s.as_str()).map_err(|e| InvalidQuery(format!("Failed to parse Python UDF return type: {e}")))?
30✔
507
            }
508
            _ => return Err(InvalidArgument("The last arg for python udf should be a string literal, which represents return type".to_string())),
×
509
        };
510

511
        Ok(Expression::PythonUDF {
30✔
512
            name: name.to_string(),
30✔
513
            args,
30✔
514
            return_type,
30✔
515
        })
30✔
516
    }
30✔
517
}
518

519
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
5,304✔
520
pub struct NameOrAlias(pub String, pub Option<String>);
521

522
pub fn extend_schema_source_def(schema: &Schema, name: &NameOrAlias) -> Schema {
1,169✔
523
    let mut output_schema = schema.clone();
1,169✔
524
    let mut fields = vec![];
1,169✔
525
    for mut field in schema.clone().fields.into_iter() {
4,500✔
526
        if let Some(alias) = &name.1 {
4,500✔
527
            field.source = SourceDefinition::Alias {
1,290✔
528
                name: alias.to_string(),
1,290✔
529
            };
1,290✔
530
        }
3,210✔
531

532
        fields.push(field);
4,500✔
533
    }
534
    output_schema.fields = fields;
1,169✔
535

1,169✔
536
    output_schema
1,169✔
537
}
1,169✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc