• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.3
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::dag::{
4
    dag::DEFAULT_PORT_HANDLE,
5
    errors::ExecutionError,
6
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
7
};
8
use dozer_types::types::{FieldDefinition, Schema, SourceDefinition};
9
use sqlparser::ast::{Expr as SqlExpr, Expr, SelectItem};
10

11
use crate::pipeline::{
12
    builder::SchemaSQLContext,
13
    expression::builder::{extend_schema_source_def, NameOrAlias},
14
};
15
use crate::pipeline::{
16
    errors::PipelineError,
17
    expression::{
18
        aggregate::AggregateFunctionType,
19
        builder::{BuilderExpressionType, ExpressionBuilder},
20
        execution::{Expression, ExpressionExecutor},
21
    },
22
    projection::{factory::parse_sql_select_item, processor::ProjectionProcessor},
23
};
24

25
use super::{
26
    aggregator::Aggregator,
27
    processor::{AggregationProcessor, FieldRule},
×
28
};
29

30
#[derive(Debug)]
×
31
pub struct AggregationProcessorFactory {
32
    name: NameOrAlias,
33
    select: Vec<SelectItem>,
34
    groupby: Vec<SqlExpr>,
35
    stateful: bool,
36
}
×
37

×
38
impl AggregationProcessorFactory {
×
39
    /// Creates a new [`AggregationProcessorFactory`].
×
40
    pub fn new(
145✔
41
        name: NameOrAlias,
145✔
42
        select: Vec<SelectItem>,
145✔
43
        groupby: Vec<SqlExpr>,
145✔
44
        stateful: bool,
145✔
45
    ) -> Self {
145✔
46
        Self {
145✔
47
            name,
145✔
48
            select,
145✔
49
            groupby,
145✔
50
            stateful,
145✔
51
        }
145✔
52
    }
145✔
53
}
×
54

×
55
impl ProcessorFactory<SchemaSQLContext> for AggregationProcessorFactory {
×
56
    fn get_input_ports(&self) -> Vec<PortHandle> {
389✔
57
        vec![DEFAULT_PORT_HANDLE]
389✔
58
    }
389✔
59

60
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
519✔
61
        if self.stateful {
519✔
62
            vec![OutputPortDef::new(
×
63
                DEFAULT_PORT_HANDLE,
×
64
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
65
                    retr_old_records_for_deletes: true,
×
66
                    retr_old_records_for_updates: true,
×
67
                },
×
68
            )]
×
69
        } else {
70
            vec![OutputPortDef::new(
519✔
71
                DEFAULT_PORT_HANDLE,
519✔
72
                OutputPortType::Stateless,
519✔
73
            )]
519✔
74
        }
75
    }
519✔
76

77
    fn get_output_schema(
260✔
78
        &self,
260✔
79
        _output_port: &PortHandle,
260✔
80
        input_schemas: &HashMap<PortHandle, (Schema, SchemaSQLContext)>,
260✔
81
    ) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
260✔
82
        let (input_schema, ctx) = input_schemas
260✔
83
            .get(&DEFAULT_PORT_HANDLE)
260✔
84
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
260✔
85
        let output_field_rules =
260✔
86
            get_aggregation_rules(&self.select, &self.groupby, input_schema).unwrap();
260✔
87

260✔
88
        if is_aggregation(&self.groupby, &output_field_rules) {
260✔
89
            let output_schema = build_output_schema(input_schema, output_field_rules)?;
80✔
90
            return Ok((output_schema, ctx.clone()));
80✔
91
        }
180✔
92
        build_projection_schema(input_schema, ctx, &self.select)
180✔
93
    }
260✔
94

×
95
    fn build(
130✔
96
        &self,
130✔
97
        input_schemas: HashMap<PortHandle, Schema>,
130✔
98
        _output_schemas: HashMap<PortHandle, Schema>,
130✔
99
    ) -> Result<Box<dyn Processor>, ExecutionError> {
130✔
100
        let input_schema = input_schemas
130✔
101
            .get(&DEFAULT_PORT_HANDLE)
130✔
102
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
130✔
103
        let input_schema = extend_schema_source_def(input_schema, &self.name);
130✔
104
        let output_field_rules =
130✔
105
            get_aggregation_rules(&self.select, &self.groupby, &input_schema).unwrap();
130✔
106

130✔
107
        if is_aggregation(&self.groupby, &output_field_rules) {
130✔
108
            return Ok(Box::new(AggregationProcessor::new(
40✔
109
                output_field_rules,
40✔
110
                input_schema,
40✔
111
            )));
40✔
112
        }
90✔
113

90✔
114
        // Build a Projection
90✔
115
        match self
90✔
116
            .select
90✔
117
            .iter()
90✔
118
            .map(|item| parse_sql_select_item(item, &input_schema))
309✔
119
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
90✔
120
        {
×
121
            Ok(expressions) => Ok(Box::new(ProjectionProcessor::new(
90✔
122
                input_schema,
90✔
123
                expressions,
90✔
124
            ))),
90✔
125
            Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
126
        }
×
127
    }
130✔
128

×
129
    fn prepare(
×
130
        &self,
×
131
        _input_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
132
        _output_schemas: HashMap<PortHandle, (Schema, SchemaSQLContext)>,
×
133
    ) -> Result<(), ExecutionError> {
×
134
        Ok(())
×
135
    }
×
136
}
137

138
fn is_aggregation(groupby: &[SqlExpr], output_field_rules: &[FieldRule]) -> bool {
390✔
139
    if !groupby.is_empty() {
390✔
140
        return true;
100✔
141
    }
290✔
142

290✔
143
    output_field_rules
290✔
144
        .iter()
290✔
145
        .any(|rule| matches!(rule, FieldRule::Measure(_, _, _)))
947✔
146
}
390✔
147

×
148
pub(crate) fn get_aggregation_rules(
435✔
149
    select: &[SelectItem],
435✔
150
    groupby: &[SqlExpr],
435✔
151
    schema: &Schema,
435✔
152
) -> Result<Vec<FieldRule>, PipelineError> {
435✔
153
    let mut select_rules = select
435✔
154
        .iter()
435✔
155
        .map(|item| parse_sql_aggregate_item(item, schema))
1,241✔
156
        .filter(|e| e.is_ok())
1,241✔
157
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
435✔
158

159
    let mut groupby_rules = groupby
436✔
160
        .iter()
435✔
161
        .map(|expr| parse_sql_groupby_item(expr, schema))
435✔
162
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
435✔
163

164
    select_rules.append(&mut groupby_rules);
436✔
165

436✔
166
    Ok(select_rules)
436✔
167
}
436✔
168

169
fn build_field_rule(
1,241✔
170
    sql_expr: &Expr,
1,241✔
171
    schema: &Schema,
1,241✔
172
    name: String,
1,241✔
173
) -> Result<FieldRule, PipelineError> {
1,241✔
174
    let builder = ExpressionBuilder {};
1,241✔
175
    let expression =
1,239✔
176
        builder.parse_sql_expression(&BuilderExpressionType::Aggregation, sql_expr, schema)?;
1,241✔
177

178
    match get_aggregator(expression.0.clone(), schema) {
1,239✔
179
        Ok(aggregator) => Ok(FieldRule::Measure(
166✔
180
            ExpressionBuilder {}
166✔
181
                .parse_sql_expression(&BuilderExpressionType::PreAggregation, sql_expr, schema)?
166✔
182
                .0,
183
            aggregator,
166✔
184
            name,
166✔
185
        )),
186
        Err(_) => Ok(FieldRule::Dimension(expression.0, true, name)),
1,073✔
187
    }
188
}
1,241✔
189

190
fn parse_sql_aggregate_item(
1,241✔
191
    item: &SelectItem,
1,241✔
192
    schema: &Schema,
1,241✔
193
) -> Result<FieldRule, PipelineError> {
1,241✔
194
    match item {
1,241✔
195
        SelectItem::UnnamedExpr(sql_expr) => {
1,186✔
196
            build_field_rule(sql_expr, schema, sql_expr.to_string())
1,186✔
197
        }
198
        SelectItem::ExprWithAlias { expr, alias } => {
55✔
199
            build_field_rule(expr, schema, alias.value.clone())
55✔
200
        }
201
        SelectItem::Wildcard(_) => Err(PipelineError::InvalidExpression(
×
202
            "Wildcard Operator is not supported".to_string(),
×
203
        )),
×
204
        SelectItem::QualifiedWildcard(..) => Err(PipelineError::InvalidExpression(
×
205
            "Qualified Wildcard Operator is not supported".to_string(),
×
206
        )),
×
207
    }
×
208
}
1,241✔
209

×
210
fn parse_sql_groupby_item(
145✔
211
    sql_expression: &SqlExpr,
145✔
212
    schema: &Schema,
145✔
213
) -> Result<FieldRule, PipelineError> {
145✔
214
    Ok(FieldRule::Dimension(
145✔
215
        ExpressionBuilder {}.build(
145✔
216
            &BuilderExpressionType::FullExpression,
145✔
217
            sql_expression,
145✔
218
            schema,
145✔
219
        )?,
145✔
220
        false,
×
221
        sql_expression.to_string(),
145✔
222
    ))
×
223
}
145✔
224

×
225
fn get_aggregator(
1,239✔
226
    expression: Box<Expression>,
1,239✔
227
    schema: &Schema,
1,239✔
228
) -> Result<Aggregator, PipelineError> {
1,239✔
229
    match *expression {
1,239✔
230
        Expression::AggregateFunction { fun, args } => {
166✔
231
            let arg_type = args[0].get_type(schema);
166✔
232
            match (&fun, arg_type) {
166✔
233
                (AggregateFunctionType::Avg, _) => Ok(Aggregator::Avg),
7✔
234
                (AggregateFunctionType::Count, _) => Ok(Aggregator::Count),
128✔
235
                (AggregateFunctionType::Max, _) => Ok(Aggregator::Max),
11✔
236
                (AggregateFunctionType::Min, _) => Ok(Aggregator::Min),
11✔
237
                (AggregateFunctionType::Sum, _) => Ok(Aggregator::Sum),
9✔
238
                _ => Err(PipelineError::InvalidExpression(format!(
×
239
                    "Not implemented Aggregation function: {:?}",
×
240
                    fun
×
241
                ))),
×
242
            }
×
243
        }
244
        _ => Err(PipelineError::InvalidExpression(format!(
1,073✔
245
            "Not an Aggregation function: {:?}",
1,073✔
246
            expression
1,073✔
247
        ))),
1,073✔
248
    }
249
}
1,239✔
250

251
fn build_output_schema(
80✔
252
    input_schema: &Schema,
80✔
253
    output_field_rules: Vec<FieldRule>,
80✔
254
) -> Result<Schema, ExecutionError> {
80✔
255
    let mut output_schema = Schema::empty();
80✔
256
    for e in output_field_rules.iter().enumerate() {
214✔
257
        match e.1 {
214✔
258
            FieldRule::Measure(pre_aggr, aggr, name) => {
80✔
259
                let res = pre_aggr
80✔
260
                    .get_type(input_schema)
80✔
261
                    .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
80✔
262

263
                output_schema.fields.push(FieldDefinition::new(
80✔
264
                    name.clone(),
80✔
265
                    aggr.get_return_type(res.return_type),
80✔
266
                    res.nullable,
80✔
267
                    res.source,
80✔
268
                ));
80✔
269
            }
270

×
271
            FieldRule::Dimension(expression, is_value, name) => {
134✔
272
                if *is_value {
134✔
273
                    let res = expression
67✔
274
                        .get_type(input_schema)
67✔
275
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
67✔
276

277
                    output_schema.fields.push(FieldDefinition::new(
67✔
278
                        name.clone(),
67✔
279
                        res.return_type,
67✔
280
                        res.nullable,
67✔
281
                        res.source,
67✔
282
                    ));
67✔
283
                    output_schema.primary_index.push(e.0);
67✔
284
                }
67✔
285
            }
×
286
        }
×
287
    }
×
288
    Ok(output_schema)
80✔
289
}
80✔
290

×
291
fn build_projection_schema(
180✔
292
    input_schema: &Schema,
180✔
293
    context: &SchemaSQLContext,
180✔
294
    select: &[SelectItem],
180✔
295
) -> Result<(Schema, SchemaSQLContext), ExecutionError> {
180✔
296
    match select
180✔
297
        .iter()
180✔
298
        .map(|item| parse_sql_select_item(item, input_schema))
618✔
299
        .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
180✔
300
    {
×
301
        Ok(expressions) => {
180✔
302
            let mut output_schema = input_schema.clone();
180✔
303
            let mut fields = vec![];
180✔
304
            for e in expressions.iter() {
618✔
305
                let field_name = e.0.clone();
618✔
306
                let field_type =
618✔
307
                    e.1.get_type(input_schema)
618✔
308
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
618✔
309

310
                fields.push(FieldDefinition::new(
618✔
311
                    field_name,
618✔
312
                    field_type.return_type,
618✔
313
                    field_type.nullable,
618✔
314
                    SourceDefinition::Dynamic,
618✔
315
                ));
618✔
316
            }
317
            output_schema.fields = fields;
180✔
318

180✔
319
            Ok((output_schema, context.clone()))
180✔
320
        }
321
        Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
322
    }
323
}
180✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc