• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020902227

pending completion
4020902227

Pull #743

github

GitHub
Merge 57279c6b6 into a12da35a5
Pull Request #743: Chore clippy fix

165 of 165 new or added lines in 60 files covered. (100.0%)

23638 of 35485 relevant lines covered (66.61%)

38417.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.56
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::dag::{
4
    dag::DEFAULT_PORT_HANDLE,
5
    errors::ExecutionError,
6
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
7
};
8
use dozer_types::types::{FieldDefinition, Schema, SourceDefinition};
9
use sqlparser::ast::{Expr as SqlExpr, Expr, SelectItem};
10

11
use crate::pipeline::{
12
    builder::SchemaSQLContext,
13
    expression::builder::{extend_schema_source_def, NameOrAlias},
14
};
15
use crate::pipeline::{
16
    errors::PipelineError,
17
    expression::{
18
        aggregate::AggregateFunctionType,
19
        builder::{BuilderExpressionType, ExpressionBuilder},
20
        execution::{Expression, ExpressionExecutor},
21
    },
22
    projection::{factory::parse_sql_select_item, processor::ProjectionProcessor},
23
};
24

25
use super::{
26
    aggregator::Aggregator,
27
    processor::{AggregationProcessor, FieldRule},
×
28
};
29

30
#[derive(Debug)]
×
31
pub struct AggregationProcessorFactory {
32
    name: NameOrAlias,
33
    select: Vec<SelectItem>,
34
    groupby: Vec<SqlExpr>,
35
    stateful: bool,
36
}
×
37

×
38
impl AggregationProcessorFactory {
×
39
    /// Creates a new [`AggregationProcessorFactory`].
×
40
    pub fn new(
145✔
41
        name: NameOrAlias,
145✔
42
        select: Vec<SelectItem>,
145✔
43
        groupby: Vec<SqlExpr>,
145✔
44
        stateful: bool,
145✔
45
    ) -> Self {
145✔
46
        Self {
145✔
47
            name,
145✔
48
            select,
145✔
49
            groupby,
145✔
50
            stateful,
145✔
51
        }
145✔
52
    }
145✔
53
}
×
54

×
55
impl ProcessorFactory<SchemaSQLContext> for AggregationProcessorFactory {
×
56
    fn get_input_ports(&self) -> Vec<PortHandle> {
389✔
57
        vec![DEFAULT_PORT_HANDLE]
389✔
58
    }
389✔
59

60
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
519✔
61
        if self.stateful {
519✔
62
            vec![OutputPortDef::new(
×
63
                DEFAULT_PORT_HANDLE,
×
64
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
65
                    retr_old_records_for_deletes: true,
×
66
                    retr_old_records_for_updates: true,
×
67
                },
×
68
            )]
×
69
        } else {
×
70
            vec![OutputPortDef::new(
519✔
71
                DEFAULT_PORT_HANDLE,
519✔
72
                OutputPortType::Stateless,
519✔
73
            )]
519✔
74
        }
×
75
    }
519✔
76

×
77
    fn get_output_schema(
260✔
78
        &self,
260✔
79
        _output_port: &PortHandle,
260✔
80
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
260✔
81
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
260✔
82
        let (input_schema, ctx) = input_schemas
260✔
83
            .get(&DEFAULT_PORT_HANDLE)
260✔
84
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
260✔
85
        let output_field_rules =
260✔
86
            get_aggregation_rules(&self.select, &self.groupby, input_schema).unwrap();
260✔
87

260✔
88
        if is_aggregation(&self.groupby, &output_field_rules) {
260✔
89
            let output_schema = build_output_schema(input_schema, output_field_rules)?;
80✔
90
            return Ok((output_schema, ctx.clone()));
80✔
91
        }
180✔
92
        build_projection_schema(input_schema, ctx, &self.select)
180✔
93
    }
260✔
94

×
95
    fn build(
130✔
96
        &self,
130✔
97
        input_schemas: HashMap<PortHandle, Schema>,
130✔
98
        _output_schemas: HashMap<PortHandle, Schema>,
130✔
99
    ) -> Result<Box<dyn Processor>, ExecutionError> {
130✔
100
        let input_schema = input_schemas
130✔
101
            .get(&DEFAULT_PORT_HANDLE)
130✔
102
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
130✔
103
        let input_schema = extend_schema_source_def(input_schema, &self.name);
130✔
104
        let output_field_rules =
130✔
105
            get_aggregation_rules(&self.select, &self.groupby, &input_schema).unwrap();
130✔
106

130✔
107
        if is_aggregation(&self.groupby, &output_field_rules) {
130✔
108
            return Ok(Box::new(AggregationProcessor::new(
40✔
109
                output_field_rules,
40✔
110
                input_schema,
40✔
111
            )));
40✔
112
        }
90✔
113

90✔
114
        // Build a Projection
90✔
115
        match self
90✔
116
            .select
90✔
117
            .iter()
90✔
118
            .map(|item| parse_sql_select_item(item, &input_schema))
309✔
119
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
90✔
120
        {
×
121
            Ok(expressions) => Ok(Box::new(ProjectionProcessor::new(
90✔
122
                input_schema,
90✔
123
                expressions,
90✔
124
            ))),
90✔
125
            Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
126
        }
×
127
    }
130✔
128

×
129
    fn prepare(
×
130
        &self,
×
131
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
132
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
133
    ) -> Result<(), ExecutionError> {
×
134
        Ok(())
×
135
    }
×
136
}
×
137

×
138
fn is_aggregation(groupby: &[SqlExpr], output_field_rules: &[FieldRule]) -> bool {
390✔
139
    if !groupby.is_empty() {
390✔
140
        return true;
100✔
141
    }
290✔
142

290✔
143
    output_field_rules
290✔
144
        .iter()
290✔
145
        .any(|rule| matches!(rule, FieldRule::Measure(_, _, _)))
947✔
146
}
390✔
147

×
148
pub(crate) fn get_aggregation_rules(
436✔
149
    select: &[SelectItem],
436✔
150
    groupby: &[SqlExpr],
436✔
151
    schema: &Schema,
436✔
152
) -> Result<Vec<FieldRule>, PipelineError> {
436✔
153
    let mut select_rules = select
436✔
154
        .iter()
436✔
155
        .map(|item| parse_sql_aggregate_item(item, schema))
1,241✔
156
        .filter(|e| e.is_ok())
1,241✔
157
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
436✔
158

159
    let mut groupby_rules = groupby
436✔
160
        .iter()
436✔
161
        .map(|expr| parse_sql_groupby_item(expr, schema))
436✔
162
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
436✔
163

164
    select_rules.append(&mut groupby_rules);
436✔
165

436✔
166
    Ok(select_rules)
436✔
167
}
436✔
168

×
169
fn build_field_rule(
1,241✔
170
    sql_expr: &Expr,
1,241✔
171
    schema: &Schema,
1,241✔
172
    name: String,
1,241✔
173
) -> Result<FieldRule, PipelineError> {
1,241✔
174
    let builder = ExpressionBuilder {};
1,241✔
175
    let expression =
1,239✔
176
        builder.parse_sql_expression(&BuilderExpressionType::Aggregation, sql_expr, schema)?;
1,241✔
177

178
    match get_aggregator(expression.0.clone(), schema) {
1,239✔
179
        Ok(aggregator) => Ok(FieldRule::Measure(
166✔
180
            ExpressionBuilder {}
166✔
181
                .parse_sql_expression(&BuilderExpressionType::PreAggregation, sql_expr, schema)?
166✔
182
                .0,
183
            aggregator,
166✔
184
            name,
166✔
185
        )),
×
186
        Err(_) => Ok(FieldRule::Dimension(expression.0, true, name)),
1,073✔
187
    }
×
188
}
1,241✔
189

×
190
fn parse_sql_aggregate_item(
1,241✔
191
    item: &SelectItem,
1,241✔
192
    schema: &Schema,
1,241✔
193
) -> Result<FieldRule, PipelineError> {
1,241✔
194
    match item {
1,241✔
195
        SelectItem::UnnamedExpr(sql_expr) => {
1,186✔
196
            build_field_rule(sql_expr, schema, sql_expr.to_string())
1,186✔
197
        }
×
198
        SelectItem::ExprWithAlias { expr, alias } => {
55✔
199
            build_field_rule(expr, schema, alias.value.clone())
55✔
200
        }
×
201
        SelectItem::Wildcard(_) => Err(PipelineError::InvalidExpression(
×
202
            "Wildcard Operator is not supported".to_string(),
×
203
        )),
×
204
        SelectItem::QualifiedWildcard(..) => Err(PipelineError::InvalidExpression(
×
205
            "Qualified Wildcard Operator is not supported".to_string(),
×
206
        )),
×
207
    }
×
208
}
1,241✔
209

×
210
fn parse_sql_groupby_item(
146✔
211
    sql_expression: &SqlExpr,
146✔
212
    schema: &Schema,
146✔
213
) -> Result<FieldRule, PipelineError> {
146✔
214
    Ok(FieldRule::Dimension(
146✔
215
        ExpressionBuilder {}.build(
146✔
216
            &BuilderExpressionType::FullExpression,
146✔
217
            sql_expression,
146✔
218
            schema,
146✔
219
        )?,
146✔
220
        false,
×
221
        sql_expression.to_string(),
146✔
222
    ))
×
223
}
146✔
224

×
225
fn get_aggregator(
1,239✔
226
    expression: Box<Expression>,
1,239✔
227
    schema: &Schema,
1,239✔
228
) -> Result<Aggregator, PipelineError> {
1,239✔
229
    match *expression {
1,239✔
230
        Expression::AggregateFunction { fun, args } => {
166✔
231
            let arg_type = args[0].get_type(schema);
166✔
232
            match (&fun, arg_type) {
166✔
233
                (AggregateFunctionType::Avg, _) => Ok(Aggregator::Avg),
7✔
234
                (AggregateFunctionType::Count, _) => Ok(Aggregator::Count),
128✔
235
                (AggregateFunctionType::Max, _) => Ok(Aggregator::Max),
11✔
236
                (AggregateFunctionType::Min, _) => Ok(Aggregator::Min),
11✔
237
                (AggregateFunctionType::Sum, _) => Ok(Aggregator::Sum),
9✔
238
                _ => Err(PipelineError::InvalidExpression(format!(
×
239
                    "Not implemented Aggregation function: {fun:?}"
×
240
                ))),
×
241
            }
×
242
        }
×
243
        _ => Err(PipelineError::InvalidExpression(format!(
1,073✔
244
            "Not an Aggregation function: {expression:?}"
1,073✔
245
        ))),
1,073✔
246
    }
×
247
}
1,239✔
248

×
249
fn build_output_schema(
80✔
250
    input_schema: &Schema,
80✔
251
    output_field_rules: Vec<FieldRule>,
80✔
252
) -> Result<Schema, ExecutionError> {
80✔
253
    let mut output_schema = Schema::empty();
80✔
254
    for e in output_field_rules.iter().enumerate() {
214✔
255
        match e.1 {
214✔
256
            FieldRule::Measure(pre_aggr, aggr, name) => {
80✔
257
                let res = pre_aggr
80✔
258
                    .get_type(input_schema)
80✔
259
                    .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
80✔
260

×
261
                output_schema.fields.push(FieldDefinition::new(
80✔
262
                    name.clone(),
80✔
263
                    aggr.get_return_type(res.return_type),
80✔
264
                    res.nullable,
80✔
265
                    res.source,
80✔
266
                ));
80✔
267
            }
×
268

×
269
            FieldRule::Dimension(expression, is_value, name) => {
134✔
270
                if *is_value {
134✔
271
                    let res = expression
67✔
272
                        .get_type(input_schema)
67✔
273
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
67✔
274

×
275
                    output_schema.fields.push(FieldDefinition::new(
67✔
276
                        name.clone(),
67✔
277
                        res.return_type,
67✔
278
                        res.nullable,
67✔
279
                        res.source,
67✔
280
                    ));
67✔
281
                    output_schema.primary_index.push(e.0);
67✔
282
                }
67✔
283
            }
×
284
        }
×
285
    }
×
286
    Ok(output_schema)
80✔
287
}
80✔
288

×
289
fn build_projection_schema(
180✔
290
    input_schema: &Schema,
180✔
291
    context: &SchemaSQLContext,
180✔
292
    select: &[SelectItem],
180✔
293
) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
180✔
294
    match select
180✔
295
        .iter()
180✔
296
        .map(|item| parse_sql_select_item(item, input_schema))
618✔
297
        .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
180✔
298
    {
×
299
        Ok(expressions) => {
180✔
300
            let mut output_schema = input_schema.clone();
180✔
301
            let mut fields = vec![];
180✔
302
            for e in expressions.iter() {
618✔
303
                let field_name = e.0.clone();
618✔
304
                let field_type =
618✔
305
                    e.1.get_type(input_schema)
618✔
306
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
618✔
307

×
308
                fields.push(FieldDefinition::new(
618✔
309
                    field_name,
618✔
310
                    field_type.return_type,
618✔
311
                    field_type.nullable,
618✔
312
                    SourceDefinition::Dynamic,
618✔
313
                ));
618✔
314
            }
×
315
            output_schema.fields = fields;
180✔
316

180✔
317
            Ok((output_schema, context.clone()))
180✔
318
        }
×
319
        Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
320
    }
321
}
180✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc