• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4111850539

pending completion
4111850539

Pull #817

github

GitHub
Merge 200083b05 into c160ec41f
Pull Request #817: feat: Stateful pipeline for sources without PK

63 of 63 new or added lines in 2 files covered. (100.0%)

23355 of 37656 relevant lines covered (62.02%)

31517.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.48
/dozer-sql/src/pipeline/expression/builder.rs
1
use dozer_types::{
2
    ordered_float::OrderedFloat,
3
    types::{Field, FieldDefinition, Schema, SourceDefinition},
4
};
5

6
use sqlparser::ast::{
7
    BinaryOperator as SqlBinaryOperator, DataType, Expr as SqlExpr, Expr, Function, FunctionArg,
8
    FunctionArgExpr, Ident, TrimWhereField, UnaryOperator as SqlUnaryOperator, Value as SqlValue,
9
};
10

11
use crate::pipeline::errors::PipelineError;
12
use crate::pipeline::errors::PipelineError::{
13
    AmbiguousFieldIdentifier, IllegalFieldIdentifier, UnknownFieldIdentifier,
14
};
15
use crate::pipeline::expression::aggregate::AggregateFunctionType;
16
use crate::pipeline::expression::builder::PipelineError::InvalidArgument;
17
use crate::pipeline::expression::builder::PipelineError::InvalidExpression;
18
use crate::pipeline::expression::builder::PipelineError::InvalidOperator;
19
use crate::pipeline::expression::builder::PipelineError::InvalidValue;
20
use crate::pipeline::expression::execution::Expression;
21
use crate::pipeline::expression::execution::Expression::ScalarFunction;
22
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
23
use crate::pipeline::expression::scalar::common::ScalarFunctionType;
24
use crate::pipeline::expression::scalar::string::TrimType;
25

26
use super::cast::CastOperatorType;
27

28
pub type Bypass = bool;
29

30
pub enum BuilderExpressionType {
31
    PreAggregation,
32
    Aggregation,
33
    // PostAggregation,
34
    FullExpression,
35
}
36
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
994✔
37
pub struct NameOrAlias(pub String, pub Option<String>);
38

39
pub struct ExpressionBuilder;
40

41
impl ExpressionBuilder {
42
    pub fn build(
117✔
43
        &self,
117✔
44
        expression_type: &BuilderExpressionType,
117✔
45
        sql_expression: &SqlExpr,
117✔
46
        schema: &Schema,
117✔
47
    ) -> Result<Box<Expression>, PipelineError> {
117✔
48
        let (expression, _bypass) =
117✔
49
            self.parse_sql_expression(expression_type, sql_expression, schema)?;
117✔
50
        Ok(expression)
117✔
51
    }
117✔
52

53
    pub fn parse_sql_expression(
54
        &self,
55
        expression_type: &BuilderExpressionType,
56
        expression: &SqlExpr,
57
        schema: &Schema,
58
    ) -> Result<(Box<Expression>, bool), PipelineError> {
59
        match expression {
49✔
60
            SqlExpr::Trim {
61
                expr,
38✔
62
                trim_where,
38✔
63
                trim_what,
38✔
64
            } => self.parse_sql_trim_function(expression_type, expr, trim_where, trim_what, schema),
38✔
65
            SqlExpr::Identifier(ident) => Ok((parse_sql_column(&[ident.clone()], schema)?, false)),
1,347✔
66
            SqlExpr::CompoundIdentifier(ident) => Ok((parse_sql_column(ident, schema)?, false)),
194✔
67
            SqlExpr::Value(SqlValue::Number(n, _)) => self.parse_sql_number(n),
29✔
68
            SqlExpr::Value(SqlValue::Null) => {
69
                Ok((Box::new(Expression::Literal(Field::Null)), false))
×
70
            }
71
            SqlExpr::Value(SqlValue::SingleQuotedString(s) | SqlValue::DoubleQuotedString(s)) => {
20✔
72
                parse_sql_string(s)
20✔
73
            }
74
            SqlExpr::UnaryOp { expr, op } => {
×
75
                self.parse_sql_unary_op(expression_type, op, expr, schema)
×
76
            }
77
            SqlExpr::BinaryOp { left, op, right } => {
53✔
78
                self.parse_sql_binary_op(expression_type, left, op, right, schema)
53✔
79
            }
80
            SqlExpr::Nested(expr) => self.parse_sql_expression(expression_type, expr, schema),
8✔
81
            SqlExpr::Function(sql_function) => match expression_type {
220✔
82
                BuilderExpressionType::PreAggregation => self.parse_sql_function_pre_aggregation(
102✔
83
                    expression_type,
102✔
84
                    sql_function,
102✔
85
                    schema,
102✔
86
                    expression,
102✔
87
                ),
102✔
88
                BuilderExpressionType::Aggregation => self.parse_sql_function_aggregation(
102✔
89
                    expression_type,
102✔
90
                    sql_function,
102✔
91
                    schema,
102✔
92
                    expression,
102✔
93
                ),
102✔
94
                // ExpressionType::PostAggregation => todo!(),
95
                BuilderExpressionType::FullExpression => {
96
                    self.parse_sql_function(expression_type, sql_function, schema, expression)
16✔
97
                }
98
            },
99
            SqlExpr::Like {
100
                negated,
×
101
                expr,
×
102
                pattern,
×
103
                escape_char,
×
104
            } => self.parse_sql_like_operator(
×
105
                expression_type,
×
106
                negated,
×
107
                expr,
×
108
                pattern,
×
109
                escape_char,
×
110
                schema,
×
111
            ),
×
112
            SqlExpr::Cast { expr, data_type } => {
64✔
113
                self.parse_sql_cast_operator(expression_type, expr, data_type, schema)
64✔
114
            }
115
            _ => Err(InvalidExpression(format!("{expression:?}"))),
×
116
        }
117
    }
1,973✔
118

119
    fn parse_sql_trim_function(
38✔
120
        &self,
38✔
121
        expression_type: &BuilderExpressionType,
38✔
122
        expr: &Expr,
38✔
123
        trim_where: &Option<TrimWhereField>,
38✔
124
        trim_what: &Option<Box<Expr>>,
38✔
125
        schema: &Schema,
38✔
126
    ) -> Result<(Box<Expression>, bool), PipelineError> {
38✔
127
        let arg = self.parse_sql_expression(expression_type, expr, schema)?.0;
38✔
128
        let what = match trim_what {
38✔
129
            Some(e) => Some(self.parse_sql_expression(expression_type, e, schema)?.0),
8✔
130
            _ => None,
30✔
131
        };
132
        let typ = trim_where.as_ref().map(|e| match e {
38✔
133
            TrimWhereField::Both => TrimType::Both,
2✔
134
            TrimWhereField::Leading => TrimType::Leading,
2✔
135
            TrimWhereField::Trailing => TrimType::Trailing,
2✔
136
        });
38✔
137
        Ok((Box::new(Expression::Trim { arg, what, typ }), false))
38✔
138
    }
38✔
139

140
    fn parse_sql_function(
16✔
141
        &self,
16✔
142
        expression_type: &BuilderExpressionType,
16✔
143
        sql_function: &Function,
16✔
144
        schema: &Schema,
16✔
145
        expression: &SqlExpr,
16✔
146
    ) -> Result<(Box<Expression>, bool), PipelineError> {
16✔
147
        let name = sql_function.name.to_string().to_lowercase();
16✔
148
        if let Ok(function) = ScalarFunctionType::new(&name) {
16✔
149
            let mut arg_exprs = vec![];
15✔
150
            for arg in &sql_function.args {
37✔
151
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
22✔
152
                match r {
22✔
153
                    Ok(result) => {
22✔
154
                        if result.1 {
22✔
155
                            return Ok(result);
×
156
                        } else {
22✔
157
                            arg_exprs.push(*result.0);
22✔
158
                        }
22✔
159
                    }
160
                    Err(error) => {
×
161
                        return Err(error);
×
162
                    }
163
                }
164
            }
165

166
            return Ok((
15✔
167
                Box::new(ScalarFunction {
15✔
168
                    fun: function,
15✔
169
                    args: arg_exprs,
15✔
170
                }),
15✔
171
                false,
15✔
172
            ));
15✔
173
        };
1✔
174
        if AggregateFunctionType::new(&name).is_ok() {
1✔
175
            let arg = sql_function.args.first().unwrap();
1✔
176
            let r = self.parse_sql_function_arg(expression_type, arg, schema)?;
1✔
177
            return Ok((r.0, false)); // switch bypass to true, since the argument of this Aggregation must be the final result
1✔
178
        };
×
179
        Err(InvalidExpression(format!("{expression:?}")))
×
180
    }
16✔
181

182
    fn parse_sql_function_pre_aggregation(
102✔
183
        &self,
102✔
184
        expression_type: &BuilderExpressionType,
102✔
185
        sql_function: &Function,
102✔
186
        schema: &Schema,
102✔
187
        expression: &SqlExpr,
102✔
188
    ) -> Result<(Box<Expression>, bool), PipelineError> {
102✔
189
        let name = sql_function.name.to_string().to_lowercase();
102✔
190

191
        if let Ok(function) = ScalarFunctionType::new(&name) {
102✔
192
            let mut arg_exprs = vec![];
×
193
            for arg in &sql_function.args {
×
194
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
×
195
                match r {
×
196
                    Ok(result) => {
×
197
                        if result.1 {
×
198
                            return Ok(result);
×
199
                        } else {
×
200
                            arg_exprs.push(*result.0);
×
201
                        }
×
202
                    }
203
                    Err(error) => {
×
204
                        return Err(error);
×
205
                    }
206
                }
207
            }
208

209
            return Ok((
×
210
                Box::new(ScalarFunction {
×
211
                    fun: function,
×
212
                    args: arg_exprs,
×
213
                }),
×
214
                false,
×
215
            ));
×
216
        };
102✔
217
        if AggregateFunctionType::new(&name).is_ok() {
102✔
218
            let arg = sql_function.args.first().unwrap();
102✔
219
            let r = self.parse_sql_function_arg(expression_type, arg, schema)?;
102✔
220
            return Ok((r.0, true)); // switch bypass to true, since the argument of this Aggregation must be the final result
102✔
221
        };
×
222
        Err(InvalidExpression(format!("{expression:?}")))
×
223
    }
102✔
224

225
    fn parse_sql_function_aggregation(
102✔
226
        &self,
102✔
227
        expression_type: &BuilderExpressionType,
102✔
228
        sql_function: &Function,
102✔
229
        schema: &Schema,
102✔
230
        expression: &SqlExpr,
102✔
231
    ) -> Result<(Box<Expression>, bool), PipelineError> {
102✔
232
        let name = sql_function.name.to_string().to_lowercase();
102✔
233

234
        if let Ok(function) = ScalarFunctionType::new(&name) {
102✔
235
            let mut arg_exprs = vec![];
×
236
            for arg in &sql_function.args {
×
237
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
×
238
                match r {
×
239
                    Ok(result) => {
×
240
                        if result.1 {
×
241
                            return Ok(result);
×
242
                        } else {
×
243
                            arg_exprs.push(*result.0);
×
244
                        }
×
245
                    }
246
                    Err(error) => {
×
247
                        return Err(error);
×
248
                    }
249
                }
250
            }
251

252
            return Ok((
×
253
                Box::new(ScalarFunction {
×
254
                    fun: function,
×
255
                    args: arg_exprs,
×
256
                }),
×
257
                false,
×
258
            ));
×
259
        };
102✔
260

261
        if let Ok(function) = AggregateFunctionType::new(&name) {
102✔
262
            let mut arg_exprs = vec![];
102✔
263
            for arg in &sql_function.args {
204✔
264
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
102✔
265
                match r {
102✔
266
                    Ok(result) => {
102✔
267
                        if result.1 {
102✔
268
                            return Ok(result);
×
269
                        } else {
102✔
270
                            arg_exprs.push(*result.0);
102✔
271
                        }
102✔
272
                    }
273
                    Err(error) => {
×
274
                        return Err(error);
×
275
                    }
276
                }
277
            }
278

279
            return Ok((
102✔
280
                Box::new(Expression::AggregateFunction {
102✔
281
                    fun: function,
102✔
282
                    args: arg_exprs,
102✔
283
                }),
102✔
284
                true, // switch bypass to true, since this Aggregation must be the final result
102✔
285
            ));
102✔
286
        };
×
287

×
288
        Err(InvalidExpression(format!(
×
289
            "Unsupported Expression: {expression:?}"
×
290
        )))
×
291
    }
102✔
292

293
    fn parse_sql_function_arg(
294
        &self,
295
        expression_type: &BuilderExpressionType,
296
        argument: &FunctionArg,
297
        schema: &Schema,
298
    ) -> Result<(Box<Expression>, bool), PipelineError> {
299
        match argument {
227✔
300
            FunctionArg::Named {
301
                name: _,
302
                arg: FunctionArgExpr::Expr(arg),
×
303
            } => self.parse_sql_expression(expression_type, arg, schema),
×
304
            FunctionArg::Named {
305
                name: _,
306
                arg: FunctionArgExpr::Wildcard,
307
            } => Err(InvalidArgument(format!("{argument:?}"))),
×
308
            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
227✔
309
                self.parse_sql_expression(expression_type, arg, schema)
227✔
310
            }
311
            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
312
                Err(InvalidArgument(format!("{argument:?}")))
×
313
            }
314
            _ => Err(InvalidArgument(format!("{argument:?}"))),
×
315
        }
316
    }
227✔
317

318
    fn parse_sql_unary_op(
×
319
        &self,
×
320
        expression_type: &BuilderExpressionType,
×
321
        op: &SqlUnaryOperator,
×
322
        expr: &SqlExpr,
×
323
        schema: &Schema,
×
324
    ) -> Result<(Box<Expression>, Bypass), PipelineError> {
×
325
        let (arg, bypass) = self.parse_sql_expression(expression_type, expr, schema)?;
×
326
        if bypass {
×
327
            return Ok((arg, bypass));
×
328
        }
×
329

330
        let operator = match op {
×
331
            SqlUnaryOperator::Not => UnaryOperatorType::Not,
×
332
            SqlUnaryOperator::Plus => UnaryOperatorType::Plus,
×
333
            SqlUnaryOperator::Minus => UnaryOperatorType::Minus,
×
334
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
335
        };
336

337
        Ok((Box::new(Expression::UnaryOperator { operator, arg }), false))
×
338
    }
×
339

340
    fn parse_sql_binary_op(
53✔
341
        &self,
53✔
342
        expression_type: &BuilderExpressionType,
53✔
343
        left: &SqlExpr,
53✔
344
        op: &SqlBinaryOperator,
53✔
345
        right: &SqlExpr,
53✔
346
        schema: &Schema,
53✔
347
    ) -> Result<(Box<Expression>, bool), PipelineError> {
53✔
348
        let (left_op, bypass_left) = self.parse_sql_expression(expression_type, left, schema)?;
53✔
349
        if bypass_left {
53✔
350
            return Ok((left_op, bypass_left));
×
351
        }
53✔
352
        let (right_op, bypass_right) = self.parse_sql_expression(expression_type, right, schema)?;
53✔
353
        if bypass_right {
53✔
354
            return Ok((right_op, bypass_right));
×
355
        }
53✔
356

357
        let operator = match op {
53✔
358
            SqlBinaryOperator::Gt => BinaryOperatorType::Gt,
12✔
359
            SqlBinaryOperator::GtEq => BinaryOperatorType::Gte,
1✔
360
            SqlBinaryOperator::Lt => BinaryOperatorType::Lt,
8✔
361
            SqlBinaryOperator::LtEq => BinaryOperatorType::Lte,
8✔
362
            SqlBinaryOperator::Eq => BinaryOperatorType::Eq,
12✔
363
            SqlBinaryOperator::NotEq => BinaryOperatorType::Ne,
×
364

365
            SqlBinaryOperator::Plus => BinaryOperatorType::Add,
×
366
            SqlBinaryOperator::Minus => BinaryOperatorType::Sub,
×
367
            SqlBinaryOperator::Multiply => BinaryOperatorType::Mul,
×
368
            SqlBinaryOperator::Divide => BinaryOperatorType::Div,
×
369
            SqlBinaryOperator::Modulo => BinaryOperatorType::Mod,
×
370

371
            SqlBinaryOperator::And => BinaryOperatorType::And,
8✔
372
            SqlBinaryOperator::Or => BinaryOperatorType::Or,
4✔
373

374
            // BinaryOperator::BitwiseAnd => ...
375
            // BinaryOperator::BitwiseOr => ...
376
            // BinaryOperator::StringConcat => ...
377
            _ => return Err(InvalidOperator(format!("{op:?}"))),
×
378
        };
379

380
        Ok((
53✔
381
            Box::new(Expression::BinaryOperator {
53✔
382
                left: left_op,
53✔
383
                operator,
53✔
384
                right: right_op,
53✔
385
            }),
53✔
386
            false,
53✔
387
        ))
53✔
388
    }
53✔
389

390
    fn parse_sql_number(&self, n: &str) -> Result<(Box<Expression>, Bypass), PipelineError> {
29✔
391
        match n.parse::<i64>() {
29✔
392
            Ok(n) => Ok((Box::new(Expression::Literal(Field::Int(n))), false)),
29✔
393
            Err(_) => match n.parse::<f64>() {
×
394
                Ok(f) => Ok((
×
395
                    Box::new(Expression::Literal(Field::Float(OrderedFloat(f)))),
×
396
                    false,
×
397
                )),
×
398
                Err(_) => Err(InvalidValue(n.to_string())),
×
399
            },
400
        }
401
    }
29✔
402

403
    fn parse_sql_like_operator(
×
404
        &self,
×
405
        expression_type: &BuilderExpressionType,
×
406
        negated: &bool,
×
407
        expr: &Expr,
×
408
        pattern: &Expr,
×
409
        escape_char: &Option<char>,
×
410
        schema: &Schema,
×
411
    ) -> Result<(Box<Expression>, bool), PipelineError> {
×
412
        let arg = self.parse_sql_expression(expression_type, expr, schema)?;
×
413
        let pattern = self.parse_sql_expression(expression_type, pattern, schema)?;
×
414
        let like_expression = Box::new(Expression::Like {
×
415
            arg: arg.0,
×
416
            pattern: pattern.0,
×
417
            escape: *escape_char,
×
418
        });
×
419
        if *negated {
×
420
            Ok((
×
421
                Box::new(Expression::UnaryOperator {
×
422
                    operator: UnaryOperatorType::Not,
×
423
                    arg: like_expression,
×
424
                }),
×
425
                arg.1,
×
426
            ))
×
427
        } else {
428
            Ok((like_expression, arg.1))
×
429
        }
430
    }
×
431

432
    fn parse_sql_cast_operator(
64✔
433
        &self,
64✔
434
        expression_type: &BuilderExpressionType,
64✔
435
        expr: &Expr,
64✔
436
        data_type: &DataType,
64✔
437
        schema: &Schema,
64✔
438
    ) -> Result<(Box<Expression>, bool), PipelineError> {
64✔
439
        let expression = self.parse_sql_expression(expression_type, expr, schema)?;
64✔
440
        let cast_to = match data_type {
64✔
441
            DataType::Decimal(_) => CastOperatorType::Decimal,
×
442
            DataType::Binary(_) => CastOperatorType::Binary,
×
443
            DataType::Float(_) => CastOperatorType::Float,
10✔
444
            DataType::Int(_) => CastOperatorType::Int,
6✔
445
            DataType::Integer(_) => CastOperatorType::Int,
×
446
            DataType::UnsignedInt(_) => CastOperatorType::UInt,
×
447
            DataType::UnsignedInteger(_) => CastOperatorType::UInt,
×
448
            DataType::Boolean => CastOperatorType::Boolean,
12✔
449
            DataType::Date => CastOperatorType::Date,
×
450
            DataType::Timestamp(..) => CastOperatorType::Timestamp,
×
451
            DataType::Text => CastOperatorType::Text,
18✔
452
            DataType::String => CastOperatorType::String,
18✔
453
            DataType::Custom(name, ..) => {
×
454
                if name.to_string().to_lowercase() == "bson" {
×
455
                    CastOperatorType::Bson
×
456
                } else {
457
                    Err(PipelineError::InvalidFunction(format!(
×
458
                        "Unsupported Cast type {name}"
×
459
                    )))?
×
460
                }
461
            }
462
            _ => Err(PipelineError::InvalidFunction(format!(
×
463
                "Unsupported Cast type {data_type}"
×
464
            )))?,
×
465
        };
466
        Ok((
64✔
467
            Box::new(Expression::Cast {
64✔
468
                arg: expression.0,
64✔
469
                typ: cast_to,
64✔
470
            }),
64✔
471
            expression.1,
64✔
472
        ))
64✔
473
    }
64✔
474
}
475

476
pub fn fullname_from_ident(ident: &[Ident]) -> String {
38✔
477
    let mut ident_tokens = vec![];
38✔
478
    for token in ident.iter() {
38✔
479
        ident_tokens.push(token.value.clone());
38✔
480
    }
38✔
481
    ident_tokens.join(".")
38✔
482
}
38✔
483

484
fn parse_sql_string(s: &str) -> Result<(Box<Expression>, bool), PipelineError> {
20✔
485
    Ok((
20✔
486
        Box::new(Expression::Literal(Field::String(s.to_owned()))),
20✔
487
        false,
20✔
488
    ))
20✔
489
}
20✔
490

491
pub(crate) fn normalize_ident(id: &Ident) -> String {
108✔
492
    match id.quote_style {
108✔
493
        Some(_) => id.value.clone(),
×
494
        None => id.value.clone(),
108✔
495
    }
496
}
108✔
497

498
pub fn extend_schema_source_def(schema: &Schema, name: &NameOrAlias) -> Schema {
266✔
499
    let mut output_schema = schema.clone();
266✔
500
    let mut fields = vec![];
266✔
501
    for mut field in schema.clone().fields.into_iter() {
1,176✔
502
        if let Some(alias) = &name.1 {
1,176✔
503
            field.source = SourceDefinition::Alias {
408✔
504
                name: alias.to_string(),
408✔
505
            };
408✔
506
        }
768✔
507

508
        fields.push(field);
1,176✔
509
    }
510
    output_schema.fields = fields;
266✔
511

266✔
512
    output_schema
266✔
513
}
266✔
514

515
fn parse_sql_column(ident: &[Ident], schema: &Schema) -> Result<Box<Expression>, PipelineError> {
1,541✔
516
    let (src_field, src_table_or_alias, src_connection) = match ident.len() {
1,541✔
517
        1 => (&ident[0].value, None, None),
1,347✔
518
        2 => (&ident[1].value, Some(&ident[0].value), None),
194✔
519
        3 => (
×
520
            &ident[2].value,
×
521
            Some(&ident[1].value),
×
522
            Some(&ident[0].value),
×
523
        ),
×
524
        _ => {
525
            return Err(IllegalFieldIdentifier(
×
526
                ident
×
527
                    .iter()
×
528
                    .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
529
            ))
×
530
        }
531
    };
532

533
    let matching_by_field: Vec<(usize, &FieldDefinition)> = schema
1,541✔
534
        .fields
1,541✔
535
        .iter()
1,541✔
536
        .enumerate()
1,541✔
537
        .filter(|(_idx, f)| &f.name == src_field)
7,093✔
538
        .collect();
1,541✔
539

1,541✔
540
    match matching_by_field.len() {
1,541✔
541
        0 => Err(UnknownFieldIdentifier(
×
542
            ident
×
543
                .iter()
×
544
                .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
545
        )),
×
546
        1 => Ok(Box::new(Expression::Column {
1,445✔
547
            index: matching_by_field[0].0,
1,445✔
548
        })),
1,445✔
549
        _ => match src_table_or_alias {
96✔
550
            None => Err(AmbiguousFieldIdentifier(
×
551
                ident
×
552
                    .iter()
×
553
                    .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
554
            )),
×
555
            Some(src_table_or_alias) => {
96✔
556
                let matching_by_table_or_alias: Vec<(usize, &FieldDefinition)> = matching_by_field
96✔
557
                    .into_iter()
96✔
558
                    .filter(|(_idx, field)| match &field.source {
192✔
559
                        SourceDefinition::Alias { name } => name == src_table_or_alias,
144✔
560
                        SourceDefinition::Table {
561
                            name,
48✔
562
                            connection: _,
48✔
563
                        } => name == src_table_or_alias,
48✔
564
                        _ => false,
×
565
                    })
192✔
566
                    .collect();
96✔
567

96✔
568
                match matching_by_table_or_alias.len() {
96✔
569
                    0 => Err(UnknownFieldIdentifier(
×
570
                        ident
×
571
                            .iter()
×
572
                            .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
573
                    )),
×
574
                    1 => Ok(Box::new(Expression::Column {
96✔
575
                        index: matching_by_table_or_alias[0].0,
96✔
576
                    })),
96✔
577
                    _ => match src_connection {
×
578
                        None => Err(InvalidExpression(
×
579
                            ident
×
580
                                .iter()
×
581
                                .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
582
                        )),
×
583
                        Some(src_connection) => {
×
584
                            let matching_by_connection: Vec<(usize, &FieldDefinition)> =
×
585
                                matching_by_table_or_alias
×
586
                                    .into_iter()
×
587
                                    .filter(|(_idx, field)| match &field.source {
×
588
                                        SourceDefinition::Table {
589
                                            name: _,
590
                                            connection,
×
591
                                        } => connection == src_connection,
×
592
                                        _ => false,
×
593
                                    })
×
594
                                    .collect();
×
595

×
596
                            match matching_by_connection.len() {
×
597
                                0 => Err(UnknownFieldIdentifier(
×
598
                                    ident
×
599
                                        .iter()
×
600
                                        .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
601
                                )),
×
602
                                1 => Ok(Box::new(Expression::Column {
×
603
                                    index: matching_by_connection[0].0,
×
604
                                })),
×
605
                                _ => Err(InvalidExpression(
×
606
                                    ident
×
607
                                        .iter()
×
608
                                        .fold(String::new(), |a, b| a + "." + b.value.as_str()),
×
609
                                )),
×
610
                            }
611
                        }
612
                    },
613
                }
614
            }
615
        },
616
    }
617
}
1,541✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc