• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

50.89
/dozer-sql/src/pipeline/expression/execution.rs
1
use crate::argv;
2
use crate::pipeline::errors::PipelineError;
3

4
use uuid::Uuid;
5

6
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
7
use crate::pipeline::expression::scalar::common::{get_scalar_function_type, ScalarFunctionType};
8
use crate::pipeline::expression::scalar::string::{evaluate_trim, validate_trim, TrimType};
9
use dozer_types::types::{Field, FieldType, Record, Schema, SourceDefinition};
10

11
use super::aggregate::AggregateFunctionType;
12
use super::cast::CastOperatorType;
13
use super::scalar::string::{evaluate_like, get_like_operator_type};
14

15
#[derive(Clone, Debug, PartialEq)]
170,874✔
16
pub enum Expression {
17
    Column {
18
        index: usize,
19
    },
20
    Literal(Field),
21
    UnaryOperator {
22
        operator: UnaryOperatorType,
23
        arg: Box<Expression>,
24
    },
25
    BinaryOperator {
26
        left: Box<Expression>,
27
        operator: BinaryOperatorType,
28
        right: Box<Expression>,
29
    },
30
    ScalarFunction {
31
        fun: ScalarFunctionType,
32
        args: Vec<Expression>,
33
    },
34
    AggregateFunction {
35
        fun: AggregateFunctionType,
36
        args: Vec<Expression>,
37
    },
38
    Cast {
39
        arg: Box<Expression>,
40
        typ: CastOperatorType,
41
    },
42
    Trim {
43
        arg: Box<Expression>,
44
        what: Option<Box<Expression>>,
45
        typ: Option<TrimType>,
46
    },
47
    Like {
48
        arg: Box<Expression>,
49
        pattern: Box<Expression>,
50
        escape: Option<char>,
51
    },
52
}
53

54
impl Expression {
55
    pub fn to_string(&self, schema: &Schema) -> String {
1,198✔
56
        match &self {
1,198✔
57
            Expression::Column { index } => schema.fields[*index].name.clone(),
1,058✔
58
            Expression::Literal(value) => value
4✔
59
                .to_string()
4✔
60
                .unwrap_or_else(|| Uuid::new_v4().to_string()),
4✔
61
            Expression::UnaryOperator { operator, arg } => {
×
62
                operator.to_string() + arg.to_string(schema).as_str()
×
63
            }
64
            Expression::BinaryOperator {
65
                left,
×
66
                operator,
×
67
                right,
×
68
            } => {
×
69
                left.to_string(schema)
×
70
                    + operator.to_string().as_str()
×
71
                    + right.to_string(schema).as_str()
×
72
            }
73
            Expression::ScalarFunction { fun, args } => {
4✔
74
                fun.to_string()
4✔
75
                    + "("
4✔
76
                    + args
4✔
77
                        .iter()
4✔
78
                        .map(|e| e.to_string(schema))
9✔
79
                        .collect::<Vec<String>>()
4✔
80
                        .join(",")
4✔
81
                        .as_str()
4✔
82
                    + ")"
4✔
83
            }
84
            Expression::AggregateFunction { fun, args } => {
114✔
85
                fun.to_string()
114✔
86
                    + "("
114✔
87
                    + args
114✔
88
                        .iter()
114✔
89
                        .map(|e| e.to_string(schema))
114✔
90
                        .collect::<Vec<String>>()
114✔
91
                        .join(",")
114✔
92
                        .as_str()
114✔
93
                    + ")"
114✔
94
            }
95
            Expression::Cast { arg, typ } => {
×
96
                "CAST(".to_string()
×
97
                    + arg.to_string(schema).as_str()
×
98
                    + " AS "
×
99
                    + typ.to_string().as_str()
×
100
                    + ")"
×
101
            }
102
            Expression::Trim { typ, what, arg } => {
18✔
103
                "TRIM(".to_string()
18✔
104
                    + if let Some(t) = typ {
18✔
105
                        t.to_string()
×
106
                    } else {
107
                        "".to_string()
18✔
108
                    }
109
                    .as_str()
18✔
110
                    + if let Some(w) = what {
18✔
111
                        w.to_string(schema) + " FROM "
×
112
                    } else {
113
                        "".to_string()
18✔
114
                    }
115
                    .as_str()
18✔
116
                    + arg.to_string(schema).as_str()
18✔
117
                    + ")"
18✔
118
            }
119

120
            Expression::Like {
121
                arg,
×
122
                pattern,
×
123
                escape: _,
×
124
            } => arg.to_string(schema) + " LIKE " + pattern.to_string(schema).as_str(),
×
125
        }
126
    }
1,198✔
127
}
128

129
pub struct ExpressionType {
130
    pub return_type: FieldType,
131
    pub nullable: bool,
132
    pub source: SourceDefinition,
133
    pub is_primary_key: bool,
134
}
135

136
impl ExpressionType {
137
    pub fn new(
7,250✔
138
        return_type: FieldType,
7,250✔
139
        nullable: bool,
7,250✔
140
        source: SourceDefinition,
7,250✔
141
        is_primary_key: bool,
7,250✔
142
    ) -> Self {
7,250✔
143
        Self {
7,250✔
144
            return_type,
7,250✔
145
            nullable,
7,250✔
146
            source,
7,250✔
147
            is_primary_key,
7,250✔
148
        }
7,250✔
149
    }
7,250✔
150
}
151

152
impl Expression {}
153

154
pub trait ExpressionExecutor: Send + Sync {
155
    fn evaluate(&self, record: &Record, schema: &Schema) -> Result<Field, PipelineError>;
156
    fn get_type(&self, schema: &Schema) -> Result<ExpressionType, PipelineError>;
157
}
158

159
impl ExpressionExecutor for Expression {
160
    fn evaluate(&self, record: &Record, schema: &Schema) -> Result<Field, PipelineError> {
260,011✔
161
        match self {
260,011✔
162
            Expression::Literal(field) => Ok(field.clone()),
10,912✔
163
            Expression::Column { index } => Ok(record
242,087✔
164
                .get_value(*index)
242,087✔
165
                .map_err(|_e| {
242,087✔
166
                    PipelineError::InvalidInputType(format!("{} is an invalid field index", *index))
×
167
                })?
242,087✔
168
                .clone()),
242,089✔
169
            Expression::BinaryOperator {
170
                left,
6,932✔
171
                operator,
6,932✔
172
                right,
6,932✔
173
            } => operator.evaluate(schema, left, right, record),
6,932✔
174
            Expression::ScalarFunction { fun, args } => fun.evaluate(schema, args, record),
16✔
175
            Expression::UnaryOperator { operator, arg } => operator.evaluate(schema, arg, record),
1✔
176
            Expression::AggregateFunction { fun, args: _ } => {
×
177
                Err(PipelineError::InvalidExpression(format!(
×
178
                    "Aggregate Function {fun:?} should not be executed at this point"
×
179
                )))
×
180
            }
181
            Expression::Trim { typ, what, arg } => evaluate_trim(schema, arg, what, typ, record),
28✔
182
            Expression::Like {
183
                arg,
×
184
                pattern,
×
185
                escape,
×
186
            } => evaluate_like(schema, arg, pattern, *escape, record),
×
187
            Expression::Cast { arg, typ } => typ.evaluate(schema, arg, record),
35✔
188
        }
189
    }
260,013✔
190

191
    fn get_type(&self, schema: &Schema) -> Result<ExpressionType, PipelineError> {
7,349✔
192
        match self {
7,349✔
193
            Expression::Literal(field) => {
5,735✔
194
                let field_type = get_field_type(field);
5,735✔
195
                match field_type {
5,735✔
196
                    Some(f) => Ok(ExpressionType::new(
5,735✔
197
                        f,
5,735✔
198
                        false,
5,735✔
199
                        SourceDefinition::Dynamic,
5,735✔
200
                        false,
5,735✔
201
                    )),
5,735✔
202
                    None => Err(PipelineError::InvalidExpression(
×
203
                        "literal expression cannot be null".to_string(),
×
204
                    )),
×
205
                }
206
            }
×
207
            Expression::Column { index } => {
1,404✔
208
                let t = schema.fields.get(*index).unwrap();
1,404✔
209

1,404✔
210
                Ok(ExpressionType::new(
1,404✔
211
                    t.typ,
1,404✔
212
                    t.nullable,
1,404✔
213
                    t.source.clone(),
1,404✔
214
                    schema.primary_index.contains(index),
1,404✔
215
                ))
1,404✔
216
            }
×
217
            Expression::UnaryOperator { operator, arg } => {
×
218
                get_unary_operator_type(operator, arg, schema)
×
219
            }
220
            Expression::BinaryOperator {
×
221
                left,
3✔
222
                operator,
3✔
223
                right,
3✔
224
            } => get_binary_operator_type(left, operator, right, schema),
3✔
225
            Expression::ScalarFunction { fun, args } => get_scalar_function_type(fun, args, schema),
13✔
226
            Expression::AggregateFunction { fun, args } => {
133✔
227
                get_aggregate_function_type(fun, args, schema)
133✔
228
            }
229
            Expression::Trim {
230
                what: _,
231
                typ: _,
×
232
                arg,
25✔
233
            } => validate_trim(arg, schema),
25✔
234
            Expression::Like {
×
235
                arg,
×
236
                pattern,
×
237
                escape: _,
×
238
            } => get_like_operator_type(arg, pattern, schema),
×
239
            Expression::Cast { arg, typ } => typ.get_return_type(schema, arg),
36✔
240
        }
×
241
    }
7,349✔
242
}
243

×
244
fn get_field_type(field: &Field) -> Option<FieldType> {
5,735✔
245
    match field {
5,735✔
246
        Field::Int(_) => Some(FieldType::Int),
5,721✔
247
        Field::Float(_) => Some(FieldType::Float),
×
248
        Field::Boolean(_) => Some(FieldType::Boolean),
×
249
        Field::String(_) => Some(FieldType::String),
14✔
250
        Field::Binary(_) => Some(FieldType::Binary),
×
251
        Field::Decimal(_) => Some(FieldType::Decimal),
×
252
        Field::Timestamp(_) => Some(FieldType::Timestamp),
×
253
        Field::Bson(_) => Some(FieldType::Bson),
×
254
        Field::Null => None,
×
255
        Field::UInt(_) => Some(FieldType::UInt),
×
256
        Field::Text(_) => Some(FieldType::Text),
×
257
        Field::Date(_) => Some(FieldType::Date),
×
258
    }
×
259
}
5,735✔
260

×
261
fn get_unary_operator_type(
×
262
    operator: &UnaryOperatorType,
×
263
    expression: &Expression,
×
264
    schema: &Schema,
×
265
) -> Result<ExpressionType, PipelineError> {
×
266
    let field_type = expression.get_type(schema)?;
×
267
    match operator {
×
268
        UnaryOperatorType::Not => match field_type.return_type {
×
269
            FieldType::Boolean => Ok(field_type),
×
270
            field_type => Err(PipelineError::InvalidExpression(format!(
×
271
                "cannot apply NOT to {field_type:?}"
×
272
            ))),
×
273
        },
×
274
        UnaryOperatorType::Plus => Ok(field_type),
×
275
        UnaryOperatorType::Minus => Ok(field_type),
×
276
    }
×
277
}
×
278

×
279
fn get_binary_operator_type(
3✔
280
    left: &Expression,
3✔
281
    operator: &BinaryOperatorType,
3✔
282
    right: &Expression,
3✔
283
    schema: &Schema,
3✔
284
) -> Result<ExpressionType, PipelineError> {
3✔
285
    let left_field_type = left.get_type(schema)?;
3✔
286
    let right_field_type = right.get_type(schema)?;
3✔
287
    match operator {
3✔
288
        BinaryOperatorType::Eq
289
        | BinaryOperatorType::Ne
290
        | BinaryOperatorType::Gt
291
        | BinaryOperatorType::Gte
292
        | BinaryOperatorType::Lt
×
293
        | BinaryOperatorType::Lte => Ok(ExpressionType::new(
×
294
            FieldType::Boolean,
×
295
            false,
×
296
            SourceDefinition::Dynamic,
×
297
            false,
×
298
        )),
×
299

300
        BinaryOperatorType::And | BinaryOperatorType::Or => {
×
301
            match (left_field_type.return_type, right_field_type.return_type) {
×
302
                (FieldType::Boolean, FieldType::Boolean) => Ok(ExpressionType::new(
×
303
                    FieldType::Boolean,
×
304
                    false,
×
305
                    SourceDefinition::Dynamic,
×
306
                    false,
×
307
                )),
×
308
                (left_field_type, right_field_type) => {
×
309
                    Err(PipelineError::InvalidExpression(format!(
×
310
                        "cannot apply {operator:?} to {left_field_type:?} and {right_field_type:?}"
×
311
                    )))
×
312
                }
313
            }
314
        }
315

316
        BinaryOperatorType::Add | BinaryOperatorType::Sub | BinaryOperatorType::Mul => {
×
317
            match (left_field_type.return_type, right_field_type.return_type) {
3✔
318
                (FieldType::Int, FieldType::Int) => Ok(ExpressionType::new(
3✔
319
                    FieldType::Int,
3✔
320
                    false,
3✔
321
                    SourceDefinition::Dynamic,
3✔
322
                    false,
3✔
323
                )),
3✔
324
                (FieldType::Int, FieldType::Float)
325
                | (FieldType::Float, FieldType::Int)
×
326
                | (FieldType::Float, FieldType::Float) => Ok(ExpressionType::new(
×
327
                    FieldType::Float,
×
328
                    false,
×
329
                    SourceDefinition::Dynamic,
×
330
                    false,
×
331
                )),
×
332
                (left_field_type, right_field_type) => {
×
333
                    Err(PipelineError::InvalidExpression(format!(
×
334
                        "cannot apply {operator:?} to {left_field_type:?} and {right_field_type:?}"
×
335
                    )))
×
336
                }
337
            }
338
        }
339
        BinaryOperatorType::Div | BinaryOperatorType::Mod => {
×
340
            match (left_field_type.return_type, right_field_type.return_type) {
×
341
                (FieldType::Int, FieldType::Float)
342
                | (FieldType::Float, FieldType::Int)
×
343
                | (FieldType::Float, FieldType::Float) => Ok(ExpressionType::new(
×
344
                    FieldType::Float,
×
345
                    false,
×
346
                    SourceDefinition::Dynamic,
×
347
                    false,
×
348
                )),
×
349
                (left_field_type, right_field_type) => {
×
350
                    Err(PipelineError::InvalidExpression(format!(
×
351
                        "cannot apply {operator:?} to {left_field_type:?} and {right_field_type:?}"
×
352
                    )))
×
353
                }
354
            }
355
        }
356
    }
×
357
}
3✔
358

×
359
fn get_aggregate_function_type(
133✔
360
    function: &AggregateFunctionType,
133✔
361
    args: &[Expression],
133✔
362
    schema: &Schema,
133✔
363
) -> Result<ExpressionType, PipelineError> {
133✔
364
    match function {
133✔
365
        AggregateFunctionType::Avg => Ok(ExpressionType::new(
7✔
366
            FieldType::Float,
7✔
367
            false,
7✔
368
            SourceDefinition::Dynamic,
7✔
369
            false,
7✔
370
        )),
7✔
371
        AggregateFunctionType::Count => Ok(ExpressionType::new(
92✔
372
            FieldType::Int,
92✔
373
            false,
92✔
374
            SourceDefinition::Dynamic,
92✔
375
            false,
92✔
376
        )),
92✔
377
        AggregateFunctionType::Max => argv!(args, 0, AggregateFunctionType::Max)?.get_type(schema),
11✔
378
        AggregateFunctionType::Median => {
×
379
            argv!(args, 0, AggregateFunctionType::Median)?.get_type(schema)
×
380
        }
×
381
        AggregateFunctionType::Min => argv!(args, 0, AggregateFunctionType::Min)?.get_type(schema),
11✔
382
        AggregateFunctionType::Sum => argv!(args, 0, AggregateFunctionType::Sum)?.get_type(schema),
12✔
383
        AggregateFunctionType::Stddev => Ok(ExpressionType::new(
×
384
            FieldType::Float,
×
385
            false,
×
386
            SourceDefinition::Dynamic,
×
387
            false,
×
388
        )),
×
389
        AggregateFunctionType::Variance => Ok(ExpressionType::new(
×
390
            FieldType::Float,
×
391
            false,
×
392
            SourceDefinition::Dynamic,
×
393
            false,
×
394
        )),
×
395
    }
×
396
}
133✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc