• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.82
/dozer-sql/src/pipeline/expression/execution.rs
1
use crate::argv;
2
use crate::pipeline::errors::PipelineError;
3

4
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
5
use crate::pipeline::expression::scalar::common::{get_scalar_function_type, ScalarFunctionType};
6
use crate::pipeline::expression::scalar::string::{evaluate_trim, validate_trim, TrimType};
7
use dozer_types::types::{Field, FieldType, Record, Schema, SourceDefinition};
8

9
use super::aggregate::AggregateFunctionType;
10
use super::cast::CastOperatorType;
11
use super::scalar::string::{evaluate_like, get_like_operator_type};
12

13
#[derive(Clone, Debug, PartialEq)]
148,511✔
14
pub enum Expression {
15
    Column {
16
        index: usize,
17
    },
18
    Literal(Field),
19
    UnaryOperator {
20
        operator: UnaryOperatorType,
21
        arg: Box<Expression>,
22
    },
23
    BinaryOperator {
24
        left: Box<Expression>,
25
        operator: BinaryOperatorType,
26
        right: Box<Expression>,
27
    },
28
    ScalarFunction {
29
        fun: ScalarFunctionType,
30
        args: Vec<Expression>,
31
    },
32
    AggregateFunction {
33
        fun: AggregateFunctionType,
34
        args: Vec<Expression>,
35
    },
36
    Cast {
37
        arg: Box<Expression>,
38
        typ: CastOperatorType,
39
    },
40
    Trim {
41
        arg: Box<Expression>,
42
        what: Option<Box<Expression>>,
43
        typ: Option<TrimType>,
44
    },
45
    Like {
46
        arg: Box<Expression>,
47
        pattern: Box<Expression>,
48
        escape: Option<char>,
49
    },
50
}
51

52
pub struct ExpressionType {
53
    pub return_type: FieldType,
54
    pub nullable: bool,
55
    pub source: SourceDefinition,
56
}
57

58
impl ExpressionType {
×
59
    pub fn new(return_type: FieldType, nullable: bool, source: SourceDefinition) -> Self {
17,996✔
60
        Self {
17,996✔
61
            return_type,
17,996✔
62
            nullable,
17,996✔
63
            source,
17,996✔
64
        }
17,996✔
65
    }
17,996✔
66
}
67

68
impl Expression {}
69

70
pub trait ExpressionExecutor: Send + Sync {
71
    fn evaluate(&self, record: &Record, schema: &Schema) -> Result<Field, PipelineError>;
72
    fn get_type(&self, schema: &Schema) -> Result<ExpressionType, PipelineError>;
73
}
74

×
75
impl ExpressionExecutor for Expression {
×
76
    fn evaluate(&self, record: &Record, schema: &Schema) -> Result<Field, PipelineError> {
186,499✔
77
        match self {
186,499✔
78
            Expression::Literal(field) => Ok(field.clone()),
14,936✔
79
            Expression::Column { index } => Ok(record
154,800✔
80
                .get_value(*index)
154,800✔
81
                .map_err(|_e| {
154,800✔
82
                    PipelineError::InvalidInputType(format!("{} is an invalid field index", *index))
×
83
                })?
154,800✔
84
                .clone()),
154,821✔
85
            Expression::BinaryOperator {
×
86
                left,
16,694✔
87
                operator,
16,694✔
88
                right,
16,694✔
89
            } => operator.evaluate(schema, left, right, record),
16,694✔
90
            Expression::ScalarFunction { fun, args } => fun.evaluate(schema, args, record),
8✔
91
            Expression::UnaryOperator { operator, arg } => operator.evaluate(schema, arg, record),
1✔
92
            Expression::AggregateFunction { fun, args: _ } => {
×
93
                Err(PipelineError::InvalidExpression(format!(
×
94
                    "Aggregate Function {:?} should not be executed at this point",
×
95
                    fun
×
96
                )))
×
97
            }
98
            Expression::Trim { typ, what, arg } => evaluate_trim(schema, arg, what, typ, record),
28✔
99
            Expression::Like {
×
100
                arg,
×
101
                pattern,
×
102
                escape,
×
103
            } => evaluate_like(schema, arg, pattern, *escape, record),
×
104
            Expression::Cast { arg, typ } => typ.evaluate(schema, arg, record),
32✔
105
        }
106
    }
186,520✔
107

108
    fn get_type(&self, schema: &Schema) -> Result<ExpressionType, PipelineError> {
18,050✔
109
        match self {
18,050✔
110
            Expression::Literal(field) => {
×
111
                let r = get_field_type(field).ok_or_else(|| {
×
112
                    PipelineError::InvalidExpression(
×
113
                        "literal expression cannot be null".to_string(),
×
114
                    )
×
115
                })?;
×
116
                Ok(ExpressionType::new(r, false, SourceDefinition::Dynamic))
×
117
            }
118
            Expression::Column { index } => {
17,991✔
119
                let t = schema.fields.get(*index).unwrap();
17,991✔
120

17,991✔
121
                Ok(ExpressionType::new(t.typ, t.nullable, t.source.clone()))
17,991✔
122
            }
123
            Expression::UnaryOperator { operator, arg } => {
×
124
                get_unary_operator_type(operator, arg, schema)
×
125
            }
126
            Expression::BinaryOperator {
127
                left,
×
128
                operator,
×
129
                right,
×
130
            } => get_binary_operator_type(left, operator, right, schema),
×
131
            Expression::ScalarFunction { fun, args } => get_scalar_function_type(fun, args, schema),
8✔
132
            Expression::AggregateFunction { fun, args } => {
×
133
                get_aggregate_function_type(fun, args, schema)
×
134
            }
135
            Expression::Trim {
136
                what: _,
137
                typ: _,
138
                arg,
19✔
139
            } => validate_trim(arg, schema),
19✔
140
            Expression::Like {
141
                arg,
×
142
                pattern,
×
143
                escape: _,
×
144
            } => get_like_operator_type(arg, pattern, schema),
×
145
            Expression::Cast { arg, typ } => typ.get_return_type(schema, arg),
32✔
146
        }
×
147
    }
18,050✔
148
}
×
149

×
150
fn get_field_type(field: &Field) -> Option<FieldType> {
×
151
    match field {
×
152
        Field::Int(_) => Some(FieldType::Int),
×
153
        Field::Float(_) => Some(FieldType::Float),
×
154
        Field::Boolean(_) => Some(FieldType::Boolean),
×
155
        Field::String(_) => Some(FieldType::String),
×
156
        Field::Binary(_) => Some(FieldType::Binary),
×
157
        Field::Decimal(_) => Some(FieldType::Decimal),
×
158
        Field::Timestamp(_) => Some(FieldType::Timestamp),
×
159
        Field::Bson(_) => Some(FieldType::Bson),
×
160
        Field::Null => None,
×
161
        Field::UInt(_) => Some(FieldType::UInt),
×
162
        Field::Text(_) => Some(FieldType::Text),
×
163
        Field::Date(_) => Some(FieldType::Date),
×
164
    }
×
165
}
×
166

×
167
fn get_unary_operator_type(
×
168
    operator: &UnaryOperatorType,
×
169
    expression: &Expression,
×
170
    schema: &Schema,
×
171
) -> Result<ExpressionType, PipelineError> {
×
172
    let field_type = expression.get_type(schema)?;
×
173
    match operator {
×
174
        UnaryOperatorType::Not => match field_type.return_type {
×
175
            FieldType::Boolean => Ok(field_type),
×
176
            field_type => Err(PipelineError::InvalidExpression(format!(
×
177
                "cannot apply NOT to {:?}",
×
178
                field_type
×
179
            ))),
×
180
        },
×
181
        UnaryOperatorType::Plus => Ok(field_type),
×
182
        UnaryOperatorType::Minus => Ok(field_type),
×
183
    }
×
184
}
×
185

×
186
fn get_binary_operator_type(
×
187
    left: &Expression,
×
188
    operator: &BinaryOperatorType,
×
189
    right: &Expression,
×
190
    schema: &Schema,
×
191
) -> Result<ExpressionType, PipelineError> {
×
192
    let left_field_type = left.get_type(schema)?;
×
193
    let right_field_type = right.get_type(schema)?;
×
194
    match operator {
×
195
        BinaryOperatorType::Eq
196
        | BinaryOperatorType::Ne
197
        | BinaryOperatorType::Gt
198
        | BinaryOperatorType::Gte
199
        | BinaryOperatorType::Lt
×
200
        | BinaryOperatorType::Lte => Ok(ExpressionType::new(
×
201
            FieldType::Boolean,
×
202
            false,
×
203
            SourceDefinition::Dynamic,
×
204
        )),
×
205

206
        BinaryOperatorType::And | BinaryOperatorType::Or => {
×
207
            match (left_field_type.return_type, right_field_type.return_type) {
×
208
                (FieldType::Boolean, FieldType::Boolean) => Ok(ExpressionType::new(
×
209
                    FieldType::Boolean,
×
210
                    false,
×
211
                    SourceDefinition::Dynamic,
×
212
                )),
×
213
                (left_field_type, right_field_type) => {
×
214
                    Err(PipelineError::InvalidExpression(format!(
×
215
                        "cannot apply {:?} to {:?} and {:?}",
×
216
                        operator, left_field_type, right_field_type
×
217
                    )))
×
218
                }
219
            }
220
        }
221

×
222
        BinaryOperatorType::Add | BinaryOperatorType::Sub | BinaryOperatorType::Mul => {
223
            match (left_field_type.return_type, right_field_type.return_type) {
×
224
                (FieldType::Int, FieldType::Int) => Ok(ExpressionType::new(
×
225
                    FieldType::Int,
×
226
                    false,
×
227
                    SourceDefinition::Dynamic,
×
228
                )),
×
229
                (FieldType::Int, FieldType::Float)
230
                | (FieldType::Float, FieldType::Int)
231
                | (FieldType::Float, FieldType::Float) => Ok(ExpressionType::new(
×
232
                    FieldType::Float,
×
233
                    false,
×
234
                    SourceDefinition::Dynamic,
×
235
                )),
×
236
                (left_field_type, right_field_type) => {
×
237
                    Err(PipelineError::InvalidExpression(format!(
×
238
                        "cannot apply {:?} to {:?} and {:?}",
×
239
                        operator, left_field_type, right_field_type
×
240
                    )))
×
241
                }
×
242
            }
×
243
        }
244
        BinaryOperatorType::Div | BinaryOperatorType::Mod => {
245
            match (left_field_type.return_type, right_field_type.return_type) {
×
246
                (FieldType::Int, FieldType::Float)
247
                | (FieldType::Float, FieldType::Int)
×
248
                | (FieldType::Float, FieldType::Float) => Ok(ExpressionType::new(
×
249
                    FieldType::Float,
×
250
                    false,
×
251
                    SourceDefinition::Dynamic,
×
252
                )),
×
253
                (left_field_type, right_field_type) => {
×
254
                    Err(PipelineError::InvalidExpression(format!(
×
255
                        "cannot apply {:?} to {:?} and {:?}",
×
256
                        operator, left_field_type, right_field_type
×
257
                    )))
×
258
                }
259
            }
×
260
        }
261
    }
×
262
}
×
263

×
264
fn get_aggregate_function_type(
×
265
    function: &AggregateFunctionType,
×
266
    args: &[Expression],
×
267
    schema: &Schema,
×
268
) -> Result<ExpressionType, PipelineError> {
×
269
    match function {
×
270
        AggregateFunctionType::Avg => Ok(ExpressionType::new(
×
271
            FieldType::Float,
×
272
            false,
×
273
            SourceDefinition::Dynamic,
×
274
        )),
×
275
        AggregateFunctionType::Count => Ok(ExpressionType::new(
×
276
            FieldType::Int,
×
277
            false,
×
278
            SourceDefinition::Dynamic,
×
279
        )),
×
280
        AggregateFunctionType::Max => argv!(args, 0, AggregateFunctionType::Max)?.get_type(schema),
×
281
        AggregateFunctionType::Median => {
282
            argv!(args, 0, AggregateFunctionType::Median)?.get_type(schema)
×
283
        }
284
        AggregateFunctionType::Min => argv!(args, 0, AggregateFunctionType::Min)?.get_type(schema),
×
285
        AggregateFunctionType::Sum => argv!(args, 0, AggregateFunctionType::Sum)?.get_type(schema),
×
286
        AggregateFunctionType::Stddev => Ok(ExpressionType::new(
×
287
            FieldType::Float,
×
288
            false,
×
289
            SourceDefinition::Dynamic,
×
290
        )),
×
291
        AggregateFunctionType::Variance => Ok(ExpressionType::new(
×
292
            FieldType::Float,
×
293
            false,
×
294
            SourceDefinition::Dynamic,
×
295
        )),
×
296
    }
297
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc