• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4062264521

pending completion
4062264521

Pull #775

github

GitHub
Merge 77a7da657 into a81d03c98
Pull Request #775: feat: Implement basic continue for ingestion in snowflake

55 of 55 new or added lines in 4 files covered. (100.0%)

24399 of 37509 relevant lines covered (65.05%)

35550.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.71
/dozer-sql/src/pipeline/expression/builder.rs
1
use std::fmt::Display;
2

3
use dozer_types::{
4
    ordered_float::OrderedFloat,
5
    types::{Field, FieldDefinition, Schema, SourceDefinition},
6
};
7

8
use sqlparser::ast::{
9
    BinaryOperator as SqlBinaryOperator, DataType, Expr as SqlExpr, Expr, Function, FunctionArg,
10
    FunctionArgExpr, Ident, TrimWhereField, UnaryOperator as SqlUnaryOperator, Value as SqlValue,
11
};
12

13
use crate::pipeline::errors::PipelineError;
14
use crate::pipeline::errors::PipelineError::{
15
    AmbiguousFieldIdentifier, IllegalFieldIdentifier, UnknownFieldIdentifier,
16
};
17
use crate::pipeline::expression::aggregate::AggregateFunctionType;
18
use crate::pipeline::expression::builder::PipelineError::InvalidArgument;
19
use crate::pipeline::expression::builder::PipelineError::InvalidExpression;
20
use crate::pipeline::expression::builder::PipelineError::InvalidOperator;
21
use crate::pipeline::expression::builder::PipelineError::InvalidValue;
22
use crate::pipeline::expression::execution::Expression;
23
use crate::pipeline::expression::execution::Expression::ScalarFunction;
24
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
25
use crate::pipeline::expression::scalar::common::ScalarFunctionType;
26
use crate::pipeline::expression::scalar::string::TrimType;
27

28
use super::cast::CastOperatorType;
29

30
pub type Bypass = bool;
31

32
pub enum BuilderExpressionType {
33
    PreAggregation,
34
    Aggregation,
35
    // PostAggregation,
×
36
    FullExpression,
37
}
38
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
1,346✔
39
pub struct NameOrAlias(pub String, pub Option<String>);
×
40

×
41
pub enum ConstraintIdentifier {
×
42
    Single(Ident),
×
43
    Compound(Vec<Ident>),
×
44
}
×
45

×
46
impl Display for ConstraintIdentifier {
×
47
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
48
        match self {
×
49
            ConstraintIdentifier::Single(ident) => f.write_fmt(format_args!("{ident}")),
×
50
            ConstraintIdentifier::Compound(ident) => f.write_fmt(format_args!("{ident:?}")),
×
51
        }
52
    }
×
53
}
54
pub struct ExpressionBuilder;
×
55

×
56
impl ExpressionBuilder {
×
57
    pub fn build(
187✔
58
        &self,
187✔
59
        expression_type: &BuilderExpressionType,
187✔
60
        sql_expression: &SqlExpr,
187✔
61
        schema: &Schema,
187✔
62
    ) -> Result<Box<Expression>, PipelineError> {
187✔
63
        let (expression, _bypass) =
187✔
64
            self.parse_sql_expression(expression_type, sql_expression, schema)?;
187✔
65
        Ok(expression)
187✔
66
    }
187✔
67

68
    pub fn parse_sql_expression(
×
69
        &self,
×
70
        expression_type: &BuilderExpressionType,
71
        expression: &SqlExpr,
×
72
        schema: &Schema,
×
73
    ) -> Result<(Box<Expression>, bool), PipelineError> {
×
74
        match expression {
69✔
75
            SqlExpr::Trim {
×
76
                expr,
50✔
77
                trim_where,
50✔
78
                trim_what,
50✔
79
            } => self.parse_sql_trim_function(expression_type, expr, trim_where, trim_what, schema),
50✔
80
            SqlExpr::Identifier(ident) => Ok((parse_sql_column(&[ident.clone()], schema)?, false)),
2,591✔
81
            SqlExpr::CompoundIdentifier(ident) => Ok((parse_sql_column(ident, schema)?, false)),
56✔
82
            SqlExpr::Value(SqlValue::Number(n, _)) => self.parse_sql_number(n),
43✔
83
            SqlExpr::Value(SqlValue::Null) => {
×
84
                Ok((Box::new(Expression::Literal(Field::Null)), false))
×
85
            }
×
86
            SqlExpr::Value(SqlValue::SingleQuotedString(s) | SqlValue::DoubleQuotedString(s)) => {
26✔
87
                parse_sql_string(s)
26✔
88
            }
×
89
            SqlExpr::UnaryOp { expr, op } => {
×
90
                self.parse_sql_unary_op(expression_type, op, expr, schema)
×
91
            }
×
92
            SqlExpr::BinaryOp { left, op, right } => {
79✔
93
                self.parse_sql_binary_op(expression_type, left, op, right, schema)
79✔
94
            }
×
95
            SqlExpr::Nested(expr) => self.parse_sql_expression(expression_type, expr, schema),
12✔
96
            SqlExpr::Function(sql_function) => match expression_type {
346✔
97
                BuilderExpressionType::PreAggregation => self.parse_sql_function_pre_aggregation(
165✔
98
                    expression_type,
165✔
99
                    sql_function,
165✔
100
                    schema,
165✔
101
                    expression,
165✔
102
                ),
165✔
103
                BuilderExpressionType::Aggregation => self.parse_sql_function_aggregation(
165✔
104
                    expression_type,
165✔
105
                    sql_function,
165✔
106
                    schema,
165✔
107
                    expression,
165✔
108
                ),
165✔
109
                // ExpressionType::PostAggregation => todo!(),
×
110
                BuilderExpressionType::FullExpression => {
×
111
                    self.parse_sql_function(expression_type, sql_function, schema, expression)
16✔
112
                }
×
113
            },
×
114
            SqlExpr::Like {
×
115
                negated,
×
116
                expr,
×
117
                pattern,
×
118
                escape_char,
×
119
            } => self.parse_sql_like_operator(
×
120
                expression_type,
×
121
                negated,
×
122
                expr,
×
123
                pattern,
×
124
                escape_char,
×
125
                schema,
×
126
            ),
×
127
            SqlExpr::Cast { expr, data_type } => {
64✔
128
                self.parse_sql_cast_operator(expression_type, expr, data_type, schema)
64✔
129
            }
×
130
            _ => Err(InvalidExpression(format!("{expression:?}"))),
×
131
        }
×
132
    }
3,267✔
133

×
134
    fn parse_sql_trim_function(
50✔
135
        &self,
50✔
136
        expression_type: &BuilderExpressionType,
50✔
137
        expr: &Expr,
50✔
138
        trim_where: &Option<TrimWhereField>,
50✔
139
        trim_what: &Option<Box<Expr>>,
50✔
140
        schema: &Schema,
50✔
141
    ) -> Result<(Box<Expression>, bool), PipelineError> {
50✔
142
        let arg = self.parse_sql_expression(expression_type, expr, schema)?.0;
50✔
143
        let what = match trim_what {
50✔
144
            Some(e) => Some(self.parse_sql_expression(expression_type, e, schema)?.0),
8✔
145
            _ => None,
42✔
146
        };
×
147
        let typ = trim_where.as_ref().map(|e| match e {
50✔
148
            TrimWhereField::Both => TrimType::Both,
2✔
149
            TrimWhereField::Leading => TrimType::Leading,
2✔
150
            TrimWhereField::Trailing => TrimType::Trailing,
2✔
151
        });
50✔
152
        Ok((Box::new(Expression::Trim { arg, what, typ }), false))
50✔
153
    }
50✔
154

×
155
    fn parse_sql_function(
16✔
156
        &self,
16✔
157
        expression_type: &BuilderExpressionType,
16✔
158
        sql_function: &Function,
16✔
159
        schema: &Schema,
16✔
160
        expression: &SqlExpr,
16✔
161
    ) -> Result<(Box<Expression>, bool), PipelineError> {
16✔
162
        let name = sql_function.name.to_string().to_lowercase();
16✔
163
        if let Ok(function) = ScalarFunctionType::new(&name) {
16✔
164
            let mut arg_exprs = vec![];
15✔
165
            for arg in &sql_function.args {
37✔
166
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
22✔
167
                match r {
22✔
168
                    Ok(result) => {
22✔
169
                        if result.1 {
22✔
170
                            return Ok(result);
×
171
                        } else {
22✔
172
                            arg_exprs.push(*result.0);
22✔
173
                        }
22✔
174
                    }
×
175
                    Err(error) => {
×
176
                        return Err(error);
×
177
                    }
×
178
                }
×
179
            }
×
180

×
181
            return Ok((
15✔
182
                Box::new(ScalarFunction {
15✔
183
                    fun: function,
15✔
184
                    args: arg_exprs,
15✔
185
                }),
15✔
186
                false,
15✔
187
            ));
15✔
188
        };
1✔
189
        if AggregateFunctionType::new(&name).is_ok() {
1✔
190
            let arg = sql_function.args.first().unwrap();
1✔
191
            let r = self.parse_sql_function_arg(expression_type, arg, schema)?;
1✔
192
            return Ok((r.0, false)); // switch bypass to true, since the argument of this Aggregation must be the final result
1✔
193
        };
×
194
        Err(InvalidExpression(format!("{expression:?}")))
×
195
    }
16✔
196

×
197
    fn parse_sql_function_pre_aggregation(
165✔
198
        &self,
165✔
199
        expression_type: &BuilderExpressionType,
165✔
200
        sql_function: &Function,
165✔
201
        schema: &Schema,
165✔
202
        expression: &SqlExpr,
165✔
203
    ) -> Result<(Box<Expression>, bool), PipelineError> {
165✔
204
        let name = sql_function.name.to_string().to_lowercase();
165✔
205

×
206
        if let Ok(function) = ScalarFunctionType::new(&name) {
165✔
207
            let mut arg_exprs = vec![];
×
208
            for arg in &sql_function.args {
×
209
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
×
210
                match r {
×
211
                    Ok(result) => {
×
212
                        if result.1 {
×
213
                            return Ok(result);
×
214
                        } else {
×
215
                            arg_exprs.push(*result.0);
×
216
                        }
×
217
                    }
218
                    Err(error) => {
×
219
                        return Err(error);
×
220
                    }
×
221
                }
×
222
            }
×
223

×
224
            return Ok((
×
225
                Box::new(ScalarFunction {
×
226
                    fun: function,
×
227
                    args: arg_exprs,
×
228
                }),
×
229
                false,
×
230
            ));
×
231
        };
165✔
232
        if AggregateFunctionType::new(&name).is_ok() {
165✔
233
            let arg = sql_function.args.first().unwrap();
165✔
234
            let r = self.parse_sql_function_arg(expression_type, arg, schema)?;
165✔
235
            return Ok((r.0, true)); // switch bypass to true, since the argument of this Aggregation must be the final result
165✔
236
        };
×
237
        Err(InvalidExpression(format!("{expression:?}")))
×
238
    }
165✔
239

×
240
    fn parse_sql_function_aggregation(
165✔
241
        &self,
165✔
242
        expression_type: &BuilderExpressionType,
165✔
243
        sql_function: &Function,
165✔
244
        schema: &Schema,
165✔
245
        expression: &SqlExpr,
165✔
246
    ) -> Result<(Box<Expression>, bool), PipelineError> {
165✔
247
        let name = sql_function.name.to_string().to_lowercase();
165✔
248

×
249
        if let Ok(function) = ScalarFunctionType::new(&name) {
165✔
250
            let mut arg_exprs = vec![];
×
251
            for arg in &sql_function.args {
×
252
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
×
253
                match r {
×
254
                    Ok(result) => {
×
255
                        if result.1 {
×
256
                            return Ok(result);
×
257
                        } else {
×
258
                            arg_exprs.push(*result.0);
×
259
                        }
×
260
                    }
261
                    Err(error) => {
×
262
                        return Err(error);
×
263
                    }
×
264
                }
×
265
            }
×
266

×
267
            return Ok((
×
268
                Box::new(ScalarFunction {
×
269
                    fun: function,
×
270
                    args: arg_exprs,
×
271
                }),
×
272
                false,
×
273
            ));
×
274
        };
165✔
275

×
276
        if let Ok(function) = AggregateFunctionType::new(&name) {
165✔
277
            let mut arg_exprs = vec![];
165✔
278
            for arg in &sql_function.args {
330✔
279
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
165✔
280
                match r {
165✔
281
                    Ok(result) => {
165✔
282
                        if result.1 {
165✔
283
                            return Ok(result);
×
284
                        } else {
165✔
285
                            arg_exprs.push(*result.0);
165✔
286
                        }
165✔
287
                    }
288
                    Err(error) => {
×
289
                        return Err(error);
×
290
                    }
×
291
                }
×
292
            }
×
293

×
294
            return Ok((
165✔
295
                Box::new(Expression::AggregateFunction {
165✔
296
                    fun: function,
165✔
297
                    args: arg_exprs,
165✔
298
                }),
165✔
299
                true, // switch bypass to true, since this Aggregation must be the final result
165✔
300
            ));
165✔
301
        };
×
302

×
303
        Err(InvalidExpression(format!(
×
304
            "Unsupported Expression: {expression:?}"
×
305
        )))
×
306
    }
165✔
307

×
308
    fn parse_sql_function_arg(
×
309
        &self,
×
310
        expression_type: &BuilderExpressionType,
×
311
        argument: &FunctionArg,
×
312
        schema: &Schema,
×
313
    ) -> Result<(Box<Expression>, bool), PipelineError> {
×
314
        match argument {
353✔
315
            FunctionArg::Named {
×
316
                name: _,
×
317
                arg: FunctionArgExpr::Expr(arg),
×
318
            } => self.parse_sql_expression(expression_type, arg, schema),
×
319
            FunctionArg::Named {
×
320
                name: _,
×
321
                arg: FunctionArgExpr::Wildcard,
×
322
            } => Err(InvalidArgument(format!("{argument:?}"))),
×
323
            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
353✔
324
                self.parse_sql_expression(expression_type, arg, schema)
353✔
325
            }
326
            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
×
327
                Err(InvalidArgument(format!("{argument:?}")))
×
328
            }
×
329
            _ => Err(InvalidArgument(format!("{argument:?}"))),
×
330
        }
×
331
    }
353✔
332

×
333
    fn parse_sql_unary_op(
×
334
        &self,
×
335
        expression_type: &BuilderExpressionType,
×
336
        op: &SqlUnaryOperator,
×
337
        expr: &SqlExpr,
×
338
        schema: &Schema,
×
339
    ) -> Result<(Box<Expression>, Bypass), PipelineError> {
×
340
        let (arg, bypass) = self.parse_sql_expression(expression_type, expr, schema)?;
×
341
        if bypass {
×
342
            return Ok((arg, bypass));
×
343
        }
×
344

×
345
        let operator = match op {
×
346
            SqlUnaryOperator::Not => UnaryOperatorType::Not,
×
347
            SqlUnaryOperator::Plus => UnaryOperatorType::Plus,
×
348
            SqlUnaryOperator::Minus => UnaryOperatorType::Minus,
×
349
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
350
        };
×
351

×
352
        Ok((Box::new(Expression::UnaryOperator { operator, arg }), false))
×
353
    }
×
354

×
355
    fn parse_sql_binary_op(
79✔
356
        &self,
79✔
357
        expression_type: &BuilderExpressionType,
79✔
358
        left: &SqlExpr,
79✔
359
        op: &SqlBinaryOperator,
79✔
360
        right: &SqlExpr,
79✔
361
        schema: &Schema,
79✔
362
    ) -> Result<(Box<Expression>, bool), PipelineError> {
79✔
363
        let (left_op, bypass_left) = self.parse_sql_expression(expression_type, left, schema)?;
79✔
364
        if bypass_left {
79✔
365
            return Ok((left_op, bypass_left));
×
366
        }
79✔
367
        let (right_op, bypass_right) = self.parse_sql_expression(expression_type, right, schema)?;
79✔
368
        if bypass_right {
79✔
369
            return Ok((right_op, bypass_right));
×
370
        }
79✔
371

×
372
        let operator = match op {
79✔
373
            SqlBinaryOperator::Gt => BinaryOperatorType::Gt,
18✔
374
            SqlBinaryOperator::GtEq => BinaryOperatorType::Gte,
1✔
375
            SqlBinaryOperator::Lt => BinaryOperatorType::Lt,
12✔
376
            SqlBinaryOperator::LtEq => BinaryOperatorType::Lte,
12✔
377
            SqlBinaryOperator::Eq => BinaryOperatorType::Eq,
18✔
378
            SqlBinaryOperator::NotEq => BinaryOperatorType::Ne,
×
379

×
380
            SqlBinaryOperator::Plus => BinaryOperatorType::Add,
×
381
            SqlBinaryOperator::Minus => BinaryOperatorType::Sub,
×
382
            SqlBinaryOperator::Multiply => BinaryOperatorType::Mul,
×
383
            SqlBinaryOperator::Divide => BinaryOperatorType::Div,
×
384
            SqlBinaryOperator::Modulo => BinaryOperatorType::Mod,
×
385

×
386
            SqlBinaryOperator::And => BinaryOperatorType::And,
12✔
387
            SqlBinaryOperator::Or => BinaryOperatorType::Or,
6✔
388

×
389
            // BinaryOperator::BitwiseAnd => ...
×
390
            // BinaryOperator::BitwiseOr => ...
×
391
            // BinaryOperator::StringConcat => ...
×
392
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
393
        };
×
394

×
395
        Ok((
79✔
396
            Box::new(Expression::BinaryOperator {
79✔
397
                left: left_op,
79✔
398
                operator,
79✔
399
                right: right_op,
79✔
400
            }),
79✔
401
            false,
79✔
402
        ))
79✔
403
    }
79✔
404

×
405
    fn parse_sql_number(&self, n: &str) -> Result<(Box<Expression>, Bypass), PipelineError> {
43✔
406
        match n.parse::<i64>() {
43✔
407
            Ok(n) => Ok((Box::new(Expression::Literal(Field::Int(n))), false)),
43✔
408
            Err(_) => match n.parse::<f64>() {
×
409
                Ok(f) => Ok((
×
410
                    Box::new(Expression::Literal(Field::Float(OrderedFloat(f)))),
×
411
                    false,
×
412
                )),
×
413
                Err(_) => Err(InvalidValue(n.to_string())),
×
414
            },
×
415
        }
×
416
    }
43✔
417

×
418
    fn parse_sql_like_operator(
×
419
        &self,
×
420
        expression_type: &BuilderExpressionType,
×
421
        negated: &bool,
×
422
        expr: &Expr,
×
423
        pattern: &Expr,
×
424
        escape_char: &Option<char>,
×
425
        schema: &Schema,
×
426
    ) -> Result<(Box<Expression>, bool), PipelineError> {
×
427
        let arg = self.parse_sql_expression(expression_type, expr, schema)?;
×
428
        let pattern = self.parse_sql_expression(expression_type, pattern, schema)?;
×
429
        let like_expression = Box::new(Expression::Like {
×
430
            arg: arg.0,
×
431
            pattern: pattern.0,
×
432
            escape: *escape_char,
×
433
        });
×
434
        if *negated {
×
435
            Ok((
×
436
                Box::new(Expression::UnaryOperator {
×
437
                    operator: UnaryOperatorType::Not,
×
438
                    arg: like_expression,
×
439
                }),
×
440
                arg.1,
×
441
            ))
×
442
        } else {
×
443
            Ok((like_expression, arg.1))
×
444
        }
×
445
    }
×
446

×
447
    fn parse_sql_cast_operator(
64✔
448
        &self,
64✔
449
        expression_type: &BuilderExpressionType,
64✔
450
        expr: &Expr,
64✔
451
        data_type: &DataType,
64✔
452
        schema: &Schema,
64✔
453
    ) -> Result<(Box<Expression>, bool), PipelineError> {
64✔
454
        let expression = self.parse_sql_expression(expression_type, expr, schema)?;
64✔
455
        let cast_to = match data_type {
64✔
456
            DataType::Decimal(_) => CastOperatorType::Decimal,
×
457
            DataType::Binary(_) => CastOperatorType::Binary,
×
458
            DataType::Float(_) => CastOperatorType::Float,
10✔
459
            DataType::Int(_) => CastOperatorType::Int,
6✔
460
            DataType::Integer(_) => CastOperatorType::Int,
×
461
            DataType::UnsignedInt(_) => CastOperatorType::UInt,
×
462
            DataType::UnsignedInteger(_) => CastOperatorType::UInt,
×
463
            DataType::Boolean => CastOperatorType::Boolean,
12✔
464
            DataType::Date => CastOperatorType::Date,
×
465
            DataType::Timestamp(..) => CastOperatorType::Timestamp,
×
466
            DataType::Text => CastOperatorType::Text,
18✔
467
            DataType::String => CastOperatorType::String,
18✔
468
            DataType::Custom(name, ..) => {
×
469
                if name.to_string().to_lowercase() == "bson" {
×
470
                    CastOperatorType::Bson
×
471
                } else {
×
472
                    Err(PipelineError::InvalidFunction(format!(
×
473
                        "Unsupported Cast type {name}"
×
474
                    )))?
×
475
                }
×
476
            }
×
477
            _ => Err(PipelineError::InvalidFunction(format!(
×
478
                "Unsupported Cast type {data_type}"
×
479
            )))?,
×
480
        };
×
481
        Ok((
64✔
482
            Box::new(Expression::Cast {
64✔
483
                arg: expression.0,
64✔
484
                typ: cast_to,
64✔
485
            }),
64✔
486
            expression.1,
64✔
487
        ))
64✔
488
    }
64✔
489
}
490

×
491
pub fn fullname_from_ident(ident: &[Ident]) -> String {
28✔
492
    let mut ident_tokens = vec![];
28✔
493
    for token in ident.iter() {
28✔
494
        ident_tokens.push(token.value.clone());
28✔
495
    }
28✔
496
    ident_tokens.join(".")
28✔
497
}
28✔
498

×
499
fn parse_sql_string(s: &str) -> Result<(Box<Expression>, bool), PipelineError> {
26✔
500
    Ok((
26✔
501
        Box::new(Expression::Literal(Field::String(s.to_owned()))),
26✔
502
        false,
26✔
503
    ))
26✔
504
}
26✔
505

×
506
pub(crate) fn normalize_ident(id: &Ident) -> String {
146✔
507
    match id.quote_style {
146✔
508
        Some(_) => id.value.clone(),
×
509
        None => id.value.clone(),
146✔
510
    }
×
511
}
146✔
512

×
513
pub fn extend_schema_source_def(schema: &Schema, name: &NameOrAlias) -> Schema {
139✔
514
    let mut output_schema = schema.clone();
139✔
515
    let mut fields = vec![];
139✔
516
    for mut field in schema.clone().fields.into_iter() {
552✔
517
        if let Some(alias) = &name.1 {
552✔
518
            field.source = SourceDefinition::Alias {
60✔
519
                name: alias.to_string(),
60✔
520
            };
60✔
521
        }
492✔
522

×
523
        fields.push(field);
552✔
524
    }
×
525
    output_schema.fields = fields;
139✔
526

139✔
527
    output_schema
139✔
528
}
139✔
529

×
530
fn parse_sql_column(ident: &[Ident], schema: &Schema) -> Result<Box<Expression>, PipelineError> {
2,647✔
531
    let (src_field, src_table_or_alias, src_connection) = match ident.len() {
2,647✔
532
        1 => (&ident[0].value, None, None),
2,591✔
533
        2 => (&ident[1].value, Some(&ident[0].value), None),
56✔
534
        3 => (
×
535
            &ident[2].value,
×
536
            Some(&ident[1].value),
×
537
            Some(&ident[0].value),
×
538
        ),
×
539
        _ => {
540
            return Err(IllegalFieldIdentifier(
×
541
                ident
×
542
                    .iter()
×
543
                    .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
544
            ))
×
545
        }
×
546
    };
×
547

×
548
    let matching_by_field: Vec<(usize, &FieldDefinition)> = schema
2,647✔
549
        .fields
2,647✔
550
        .iter()
2,647✔
551
        .enumerate()
2,647✔
552
        .filter(|(_idx, f)| &f.name == src_field)
10,773✔
553
        .collect();
2,647✔
554

2,647✔
555
    match matching_by_field.len() {
2,647✔
556
        0 => Err(UnknownFieldIdentifier(
×
557
            ident
×
558
                .iter()
×
559
                .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
560
        )),
×
561
        1 => Ok(Box::new(Expression::Column {
2,593✔
562
            index: matching_by_field[0].0,
2,593✔
563
        })),
2,593✔
564
        _ => match src_table_or_alias {
54✔
565
            None => Err(AmbiguousFieldIdentifier(
×
566
                ident
×
567
                    .iter()
×
568
                    .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
569
            )),
×
570
            Some(src_table_or_alias) => {
54✔
571
                let matching_by_table_or_alias: Vec<(usize, &FieldDefinition)> = matching_by_field
54✔
572
                    .into_iter()
54✔
573
                    .filter(|(_idx, field)| match &field.source {
108✔
574
                        SourceDefinition::Alias { name } => name == src_table_or_alias,
72✔
575
                        SourceDefinition::Table {
×
576
                            name,
36✔
577
                            connection: _,
36✔
578
                        } => name == src_table_or_alias,
36✔
579
                        _ => false,
×
580
                    })
108✔
581
                    .collect();
54✔
582

54✔
583
                match matching_by_table_or_alias.len() {
54✔
584
                    0 => Err(UnknownFieldIdentifier(
×
585
                        ident
×
586
                            .iter()
×
587
                            .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
588
                    )),
×
589
                    1 => Ok(Box::new(Expression::Column {
54✔
590
                        index: matching_by_table_or_alias[0].0,
54✔
591
                    })),
54✔
592
                    _ => match src_connection {
×
593
                        None => Err(InvalidExpression(
×
594
                            ident
×
595
                                .iter()
×
596
                                .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
597
                        )),
×
598
                        Some(src_connection) => {
×
599
                            let matching_by_connection: Vec<(usize, &FieldDefinition)> =
×
600
                                matching_by_table_or_alias
×
601
                                    .into_iter()
×
602
                                    .filter(|(_idx, field)| match &field.source {
×
603
                                        SourceDefinition::Table {
604
                                            name: _,
605
                                            connection,
×
606
                                        } => connection == src_connection,
×
607
                                        _ => false,
×
608
                                    })
×
609
                                    .collect();
×
610

×
611
                            match matching_by_connection.len() {
×
612
                                0 => Err(UnknownFieldIdentifier(
×
613
                                    ident
×
614
                                        .iter()
×
615
                                        .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
616
                                )),
×
617
                                1 => Ok(Box::new(Expression::Column {
×
618
                                    index: matching_by_connection[0].0,
×
619
                                })),
×
620
                                _ => Err(InvalidExpression(
×
621
                                    ident
×
622
                                        .iter()
×
623
                                        .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
624
                                )),
×
625
                            }
626
                        }
627
                    },
628
                }
629
            }
630
        },
631
    }
632
}
2,647✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc