• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5996155062

28 Aug 2023 05:54AM UTC coverage: 76.026% (+0.5%) from 75.575%
5996155062

push

github

web-flow
chore: setup MySQL and MariaDB services in the CI (#1920)

The MySQL connector should be tested against MariaDB as well, because it has been failing recently on MariaDB.

The switch to `docker run` to start the services from the nicer declarative service specification is because the latter doesn't have an option to specify the container command line arguments, which are needed to enable CDC on MariaDB.

47318 of 62239 relevant lines covered (76.03%)

49704.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.74
/dozer-sql/src/pipeline/expression/builder.rs
1
use dozer_types::{
2
    ordered_float::OrderedFloat,
3
    types::{Field, FieldDefinition, Schema, SourceDefinition},
4
};
5
use sqlparser::ast::{
6
    BinaryOperator as SqlBinaryOperator, DataType, DateTimeField, Expr as SqlExpr, Expr, Function,
7
    FunctionArg, FunctionArgExpr, Ident, Interval, TrimWhereField,
8
    UnaryOperator as SqlUnaryOperator, Value as SqlValue,
9
};
10

11
use crate::pipeline::errors::PipelineError::{
12
    InvalidArgument, InvalidExpression, InvalidFunction, InvalidNestedAggregationFunction,
13
    InvalidOperator, InvalidValue,
14
};
15
use crate::pipeline::errors::{PipelineError, SqlError};
16
use crate::pipeline::expression::aggregate::AggregateFunctionType;
17
use crate::pipeline::expression::conditional::ConditionalExpressionType;
18
use crate::pipeline::expression::datetime::DateTimeFunctionType;
19

20
use crate::pipeline::expression::execution::Expression;
21
use crate::pipeline::expression::execution::Expression::{
22
    ConditionalExpression, GeoFunction, Now, ScalarFunction,
23
};
24
use crate::pipeline::expression::geo::common::GeoFunctionType;
25
use crate::pipeline::expression::json_functions::JsonFunctionType;
26
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
27
use crate::pipeline::expression::scalar::common::ScalarFunctionType;
28
use crate::pipeline::expression::scalar::string::TrimType;
29

30
use super::cast::CastOperatorType;
31

32
#[derive(Clone, PartialEq, Debug)]
9✔
33
pub struct ExpressionBuilder {
34
    // Must be an aggregation function
35
    pub aggregations: Vec<Expression>,
36
    pub offset: usize,
37
}
38

39
impl ExpressionBuilder {
40
    pub fn new(offset: usize) -> Self {
6,227✔
41
        Self {
6,227✔
42
            aggregations: Vec::new(),
6,227✔
43
            offset,
6,227✔
44
        }
6,227✔
45
    }
6,227✔
46

47
    pub fn from(offset: usize, aggregations: Vec<Expression>) -> Self {
62✔
48
        Self {
62✔
49
            aggregations,
62✔
50
            offset,
62✔
51
        }
62✔
52
    }
62✔
53

54
    pub fn build(
3,968✔
55
        &mut self,
3,968✔
56
        parse_aggregations: bool,
3,968✔
57
        sql_expression: &SqlExpr,
3,968✔
58
        schema: &Schema,
3,968✔
59
    ) -> Result<Expression, PipelineError> {
3,968✔
60
        self.parse_sql_expression(parse_aggregations, sql_expression, schema)
3,968✔
61
    }
3,968✔
62

63
    pub(crate) fn parse_sql_expression(
64
        &mut self,
65
        parse_aggregations: bool,
66
        expression: &SqlExpr,
67
        schema: &Schema,
68
    ) -> Result<Expression, PipelineError> {
69
        match expression {
746✔
70
            SqlExpr::Trim {
71
                expr,
42✔
72
                trim_where,
42✔
73
                trim_what,
42✔
74
            } => self.parse_sql_trim_function(
42✔
75
                parse_aggregations,
42✔
76
                expr,
42✔
77
                trim_where,
42✔
78
                trim_what,
42✔
79
                schema,
42✔
80
            ),
42✔
81
            SqlExpr::Identifier(ident) => Self::parse_sql_column(&[ident.clone()], schema),
5,668✔
82
            SqlExpr::CompoundIdentifier(ident) => Self::parse_sql_column(ident, schema),
901✔
83
            SqlExpr::Value(SqlValue::Number(n, _)) => Self::parse_sql_number(n),
448✔
84
            SqlExpr::Value(SqlValue::Null) => Ok(Expression::Literal(Field::Null)),
×
85
            SqlExpr::Value(SqlValue::SingleQuotedString(s) | SqlValue::DoubleQuotedString(s)) => {
298✔
86
                Self::parse_sql_string(s)
298✔
87
            }
88
            SqlExpr::UnaryOp { expr, op } => {
×
89
                self.parse_sql_unary_op(parse_aggregations, op, expr, schema)
×
90
            }
91
            SqlExpr::BinaryOp { left, op, right } => {
440✔
92
                self.parse_sql_binary_op(parse_aggregations, left, op, right, schema)
440✔
93
            }
94
            SqlExpr::Nested(expr) => self.parse_sql_expression(parse_aggregations, expr, schema),
43✔
95
            SqlExpr::Function(sql_function) => {
2,757✔
96
                self.parse_sql_function(parse_aggregations, sql_function, schema)
2,757✔
97
            }
98
            SqlExpr::Like {
99
                negated,
14✔
100
                expr,
14✔
101
                pattern,
14✔
102
                escape_char,
14✔
103
            } => self.parse_sql_like_operator(
14✔
104
                parse_aggregations,
14✔
105
                negated,
14✔
106
                expr,
14✔
107
                pattern,
14✔
108
                escape_char,
14✔
109
                schema,
14✔
110
            ),
14✔
111
            SqlExpr::InList {
112
                expr,
16✔
113
                list,
16✔
114
                negated,
16✔
115
            } => self.parse_sql_in_list_operator(parse_aggregations, expr, list, *negated, schema),
16✔
116

117
            SqlExpr::Cast { expr, data_type } => {
122✔
118
                self.parse_sql_cast_operator(parse_aggregations, expr, data_type, schema)
122✔
119
            }
120
            SqlExpr::Extract { field, expr } => {
64✔
121
                self.parse_sql_extract_operator(parse_aggregations, field, expr, schema)
64✔
122
            }
123
            SqlExpr::Interval(Interval {
124
                value,
6✔
125
                leading_field,
6✔
126
                leading_precision: _,
6✔
127
                last_field: _,
6✔
128
                fractional_seconds_precision: _,
6✔
129
            }) => {
6✔
130
                self.parse_sql_interval_expression(parse_aggregations, value, leading_field, schema)
6✔
131
            }
132
            SqlExpr::Case {
133
                operand,
6✔
134
                conditions,
6✔
135
                results,
6✔
136
                else_result,
6✔
137
            } => self.parse_sql_case_expression(
6✔
138
                parse_aggregations,
6✔
139
                operand,
6✔
140
                conditions,
6✔
141
                results,
6✔
142
                else_result,
6✔
143
                schema,
6✔
144
            ),
6✔
145
            _ => Err(InvalidExpression(format!("{expression:?}"))),
×
146
        }
147
    }
10,825✔
148

149
    fn parse_sql_column(ident: &[Ident], schema: &Schema) -> Result<Expression, PipelineError> {
6,569✔
150
        let (src_field, src_table_or_alias, src_connection) = match ident.len() {
6,569✔
151
            1 => (&ident[0].value, None, None),
5,668✔
152
            2 => (&ident[1].value, Some(&ident[0].value), None),
900✔
153
            3 => (
1✔
154
                &ident[2].value,
1✔
155
                Some(&ident[1].value),
1✔
156
                Some(&ident[0].value),
1✔
157
            ),
1✔
158
            _ => {
159
                return Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
160
                    ident
×
161
                        .iter()
×
162
                        .map(|e| e.value.as_str())
×
163
                        .collect::<Vec<&str>>()
×
164
                        .join("."),
×
165
                )));
×
166
            }
167
        };
168

169
        let matching_by_field: Vec<(usize, &FieldDefinition)> = schema
6,569✔
170
            .fields
6,569✔
171
            .iter()
6,569✔
172
            .enumerate()
6,569✔
173
            .filter(|(_idx, f)| &f.name == src_field)
28,934✔
174
            .collect();
6,569✔
175

6,569✔
176
        match matching_by_field.len() {
6,569✔
177
            1 => Ok(Expression::Column {
6,289✔
178
                index: matching_by_field[0].0,
6,289✔
179
            }),
6,289✔
180
            _ => match src_table_or_alias {
280✔
181
                None => Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
182
                    ident
×
183
                        .iter()
×
184
                        .map(|e| e.value.as_str())
×
185
                        .collect::<Vec<&str>>()
×
186
                        .join("."),
×
187
                ))),
×
188
                Some(src_table_or_alias) => {
280✔
189
                    let matching_by_table_or_alias: Vec<(usize, &FieldDefinition)> =
280✔
190
                        matching_by_field
280✔
191
                            .into_iter()
280✔
192
                            .filter(|(_idx, field)| match &field.source {
560✔
193
                                SourceDefinition::Alias { name } => name == src_table_or_alias,
476✔
194
                                SourceDefinition::Table {
195
                                    name,
84✔
196
                                    connection: _,
84✔
197
                                } => name == src_table_or_alias,
84✔
198
                                _ => false,
×
199
                            })
560✔
200
                            .collect();
280✔
201

280✔
202
                    match matching_by_table_or_alias.len() {
280✔
203
                        1 => Ok(Expression::Column {
280✔
204
                            index: matching_by_table_or_alias[0].0,
280✔
205
                        }),
280✔
206
                        _ => match src_connection {
×
207
                            None => Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
208
                                ident
×
209
                                    .iter()
×
210
                                    .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
211
                            ))),
×
212
                            Some(src_connection) => {
×
213
                                let matching_by_connection: Vec<(usize, &FieldDefinition)> =
×
214
                                    matching_by_table_or_alias
×
215
                                        .into_iter()
×
216
                                        .filter(|(_idx, field)| match &field.source {
×
217
                                            SourceDefinition::Table {
218
                                                name: _,
219
                                                connection,
×
220
                                            } => connection == src_connection,
×
221
                                            _ => false,
×
222
                                        })
×
223
                                        .collect();
×
224

×
225
                                match matching_by_connection.len() {
×
226
                                    1 => Ok(Expression::Column {
×
227
                                        index: matching_by_connection[0].0,
×
228
                                    }),
×
229
                                    _ => Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
230
                                        ident
×
231
                                            .iter()
×
232
                                            .map(|e| e.value.as_str())
×
233
                                            .collect::<Vec<&str>>()
×
234
                                            .join("."),
×
235
                                    ))),
×
236
                                }
237
                            }
238
                        },
239
                    }
240
                }
241
            },
242
        }
243
    }
6,569✔
244

245
    fn parse_sql_trim_function(
42✔
246
        &mut self,
42✔
247
        parse_aggregations: bool,
42✔
248
        expr: &Expr,
42✔
249
        trim_where: &Option<TrimWhereField>,
42✔
250
        trim_what: &Option<Box<Expr>>,
42✔
251
        schema: &Schema,
42✔
252
    ) -> Result<Expression, PipelineError> {
42✔
253
        let arg = Box::new(self.parse_sql_expression(parse_aggregations, expr, schema)?);
42✔
254
        let what = match trim_what {
42✔
255
            Some(e) => Some(Box::new(self.parse_sql_expression(
8✔
256
                parse_aggregations,
8✔
257
                e,
8✔
258
                schema,
8✔
259
            )?)),
8✔
260
            _ => None,
34✔
261
        };
262
        let typ = trim_where.as_ref().map(|e| match e {
42✔
263
            TrimWhereField::Both => TrimType::Both,
2✔
264
            TrimWhereField::Leading => TrimType::Leading,
2✔
265
            TrimWhereField::Trailing => TrimType::Trailing,
2✔
266
        });
42✔
267
        Ok(Expression::Trim { arg, what, typ })
42✔
268
    }
42✔
269

270
    fn aggr_function_check(
2,729✔
271
        &mut self,
2,729✔
272
        function_name: String,
2,729✔
273
        parse_aggregations: bool,
2,729✔
274
        sql_function: &Function,
2,729✔
275
        schema: &Schema,
2,729✔
276
    ) -> Result<Expression, PipelineError> {
2,729✔
277
        match (
2,729✔
278
            AggregateFunctionType::new(function_name.as_str()),
2,729✔
279
            parse_aggregations,
2,729✔
280
        ) {
2,729✔
281
            (Ok(aggr), true) => {
631✔
282
                let mut arg_expr: Vec<Expression> = Vec::new();
631✔
283
                for arg in &sql_function.args {
1,288✔
284
                    let aggregation = self.parse_sql_function_arg(true, arg, schema)?;
657✔
285
                    arg_expr.push(aggregation);
657✔
286
                }
287
                let measure = Expression::AggregateFunction {
631✔
288
                    fun: aggr,
631✔
289
                    args: arg_expr,
631✔
290
                };
631✔
291
                let index = match self
631✔
292
                    .aggregations
631✔
293
                    .iter()
631✔
294
                    .enumerate()
631✔
295
                    .find(|e| e.1 == &measure)
631✔
296
                {
297
                    Some((index, _existing)) => index,
65✔
298
                    _ => {
299
                        self.aggregations.push(measure);
566✔
300
                        self.aggregations.len() - 1
566✔
301
                    }
302
                };
303
                Ok(Expression::Column {
631✔
304
                    index: self.offset + index,
631✔
305
                })
631✔
306
            }
307
            (Ok(_agg), false) => Err(InvalidNestedAggregationFunction(function_name)),
×
308
            (Err(_), _) => Err(InvalidNestedAggregationFunction(function_name)),
2,098✔
309
        }
310
    }
2,729✔
311

312
    fn scalar_function_check(
2,098✔
313
        &mut self,
2,098✔
314
        function_name: String,
2,098✔
315
        parse_aggregations: bool,
2,098✔
316
        sql_function: &Function,
2,098✔
317
        schema: &Schema,
2,098✔
318
    ) -> Result<Expression, PipelineError> {
2,098✔
319
        let mut function_args: Vec<Expression> = Vec::new();
2,098✔
320
        for arg in &sql_function.args {
4,287✔
321
            function_args.push(self.parse_sql_function_arg(parse_aggregations, arg, schema)?);
2,189✔
322
        }
323

324
        match ScalarFunctionType::new(function_name.as_str()) {
2,098✔
325
            Ok(sft) => Ok(ScalarFunction {
2,042✔
326
                fun: sft,
2,042✔
327
                args: function_args.clone(),
2,042✔
328
            }),
2,042✔
329
            Err(_d) => Err(InvalidFunction(function_name)),
56✔
330
        }
331
    }
2,098✔
332

333
    fn geo_expr_check(
56✔
334
        &mut self,
56✔
335
        function_name: String,
56✔
336
        parse_aggregations: bool,
56✔
337
        sql_function: &Function,
56✔
338
        schema: &Schema,
56✔
339
    ) -> Result<Expression, PipelineError> {
56✔
340
        let mut function_args: Vec<Expression> = Vec::new();
56✔
341
        for arg in &sql_function.args {
166✔
342
            function_args.push(self.parse_sql_function_arg(parse_aggregations, arg, schema)?);
110✔
343
        }
344

345
        match GeoFunctionType::new(function_name.as_str()) {
56✔
346
            Ok(gft) => Ok(GeoFunction {
14✔
347
                fun: gft,
14✔
348
                args: function_args.clone(),
14✔
349
            }),
14✔
350
            Err(_e) => Err(InvalidFunction(function_name)),
42✔
351
        }
352
    }
56✔
353

354
    fn datetime_expr_check(&mut self, function_name: String) -> Result<Expression, PipelineError> {
32✔
355
        match DateTimeFunctionType::new(function_name.as_str()) {
32✔
356
            Ok(dtf) => Ok(Now { fun: dtf }),
2✔
357
            Err(_e) => Err(InvalidFunction(function_name)),
30✔
358
        }
359
    }
32✔
360

361
    fn json_func_check(
30✔
362
        &mut self,
30✔
363
        function_name: String,
30✔
364
        parse_aggregations: bool,
30✔
365
        sql_function: &Function,
30✔
366
        schema: &Schema,
30✔
367
    ) -> Result<Expression, PipelineError> {
30✔
368
        let mut function_args: Vec<Expression> = Vec::new();
30✔
369
        for arg in &sql_function.args {
88✔
370
            function_args.push(self.parse_sql_function_arg(parse_aggregations, arg, schema)?);
58✔
371
        }
372

373
        match JsonFunctionType::new(function_name.as_str()) {
30✔
374
            Ok(jft) => Ok(Expression::Json {
30✔
375
                fun: jft,
30✔
376
                args: function_args,
30✔
377
            }),
30✔
378
            Err(_e) => Err(InvalidFunction(function_name)),
×
379
        }
380
    }
30✔
381

382
    fn conditional_expr_check(
42✔
383
        &mut self,
42✔
384
        function_name: String,
42✔
385
        parse_aggregations: bool,
42✔
386
        sql_function: &Function,
42✔
387
        schema: &Schema,
42✔
388
    ) -> Result<Expression, PipelineError> {
42✔
389
        let mut function_args: Vec<Expression> = Vec::new();
42✔
390
        for arg in &sql_function.args {
118✔
391
            function_args.push(self.parse_sql_function_arg(parse_aggregations, arg, schema)?);
76✔
392
        }
393

394
        match ConditionalExpressionType::new(function_name.as_str()) {
42✔
395
            Ok(cet) => Ok(ConditionalExpression {
10✔
396
                fun: cet,
10✔
397
                args: function_args.clone(),
10✔
398
            }),
10✔
399
            Err(_err) => Err(InvalidFunction(function_name)),
32✔
400
        }
401
    }
42✔
402

403
    fn parse_sql_function(
2,757✔
404
        &mut self,
2,757✔
405
        parse_aggregations: bool,
2,757✔
406
        sql_function: &Function,
2,757✔
407
        schema: &Schema,
2,757✔
408
    ) -> Result<Expression, PipelineError> {
2,757✔
409
        let function_name = sql_function.name.to_string().to_lowercase();
2,757✔
410

2,757✔
411
        #[cfg(feature = "python")]
2,757✔
412
        if function_name.starts_with("py_") {
2,757✔
413
            // The function is from python udf.
414
            let udf_name = function_name.strip_prefix("py_").unwrap();
28✔
415
            return self.parse_python_udf(udf_name, sql_function, schema);
28✔
416
        }
2,729✔
417

2,729✔
418
        let aggr_check = self.aggr_function_check(
2,729✔
419
            function_name.clone(),
2,729✔
420
            parse_aggregations,
2,729✔
421
            sql_function,
2,729✔
422
            schema,
2,729✔
423
        );
2,729✔
424
        if aggr_check.is_ok() {
2,729✔
425
            return aggr_check;
631✔
426
        }
2,098✔
427

2,098✔
428
        let scalar_check = self.scalar_function_check(
2,098✔
429
            function_name.clone(),
2,098✔
430
            parse_aggregations,
2,098✔
431
            sql_function,
2,098✔
432
            schema,
2,098✔
433
        );
2,098✔
434
        if scalar_check.is_ok() {
2,098✔
435
            return scalar_check;
2,042✔
436
        }
56✔
437

56✔
438
        let geo_check = self.geo_expr_check(
56✔
439
            function_name.clone(),
56✔
440
            parse_aggregations,
56✔
441
            sql_function,
56✔
442
            schema,
56✔
443
        );
56✔
444
        if geo_check.is_ok() {
56✔
445
            return geo_check;
14✔
446
        }
42✔
447

42✔
448
        let conditional_check = self.conditional_expr_check(
42✔
449
            function_name.clone(),
42✔
450
            parse_aggregations,
42✔
451
            sql_function,
42✔
452
            schema,
42✔
453
        );
42✔
454
        if conditional_check.is_ok() {
42✔
455
            return conditional_check;
10✔
456
        }
32✔
457

32✔
458
        let datetime_check = self.datetime_expr_check(function_name.clone());
32✔
459
        if datetime_check.is_ok() {
32✔
460
            return datetime_check;
2✔
461
        }
30✔
462

30✔
463
        self.json_func_check(function_name, parse_aggregations, sql_function, schema)
30✔
464
    }
2,757✔
465

×
466
    fn parse_sql_function_arg(
×
467
        &mut self,
×
468
        parse_aggregations: bool,
×
469
        argument: &FunctionArg,
×
470
        schema: &Schema,
×
471
    ) -> Result<Expression, PipelineError> {
×
472
        match argument {
3,132✔
473
            FunctionArg::Named {
474
                name: _,
475
                arg: FunctionArgExpr::Expr(arg),
×
476
            } => self.parse_sql_expression(parse_aggregations, arg, schema),
×
477
            FunctionArg::Named {
478
                name: _,
479
                arg: FunctionArgExpr::Wildcard,
×
480
            } => Ok(Expression::Literal(Field::Null)),
×
481
            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
3,131✔
482
                self.parse_sql_expression(parse_aggregations, arg, schema)
3,131✔
483
            }
×
484
            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(Expression::Literal(Field::Null)),
1✔
485
            FunctionArg::Named {
486
                name: _,
487
                arg: FunctionArgExpr::QualifiedWildcard(_),
×
488
            } => Err(InvalidArgument(format!("{argument:?}"))),
×
489
            FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
×
490
                Err(InvalidArgument(format!("{argument:?}")))
×
491
            }
×
492
        }
493
    }
3,132✔
494

495
    fn parse_sql_case_expression(
6✔
496
        &mut self,
6✔
497
        parse_aggregations: bool,
6✔
498
        operand: &Option<Box<Expr>>,
6✔
499
        conditions: &[Expr],
6✔
500
        results: &[Expr],
6✔
501
        else_result: &Option<Box<Expr>>,
6✔
502
        schema: &Schema,
6✔
503
    ) -> Result<Expression, PipelineError> {
6✔
504
        let op = match operand {
6✔
505
            Some(o) => Some(Box::new(self.parse_sql_expression(
×
506
                parse_aggregations,
×
507
                o,
×
508
                schema,
×
509
            )?)),
×
510
            None => None,
6✔
511
        };
×
512
        let conds = conditions
6✔
513
            .iter()
6✔
514
            .map(|cond| self.parse_sql_expression(parse_aggregations, cond, schema))
12✔
515
            .collect::<Result<Vec<_>, PipelineError>>()?;
6✔
516
        let res = results
6✔
517
            .iter()
6✔
518
            .map(|r| self.parse_sql_expression(parse_aggregations, r, schema))
12✔
519
            .collect::<Result<Vec<_>, PipelineError>>()?;
6✔
520
        let else_res = match else_result {
6✔
521
            Some(r) => Some(Box::new(self.parse_sql_expression(
4✔
522
                parse_aggregations,
4✔
523
                r,
4✔
524
                schema,
4✔
525
            )?)),
4✔
526
            None => None,
2✔
527
        };
×
528

×
529
        Ok(Expression::Case {
6✔
530
            operand: op,
6✔
531
            conditions: conds,
6✔
532
            results: res,
6✔
533
            else_result: else_res,
6✔
534
        })
6✔
535
    }
6✔
536

×
537
    fn parse_sql_interval_expression(
6✔
538
        &mut self,
6✔
539
        parse_aggregations: bool,
6✔
540
        value: &Expr,
6✔
541
        leading_field: &Option<DateTimeField>,
6✔
542
        schema: &Schema,
6✔
543
    ) -> Result<Expression, PipelineError> {
6✔
544
        let right = self.parse_sql_expression(parse_aggregations, value, schema)?;
6✔
545
        if leading_field.is_some() {
6✔
546
            Ok(Expression::DateTimeFunction {
6✔
547
                fun: DateTimeFunctionType::Interval {
6✔
548
                    field: leading_field.unwrap(),
6✔
549
                },
6✔
550
                arg: Box::new(right),
6✔
551
            })
6✔
552
        } else {
×
553
            Err(InvalidExpression(format!("INTERVAL for {leading_field:?}")))
×
554
        }
×
555
    }
6✔
556

×
557
    fn parse_sql_unary_op(
×
558
        &mut self,
×
559
        parse_aggregations: bool,
×
560
        op: &SqlUnaryOperator,
×
561
        expr: &SqlExpr,
×
562
        schema: &Schema,
×
563
    ) -> Result<Expression, PipelineError> {
×
564
        let arg = Box::new(self.parse_sql_expression(parse_aggregations, expr, schema)?);
×
565
        let operator = match op {
×
566
            SqlUnaryOperator::Not => UnaryOperatorType::Not,
×
567
            SqlUnaryOperator::Plus => UnaryOperatorType::Plus,
×
568
            SqlUnaryOperator::Minus => UnaryOperatorType::Minus,
×
569
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
570
        };
×
571

×
572
        Ok(Expression::UnaryOperator { operator, arg })
×
573
    }
×
574

×
575
    fn parse_sql_binary_op(
440✔
576
        &mut self,
440✔
577
        parse_aggregations: bool,
440✔
578
        left: &SqlExpr,
440✔
579
        op: &SqlBinaryOperator,
440✔
580
        right: &SqlExpr,
440✔
581
        schema: &Schema,
440✔
582
    ) -> Result<Expression, PipelineError> {
440✔
583
        let left_op = self.parse_sql_expression(parse_aggregations, left, schema)?;
440✔
584
        let right_op = self.parse_sql_expression(parse_aggregations, right, schema)?;
440✔
585

×
586
        let operator = match op {
440✔
587
            SqlBinaryOperator::Gt => BinaryOperatorType::Gt,
100✔
588
            SqlBinaryOperator::GtEq => BinaryOperatorType::Gte,
52✔
589
            SqlBinaryOperator::Lt => BinaryOperatorType::Lt,
55✔
590
            SqlBinaryOperator::LtEq => BinaryOperatorType::Lte,
37✔
591
            SqlBinaryOperator::Eq => BinaryOperatorType::Eq,
82✔
592
            SqlBinaryOperator::NotEq => BinaryOperatorType::Ne,
2✔
593
            SqlBinaryOperator::Plus => BinaryOperatorType::Add,
22✔
594
            SqlBinaryOperator::Minus => BinaryOperatorType::Sub,
4✔
595
            SqlBinaryOperator::Multiply => BinaryOperatorType::Mul,
×
596
            SqlBinaryOperator::Divide => BinaryOperatorType::Div,
14✔
597
            SqlBinaryOperator::Modulo => BinaryOperatorType::Mod,
×
598
            SqlBinaryOperator::And => BinaryOperatorType::And,
58✔
599
            SqlBinaryOperator::Or => BinaryOperatorType::Or,
14✔
600
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
601
        };
×
602

×
603
        Ok(Expression::BinaryOperator {
440✔
604
            left: Box::new(left_op),
440✔
605
            operator,
440✔
606
            right: Box::new(right_op),
440✔
607
        })
440✔
608
    }
440✔
609

610
    #[cfg(not(feature = "bigdecimal"))]
×
611
    fn parse_sql_number(n: &str) -> Result<Expression, PipelineError> {
448✔
612
        match n.parse::<i64>() {
448✔
613
            Ok(n) => Ok(Expression::Literal(Field::Int(n))),
448✔
614
            Err(_) => match n.parse::<f64>() {
×
615
                Ok(f) => Ok(Expression::Literal(Field::Float(OrderedFloat(f)))),
×
616
                Err(_) => Err(InvalidValue(n.to_string())),
×
617
            },
618
        }
×
619
    }
448✔
620

×
621
    #[cfg(feature = "bigdecimal")]
×
622
    fn parse_sql_number(n: &bigdecimal::BigDecimal) -> Result<Expression, PipelineError> {
×
623
        use bigdecimal::ToPrimitive;
×
624
        if n.is_integer() {
625
            Ok(Expression::Literal(Field::Int(n.to_i64().unwrap())))
626
        } else {
×
627
            match n.to_f64() {
628
                Some(f) => Ok(Expression::Literal(Field::Float(OrderedFloat(f)))),
629
                None => Err(InvalidValue(n.to_string())),
630
            }
631
        }
632
    }
633

634
    fn parse_sql_like_operator(
14✔
635
        &mut self,
14✔
636
        parse_aggregations: bool,
14✔
637
        negated: &bool,
14✔
638
        expr: &Expr,
14✔
639
        pattern: &Expr,
14✔
640
        escape_char: &Option<char>,
14✔
641
        schema: &Schema,
14✔
642
    ) -> Result<Expression, PipelineError> {
14✔
643
        let arg = self.parse_sql_expression(parse_aggregations, expr, schema)?;
14✔
644
        let pattern = self.parse_sql_expression(parse_aggregations, pattern, schema)?;
14✔
645
        let like_expression = Expression::Like {
14✔
646
            arg: Box::new(arg),
14✔
647
            pattern: Box::new(pattern),
14✔
648
            escape: *escape_char,
14✔
649
        };
14✔
650
        if *negated {
14✔
651
            Ok(Expression::UnaryOperator {
×
652
                operator: UnaryOperatorType::Not,
×
653
                arg: Box::new(like_expression),
×
654
            })
×
655
        } else {
×
656
            Ok(like_expression)
14✔
657
        }
×
658
    }
14✔
659

×
660
    fn parse_sql_extract_operator(
64✔
661
        &mut self,
64✔
662
        parse_aggregations: bool,
64✔
663
        field: &sqlparser::ast::DateTimeField,
64✔
664
        expr: &Expr,
64✔
665
        schema: &Schema,
64✔
666
    ) -> Result<Expression, PipelineError> {
64✔
667
        let right = self.parse_sql_expression(parse_aggregations, expr, schema)?;
64✔
668
        Ok(Expression::DateTimeFunction {
64✔
669
            fun: DateTimeFunctionType::Extract { field: *field },
64✔
670
            arg: Box::new(right),
64✔
671
        })
64✔
672
    }
64✔
673

×
674
    fn parse_sql_cast_operator(
122✔
675
        &mut self,
122✔
676
        parse_aggregations: bool,
122✔
677
        expr: &Expr,
122✔
678
        data_type: &DataType,
122✔
679
        schema: &Schema,
122✔
680
    ) -> Result<Expression, PipelineError> {
122✔
681
        let expression = self.parse_sql_expression(parse_aggregations, expr, schema)?;
122✔
682
        let cast_to = match data_type {
122✔
683
            DataType::Decimal(_) => CastOperatorType::Decimal,
×
684
            DataType::Binary(_) => CastOperatorType::Binary,
×
685
            DataType::Float(_) => CastOperatorType::Float,
18✔
686
            DataType::Int(_) => CastOperatorType::Int,
15✔
687
            DataType::Integer(_) => CastOperatorType::Int,
×
688
            DataType::UnsignedInt(_) => CastOperatorType::UInt,
×
689
            DataType::UnsignedInteger(_) => CastOperatorType::UInt,
×
690
            DataType::Boolean => CastOperatorType::Boolean,
14✔
691
            DataType::Date => CastOperatorType::Date,
×
692
            DataType::Timestamp(..) => CastOperatorType::Timestamp,
×
693
            DataType::Text => CastOperatorType::Text,
18✔
694
            DataType::String => CastOperatorType::String,
21✔
695
            DataType::JSON => CastOperatorType::Json,
16✔
696
            DataType::Custom(name, ..) => {
20✔
697
                if name.to_string().to_lowercase() == "uint" {
20✔
698
                    CastOperatorType::UInt
8✔
699
                } else if name.to_string().to_lowercase() == "u128" {
12✔
700
                    CastOperatorType::U128
6✔
701
                } else if name.to_string().to_lowercase() == "i128" {
6✔
702
                    CastOperatorType::I128
6✔
703
                } else {
×
704
                    Err(PipelineError::InvalidFunction(format!(
×
705
                        "Unsupported Cast type {name}"
×
706
                    )))?
×
707
                }
×
708
            }
×
709
            _ => Err(PipelineError::InvalidFunction(format!(
×
710
                "Unsupported Cast type {data_type}"
×
711
            )))?,
×
712
        };
×
713
        Ok(Expression::Cast {
122✔
714
            arg: Box::new(expression),
122✔
715
            typ: cast_to,
122✔
716
        })
122✔
717
    }
122✔
718

×
719
    fn parse_sql_string(s: &str) -> Result<Expression, PipelineError> {
298✔
720
        Ok(Expression::Literal(Field::String(s.to_owned())))
298✔
721
    }
298✔
722

×
723
    pub fn fullname_from_ident(ident: &[Ident]) -> String {
374✔
724
        let mut ident_tokens = vec![];
374✔
725
        for token in ident.iter() {
374✔
726
            ident_tokens.push(token.value.clone());
374✔
727
        }
374✔
728
        ident_tokens.join(".")
374✔
729
    }
374✔
730

×
731
    pub(crate) fn normalize_ident(id: &Ident) -> String {
1,902✔
732
        match id.quote_style {
1,902✔
733
            Some(_) => id.value.clone(),
×
734
            None => id.value.clone(),
1,902✔
735
        }
×
736
    }
1,902✔
737

738
    #[cfg(feature = "python")]
×
739
    fn parse_python_udf(
28✔
740
        &mut self,
28✔
741
        name: &str,
28✔
742
        function: &Function,
28✔
743
        schema: &Schema,
28✔
744
    ) -> Result<Expression, PipelineError> {
28✔
745
        // First, get python function define by name.
746
        // Then, transfer python function to Expression::PythonUDF
×
747

×
748
        use dozer_types::types::FieldType;
×
749
        use PipelineError::InvalidQuery;
×
750

×
751
        let args = function
28✔
752
            .args
28✔
753
            .iter()
28✔
754
            .map(|argument| self.parse_sql_function_arg(false, argument, schema))
42✔
755
            .collect::<Result<Vec<_>, PipelineError>>()?;
28✔
756

757
        let return_type = {
28✔
758
            let ident = function
28✔
759
                .return_type
28✔
760
                .as_ref()
28✔
761
                .ok_or_else(|| InvalidQuery("Python UDF must have a return type. The syntax is: function_name<return_type>(arguments)".to_string()))?;
28✔
762

×
763
            FieldType::try_from(ident.value.as_str())
28✔
764
                .map_err(|e| InvalidQuery(format!("Failed to parse Python UDF return type: {e}")))?
28✔
765
        };
×
766

×
767
        Ok(Expression::PythonUDF {
28✔
768
            name: name.to_string(),
28✔
769
            args,
28✔
770
            return_type,
28✔
771
        })
28✔
772
    }
28✔
773

774
    fn parse_sql_in_list_operator(
16✔
775
        &mut self,
16✔
776
        parse_aggregations: bool,
16✔
777
        expr: &Expr,
16✔
778
        list: &[Expr],
16✔
779
        negated: bool,
16✔
780
        schema: &Schema,
16✔
781
    ) -> Result<Expression, PipelineError> {
16✔
782
        let expr = self.parse_sql_expression(parse_aggregations, expr, schema)?;
16✔
783
        let list = list
16✔
784
            .iter()
16✔
785
            .map(|expr| self.parse_sql_expression(parse_aggregations, expr, schema))
168✔
786
            .collect::<Result<Vec<_>, PipelineError>>()?;
16✔
787
        let in_list_expression = Expression::InList {
16✔
788
            expr: Box::new(expr),
16✔
789
            list,
16✔
790
            negated,
16✔
791
        };
16✔
792

16✔
793
        Ok(in_list_expression)
16✔
794
    }
16✔
795
}
796

797
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
826✔
798
pub struct NameOrAlias(pub String, pub Option<String>);
799

800
pub fn extend_schema_source_def(schema: &Schema, name: &NameOrAlias) -> Schema {
1,317✔
801
    let mut output_schema = schema.clone();
1,317✔
802
    let mut fields = vec![];
1,317✔
803
    for mut field in schema.clone().fields.into_iter() {
6,457✔
804
        if let Some(alias) = &name.1 {
6,457✔
805
            field.source = SourceDefinition::Alias {
3,416✔
806
                name: alias.to_string(),
3,416✔
807
            };
3,416✔
808
        }
3,419✔
809

810
        fields.push(field);
6,457✔
811
    }
812
    output_schema.fields = fields;
1,317✔
813

1,317✔
814
    output_schema
1,317✔
815
}
1,317✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc