• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3978628498

pending completion
3978628498

Pull #705

github

GitHub
Merge 8775fcda7 into e2f9ad287
Pull Request #705: chore: support for generic schema context in `Sink`, `Processor` and `Source` factories

572 of 572 new or added lines in 35 files covered. (100.0%)

22294 of 34850 relevant lines covered (63.97%)

40332.28 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.09
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::dag::{
4
    dag::DEFAULT_PORT_HANDLE,
5
    errors::ExecutionError,
6
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
7
};
8
use dozer_types::types::{FieldDefinition, Schema};
9
use sqlparser::ast::{Expr as SqlExpr, Expr, SelectItem};
10

11
use crate::pipeline::builder::SchemaSQLContext;
12
use crate::pipeline::{
13
    errors::PipelineError,
14
    expression::{
15
        aggregate::AggregateFunctionType,
16
        builder::{BuilderExpressionType, ExpressionBuilder},
17
        execution::{Expression, ExpressionExecutor},
18
    },
19
    projection::{factory::parse_sql_select_item, processor::ProjectionProcessor},
20
};
21

22
use super::{
23
    aggregator::Aggregator,
24
    processor::{AggregationProcessor, FieldRule},
25
};
26

×
27
#[derive(Debug)]
×
28
pub struct AggregationProcessorFactory {
29
    select: Vec<SelectItem>,
30
    groupby: Vec<SqlExpr>,
31
    stateful: bool,
32
}
33

34
impl AggregationProcessorFactory {
×
35
    /// Creates a new [`AggregationProcessorFactory`].
×
36
    pub fn new(select: Vec<SelectItem>, groupby: Vec<SqlExpr>, stateful: bool) -> Self {
91✔
37
        Self {
91✔
38
            select,
91✔
39
            groupby,
91✔
40
            stateful,
91✔
41
        }
91✔
42
    }
91✔
43
}
44

×
45
impl ProcessorFactory<SchemaSQLContext> for AggregationProcessorFactory {
×
46
    fn get_input_ports(&self) -> Vec<PortHandle> {
152✔
47
        vec![DEFAULT_PORT_HANDLE]
152✔
48
    }
152✔
49

×
50
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
228✔
51
        if self.stateful {
228✔
52
            vec![OutputPortDef::new(
×
53
                DEFAULT_PORT_HANDLE,
×
54
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
55
                    retr_old_records_for_deletes: true,
×
56
                    retr_old_records_for_updates: true,
×
57
                },
×
58
            )]
×
59
        } else {
×
60
            vec![OutputPortDef::new(
228✔
61
                DEFAULT_PORT_HANDLE,
228✔
62
                OutputPortType::Stateless,
228✔
63
            )]
228✔
64
        }
×
65
    }
228✔
66

×
67
    fn get_output_schema(
77✔
68
        &self,
77✔
69
        _output_port: &PortHandle,
77✔
70
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
77✔
71
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
77✔
72
        let (input_schema, _ctx) = input_schemas
77✔
73
            .get(&DEFAULT_PORT_HANDLE)
77✔
74
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
77✔
75
        let output_field_rules =
77✔
76
            get_aggregation_rules(&self.select, &self.groupby, input_schema).unwrap();
77✔
77

77✔
78
        if is_aggregation(&self.groupby, &output_field_rules) {
77✔
79
            return Ok((
×
80
                build_output_schema(input_schema, output_field_rules)?,
20✔
81
                SchemaSQLContext {},
20✔
82
            ));
×
83
        }
57✔
84

57✔
85
        Ok((
57✔
86
            build_projection_schema(input_schema, &self.select)?,
57✔
87
            SchemaSQLContext {},
57✔
88
        ))
×
89
    }
77✔
90

×
91
    fn build(
76✔
92
        &self,
76✔
93
        input_schemas: HashMap<PortHandle, Schema>,
76✔
94
        _output_schemas: HashMap<PortHandle, Schema>,
76✔
95
    ) -> Result<Box<dyn Processor>, ExecutionError> {
76✔
96
        let input_schema = input_schemas
76✔
97
            .get(&DEFAULT_PORT_HANDLE)
76✔
98
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
76✔
99
        let output_field_rules =
76✔
100
            get_aggregation_rules(&self.select, &self.groupby, input_schema).unwrap();
76✔
101

76✔
102
        if is_aggregation(&self.groupby, &output_field_rules) {
76✔
103
            return Ok(Box::new(AggregationProcessor::new(
19✔
104
                output_field_rules,
19✔
105
                input_schema.clone(),
19✔
106
            )));
19✔
107
        }
57✔
108

57✔
109
        // Build a Projection
57✔
110
        match self
57✔
111
            .select
57✔
112
            .iter()
57✔
113
            .map(|item| parse_sql_select_item(item, input_schema))
198✔
114
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
57✔
115
        {
×
116
            Ok(expressions) => Ok(Box::new(ProjectionProcessor::new(
57✔
117
                input_schema.clone(),
57✔
118
                expressions,
57✔
119
            ))),
57✔
120
            Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
121
        }
×
122
    }
76✔
123

×
124
    fn prepare(
×
125
        &self,
×
126
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
127
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
128
    ) -> Result<(), ExecutionError> {
×
129
        Ok(())
×
130
    }
×
131
}
×
132

×
133
fn is_aggregation(groupby: &[SqlExpr], output_field_rules: &[FieldRule]) -> bool {
153✔
134
    if !groupby.is_empty() {
153✔
135
        return true;
25✔
136
    }
128✔
137

128✔
138
    output_field_rules
128✔
139
        .iter()
128✔
140
        .any(|rule| matches!(rule, FieldRule::Measure(_, _, _)))
410✔
141
}
153✔
142

×
143
pub(crate) fn get_aggregation_rules(
198✔
144
    select: &[SelectItem],
198✔
145
    groupby: &[SqlExpr],
198✔
146
    schema: &Schema,
198✔
147
) -> Result<Vec<FieldRule>, PipelineError> {
198✔
148
    let mut select_rules = select
198✔
149
        .iter()
198✔
150
        .map(|item| parse_sql_aggregate_item(item, schema))
552✔
151
        .filter(|e| e.is_ok())
552✔
152
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
198✔
153

×
154
    let mut groupby_rules = groupby
198✔
155
        .iter()
198✔
156
        .map(|expr| parse_sql_groupby_item(expr, schema))
198✔
157
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
198✔
158

×
159
    select_rules.append(&mut groupby_rules);
198✔
160

198✔
161
    Ok(select_rules)
198✔
162
}
198✔
163

×
164
fn build_field_rule(
552✔
165
    sql_expr: &Expr,
552✔
166
    schema: &Schema,
552✔
167
    name: String,
552✔
168
) -> Result<FieldRule, PipelineError> {
552✔
169
    let builder = ExpressionBuilder {};
552✔
170
    let expression =
552✔
171
        builder.parse_sql_expression(&BuilderExpressionType::Aggregation, sql_expr, schema)?;
552✔
172

×
173
    match get_aggregator(expression.0.clone(), schema) {
552✔
174
        Ok(aggregator) => Ok(FieldRule::Measure(
84✔
175
            ExpressionBuilder {}
84✔
176
                .parse_sql_expression(&BuilderExpressionType::PreAggregation, sql_expr, schema)?
84✔
177
                .0,
×
178
            aggregator,
84✔
179
            name,
84✔
180
        )),
×
181
        Err(_) => Ok(FieldRule::Dimension(expression.0, true, name)),
468✔
182
    }
×
183
}
552✔
184

×
185
fn parse_sql_aggregate_item(
552✔
186
    item: &SelectItem,
552✔
187
    schema: &Schema,
552✔
188
) -> Result<FieldRule, PipelineError> {
552✔
189
    match item {
552✔
190
        SelectItem::UnnamedExpr(sql_expr) => {
515✔
191
            build_field_rule(sql_expr, schema, sql_expr.to_string())
515✔
192
        }
×
193
        SelectItem::ExprWithAlias { expr, alias } => {
37✔
194
            build_field_rule(expr, schema, alias.value.clone())
37✔
195
        }
196
        SelectItem::Wildcard(_) => Err(PipelineError::InvalidExpression(
×
197
            "Wildcard Operator is not supported".to_string(),
×
198
        )),
×
199
        SelectItem::QualifiedWildcard(..) => Err(PipelineError::InvalidExpression(
×
200
            "Qualified Wildcard Operator is not supported".to_string(),
×
201
        )),
×
202
    }
×
203
}
552✔
204

×
205
fn parse_sql_groupby_item(
70✔
206
    sql_expression: &SqlExpr,
70✔
207
    schema: &Schema,
70✔
208
) -> Result<FieldRule, PipelineError> {
70✔
209
    Ok(FieldRule::Dimension(
70✔
210
        ExpressionBuilder {}.build(
70✔
211
            &BuilderExpressionType::FullExpression,
70✔
212
            sql_expression,
70✔
213
            schema,
70✔
214
        )?,
70✔
215
        false,
×
216
        sql_expression.to_string(),
70✔
217
    ))
×
218
}
70✔
219

×
220
fn get_aggregator(
552✔
221
    expression: Box<Expression>,
552✔
222
    schema: &Schema,
552✔
223
) -> Result<Aggregator, PipelineError> {
552✔
224
    match *expression {
552✔
225
        Expression::AggregateFunction { fun, args } => {
84✔
226
            let arg_type = args[0].get_type(schema);
84✔
227
            match (&fun, arg_type) {
84✔
228
                (AggregateFunctionType::Avg, _) => Ok(Aggregator::Avg),
7✔
229
                (AggregateFunctionType::Count, _) => Ok(Aggregator::Count),
46✔
230
                (AggregateFunctionType::Max, _) => Ok(Aggregator::Max),
11✔
231
                (AggregateFunctionType::Min, _) => Ok(Aggregator::Min),
11✔
232
                (AggregateFunctionType::Sum, _) => Ok(Aggregator::Sum),
9✔
233
                _ => Err(PipelineError::InvalidExpression(format!(
×
234
                    "Not implemented Aggregation function: {:?}",
×
235
                    fun
×
236
                ))),
×
237
            }
×
238
        }
×
239
        _ => Err(PipelineError::InvalidExpression(format!(
468✔
240
            "Not an Aggregation function: {:?}",
468✔
241
            expression
468✔
242
        ))),
468✔
243
    }
×
244
}
552✔
245

×
246
fn build_output_schema(
20✔
247
    input_schema: &Schema,
20✔
248
    output_field_rules: Vec<FieldRule>,
20✔
249
) -> Result<Schema, ExecutionError> {
20✔
250
    let mut output_schema = Schema::empty();
20✔
251

×
252
    for e in output_field_rules.iter().enumerate() {
47✔
253
        match e.1 {
47✔
254
            FieldRule::Measure(pre_aggr, aggr, name) => {
20✔
255
                let res = pre_aggr
20✔
256
                    .get_type(input_schema)
20✔
257
                    .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
20✔
258

259
                output_schema.fields.push(FieldDefinition::new(
20✔
260
                    name.clone(),
20✔
261
                    aggr.get_return_type(res.return_type),
20✔
262
                    res.nullable,
20✔
263
                ));
20✔
264
            }
×
265

×
266
            FieldRule::Dimension(expression, is_value, name) => {
27✔
267
                if *is_value {
27✔
268
                    let res = expression
14✔
269
                        .get_type(input_schema)
14✔
270
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
14✔
271

×
272
                    output_schema.fields.push(FieldDefinition::new(
14✔
273
                        name.clone(),
14✔
274
                        res.return_type,
14✔
275
                        res.nullable,
14✔
276
                    ));
14✔
277
                    output_schema.primary_index.push(e.0);
14✔
278
                }
13✔
279
            }
×
280
        }
×
281
    }
×
282
    Ok(output_schema)
20✔
283
}
20✔
284

×
285
fn build_projection_schema(
57✔
286
    input_schema: &Schema,
57✔
287
    select: &[SelectItem],
57✔
288
) -> Result<Schema, ExecutionError> {
57✔
289
    match select
57✔
290
        .iter()
57✔
291
        .map(|item| parse_sql_select_item(item, input_schema))
198✔
292
        .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
57✔
293
    {
×
294
        Ok(expressions) => {
57✔
295
            let mut output_schema = Schema::empty();
57✔
296

×
297
            for e in expressions.iter() {
198✔
298
                let field_name = e.0.clone();
198✔
299
                let field_type =
198✔
300
                    e.1.get_type(input_schema)
198✔
301
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
198✔
302

303
                output_schema.fields.push(FieldDefinition::new(
198✔
304
                    field_name,
198✔
305
                    field_type.return_type,
198✔
306
                    field_type.nullable,
198✔
307
                ));
198✔
308
            }
309

310
            Ok(output_schema)
57✔
311
        }
312
        Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
313
    }
314
}
57✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc