• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4012757265

pending completion
4012757265

Pull #737

github

GitHub
Merge 41e5235a4 into c7b362bed
Pull Request #737: feat: select * wildcard

55 of 55 new or added lines in 4 files covered. (100.0%)

23308 of 35040 relevant lines covered (66.52%)

37782.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.3
/dozer-sql/src/pipeline/expression/builder.rs
1
use std::fmt::Display;
2

3
use dozer_types::{
4
    ordered_float::OrderedFloat,
5
    types::{Field, FieldDefinition, Schema, SourceDefinition},
6
};
7

8
use sqlparser::ast::{
9
    BinaryOperator as SqlBinaryOperator, DataType, Expr as SqlExpr, Expr, Function, FunctionArg,
10
    FunctionArgExpr, Ident, TrimWhereField, UnaryOperator as SqlUnaryOperator, Value as SqlValue,
11
};
12

13
use crate::pipeline::errors::PipelineError;
14
use crate::pipeline::expression::aggregate::AggregateFunctionType;
15
use crate::pipeline::expression::builder::PipelineError::InvalidArgument;
16
use crate::pipeline::expression::builder::PipelineError::InvalidExpression;
17
use crate::pipeline::expression::builder::PipelineError::InvalidOperator;
18
use crate::pipeline::expression::builder::PipelineError::InvalidValue;
19
use crate::pipeline::expression::execution::Expression;
20
use crate::pipeline::expression::execution::Expression::ScalarFunction;
21
use crate::pipeline::expression::operator::{BinaryOperatorType, UnaryOperatorType};
22
use crate::pipeline::expression::scalar::common::ScalarFunctionType;
23
use crate::pipeline::expression::scalar::string::TrimType;
24

25
use super::cast::CastOperatorType;
26

27
pub type Bypass = bool;
28

29
pub enum BuilderExpressionType {
30
    PreAggregation,
31
    Aggregation,
32
    // PostAggregation,
33
    FullExpression,
34
}
35
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
1,346✔
36
pub struct NameOrAlias(pub String, pub Option<String>);
37

38
pub enum ConstraintIdentifier {
39
    Single(Ident),
×
40
    Compound(Vec<Ident>),
×
41
}
×
42

×
43
impl Display for ConstraintIdentifier {
×
44
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
45
        match self {
×
46
            ConstraintIdentifier::Single(ident) => f.write_fmt(format_args!("{}", ident)),
×
47
            ConstraintIdentifier::Compound(ident) => f.write_fmt(format_args!("{:?}", ident)),
×
48
        }
×
49
    }
×
50
}
51
pub struct ExpressionBuilder;
52

53
impl ExpressionBuilder {
54
    pub fn build(
189✔
55
        &self,
189✔
56
        expression_type: &BuilderExpressionType,
189✔
57
        sql_expression: &SqlExpr,
189✔
58
        schema: &Schema,
189✔
59
    ) -> Result<Box<Expression>, PipelineError> {
189✔
60
        let (expression, _bypass) =
189✔
61
            self.parse_sql_expression(expression_type, sql_expression, schema)?;
189✔
62
        Ok(expression)
189✔
63
    }
189✔
64

×
65
    pub fn parse_sql_expression(
66
        &self,
×
67
        expression_type: &BuilderExpressionType,
68
        expression: &SqlExpr,
×
69
        schema: &Schema,
×
70
    ) -> Result<(Box<Expression>, bool), PipelineError> {
71
        match expression {
69✔
72
            SqlExpr::Trim {
×
73
                expr,
50✔
74
                trim_where,
50✔
75
                trim_what,
50✔
76
            } => self.parse_sql_trim_function(expression_type, expr, trim_where, trim_what, schema),
50✔
77
            SqlExpr::Identifier(ident) => {
2,593✔
78
                let idx = get_field_index(&ConstraintIdentifier::Single(ident.clone()), schema);
2,593✔
79

×
80
                let idx = idx?.map_or(
2,593✔
81
                    Err(PipelineError::InvalidExpression(ident.clone().value.to_string())),
2,593✔
82
                    Ok,
2,593✔
83
                )?;
2,593✔
84
                Ok((Box::new(Expression::Column { index: idx }), false))
2,593✔
85
            }
×
86
            SqlExpr::CompoundIdentifier(ident) => {
56✔
87
                let idx = get_field_index(&ConstraintIdentifier::Compound(ident.clone()), schema)?
56✔
88
                    .map_or(
56✔
89
                        Err(PipelineError::InvalidExpression(format!("{:?}", ident))),
56✔
90
                        Ok,
56✔
91
                    )?;
56✔
92
                Ok((Box::new(Expression::Column { index: idx }), false))
54✔
93
            }
×
94
            SqlExpr::Value(SqlValue::Number(n, _)) => self.parse_sql_number(n),
43✔
95
            SqlExpr::Value(SqlValue::Null) => {
96
                Ok((Box::new(Expression::Literal(Field::Null)), false))
×
97
            }
×
98
            SqlExpr::Value(SqlValue::SingleQuotedString(s) | SqlValue::DoubleQuotedString(s)) => {
26✔
99
                parse_sql_string(s)
26✔
100
            }
×
101
            SqlExpr::UnaryOp { expr, op } => {
×
102
                self.parse_sql_unary_op(expression_type, op, expr, schema)
×
103
            }
×
104
            SqlExpr::BinaryOp { left, op, right } => {
79✔
105
                self.parse_sql_binary_op(expression_type, left, op, right, schema)
79✔
106
            }
×
107
            SqlExpr::Nested(expr) => self.parse_sql_expression(expression_type, expr, schema),
12✔
108
            SqlExpr::Function(sql_function) => match expression_type {
348✔
109
                BuilderExpressionType::PreAggregation => self.parse_sql_function_pre_aggregation(
166✔
110
                    expression_type,
166✔
111
                    sql_function,
166✔
112
                    schema,
166✔
113
                    expression,
166✔
114
                ),
166✔
115
                BuilderExpressionType::Aggregation => self.parse_sql_function_aggregation(
166✔
116
                    expression_type,
166✔
117
                    sql_function,
166✔
118
                    schema,
166✔
119
                    expression,
166✔
120
                ),
166✔
121
                // BuilderExpressionType::PostAggregation => todo!(),
×
122
                BuilderExpressionType::FullExpression => {
×
123
                    self.parse_sql_function(expression_type, sql_function, schema, expression)
16✔
124
                }
125
            },
126
            SqlExpr::Like {
127
                negated,
×
128
                expr,
×
129
                pattern,
×
130
                escape_char,
×
131
            } => self.parse_sql_like_operator(
×
132
                expression_type,
×
133
                negated,
×
134
                expr,
×
135
                pattern,
×
136
                escape_char,
×
137
                schema,
×
138
            ),
×
139
            SqlExpr::Cast { expr, data_type } => {
64✔
140
                self.parse_sql_cast_operator(expression_type, expr, data_type, schema)
64✔
141
            }
×
142
            _ => Err(InvalidExpression(format!("{:?}", expression))),
×
143
        }
×
144
    }
3,271✔
145

×
146
    fn parse_sql_trim_function(
50✔
147
        &self,
50✔
148
        expression_type: &BuilderExpressionType,
50✔
149
        expr: &Expr,
50✔
150
        trim_where: &Option<TrimWhereField>,
50✔
151
        trim_what: &Option<Box<Expr>>,
50✔
152
        schema: &Schema,
50✔
153
    ) -> Result<(Box<Expression>, bool), PipelineError> {
50✔
154
        let arg = self.parse_sql_expression(expression_type, expr, schema)?.0;
50✔
155
        let what = match trim_what {
50✔
156
            Some(e) => Some(self.parse_sql_expression(expression_type, e, schema)?.0),
8✔
157
            _ => None,
42✔
158
        };
×
159
        let typ = trim_where.as_ref().map(|e| match e {
50✔
160
            TrimWhereField::Both => TrimType::Both,
2✔
161
            TrimWhereField::Leading => TrimType::Leading,
2✔
162
            TrimWhereField::Trailing => TrimType::Trailing,
2✔
163
        });
50✔
164
        Ok((Box::new(Expression::Trim { arg, what, typ }), false))
50✔
165
    }
50✔
166

×
167
    fn parse_sql_function(
16✔
168
        &self,
16✔
169
        expression_type: &BuilderExpressionType,
16✔
170
        sql_function: &Function,
16✔
171
        schema: &Schema,
16✔
172
        expression: &SqlExpr,
16✔
173
    ) -> Result<(Box<Expression>, bool), PipelineError> {
16✔
174
        let name = sql_function.name.to_string().to_lowercase();
16✔
175
        if let Ok(function) = ScalarFunctionType::new(&name) {
16✔
176
            let mut arg_exprs = vec![];
15✔
177
            for arg in &sql_function.args {
37✔
178
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
22✔
179
                match r {
22✔
180
                    Ok(result) => {
22✔
181
                        if result.1 {
22✔
182
                            return Ok(result);
×
183
                        } else {
22✔
184
                            arg_exprs.push(*result.0);
22✔
185
                        }
22✔
186
                    }
×
187
                    Err(error) => {
×
188
                        return Err(error);
×
189
                    }
×
190
                }
×
191
            }
×
192

193
            return Ok((
15✔
194
                Box::new(ScalarFunction {
15✔
195
                    fun: function,
15✔
196
                    args: arg_exprs,
15✔
197
                }),
15✔
198
                false,
15✔
199
            ));
15✔
200
        };
1✔
201
        if AggregateFunctionType::new(&name).is_ok() {
1✔
202
            let arg = sql_function.args.first().unwrap();
1✔
203
            let r = self.parse_sql_function_arg(expression_type, arg, schema)?;
1✔
204
            return Ok((r.0, false)); // switch bypass to true, since the argument of this Aggregation must be the final result
1✔
205
        };
×
206
        Err(InvalidExpression(format!("{:?}", expression)))
×
207
    }
16✔
208

×
209
    fn parse_sql_function_pre_aggregation(
166✔
210
        &self,
166✔
211
        expression_type: &BuilderExpressionType,
166✔
212
        sql_function: &Function,
166✔
213
        schema: &Schema,
166✔
214
        expression: &SqlExpr,
166✔
215
    ) -> Result<(Box<Expression>, bool), PipelineError> {
166✔
216
        let name = sql_function.name.to_string().to_lowercase();
166✔
217

218
        if let Ok(function) = ScalarFunctionType::new(&name) {
166✔
219
            let mut arg_exprs = vec![];
×
220
            for arg in &sql_function.args {
×
221
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
×
222
                match r {
×
223
                    Ok(result) => {
×
224
                        if result.1 {
×
225
                            return Ok(result);
×
226
                        } else {
×
227
                            arg_exprs.push(*result.0);
×
228
                        }
×
229
                    }
×
230
                    Err(error) => {
×
231
                        return Err(error);
×
232
                    }
×
233
                }
×
234
            }
×
235

236
            return Ok((
×
237
                Box::new(ScalarFunction {
×
238
                    fun: function,
×
239
                    args: arg_exprs,
×
240
                }),
×
241
                false,
×
242
            ));
×
243
        };
166✔
244
        if AggregateFunctionType::new(&name).is_ok() {
166✔
245
            let arg = sql_function.args.first().unwrap();
166✔
246
            let r = self.parse_sql_function_arg(expression_type, arg, schema)?;
166✔
247
            return Ok((r.0, true)); // switch bypass to true, since the argument of this Aggregation must be the final result
166✔
248
        };
×
249
        Err(InvalidExpression(format!("{:?}", expression)))
×
250
    }
166✔
251

×
252
    fn parse_sql_function_aggregation(
166✔
253
        &self,
166✔
254
        expression_type: &BuilderExpressionType,
166✔
255
        sql_function: &Function,
166✔
256
        schema: &Schema,
166✔
257
        expression: &SqlExpr,
166✔
258
    ) -> Result<(Box<Expression>, bool), PipelineError> {
166✔
259
        let name = sql_function.name.to_string().to_lowercase();
166✔
260

261
        if let Ok(function) = ScalarFunctionType::new(&name) {
166✔
262
            let mut arg_exprs = vec![];
×
263
            for arg in &sql_function.args {
×
264
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
×
265
                match r {
×
266
                    Ok(result) => {
×
267
                        if result.1 {
×
268
                            return Ok(result);
×
269
                        } else {
×
270
                            arg_exprs.push(*result.0);
×
271
                        }
×
272
                    }
×
273
                    Err(error) => {
×
274
                        return Err(error);
×
275
                    }
×
276
                }
×
277
            }
×
278

×
279
            return Ok((
×
280
                Box::new(ScalarFunction {
×
281
                    fun: function,
×
282
                    args: arg_exprs,
×
283
                }),
×
284
                false,
×
285
            ));
×
286
        };
166✔
287

288
        if let Ok(function) = AggregateFunctionType::new(&name) {
166✔
289
            let mut arg_exprs = vec![];
166✔
290
            for arg in &sql_function.args {
332✔
291
                let r = self.parse_sql_function_arg(expression_type, arg, schema);
166✔
292
                match r {
166✔
293
                    Ok(result) => {
166✔
294
                        if result.1 {
166✔
295
                            return Ok(result);
×
296
                        } else {
166✔
297
                            arg_exprs.push(*result.0);
166✔
298
                        }
166✔
299
                    }
×
300
                    Err(error) => {
×
301
                        return Err(error);
×
302
                    }
×
303
                }
×
304
            }
305

306
            return Ok((
166✔
307
                Box::new(Expression::AggregateFunction {
166✔
308
                    fun: function,
166✔
309
                    args: arg_exprs,
166✔
310
                }),
166✔
311
                true, // switch bypass to true, since this Aggregation must be the final result
166✔
312
            ));
166✔
313
        };
×
314

×
315
        Err(InvalidExpression(format!(
×
316
            "Unsupported Expression: {:?}",
×
317
            expression
×
318
        )))
×
319
    }
166✔
320

×
321
    pub(crate) fn parse_sql_function_arg(
×
322
        &self,
323
        expression_type: &BuilderExpressionType,
324
        argument: &FunctionArg,
×
325
        schema: &Schema,
326
    ) -> Result<(Box<Expression>, bool), PipelineError> {
×
327
        match argument {
355✔
328
            FunctionArg::Named {
×
329
                name: _,
330
                arg: FunctionArgExpr::Expr(arg),
×
331
            } => self.parse_sql_expression(expression_type, arg, schema),
×
332
            FunctionArg::Named {
×
333
                name: _,
×
334
                arg: FunctionArgExpr::Wildcard,
×
335
            } => Err(InvalidArgument(format!("{:?}", argument))),
×
336
            FunctionArg::Unnamed(FunctionArgExpr::Expr(arg)) => {
355✔
337
                self.parse_sql_expression(expression_type, arg, schema)
355✔
338
            }
×
339
            FunctionArg::Unnamed(FunctionArgExpr::Wildcard) => {
×
340
                self.parse_sql_function_arg_expression(&FunctionArgExpr::Wildcard, schema)
×
341
            }
342
            _ => Err(InvalidArgument(format!("{:?}", argument))),
×
343
        }
×
344
    }
355✔
345

×
346
    fn parse_sql_function_arg_expression(
×
347
        &self,
×
348
        argument: &FunctionArgExpr,
×
349
        schema: &Schema,
×
350
    ) -> Result<(Box<Expression>, bool), PipelineError> {
×
351
        match argument {
×
352
            FunctionArgExpr::Wildcard => {
×
353
                let idents: Vec<Ident> = schema.fields.iter()
×
354
                    .map(|field| Ident::new(field.clone().name)).collect();
×
355
                self.parse_sql_expression(
×
356
                    &BuilderExpressionType::FullExpression,
×
357
                    &SqlExpr::CompoundIdentifier(idents),
×
358
                    schema
×
359
                )
×
360
            },
×
361
            _ => Err(InvalidArgument(format!("{:?}", argument))),
×
362
        }
×
363
    }
×
364

×
365
    fn parse_sql_unary_op(
×
366
        &self,
×
367
        expression_type: &BuilderExpressionType,
×
368
        op: &SqlUnaryOperator,
×
369
        expr: &SqlExpr,
×
370
        schema: &Schema,
×
371
    ) -> Result<(Box<Expression>, Bypass), PipelineError> {
×
372
        let (arg, bypass) = self.parse_sql_expression(expression_type, expr, schema)?;
×
373
        if bypass {
×
374
            return Ok((arg, bypass));
×
375
        }
×
376

377
        let operator = match op {
×
378
            SqlUnaryOperator::Not => UnaryOperatorType::Not,
×
379
            SqlUnaryOperator::Plus => UnaryOperatorType::Plus,
×
380
            SqlUnaryOperator::Minus => UnaryOperatorType::Minus,
×
381
            _ => return Err(InvalidOperator(format!("{:?}", op))),
×
382
        };
383

×
384
        Ok((Box::new(Expression::UnaryOperator { operator, arg }), false))
×
385
    }
×
386

387
    fn parse_sql_binary_op(
79✔
388
        &self,
79✔
389
        expression_type: &BuilderExpressionType,
79✔
390
        left: &SqlExpr,
79✔
391
        op: &SqlBinaryOperator,
79✔
392
        right: &SqlExpr,
79✔
393
        schema: &Schema,
79✔
394
    ) -> Result<(Box<Expression>, bool), PipelineError> {
79✔
395
        let (left_op, bypass_left) = self.parse_sql_expression(expression_type, left, schema)?;
79✔
396
        if bypass_left {
79✔
397
            return Ok((left_op, bypass_left));
×
398
        }
79✔
399
        let (right_op, bypass_right) = self.parse_sql_expression(expression_type, right, schema)?;
79✔
400
        if bypass_right {
79✔
401
            return Ok((right_op, bypass_right));
×
402
        }
79✔
403

×
404
        let operator = match op {
79✔
405
            SqlBinaryOperator::Gt => BinaryOperatorType::Gt,
18✔
406
            SqlBinaryOperator::GtEq => BinaryOperatorType::Gte,
1✔
407
            SqlBinaryOperator::Lt => BinaryOperatorType::Lt,
12✔
408
            SqlBinaryOperator::LtEq => BinaryOperatorType::Lte,
12✔
409
            SqlBinaryOperator::Eq => BinaryOperatorType::Eq,
18✔
410
            SqlBinaryOperator::NotEq => BinaryOperatorType::Ne,
×
411

412
            SqlBinaryOperator::Plus => BinaryOperatorType::Add,
×
413
            SqlBinaryOperator::Minus => BinaryOperatorType::Sub,
×
414
            SqlBinaryOperator::Multiply => BinaryOperatorType::Mul,
×
415
            SqlBinaryOperator::Divide => BinaryOperatorType::Div,
×
416
            SqlBinaryOperator::Modulo => BinaryOperatorType::Mod,
×
417

×
418
            SqlBinaryOperator::And => BinaryOperatorType::And,
12✔
419
            SqlBinaryOperator::Or => BinaryOperatorType::Or,
6✔
420

×
421
            // BinaryOperator::BitwiseAnd => ...
×
422
            // BinaryOperator::BitwiseOr => ...
×
423
            // BinaryOperator::StringConcat => ...
×
424
            _ => return Err(InvalidOperator(format!("{:?}", op))),
×
425
        };
×
426

×
427
        Ok((
79✔
428
            Box::new(Expression::BinaryOperator {
79✔
429
                left: left_op,
79✔
430
                operator,
79✔
431
                right: right_op,
79✔
432
            }),
79✔
433
            false,
79✔
434
        ))
79✔
435
    }
79✔
436

×
437
    fn parse_sql_number(&self, n: &str) -> Result<(Box<Expression>, Bypass), PipelineError> {
43✔
438
        match n.parse::<i64>() {
43✔
439
            Ok(n) => Ok((Box::new(Expression::Literal(Field::Int(n))), false)),
43✔
440
            Err(_) => match n.parse::<f64>() {
×
441
                Ok(f) => Ok((
×
442
                    Box::new(Expression::Literal(Field::Float(OrderedFloat(f)))),
×
443
                    false,
×
444
                )),
×
445
                Err(_) => Err(InvalidValue(n.to_string())),
×
446
            },
×
447
        }
×
448
    }
43✔
449

×
450
    fn parse_sql_like_operator(
×
451
        &self,
×
452
        expression_type: &BuilderExpressionType,
×
453
        negated: &bool,
×
454
        expr: &Expr,
×
455
        pattern: &Expr,
×
456
        escape_char: &Option<char>,
×
457
        schema: &Schema,
×
458
    ) -> Result<(Box<Expression>, bool), PipelineError> {
×
459
        let arg = self.parse_sql_expression(expression_type, expr, schema)?;
×
460
        let pattern = self.parse_sql_expression(expression_type, pattern, schema)?;
×
461
        let like_expression = Box::new(Expression::Like {
×
462
            arg: arg.0,
×
463
            pattern: pattern.0,
×
464
            escape: *escape_char,
×
465
        });
×
466
        if *negated {
×
467
            Ok((
×
468
                Box::new(Expression::UnaryOperator {
×
469
                    operator: UnaryOperatorType::Not,
×
470
                    arg: like_expression,
×
471
                }),
×
472
                arg.1,
×
473
            ))
×
474
        } else {
475
            Ok((like_expression, arg.1))
×
476
        }
×
477
    }
×
478

×
479
    fn parse_sql_cast_operator(
64✔
480
        &self,
64✔
481
        expression_type: &BuilderExpressionType,
64✔
482
        expr: &Expr,
64✔
483
        data_type: &DataType,
64✔
484
        schema: &Schema,
64✔
485
    ) -> Result<(Box<Expression>, bool), PipelineError> {
64✔
486
        let expression = self.parse_sql_expression(expression_type, expr, schema)?;
64✔
487
        let cast_to = match data_type {
64✔
488
            DataType::Decimal(_) => CastOperatorType::Decimal,
×
489
            DataType::Binary(_) => CastOperatorType::Binary,
×
490
            DataType::Float(_) => CastOperatorType::Float,
10✔
491
            DataType::Int(_) => CastOperatorType::Int,
6✔
492
            DataType::Integer(_) => CastOperatorType::Int,
×
493
            DataType::UnsignedInt(_) => CastOperatorType::UInt,
×
494
            DataType::UnsignedInteger(_) => CastOperatorType::UInt,
×
495
            DataType::Boolean => CastOperatorType::Boolean,
12✔
496
            DataType::Date => CastOperatorType::Date,
×
497
            DataType::Timestamp(..) => CastOperatorType::Timestamp,
×
498
            DataType::Text => CastOperatorType::Text,
18✔
499
            DataType::String => CastOperatorType::String,
18✔
500
            DataType::Custom(name, ..) => {
×
501
                if name.to_string().to_lowercase() == "bson" {
×
502
                    CastOperatorType::Bson
×
503
                } else {
×
504
                    Err(PipelineError::InvalidFunction(format!(
×
505
                        "Unsupported Cast type {}",
×
506
                        name
×
507
                    )))?
×
508
                }
×
509
            }
×
510
            _ => Err(PipelineError::InvalidFunction(format!(
×
511
                "Unsupported Cast type {}",
×
512
                data_type
×
513
            )))?,
×
514
        };
515
        Ok((
64✔
516
            Box::new(Expression::Cast {
64✔
517
                arg: expression.0,
64✔
518
                typ: cast_to,
64✔
519
            }),
64✔
520
            expression.1,
64✔
521
        ))
64✔
522
    }
64✔
523
}
×
524

×
525
pub fn fullname_from_ident(ident: &[Ident]) -> String {
28✔
526
    let mut ident_tokens = vec![];
28✔
527
    for token in ident.iter() {
28✔
528
        ident_tokens.push(token.value.clone());
28✔
529
    }
28✔
530
    ident_tokens.join(".")
28✔
531
}
28✔
532

×
533
fn parse_sql_string(s: &str) -> Result<(Box<Expression>, bool), PipelineError> {
26✔
534
    Ok((
26✔
535
        Box::new(Expression::Literal(Field::String(s.to_owned()))),
26✔
536
        false,
26✔
537
    ))
26✔
538
}
26✔
539

540
pub(crate) fn normalize_ident(id: &Ident) -> String {
146✔
541
    match id.quote_style {
146✔
542
        Some(_) => id.value.clone(),
×
543
        None => id.value.clone(),
146✔
544
    }
×
545
}
146✔
546

×
547
pub fn extend_schema_source_def(schema: &Schema, name: &NameOrAlias) -> Schema {
269✔
548
    let mut output_schema = schema.clone();
269✔
549
    let mut fields = vec![];
269✔
550
    for mut field in schema.clone().fields.into_iter() {
1,104✔
551
        if let Some(alias) = &name.1 {
1,104✔
552
            field.source = SourceDefinition::Alias {
120✔
553
                name: alias.to_string(),
120✔
554
            };
120✔
555
        }
984✔
556

557
        fields.push(field);
1,104✔
558
    }
559
    output_schema.fields = fields;
269✔
560

269✔
561
    output_schema
269✔
562
}
269✔
563

564
pub fn get_field_index(
2,685✔
565
    ident: &ConstraintIdentifier,
2,685✔
566
    schema: &Schema,
2,685✔
567
) -> Result<Option<usize>, PipelineError> {
2,685✔
568
    let get_field_idx = |ident: &Ident, schema: &Schema| -> Option<(usize, FieldDefinition)> {
2,685✔
569
        schema
2,685✔
570
            .fields
2,685✔
571
            .iter()
2,685✔
572
            .enumerate()
2,685✔
573
            .find(|(_, f)| f.name == ident.value)
5,369✔
574
            .map(|(idx, fd)| (idx, fd.clone()))
2,685✔
575
    };
2,685✔
576

577
    let tables_matches = |table_ident: &Ident, fd: FieldDefinition| -> bool {
2,685✔
578
        match fd.source {
92✔
579
            dozer_types::types::SourceDefinition::Table {
580
                connection: _,
581
                name,
30✔
582
            } => name == table_ident.value,
30✔
583
            dozer_types::types::SourceDefinition::Alias { name } => name == table_ident.value,
60✔
584
            dozer_types::types::SourceDefinition::Dynamic => false,
2✔
585
        }
586
    };
92✔
587

588
    match ident {
2,685✔
589
        ConstraintIdentifier::Single(ident) => {
2,593✔
590
            let field_idx = get_field_idx(ident, schema);
2,593✔
591
            field_idx.map_or(
2,593✔
592
                Err(PipelineError::InvalidExpression(ident.value.to_string())),
2,593✔
593
                |t| Ok(Some(t.0)),
2,593✔
594
            )
2,593✔
595
        }
596
        ConstraintIdentifier::Compound(comp_ident) => {
92✔
597
            if comp_ident.len() > 2 {
92✔
598
                return Err(PipelineError::NameSpaceTooLong(
×
599
                    comp_ident
×
600
                        .iter()
×
601
                        .map(|a| a.value.clone())
×
602
                        .collect::<Vec<String>>()
×
603
                        .join("."),
×
604
                ));
×
605
            }
92✔
606
            let table_name = comp_ident.first().expect("table_name is expected");
92✔
607
            let field_name = comp_ident.last().expect("field_name is expected");
92✔
608

92✔
609
            let field_idx = get_field_idx(field_name, schema);
92✔
610
            if let Some((idx, fd)) = field_idx {
92✔
611
                if tables_matches(table_name, fd) {
92✔
612
                    return Ok(Some(idx));
72✔
613
                }
20✔
614
            }
×
615
            Ok(None)
20✔
616
        }
617
    }
618
}
2,685✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc