• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4295401807

pending completion
4295401807

push

github

GitHub
Bump version (#1099)

28685 of 39545 relevant lines covered (72.54%)

52105.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

53.47
/dozer-sql/src/pipeline/expression/execution.rs
1
use crate::argv;
2
use crate::pipeline::errors::PipelineError;
3

4
use uuid::Uuid;
5

6
use crate::pipeline::expression::datetime::{get_datetime_function_type, DateTimeFunctionType};
7
use crate::pipeline::expression::geo::common::{get_geo_function_type, GeoFunctionType};
8
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
9
use crate::pipeline::expression::scalar::common::{get_scalar_function_type, ScalarFunctionType};
10
use crate::pipeline::expression::scalar::string::{evaluate_trim, validate_trim, TrimType};
11
use dozer_types::types::{Field, FieldType, Record, Schema, SourceDefinition};
12

13
use super::aggregate::AggregateFunctionType;
14
use super::cast::CastOperatorType;
15
use super::scalar::string::{evaluate_like, get_like_operator_type};
16

17
#[derive(Clone, Debug, PartialEq)]
462,921✔
18
pub enum Expression {
19
    Column {
20
        index: usize,
21
    },
22
    Literal(Field),
23
    UnaryOperator {
24
        operator: UnaryOperatorType,
25
        arg: Box<Expression>,
26
    },
27
    BinaryOperator {
28
        left: Box<Expression>,
29
        operator: BinaryOperatorType,
30
        right: Box<Expression>,
31
    },
32
    ScalarFunction {
33
        fun: ScalarFunctionType,
34
        args: Vec<Expression>,
35
    },
36
    GeoFunction {
37
        fun: GeoFunctionType,
38
        args: Vec<Expression>,
39
    },
40
    DateTimeFunction {
41
        fun: DateTimeFunctionType,
42
        arg: Box<Expression>,
43
    },
44
    AggregateFunction {
45
        fun: AggregateFunctionType,
46
        args: Vec<Expression>,
47
    },
48
    Cast {
49
        arg: Box<Expression>,
50
        typ: CastOperatorType,
51
    },
52
    Trim {
53
        arg: Box<Expression>,
54
        what: Option<Box<Expression>>,
55
        typ: Option<TrimType>,
56
    },
57
    Like {
58
        arg: Box<Expression>,
59
        pattern: Box<Expression>,
60
        escape: Option<char>,
61
    },
62
    #[cfg(feature = "python")]
63
    PythonUDF {
64
        name: String,
65
        args: Vec<Expression>,
66
        return_type: FieldType,
67
    },
68
}
69

70
impl Expression {
71
    pub fn to_string(&self, schema: &Schema) -> String {
3,026✔
72
        match &self {
3,026✔
73
            Expression::Column { index } => schema.fields[*index].name.clone(),
2,712✔
74
            Expression::Literal(value) => value
34✔
75
                .to_string()
34✔
76
                .unwrap_or_else(|| Uuid::new_v4().to_string()),
34✔
77
            Expression::UnaryOperator { operator, arg } => {
×
78
                operator.to_string() + arg.to_string(schema).as_str()
×
79
            }
80
            Expression::BinaryOperator {
81
                left,
×
82
                operator,
×
83
                right,
×
84
            } => {
×
85
                left.to_string(schema)
×
86
                    + operator.to_string().as_str()
×
87
                    + right.to_string(schema).as_str()
×
88
            }
89
            Expression::ScalarFunction { fun, args } => {
4✔
90
                fun.to_string()
4✔
91
                    + "("
4✔
92
                    + args
4✔
93
                        .iter()
4✔
94
                        .map(|e| e.to_string(schema))
9✔
95
                        .collect::<Vec<String>>()
4✔
96
                        .join(",")
4✔
97
                        .as_str()
4✔
98
                    + ")"
4✔
99
            }
100
            Expression::AggregateFunction { fun, args } => {
216✔
101
                fun.to_string()
216✔
102
                    + "("
216✔
103
                    + args
216✔
104
                        .iter()
216✔
105
                        .map(|e| e.to_string(schema))
216✔
106
                        .collect::<Vec<String>>()
216✔
107
                        .join(",")
216✔
108
                        .as_str()
216✔
109
                    + ")"
216✔
110
            }
111
            #[cfg(feature = "python")]
112
            Expression::PythonUDF { name, args, .. } => {
30✔
113
                name.to_string()
30✔
114
                    + "("
30✔
115
                    + args
30✔
116
                        .iter()
30✔
117
                        .map(|expr| expr.to_string(schema))
75✔
118
                        .collect::<Vec<String>>()
30✔
119
                        .join(",")
30✔
120
                        .as_str()
30✔
121
                    + ")"
30✔
122
            }
123
            Expression::Cast { arg, typ } => {
×
124
                "CAST(".to_string()
×
125
                    + arg.to_string(schema).as_str()
×
126
                    + " AS "
×
127
                    + typ.to_string().as_str()
×
128
                    + ")"
×
129
            }
130
            Expression::Trim { typ, what, arg } => {
30✔
131
                "TRIM(".to_string()
30✔
132
                    + if let Some(t) = typ {
30✔
133
                        t.to_string()
×
134
                    } else {
135
                        "".to_string()
30✔
136
                    }
137
                    .as_str()
30✔
138
                    + if let Some(w) = what {
30✔
139
                        w.to_string(schema) + " FROM "
×
140
                    } else {
141
                        "".to_string()
30✔
142
                    }
143
                    .as_str()
30✔
144
                    + arg.to_string(schema).as_str()
30✔
145
                    + ")"
30✔
146
            }
147

148
            Expression::Like {
149
                arg,
×
150
                pattern,
×
151
                escape: _,
×
152
            } => arg.to_string(schema) + " LIKE " + pattern.to_string(schema).as_str(),
×
153
            Expression::GeoFunction { fun, args } => {
×
154
                fun.to_string()
×
155
                    + "("
×
156
                    + args
×
157
                        .iter()
×
158
                        .map(|e| e.to_string(schema))
×
159
                        .collect::<Vec<String>>()
×
160
                        .join(",")
×
161
                        .as_str()
×
162
                    + ")"
×
163
            }
164
            Expression::DateTimeFunction { fun, arg } => {
×
165
                fun.to_string() + "(" + arg.to_string(schema).as_str() + ")"
×
166
            }
167
        }
168
    }
3,026✔
169
}
170

171
pub struct ExpressionType {
172
    pub return_type: FieldType,
173
    pub nullable: bool,
174
    pub source: SourceDefinition,
175
    pub is_primary_key: bool,
176
}
177

178
impl ExpressionType {
179
    pub fn new(
24,423✔
180
        return_type: FieldType,
24,423✔
181
        nullable: bool,
24,423✔
182
        source: SourceDefinition,
24,423✔
183
        is_primary_key: bool,
24,423✔
184
    ) -> Self {
24,423✔
185
        Self {
24,423✔
186
            return_type,
24,423✔
187
            nullable,
24,423✔
188
            source,
24,423✔
189
            is_primary_key,
24,423✔
190
        }
24,423✔
191
    }
24,423✔
192
}
193

194
impl Expression {}
195

196
pub trait ExpressionExecutor: Send + Sync {
197
    fn evaluate(&self, record: &Record, schema: &Schema) -> Result<Field, PipelineError>;
198
    fn get_type(&self, schema: &Schema) -> Result<ExpressionType, PipelineError>;
199
}
200

201
impl ExpressionExecutor for Expression {
202
    fn evaluate(&self, record: &Record, schema: &Schema) -> Result<Field, PipelineError> {
694,464✔
203
        match self {
694,464✔
204
            Expression::Literal(field) => Ok(field.clone()),
39,398✔
205
            Expression::Column { index } => Ok(record
633,794✔
206
                .get_value(*index)
633,794✔
207
                .map_err(|_e| {
633,794✔
208
                    PipelineError::InvalidInputType(format!("{} is an invalid field index", *index))
×
209
                })?
633,794✔
210
                .clone()),
633,816✔
211
            Expression::BinaryOperator {
212
                left,
21,159✔
213
                operator,
21,159✔
214
                right,
21,159✔
215
            } => operator.evaluate(schema, left, right, record),
21,159✔
216
            Expression::ScalarFunction { fun, args } => fun.evaluate(schema, args, record),
16✔
217

218
            #[cfg(feature = "python")]
219
            Expression::PythonUDF {
220
                name,
10✔
221
                args,
10✔
222
                return_type,
10✔
223
                ..
10✔
224
            } => {
10✔
225
                use crate::pipeline::expression::python_udf::evaluate_py_udf;
10✔
226
                evaluate_py_udf(schema, name, args, return_type, record)
10✔
227
            }
228
            Expression::UnaryOperator { operator, arg } => operator.evaluate(schema, arg, record),
1✔
229
            Expression::AggregateFunction { fun, args: _ } => {
×
230
                Err(PipelineError::InvalidExpression(format!(
×
231
                    "Aggregate Function {fun:?} should not be executed at this point"
×
232
                )))
×
233
            }
234
            Expression::Trim { typ, what, arg } => evaluate_trim(schema, arg, what, typ, record),
42✔
235
            Expression::Like {
236
                arg,
×
237
                pattern,
×
238
                escape,
×
239
            } => evaluate_like(schema, arg, pattern, *escape, record),
×
240
            Expression::Cast { arg, typ } => typ.evaluate(schema, arg, record),
35✔
241
            Expression::GeoFunction { fun, args } => fun.evaluate(schema, args, record),
7✔
242
            Expression::DateTimeFunction { fun, arg } => fun.evaluate(schema, arg, record),
2✔
243
        }
244
    }
694,486✔
245

246
    fn get_type(&self, schema: &Schema) -> Result<ExpressionType, PipelineError> {
24,531✔
247
        match self {
24,531✔
248
            Expression::Literal(field) => {
21,166✔
249
                let field_type = get_field_type(field);
21,166✔
250
                match field_type {
21,166✔
251
                    Some(f) => Ok(ExpressionType::new(
21,166✔
252
                        f,
21,166✔
253
                        false,
21,166✔
254
                        SourceDefinition::Dynamic,
21,166✔
255
                        false,
21,166✔
256
                    )),
21,166✔
257
                    None => Err(PipelineError::InvalidExpression(
×
258
                        "literal expression cannot be null".to_string(),
×
259
                    )),
×
260
                }
261
            }
262
            Expression::Column { index } => {
2,989✔
263
                let t = schema.fields.get(*index).unwrap();
2,989✔
264

2,989✔
265
                Ok(ExpressionType::new(
2,989✔
266
                    t.typ,
2,989✔
267
                    t.nullable,
2,989✔
268
                    t.source.clone(),
2,989✔
269
                    schema.primary_index.contains(index),
2,989✔
270
                ))
2,989✔
271
            }
272
            Expression::UnaryOperator { operator, arg } => {
×
273
                get_unary_operator_type(operator, arg, schema)
×
274
            }
275
            Expression::BinaryOperator {
276
                left,
4✔
277
                operator,
4✔
278
                right,
4✔
279
            } => get_binary_operator_type(left, operator, right, schema),
4✔
280
            Expression::ScalarFunction { fun, args } => get_scalar_function_type(fun, args, schema),
13✔
281
            Expression::AggregateFunction { fun, args } => {
247✔
282
                get_aggregate_function_type(fun, args, schema)
247✔
283
            }
284
            Expression::Trim {
285
                what: _,
286
                typ: _,
287
                arg,
37✔
288
            } => validate_trim(arg, schema),
37✔
289
            Expression::Like {
290
                arg,
×
291
                pattern,
×
292
                escape: _,
×
293
            } => get_like_operator_type(arg, pattern, schema),
×
294
            Expression::Cast { arg, typ } => typ.get_return_type(schema, arg),
36✔
295
            Expression::GeoFunction { fun, args } => get_geo_function_type(fun, args, schema),
7✔
296
            Expression::DateTimeFunction { fun, arg } => {
2✔
297
                get_datetime_function_type(fun, arg, schema)
2✔
298
            }
299
            #[cfg(feature = "python")]
300
            Expression::PythonUDF { return_type, .. } => Ok(ExpressionType::new(
30✔
301
                *return_type,
30✔
302
                false,
30✔
303
                SourceDefinition::Dynamic,
30✔
304
                false,
30✔
305
            )),
30✔
306
        }
307
    }
24,531✔
308
}
309

310
fn get_field_type(field: &Field) -> Option<FieldType> {
21,166✔
311
    match field {
21,166✔
312
        Field::Int(_) => Some(FieldType::Int),
21,147✔
313
        Field::Float(_) => Some(FieldType::Float),
×
314
        Field::Boolean(_) => Some(FieldType::Boolean),
×
315
        Field::String(_) => Some(FieldType::String),
19✔
316
        Field::Binary(_) => Some(FieldType::Binary),
×
317
        Field::Decimal(_) => Some(FieldType::Decimal),
×
318
        Field::Timestamp(_) => Some(FieldType::Timestamp),
×
319
        Field::Bson(_) => Some(FieldType::Bson),
×
320
        Field::Null => None,
×
321
        Field::UInt(_) => Some(FieldType::UInt),
×
322
        Field::Text(_) => Some(FieldType::Text),
×
323
        Field::Date(_) => Some(FieldType::Date),
×
324
        Field::Point(_) => Some(FieldType::Point),
×
325
    }
326
}
21,166✔
327

328
fn get_unary_operator_type(
×
329
    operator: &UnaryOperatorType,
×
330
    expression: &Expression,
×
331
    schema: &Schema,
×
332
) -> Result<ExpressionType, PipelineError> {
×
333
    let field_type = expression.get_type(schema)?;
×
334
    match operator {
×
335
        UnaryOperatorType::Not => match field_type.return_type {
×
336
            FieldType::Boolean => Ok(field_type),
×
337
            field_type => Err(PipelineError::InvalidExpression(format!(
×
338
                "cannot apply NOT to {field_type:?}"
×
339
            ))),
×
340
        },
341
        UnaryOperatorType::Plus => Ok(field_type),
×
342
        UnaryOperatorType::Minus => Ok(field_type),
×
343
    }
344
}
×
345

346
fn get_binary_operator_type(
4✔
347
    left: &Expression,
4✔
348
    operator: &BinaryOperatorType,
4✔
349
    right: &Expression,
4✔
350
    schema: &Schema,
4✔
351
) -> Result<ExpressionType, PipelineError> {
4✔
352
    let left_field_type = left.get_type(schema)?;
4✔
353
    let right_field_type = right.get_type(schema)?;
4✔
354
    match operator {
4✔
355
        BinaryOperatorType::Eq
356
        | BinaryOperatorType::Ne
357
        | BinaryOperatorType::Gt
358
        | BinaryOperatorType::Gte
359
        | BinaryOperatorType::Lt
360
        | BinaryOperatorType::Lte => Ok(ExpressionType::new(
×
361
            FieldType::Boolean,
×
362
            false,
×
363
            SourceDefinition::Dynamic,
×
364
            false,
×
365
        )),
×
366

367
        BinaryOperatorType::And | BinaryOperatorType::Or => {
368
            match (left_field_type.return_type, right_field_type.return_type) {
×
369
                (FieldType::Boolean, FieldType::Boolean) => Ok(ExpressionType::new(
×
370
                    FieldType::Boolean,
×
371
                    false,
×
372
                    SourceDefinition::Dynamic,
×
373
                    false,
×
374
                )),
×
375
                (left_field_type, right_field_type) => {
×
376
                    Err(PipelineError::InvalidExpression(format!(
×
377
                        "cannot apply {operator:?} to {left_field_type:?} and {right_field_type:?}"
×
378
                    )))
×
379
                }
380
            }
381
        }
382

383
        BinaryOperatorType::Add | BinaryOperatorType::Sub | BinaryOperatorType::Mul => {
384
            match (left_field_type.return_type, right_field_type.return_type) {
4✔
385
                (FieldType::Int, FieldType::Int) => Ok(ExpressionType::new(
3✔
386
                    FieldType::Int,
3✔
387
                    false,
3✔
388
                    SourceDefinition::Dynamic,
3✔
389
                    false,
3✔
390
                )),
3✔
391
                (FieldType::UInt, FieldType::Int) => Ok(ExpressionType::new(
×
392
                    FieldType::Int,
×
393
                    false,
×
394
                    SourceDefinition::Dynamic,
×
395
                    false,
×
396
                )),
×
397
                (FieldType::Int, FieldType::UInt) => Ok(ExpressionType::new(
×
398
                    FieldType::Int,
×
399
                    false,
×
400
                    SourceDefinition::Dynamic,
×
401
                    false,
×
402
                )),
×
403
                (FieldType::Timestamp, FieldType::Timestamp) => Ok(ExpressionType::new(
1✔
404
                    FieldType::Int,
1✔
405
                    false,
1✔
406
                    SourceDefinition::Dynamic,
1✔
407
                    false,
1✔
408
                )),
1✔
409
                (FieldType::Int, FieldType::Float)
×
410
                | (FieldType::Float, FieldType::Int)
×
411
                | (FieldType::Float, FieldType::Float) => Ok(ExpressionType::new(
×
412
                    FieldType::Float,
×
413
                    false,
×
414
                    SourceDefinition::Dynamic,
×
415
                    false,
×
416
                )),
×
417
                (left_field_type, right_field_type) => {
×
418
                    Err(PipelineError::InvalidExpression(format!(
×
419
                        "cannot apply {operator:?} to {left_field_type:?} and {right_field_type:?}"
×
420
                    )))
×
421
                }
422
            }
×
423
        }
×
424
        BinaryOperatorType::Div | BinaryOperatorType::Mod => {
×
425
            match (left_field_type.return_type, right_field_type.return_type) {
×
426
                (FieldType::Int, FieldType::Float)
×
427
                | (FieldType::Float, FieldType::Int)
×
428
                | (FieldType::Float, FieldType::Float) => Ok(ExpressionType::new(
×
429
                    FieldType::Float,
×
430
                    false,
×
431
                    SourceDefinition::Dynamic,
×
432
                    false,
×
433
                )),
×
434
                (left_field_type, right_field_type) => {
×
435
                    Err(PipelineError::InvalidExpression(format!(
×
436
                        "cannot apply {operator:?} to {left_field_type:?} and {right_field_type:?}"
×
437
                    )))
×
438
                }
×
439
            }
×
440
        }
×
441
    }
×
442
}
4✔
443

×
444
fn get_aggregate_function_type(
247✔
445
    function: &AggregateFunctionType,
247✔
446
    args: &[Expression],
247✔
447
    schema: &Schema,
247✔
448
) -> Result<ExpressionType, PipelineError> {
247✔
449
    match function {
247✔
450
        AggregateFunctionType::Avg => Ok(ExpressionType::new(
7✔
451
            FieldType::Float,
7✔
452
            false,
7✔
453
            SourceDefinition::Dynamic,
7✔
454
            false,
7✔
455
        )),
7✔
456
        AggregateFunctionType::Count => Ok(ExpressionType::new(
206✔
457
            FieldType::Int,
206✔
458
            false,
206✔
459
            SourceDefinition::Dynamic,
206✔
460
            false,
206✔
461
        )),
206✔
462
        AggregateFunctionType::Max => argv!(args, 0, AggregateFunctionType::Max)?.get_type(schema),
11✔
463
        AggregateFunctionType::Median => {
×
464
            argv!(args, 0, AggregateFunctionType::Median)?.get_type(schema)
×
465
        }
×
466
        AggregateFunctionType::Min => argv!(args, 0, AggregateFunctionType::Min)?.get_type(schema),
11✔
467
        AggregateFunctionType::Sum => argv!(args, 0, AggregateFunctionType::Sum)?.get_type(schema),
12✔
468
        AggregateFunctionType::Stddev => Ok(ExpressionType::new(
×
469
            FieldType::Float,
×
470
            false,
×
471
            SourceDefinition::Dynamic,
×
472
            false,
×
473
        )),
×
474
        AggregateFunctionType::Variance => Ok(ExpressionType::new(
×
475
            FieldType::Float,
×
476
            false,
×
477
            SourceDefinition::Dynamic,
×
478
            false,
×
479
        )),
×
480
    }
481
}
247✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc