• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6015696039

29 Aug 2023 05:40PM UTC coverage: 75.532% (-0.8%) from 76.332%
6015696039

push

github

web-flow
Bump clap from 4.3.11 to 4.4.1 (#1941)

Bumps [clap](https://github.com/clap-rs/clap) from 4.3.11 to 4.4.1.
- [Release notes](https://github.com/clap-rs/clap/releases)
- [Changelog](https://github.com/clap-rs/clap/blob/master/CHANGELOG.md)
- [Commits](https://github.com/clap-rs/clap/compare/v4.3.11...v4.4.1)

---
updated-dependencies:
- dependency-name: clap
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

49076 of 64974 relevant lines covered (75.53%)

65447.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.39
/dozer-sql/src/pipeline/expression/builder.rs
1
use dozer_types::{
2
    ordered_float::OrderedFloat,
3
    types::{Field, FieldDefinition, Schema, SourceDefinition},
4
};
5
use sqlparser::ast::{
6
    BinaryOperator as SqlBinaryOperator, DataType, DateTimeField, Expr as SqlExpr, Expr, Function,
7
    FunctionArg, FunctionArgExpr, Ident, Interval, TrimWhereField,
8
    UnaryOperator as SqlUnaryOperator, Value as SqlValue,
9
};
10

11
use crate::pipeline::errors::PipelineError::{
12
    InvalidArgument, InvalidExpression, InvalidFunction, InvalidNestedAggregationFunction,
13
    InvalidOperator, InvalidValue,
14
};
15
use crate::pipeline::errors::{PipelineError, SqlError};
16
use crate::pipeline::expression::aggregate::AggregateFunctionType;
17
use crate::pipeline::expression::conditional::ConditionalExpressionType;
18
use crate::pipeline::expression::datetime::DateTimeFunctionType;
19

20
use crate::pipeline::expression::execution::Expression;
21
use crate::pipeline::expression::execution::Expression::{
22
    ConditionalExpression, GeoFunction, Now, ScalarFunction,
23
};
24
use crate::pipeline::expression::geo::common::GeoFunctionType;
25
use crate::pipeline::expression::json_functions::JsonFunctionType;
26
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
27
use crate::pipeline::expression::scalar::common::ScalarFunctionType;
28
use crate::pipeline::expression::scalar::string::TrimType;
29

30
use super::cast::CastOperatorType;
31

32
#[derive(Clone, PartialEq, Debug)]
9✔
33
pub struct ExpressionBuilder {
34
    // Must be an aggregation function
35
    pub aggregations: Vec<Expression>,
36
    pub offset: usize,
37
}
38

39
impl ExpressionBuilder {
40
    pub fn new(offset: usize) -> Self {
6,227✔
41
        Self {
6,227✔
42
            aggregations: Vec::new(),
6,227✔
43
            offset,
6,227✔
44
        }
6,227✔
45
    }
6,227✔
46

47
    pub fn from(offset: usize, aggregations: Vec<Expression>) -> Self {
62✔
48
        Self {
62✔
49
            aggregations,
62✔
50
            offset,
62✔
51
        }
62✔
52
    }
62✔
53

×
54
    pub fn build(
3,968✔
55
        &mut self,
3,968✔
56
        parse_aggregations: bool,
3,968✔
57
        sql_expression: &SqlExpr,
3,968✔
58
        schema: &Schema,
3,968✔
59
    ) -> Result<Expression, PipelineError> {
3,968✔
60
        self.parse_sql_expression(parse_aggregations, sql_expression, schema)
3,968✔
61
    }
3,968✔
62

×
63
    pub(crate) fn parse_sql_expression(
×
64
        &mut self,
×
65
        parse_aggregations: bool,
×
66
        expression: &SqlExpr,
×
67
        schema: &Schema,
×
68
    ) -> Result<Expression, PipelineError> {
×
69
        match expression {
746✔
70
            SqlExpr::Trim {
×
71
                expr,
42✔
72
                trim_where,
42✔
73
                trim_what,
42✔
74
            } => self.parse_sql_trim_function(
42✔
75
                parse_aggregations,
42✔
76
                expr,
42✔
77
                trim_where,
42✔
78
                trim_what,
42✔
79
                schema,
42✔
80
            ),
42✔
81
            SqlExpr::Identifier(ident) => Self::parse_sql_column(&[ident.clone()], schema),
5,668✔
82
            SqlExpr::CompoundIdentifier(ident) => Self::parse_sql_column(ident, schema),
901✔
83
            SqlExpr::Value(SqlValue::Number(n, _)) => Self::parse_sql_number(n),
448✔
84
            SqlExpr::Value(SqlValue::Null) => Ok(Expression::Literal(Field::Null)),
×
85
            SqlExpr::Value(SqlValue::SingleQuotedString(s) | SqlValue::DoubleQuotedString(s)) => {
298✔
86
                Self::parse_sql_string(s)
298✔
87
            }
×
88
            SqlExpr::UnaryOp { expr, op } => {
×
89
                self.parse_sql_unary_op(parse_aggregations, op, expr, schema)
×
90
            }
×
91
            SqlExpr::BinaryOp { left, op, right } => {
440✔
92
                self.parse_sql_binary_op(parse_aggregations, left, op, right, schema)
440✔
93
            }
×
94
            SqlExpr::Nested(expr) => self.parse_sql_expression(parse_aggregations, expr, schema),
43✔
95
            SqlExpr::Function(sql_function) => {
2,757✔
96
                self.parse_sql_function(parse_aggregations, sql_function, schema)
2,757✔
97
            }
×
98
            SqlExpr::Like {
99
                negated,
14✔
100
                expr,
14✔
101
                pattern,
14✔
102
                escape_char,
14✔
103
            } => self.parse_sql_like_operator(
14✔
104
                parse_aggregations,
14✔
105
                negated,
14✔
106
                expr,
14✔
107
                pattern,
14✔
108
                escape_char,
14✔
109
                schema,
14✔
110
            ),
14✔
111
            SqlExpr::InList {
112
                expr,
16✔
113
                list,
16✔
114
                negated,
16✔
115
            } => self.parse_sql_in_list_operator(parse_aggregations, expr, list, *negated, schema),
16✔
116

×
117
            SqlExpr::Cast { expr, data_type } => {
122✔
118
                self.parse_sql_cast_operator(parse_aggregations, expr, data_type, schema)
122✔
119
            }
×
120
            SqlExpr::Extract { field, expr } => {
64✔
121
                self.parse_sql_extract_operator(parse_aggregations, field, expr, schema)
64✔
122
            }
×
123
            SqlExpr::Interval(Interval {
×
124
                value,
6✔
125
                leading_field,
6✔
126
                leading_precision: _,
6✔
127
                last_field: _,
6✔
128
                fractional_seconds_precision: _,
6✔
129
            }) => {
6✔
130
                self.parse_sql_interval_expression(parse_aggregations, value, leading_field, schema)
6✔
131
            }
×
132
            SqlExpr::Case {
×
133
                operand,
6✔
134
                conditions,
6✔
135
                results,
6✔
136
                else_result,
6✔
137
            } => self.parse_sql_case_expression(
6✔
138
                parse_aggregations,
6✔
139
                operand,
6✔
140
                conditions,
6✔
141
                results,
6✔
142
                else_result,
6✔
143
                schema,
6✔
144
            ),
6✔
145
            _ => Err(InvalidExpression(format!("{expression:?}"))),
×
146
        }
×
147
    }
10,825✔
148

×
149
    fn parse_sql_column(ident: &[Ident], schema: &Schema) -> Result<Expression, PipelineError> {
6,569✔
150
        let (src_field, src_table_or_alias, src_connection) = match ident.len() {
6,569✔
151
            1 => (&ident[0].value, None, None),
5,668✔
152
            2 => (&ident[1].value, Some(&ident[0].value), None),
900✔
153
            3 => (
1✔
154
                &ident[2].value,
1✔
155
                Some(&ident[1].value),
1✔
156
                Some(&ident[0].value),
1✔
157
            ),
1✔
158
            _ => {
×
159
                return Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
160
                    ident
×
161
                        .iter()
×
162
                        .map(|e| e.value.as_str())
×
163
                        .collect::<Vec<&str>>()
×
164
                        .join("."),
×
165
                )));
×
166
            }
×
167
        };
×
168

×
169
        let matching_by_field: Vec<(usize, &FieldDefinition)> = schema
6,569✔
170
            .fields
6,569✔
171
            .iter()
6,569✔
172
            .enumerate()
6,569✔
173
            .filter(|(_idx, f)| &f.name == src_field)
28,934✔
174
            .collect();
6,569✔
175

6,569✔
176
        match matching_by_field.len() {
6,569✔
177
            1 => Ok(Expression::Column {
6,289✔
178
                index: matching_by_field[0].0,
6,289✔
179
            }),
6,289✔
180
            _ => match src_table_or_alias {
280✔
181
                None => Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
182
                    ident
×
183
                        .iter()
×
184
                        .map(|e| e.value.as_str())
×
185
                        .collect::<Vec<&str>>()
×
186
                        .join("."),
×
187
                ))),
×
188
                Some(src_table_or_alias) => {
280✔
189
                    let matching_by_table_or_alias: Vec<(usize, &FieldDefinition)> =
280✔
190
                        matching_by_field
280✔
191
                            .into_iter()
280✔
192
                            .filter(|(_idx, field)| match &field.source {
560✔
193
                                SourceDefinition::Alias { name } => name == src_table_or_alias,
476✔
194
                                SourceDefinition::Table {
195
                                    name,
84✔
196
                                    connection: _,
84✔
197
                                } => name == src_table_or_alias,
84✔
198
                                _ => false,
×
199
                            })
560✔
200
                            .collect();
280✔
201

280✔
202
                    match matching_by_table_or_alias.len() {
280✔
203
                        1 => Ok(Expression::Column {
280✔
204
                            index: matching_by_table_or_alias[0].0,
280✔
205
                        }),
280✔
206
                        _ => match src_connection {
×
207
                            None => Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
208
                                ident
×
209
                                    .iter()
×
210
                                    .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
211
                            ))),
×
212
                            Some(src_connection) => {
×
213
                                let matching_by_connection: Vec<(usize, &FieldDefinition)> =
×
214
                                    matching_by_table_or_alias
×
215
                                        .into_iter()
×
216
                                        .filter(|(_idx, field)| match &field.source {
×
217
                                            SourceDefinition::Table {
×
218
                                                name: _,
×
219
                                                connection,
×
220
                                            } => connection == src_connection,
×
221
                                            _ => false,
×
222
                                        })
×
223
                                        .collect();
×
224

×
225
                                match matching_by_connection.len() {
×
226
                                    1 => Ok(Expression::Column {
×
227
                                        index: matching_by_connection[0].0,
×
228
                                    }),
×
229
                                    _ => Err(PipelineError::SqlError(SqlError::InvalidColumn(
×
230
                                        ident
×
231
                                            .iter()
×
232
                                            .map(|e| e.value.as_str())
×
233
                                            .collect::<Vec<&str>>()
×
234
                                            .join("."),
×
235
                                    ))),
×
236
                                }
×
237
                            }
×
238
                        },
×
239
                    }
×
240
                }
×
241
            },
×
242
        }
×
243
    }
6,569✔
244

245
    fn parse_sql_trim_function(
42✔
246
        &mut self,
42✔
247
        parse_aggregations: bool,
42✔
248
        expr: &Expr,
42✔
249
        trim_where: &Option<TrimWhereField>,
42✔
250
        trim_what: &Option<Box<Expr>>,
42✔
251
        schema: &Schema,
42✔
252
    ) -> Result<Expression, PipelineError> {
42✔
253
        let arg = Box::new(self.parse_sql_expression(parse_aggregations, expr, schema)?);
42✔
254
        let what = match trim_what {
42✔
255
            Some(e) => Some(Box::new(self.parse_sql_expression(
8✔
256
                parse_aggregations,
8✔
257
                e,
8✔
258
                schema,
8✔
259
            )?)),
8✔
260
            _ => None,
34✔
261
        };
×
262
        let typ = trim_where.as_ref().map(|e| match e {
42✔
263
            TrimWhereField::Both => TrimType::Both,
2✔
264
            TrimWhereField::Leading => TrimType::Leading,
2✔
265
            TrimWhereField::Trailing => TrimType::Trailing,
2✔
266
        });
42✔
267
        Ok(Expression::Trim { arg, what, typ })
42✔
268
    }
42✔
269

×
270
    fn aggr_function_check(
2,729✔
271
        &mut self,
2,729✔
272
        function_name: String,
2,729✔
273
        parse_aggregations: bool,
2,729✔
274
        sql_function: &Function,
2,729✔
275
        schema: &Schema,
2,729✔
276
    ) -> Result<Expression, PipelineError> {
2,729✔
277
        match (
2,729✔
278
            AggregateFunctionType::new(function_name.as_str()),
2,729✔
279
            parse_aggregations,
2,729✔
280
        ) {
2,729✔
281
            (Ok(aggr), true) => {
631✔
282
                let mut arg_expr: Vec<Expression> = Vec::new();
631✔
283
                for arg in &sql_function.args {
1,288✔
284
                    let aggregation = self.parse_sql_function_arg(true, arg, schema)?;
657✔
285
                    arg_expr.push(aggregation);
657✔
286
                }
×
287
                let measure = Expression::AggregateFunction {
631✔
288
                    fun: aggr,
631✔
289
                    args: arg_expr,
631✔
290
                };
631✔
291
                let index = match self
631✔
292
                    .aggregations
631✔
293
                    .iter()
631✔
294
                    .enumerate()
631✔
295
                    .find(|e| e.1 == &measure)
631✔
296
                {
×
297
                    Some((index, _existing)) => index,
65✔
298
                    _ => {
×
299
                        self.aggregations.push(measure);
566✔
300
                        self.aggregations.len() - 1
566✔
301
                    }
×
302
                };
×
303
                Ok(Expression::Column {
631✔
304
                    index: self.offset + index,
631✔
305
                })
631✔
306
            }
×
307
            (Ok(_agg), false) => Err(InvalidNestedAggregationFunction(function_name)),
×
308
            (Err(_), _) => Err(InvalidNestedAggregationFunction(function_name)),
2,098✔
309
        }
×
310
    }
2,729✔
311

×
312
    fn scalar_function_check(
2,098✔
313
        &mut self,
2,098✔
314
        function_name: String,
2,098✔
315
        parse_aggregations: bool,
2,098✔
316
        sql_function: &Function,
2,098✔
317
        schema: &Schema,
2,098✔
318
    ) -> Result<Expression, PipelineError> {
2,098✔
319
        let mut function_args: Vec<Expression> = Vec::new();
2,098✔
320
        for arg in &sql_function.args {
4,287✔
321
            function_args.push(self.parse_sql_function_arg(parse_aggregations, arg, schema)?);
2,189✔
322
        }
×
323

×
324
        match ScalarFunctionType::new(function_name.as_str()) {
2,098✔
325
            Ok(sft) => Ok(ScalarFunction {
2,042✔
326
                fun: sft,
2,042✔
327
                args: function_args.clone(),
2,042✔
328
            }),
2,042✔
329
            Err(_d) => Err(InvalidFunction(function_name)),
56✔
330
        }
331
    }
2,098✔
332

×
333
    fn geo_expr_check(
56✔
334
        &mut self,
56✔
335
        function_name: String,
56✔
336
        parse_aggregations: bool,
56✔
337
        sql_function: &Function,
56✔
338
        schema: &Schema,
56✔
339
    ) -> Result<Expression, PipelineError> {
56✔
340
        let mut function_args: Vec<Expression> = Vec::new();
56✔
341
        for arg in &sql_function.args {
166✔
342
            function_args.push(self.parse_sql_function_arg(parse_aggregations, arg, schema)?);
110✔
343
        }
×
344

×
345
        match GeoFunctionType::new(function_name.as_str()) {
56✔
346
            Ok(gft) => Ok(GeoFunction {
14✔
347
                fun: gft,
14✔
348
                args: function_args.clone(),
14✔
349
            }),
14✔
350
            Err(_e) => Err(InvalidFunction(function_name)),
42✔
351
        }
×
352
    }
56✔
353

×
354
    fn datetime_expr_check(&mut self, function_name: String) -> Result<Expression, PipelineError> {
32✔
355
        match DateTimeFunctionType::new(function_name.as_str()) {
32✔
356
            Ok(dtf) => Ok(Now { fun: dtf }),
2✔
357
            Err(_e) => Err(InvalidFunction(function_name)),
30✔
358
        }
359
    }
32✔
360

×
361
    fn json_func_check(
30✔
362
        &mut self,
30✔
363
        function_name: String,
30✔
364
        parse_aggregations: bool,
30✔
365
        sql_function: &Function,
30✔
366
        schema: &Schema,
30✔
367
    ) -> Result<Expression, PipelineError> {
30✔
368
        let mut function_args: Vec<Expression> = Vec::new();
30✔
369
        for arg in &sql_function.args {
88✔
370
            function_args.push(self.parse_sql_function_arg(parse_aggregations, arg, schema)?);
58✔
371
        }
×
372

×
373
        match JsonFunctionType::new(function_name.as_str()) {
30✔
374
            Ok(jft) => Ok(Expression::Json {
30✔
375
                fun: jft,
30✔
376
                args: function_args,
30✔
377
            }),
30✔
378
            Err(_e) => Err(InvalidFunction(function_name)),
×
379
        }
×
380
    }
30✔
381

×
382
    fn conditional_expr_check(
42✔
383
        &mut self,
42✔
384
        function_name: String,
42✔
385
        parse_aggregations: bool,
42✔
386
        sql_function: &Function,
42✔
387
        schema: &Schema,
42✔
388
    ) -> Result<Expression, PipelineError> {
42✔
389
        let mut function_args: Vec<Expression> = Vec::new();
42✔
390
        for arg in &sql_function.args {
118✔
391
            function_args.push(self.parse_sql_function_arg(parse_aggregations, arg, schema)?);
76✔
392
        }
393

×
394
        match ConditionalExpressionType::new(function_name.as_str()) {
42✔
395
            Ok(cet) => Ok(ConditionalExpression {
10✔
396
                fun: cet,
10✔
397
                args: function_args.clone(),
10✔
398
            }),
10✔
399
            Err(_err) => Err(InvalidFunction(function_name)),
32✔
400
        }
×
401
    }
42✔
402

×
403
    fn parse_sql_function(
2,757✔
404
        &mut self,
2,757✔
405
        parse_aggregations: bool,
2,757✔
406
        sql_function: &Function,
2,757✔
407
        schema: &Schema,
2,757✔
408
    ) -> Result<Expression, PipelineError> {
2,757✔
409
        let function_name = sql_function.name.to_string().to_lowercase();
2,757✔
410

2,757✔
411
        #[cfg(feature = "python")]
2,757✔
412
        if function_name.starts_with("py_") {
2,757✔
413
            // The function is from python udf.
×
414
            let udf_name = function_name.strip_prefix("py_").unwrap();
28✔
415
            return self.parse_python_udf(udf_name, sql_function, schema);
28✔
416
        }
2,729✔
417

2,729✔
418
        let aggr_check = self.aggr_function_check(
2,729✔
419
            function_name.clone(),
2,729✔
420
            parse_aggregations,
2,729✔
421
            sql_function,
2,729✔
422
            schema,
2,729✔
423
        );
2,729✔
424
        if aggr_check.is_ok() {
2,729✔
425
            return aggr_check;
631✔
426
        }
2,098✔
427

2,098✔
428
        let scalar_check = self.scalar_function_check(
2,098✔
429
            function_name.clone(),
2,098✔
430
            parse_aggregations,
2,098✔
431
            sql_function,
2,098✔
432
            schema,
2,098✔
433
        );
2,098✔
434
        if scalar_check.is_ok() {
2,098✔
435
            return scalar_check;
2,042✔
436
        }
56✔
437

56✔
438
        let geo_check = self.geo_expr_check(
56✔
439
            function_name.clone(),
56✔
440
            parse_aggregations,
56✔
441
            sql_function,
56✔
442
            schema,
56✔
443
        );
56✔
444
        if geo_check.is_ok() {
56✔
445
            return geo_check;
14✔
446
        }
42✔
447

42✔
448
        let conditional_check = self.conditional_expr_check(
42✔
449
            function_name.clone(),
42✔
450
            parse_aggregations,
42✔
451
            sql_function,
42✔
452
            schema,
42✔
453
        );
42✔
454
        if conditional_check.is_ok() {
42✔
455
            return conditional_check;
10✔
456
        }
32✔
457

32✔
458
        let datetime_check = self.datetime_expr_check(function_name.clone());
32✔
459
        if datetime_check.is_ok() {
32✔
460
            return datetime_check;
2✔
461
        }
30✔
462

30✔
463
        self.json_func_check(function_name, parse_aggregations, sql_function, schema)
30✔
464
    }
2,757✔
465

×
466
    fn parse_sql_function_arg(
×
467
        &mut self,
468
        parse_aggregations: bool,
×
469
        argument: &FunctionArg,
×
470
        schema: &Schema,
×
471
    ) -> Result<Expression, PipelineError> {
×
472
        match argument {
3,132✔
473
            FunctionArg::Named {
×
474
                name: _,
×
475
                arg: FunctionArgExpr::Expr(arg),
×
476
            } => self.parse_sql_expression(parse_aggregations, arg, schema),
×
477
            FunctionArg::Named {
×
478
                name: _,
×
479
                arg: FunctionArgExpr::Wildcard,
×
480
            } => Ok(Expression::Literal(Field::Null)),
×
481
            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
3,131✔
482
                self.parse_sql_expression(parse_aggregations, arg, schema)
3,131✔
483
            }
×
484
            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => Ok(Expression::Literal(Field::Null)),
1✔
485
            FunctionArg::Named {
×
486
                name: _,
×
487
                arg: FunctionArgExpr::QualifiedWildcard(_),
×
488
            } => Err(InvalidArgument(format!("{argument:?}"))),
×
489
            FunctionArg::Unnamed(FunctionArgExpr::QualifiedWildcard(_)) => {
×
490
                Err(InvalidArgument(format!("{argument:?}")))
×
491
            }
×
492
        }
×
493
    }
3,132✔
494

×
495
    fn parse_sql_case_expression(
6✔
496
        &mut self,
6✔
497
        parse_aggregations: bool,
6✔
498
        operand: &Option<Box<Expr>>,
6✔
499
        conditions: &[Expr],
6✔
500
        results: &[Expr],
6✔
501
        else_result: &Option<Box<Expr>>,
6✔
502
        schema: &Schema,
6✔
503
    ) -> Result<Expression, PipelineError> {
6✔
504
        let op = match operand {
6✔
505
            Some(o) => Some(Box::new(self.parse_sql_expression(
×
506
                parse_aggregations,
×
507
                o,
×
508
                schema,
×
509
            )?)),
×
510
            None => None,
6✔
511
        };
×
512
        let conds = conditions
6✔
513
            .iter()
6✔
514
            .map(|cond| self.parse_sql_expression(parse_aggregations, cond, schema))
12✔
515
            .collect::<Result<Vec<_>, PipelineError>>()?;
6✔
516
        let res = results
6✔
517
            .iter()
6✔
518
            .map(|r| self.parse_sql_expression(parse_aggregations, r, schema))
12✔
519
            .collect::<Result<Vec<_>, PipelineError>>()?;
6✔
520
        let else_res = match else_result {
6✔
521
            Some(r) => Some(Box::new(self.parse_sql_expression(
4✔
522
                parse_aggregations,
4✔
523
                r,
4✔
524
                schema,
4✔
525
            )?)),
4✔
526
            None => None,
2✔
527
        };
×
528

×
529
        Ok(Expression::Case {
6✔
530
            operand: op,
6✔
531
            conditions: conds,
6✔
532
            results: res,
6✔
533
            else_result: else_res,
6✔
534
        })
6✔
535
    }
6✔
536

×
537
    fn parse_sql_interval_expression(
6✔
538
        &mut self,
6✔
539
        parse_aggregations: bool,
6✔
540
        value: &Expr,
6✔
541
        leading_field: &Option<DateTimeField>,
6✔
542
        schema: &Schema,
6✔
543
    ) -> Result<Expression, PipelineError> {
6✔
544
        let right = self.parse_sql_expression(parse_aggregations, value, schema)?;
6✔
545
        if leading_field.is_some() {
6✔
546
            Ok(Expression::DateTimeFunction {
6✔
547
                fun: DateTimeFunctionType::Interval {
6✔
548
                    field: leading_field.unwrap(),
6✔
549
                },
6✔
550
                arg: Box::new(right),
6✔
551
            })
6✔
552
        } else {
553
            Err(InvalidExpression(format!("INTERVAL for {leading_field:?}")))
×
554
        }
555
    }
6✔
556

557
    fn parse_sql_unary_op(
×
558
        &mut self,
×
559
        parse_aggregations: bool,
×
560
        op: &SqlUnaryOperator,
×
561
        expr: &SqlExpr,
×
562
        schema: &Schema,
×
563
    ) -> Result<Expression, PipelineError> {
×
564
        let arg = Box::new(self.parse_sql_expression(parse_aggregations, expr, schema)?);
×
565
        let operator = match op {
×
566
            SqlUnaryOperator::Not => UnaryOperatorType::Not,
×
567
            SqlUnaryOperator::Plus => UnaryOperatorType::Plus,
×
568
            SqlUnaryOperator::Minus => UnaryOperatorType::Minus,
×
569
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
570
        };
×
571

572
        Ok(Expression::UnaryOperator { operator, arg })
×
573
    }
×
574

×
575
    fn parse_sql_binary_op(
440✔
576
        &mut self,
440✔
577
        parse_aggregations: bool,
440✔
578
        left: &SqlExpr,
440✔
579
        op: &SqlBinaryOperator,
440✔
580
        right: &SqlExpr,
440✔
581
        schema: &Schema,
440✔
582
    ) -> Result<Expression, PipelineError> {
440✔
583
        let left_op = self.parse_sql_expression(parse_aggregations, left, schema)?;
440✔
584
        let right_op = self.parse_sql_expression(parse_aggregations, right, schema)?;
440✔
585

×
586
        let operator = match op {
440✔
587
            SqlBinaryOperator::Gt => BinaryOperatorType::Gt,
100✔
588
            SqlBinaryOperator::GtEq => BinaryOperatorType::Gte,
52✔
589
            SqlBinaryOperator::Lt => BinaryOperatorType::Lt,
55✔
590
            SqlBinaryOperator::LtEq => BinaryOperatorType::Lte,
37✔
591
            SqlBinaryOperator::Eq => BinaryOperatorType::Eq,
82✔
592
            SqlBinaryOperator::NotEq => BinaryOperatorType::Ne,
2✔
593
            SqlBinaryOperator::Plus => BinaryOperatorType::Add,
22✔
594
            SqlBinaryOperator::Minus => BinaryOperatorType::Sub,
4✔
595
            SqlBinaryOperator::Multiply => BinaryOperatorType::Mul,
×
596
            SqlBinaryOperator::Divide => BinaryOperatorType::Div,
14✔
597
            SqlBinaryOperator::Modulo => BinaryOperatorType::Mod,
×
598
            SqlBinaryOperator::And => BinaryOperatorType::And,
58✔
599
            SqlBinaryOperator::Or => BinaryOperatorType::Or,
14✔
600
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
601
        };
×
602

×
603
        Ok(Expression::BinaryOperator {
440✔
604
            left: Box::new(left_op),
440✔
605
            operator,
440✔
606
            right: Box::new(right_op),
440✔
607
        })
440✔
608
    }
440✔
609

×
610
    #[cfg(not(feature = "bigdecimal"))]
×
611
    fn parse_sql_number(n: &str) -> Result<Expression, PipelineError> {
448✔
612
        match n.parse::<i64>() {
448✔
613
            Ok(n) => Ok(Expression::Literal(Field::Int(n))),
448✔
614
            Err(_) => match n.parse::<f64>() {
×
615
                Ok(f) => Ok(Expression::Literal(Field::Float(OrderedFloat(f)))),
×
616
                Err(_) => Err(InvalidValue(n.to_string())),
×
617
            },
618
        }
619
    }
448✔
620

×
621
    #[cfg(feature = "bigdecimal")]
×
622
    fn parse_sql_number(n: &bigdecimal::BigDecimal) -> Result<Expression, PipelineError> {
×
623
        use bigdecimal::ToPrimitive;
×
624
        if n.is_integer() {
×
625
            Ok(Expression::Literal(Field::Int(n.to_i64().unwrap())))
×
626
        } else {
627
            match n.to_f64() {
×
628
                Some(f) => Ok(Expression::Literal(Field::Float(OrderedFloat(f)))),
×
629
                None => Err(InvalidValue(n.to_string())),
×
630
            }
×
631
        }
×
632
    }
×
633

×
634
    fn parse_sql_like_operator(
14✔
635
        &mut self,
14✔
636
        parse_aggregations: bool,
14✔
637
        negated: &bool,
14✔
638
        expr: &Expr,
14✔
639
        pattern: &Expr,
14✔
640
        escape_char: &Option<char>,
14✔
641
        schema: &Schema,
14✔
642
    ) -> Result<Expression, PipelineError> {
14✔
643
        let arg = self.parse_sql_expression(parse_aggregations, expr, schema)?;
14✔
644
        let pattern = self.parse_sql_expression(parse_aggregations, pattern, schema)?;
14✔
645
        let like_expression = Expression::Like {
14✔
646
            arg: Box::new(arg),
14✔
647
            pattern: Box::new(pattern),
14✔
648
            escape: *escape_char,
14✔
649
        };
14✔
650
        if *negated {
14✔
651
            Ok(Expression::UnaryOperator {
×
652
                operator: UnaryOperatorType::Not,
×
653
                arg: Box::new(like_expression),
×
654
            })
×
655
        } else {
×
656
            Ok(like_expression)
14✔
657
        }
×
658
    }
14✔
659

×
660
    fn parse_sql_extract_operator(
64✔
661
        &mut self,
64✔
662
        parse_aggregations: bool,
64✔
663
        field: &sqlparser::ast::DateTimeField,
64✔
664
        expr: &Expr,
64✔
665
        schema: &Schema,
64✔
666
    ) -> Result<Expression, PipelineError> {
64✔
667
        let right = self.parse_sql_expression(parse_aggregations, expr, schema)?;
64✔
668
        Ok(Expression::DateTimeFunction {
64✔
669
            fun: DateTimeFunctionType::Extract { field: *field },
64✔
670
            arg: Box::new(right),
64✔
671
        })
64✔
672
    }
64✔
673

×
674
    fn parse_sql_cast_operator(
122✔
675
        &mut self,
122✔
676
        parse_aggregations: bool,
122✔
677
        expr: &Expr,
122✔
678
        data_type: &DataType,
122✔
679
        schema: &Schema,
122✔
680
    ) -> Result<Expression, PipelineError> {
122✔
681
        let expression = self.parse_sql_expression(parse_aggregations, expr, schema)?;
122✔
682
        let cast_to = match data_type {
122✔
683
            DataType::Decimal(_) => CastOperatorType::Decimal,
×
684
            DataType::Binary(_) => CastOperatorType::Binary,
×
685
            DataType::Float(_) => CastOperatorType::Float,
18✔
686
            DataType::Int(_) => CastOperatorType::Int,
15✔
687
            DataType::Integer(_) => CastOperatorType::Int,
×
688
            DataType::UnsignedInt(_) => CastOperatorType::UInt,
×
689
            DataType::UnsignedInteger(_) => CastOperatorType::UInt,
×
690
            DataType::Boolean => CastOperatorType::Boolean,
14✔
691
            DataType::Date => CastOperatorType::Date,
×
692
            DataType::Timestamp(..) => CastOperatorType::Timestamp,
×
693
            DataType::Text => CastOperatorType::Text,
18✔
694
            DataType::String => CastOperatorType::String,
21✔
695
            DataType::JSON => CastOperatorType::Json,
16✔
696
            DataType::Custom(name, ..) => {
20✔
697
                if name.to_string().to_lowercase() == "uint" {
20✔
698
                    CastOperatorType::UInt
8✔
699
                } else if name.to_string().to_lowercase() == "u128" {
12✔
700
                    CastOperatorType::U128
6✔
701
                } else if name.to_string().to_lowercase() == "i128" {
6✔
702
                    CastOperatorType::I128
6✔
703
                } else {
704
                    Err(PipelineError::InvalidFunction(format!(
×
705
                        "Unsupported Cast type {name}"
×
706
                    )))?
×
707
                }
×
708
            }
×
709
            _ => Err(PipelineError::InvalidFunction(format!(
×
710
                "Unsupported Cast type {data_type}"
×
711
            )))?,
×
712
        };
×
713
        Ok(Expression::Cast {
122✔
714
            arg: Box::new(expression),
122✔
715
            typ: cast_to,
122✔
716
        })
122✔
717
    }
122✔
718

719
    fn parse_sql_string(s: &str) -> Result<Expression, PipelineError> {
298✔
720
        Ok(Expression::Literal(Field::String(s.to_owned())))
298✔
721
    }
298✔
722

723
    pub fn fullname_from_ident(ident: &[Ident]) -> String {
374✔
724
        let mut ident_tokens = vec![];
374✔
725
        for token in ident.iter() {
374✔
726
            ident_tokens.push(token.value.clone());
374✔
727
        }
374✔
728
        ident_tokens.join(".")
374✔
729
    }
374✔
730

×
731
    pub(crate) fn normalize_ident(id: &Ident) -> String {
1,902✔
732
        match id.quote_style {
1,902✔
733
            Some(_) => id.value.clone(),
×
734
            None => id.value.clone(),
1,902✔
735
        }
×
736
    }
1,902✔
737

×
738
    #[cfg(feature = "python")]
×
739
    fn parse_python_udf(
28✔
740
        &mut self,
28✔
741
        name: &str,
28✔
742
        function: &Function,
28✔
743
        schema: &Schema,
28✔
744
    ) -> Result<Expression, PipelineError> {
28✔
745
        // First, get python function define by name.
×
746
        // Then, transfer python function to Expression::PythonUDF
×
747

×
748
        use dozer_types::types::FieldType;
×
749
        use PipelineError::InvalidQuery;
×
750

751
        let args = function
28✔
752
            .args
28✔
753
            .iter()
28✔
754
            .map(|argument| self.parse_sql_function_arg(false, argument, schema))
42✔
755
            .collect::<Result<Vec<_>, PipelineError>>()?;
28✔
756

×
757
        let return_type = {
28✔
758
            let ident = function
28✔
759
                .return_type
28✔
760
                .as_ref()
28✔
761
                .ok_or_else(|| InvalidQuery("Python UDF must have a return type. The syntax is: function_name<return_type>(arguments)".to_string()))?;
28✔
762

×
763
            FieldType::try_from(ident.value.as_str())
28✔
764
                .map_err(|e| InvalidQuery(format!("Failed to parse Python UDF return type: {e}")))?
28✔
765
        };
×
766

×
767
        Ok(Expression::PythonUDF {
28✔
768
            name: name.to_string(),
28✔
769
            args,
28✔
770
            return_type,
28✔
771
        })
28✔
772
    }
28✔
773

×
774
    fn parse_sql_in_list_operator(
16✔
775
        &mut self,
16✔
776
        parse_aggregations: bool,
16✔
777
        expr: &Expr,
16✔
778
        list: &[Expr],
16✔
779
        negated: bool,
16✔
780
        schema: &Schema,
16✔
781
    ) -> Result<Expression, PipelineError> {
16✔
782
        let expr = self.parse_sql_expression(parse_aggregations, expr, schema)?;
16✔
783
        let list = list
16✔
784
            .iter()
16✔
785
            .map(|expr| self.parse_sql_expression(parse_aggregations, expr, schema))
168✔
786
            .collect::<Result<Vec<_>, PipelineError>>()?;
16✔
787
        let in_list_expression = Expression::InList {
16✔
788
            expr: Box::new(expr),
16✔
789
            list,
16✔
790
            negated,
16✔
791
        };
16✔
792

16✔
793
        Ok(in_list_expression)
16✔
794
    }
16✔
795
}
×
796

×
797
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
826✔
798
pub struct NameOrAlias(pub String, pub Option<String>);
×
799

×
800
pub fn extend_schema_source_def(schema: &Schema, name: &NameOrAlias) -> Schema {
1,317✔
801
    let mut output_schema = schema.clone();
1,317✔
802
    let mut fields = vec![];
1,317✔
803
    for mut field in schema.clone().fields.into_iter() {
6,457✔
804
        if let Some(alias) = &name.1 {
6,457✔
805
            field.source = SourceDefinition::Alias {
3,416✔
806
                name: alias.to_string(),
3,416✔
807
            };
3,416✔
808
        }
3,419✔
809

810
        fields.push(field);
6,457✔
811
    }
×
812
    output_schema.fields = fields;
1,317✔
813

1,317✔
814
    output_schema
1,317✔
815
}
1,317✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc