• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3964452665

pending completion
3964452665

Pull #686

github

GitHub
Merge 2d9a2e5a9 into 56c0cf2b3
Pull Request #686: chore: bump sqlparser to v0.30.0

13 of 13 new or added lines in 3 files covered. (100.0%)

22084 of 32621 relevant lines covered (67.7%)

52706.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.61
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::dag::{
4
    dag::DEFAULT_PORT_HANDLE,
5
    errors::ExecutionError,
6
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
7
};
8
use dozer_types::types::{FieldDefinition, Schema};
9
use sqlparser::ast::{Expr as SqlExpr, Expr, SelectItem};
10

11
use crate::pipeline::{
12
    errors::PipelineError,
13
    expression::{
14
        aggregate::AggregateFunctionType,
15
        builder::{BuilderExpressionType, ExpressionBuilder},
16
        execution::{Expression, ExpressionExecutor},
17
    },
18
    projection::{factory::parse_sql_select_item, processor::ProjectionProcessor},
19
};
20

21
use super::{
22
    aggregator::Aggregator,
23
    processor::{AggregationProcessor, FieldRule},
24
};
25

26
#[derive(Debug)]
×
27
pub struct AggregationProcessorFactory {
28
    select: Vec<SelectItem>,
29
    groupby: Vec<SqlExpr>,
30
}
31

32
impl AggregationProcessorFactory {
33
    /// Creates a new [`AggregationProcessorFactory`].
34
    pub fn new(select: Vec<SelectItem>, groupby: Vec<SqlExpr>) -> Self {
77✔
35
        Self { select, groupby }
77✔
36
    }
77✔
37
}
38

39
impl ProcessorFactory for AggregationProcessorFactory {
40
    fn get_input_ports(&self) -> Vec<PortHandle> {
152✔
41
        vec![DEFAULT_PORT_HANDLE]
152✔
42
    }
152✔
43

44
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
228✔
45
        vec![OutputPortDef::new(
228✔
46
            DEFAULT_PORT_HANDLE,
228✔
47
            OutputPortType::Stateless,
228✔
48
        )]
228✔
49
    }
228✔
50

51
    fn get_output_schema(
77✔
52
        &self,
77✔
53
        _output_port: &PortHandle,
77✔
54
        input_schemas: &HashMap<PortHandle, Schema>,
77✔
55
    ) -> Result<Schema, ExecutionError> {
77✔
56
        let input_schema = input_schemas
77✔
57
            .get(&DEFAULT_PORT_HANDLE)
77✔
58
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
77✔
59
        let output_field_rules =
77✔
60
            get_aggregation_rules(&self.select, &self.groupby, input_schema).unwrap();
77✔
61

77✔
62
        if is_aggregation(&self.groupby, &output_field_rules) {
77✔
63
            return build_output_schema(input_schema, output_field_rules);
20✔
64
        }
57✔
65

57✔
66
        build_projection_schema(input_schema, &self.select)
57✔
67
    }
77✔
68

69
    fn build(
76✔
70
        &self,
76✔
71
        input_schemas: HashMap<PortHandle, Schema>,
76✔
72
        _output_schemas: HashMap<PortHandle, Schema>,
76✔
73
    ) -> Result<Box<dyn Processor>, ExecutionError> {
76✔
74
        let input_schema = input_schemas
76✔
75
            .get(&DEFAULT_PORT_HANDLE)
76✔
76
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
76✔
77
        let output_field_rules =
76✔
78
            get_aggregation_rules(&self.select, &self.groupby, input_schema).unwrap();
76✔
79

76✔
80
        if is_aggregation(&self.groupby, &output_field_rules) {
76✔
81
            return Ok(Box::new(AggregationProcessor::new(
19✔
82
                output_field_rules,
19✔
83
                input_schema.clone(),
19✔
84
            )));
19✔
85
        }
57✔
86

57✔
87
        // Build a Projection
57✔
88
        match self
57✔
89
            .select
57✔
90
            .iter()
57✔
91
            .map(|item| parse_sql_select_item(item, input_schema))
198✔
92
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
57✔
93
        {
94
            Ok(expressions) => Ok(Box::new(ProjectionProcessor::new(
57✔
95
                input_schema.clone(),
57✔
96
                expressions,
57✔
97
            ))),
57✔
98
            Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
99
        }
100
    }
76✔
101

102
    fn prepare(
×
103
        &self,
×
104
        _input_schemas: HashMap<PortHandle, Schema>,
×
105
        _output_schemas: HashMap<PortHandle, Schema>,
×
106
    ) -> Result<(), ExecutionError> {
×
107
        Ok(())
×
108
    }
×
109
}
110

111
fn is_aggregation(groupby: &[SqlExpr], output_field_rules: &[FieldRule]) -> bool {
153✔
112
    if !groupby.is_empty() {
153✔
113
        return true;
25✔
114
    }
128✔
115

128✔
116
    output_field_rules
128✔
117
        .iter()
128✔
118
        .any(|rule| matches!(rule, FieldRule::Measure(_, _, _)))
410✔
119
}
153✔
120

121
pub(crate) fn get_aggregation_rules(
197✔
122
    select: &[SelectItem],
197✔
123
    groupby: &[SqlExpr],
197✔
124
    schema: &Schema,
197✔
125
) -> Result<Vec<FieldRule>, PipelineError> {
197✔
126
    let mut select_rules = select
197✔
127
        .iter()
197✔
128
        .map(|item| parse_sql_aggregate_item(item, schema))
552✔
129
        .filter(|e| e.is_ok())
552✔
130
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
197✔
131

132
    let mut groupby_rules = groupby
198✔
133
        .iter()
197✔
134
        .map(|expr| parse_sql_groupby_item(expr, schema))
197✔
135
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
197✔
136

137
    select_rules.append(&mut groupby_rules);
198✔
138

198✔
139
    Ok(select_rules)
198✔
140
}
198✔
141

142
fn build_field_rule(
552✔
143
    sql_expr: &Expr,
552✔
144
    schema: &Schema,
552✔
145
    name: String,
552✔
146
) -> Result<FieldRule, PipelineError> {
552✔
147
    let builder = ExpressionBuilder {};
552✔
148
    let expression =
552✔
149
        builder.parse_sql_expression(&BuilderExpressionType::Aggregation, sql_expr, schema)?;
552✔
150

151
    match get_aggregator(expression.0.clone(), schema) {
552✔
152
        Ok(aggregator) => Ok(FieldRule::Measure(
84✔
153
            ExpressionBuilder {}
84✔
154
                .parse_sql_expression(&BuilderExpressionType::PreAggregation, sql_expr, schema)?
84✔
155
                .0,
156
            aggregator,
84✔
157
            name,
84✔
158
        )),
159
        Err(_) => Ok(FieldRule::Dimension(expression.0, true, name)),
468✔
160
    }
161
}
552✔
162

163
fn parse_sql_aggregate_item(
552✔
164
    item: &SelectItem,
552✔
165
    schema: &Schema,
552✔
166
) -> Result<FieldRule, PipelineError> {
552✔
167
    match item {
552✔
168
        SelectItem::UnnamedExpr(sql_expr) => {
515✔
169
            build_field_rule(sql_expr, schema, sql_expr.to_string())
515✔
170
        }
171
        SelectItem::ExprWithAlias { expr, alias } => {
37✔
172
            build_field_rule(expr, schema, alias.value.clone())
37✔
173
        }
174
        SelectItem::Wildcard(_) => Err(PipelineError::InvalidExpression(
×
175
            "Wildcard Operator is not supported".to_string(),
×
176
        )),
×
177
        SelectItem::QualifiedWildcard(..) => Err(PipelineError::InvalidExpression(
×
178
            "Qualified Wildcard Operator is not supported".to_string(),
×
179
        )),
×
180
    }
181
}
552✔
182

183
fn parse_sql_groupby_item(
70✔
184
    sql_expression: &SqlExpr,
70✔
185
    schema: &Schema,
70✔
186
) -> Result<FieldRule, PipelineError> {
70✔
187
    Ok(FieldRule::Dimension(
70✔
188
        ExpressionBuilder {}.build(
70✔
189
            &BuilderExpressionType::FullExpression,
70✔
190
            sql_expression,
70✔
191
            schema,
70✔
192
        )?,
70✔
193
        false,
194
        sql_expression.to_string(),
70✔
195
    ))
196
}
70✔
197

198
fn get_aggregator(
552✔
199
    expression: Box<Expression>,
552✔
200
    schema: &Schema,
552✔
201
) -> Result<Aggregator, PipelineError> {
552✔
202
    match *expression {
552✔
203
        Expression::AggregateFunction { fun, args } => {
84✔
204
            let arg_type = args[0].get_type(schema);
84✔
205
            match (&fun, arg_type) {
84✔
206
                (AggregateFunctionType::Avg, _) => Ok(Aggregator::Avg),
7✔
207
                (AggregateFunctionType::Count, _) => Ok(Aggregator::Count),
46✔
208
                (AggregateFunctionType::Max, _) => Ok(Aggregator::Max),
11✔
209
                (AggregateFunctionType::Min, _) => Ok(Aggregator::Min),
11✔
210
                (AggregateFunctionType::Sum, _) => Ok(Aggregator::Sum),
9✔
211
                _ => Err(PipelineError::InvalidExpression(format!(
×
212
                    "Not implemented Aggregation function: {:?}",
×
213
                    fun
×
214
                ))),
×
215
            }
216
        }
217
        _ => Err(PipelineError::InvalidExpression(format!(
468✔
218
            "Not an Aggregation function: {:?}",
468✔
219
            expression
468✔
220
        ))),
468✔
221
    }
222
}
552✔
223

224
fn build_output_schema(
20✔
225
    input_schema: &Schema,
20✔
226
    output_field_rules: Vec<FieldRule>,
20✔
227
) -> Result<Schema, ExecutionError> {
20✔
228
    let mut output_schema = Schema::empty();
20✔
229

230
    for e in output_field_rules.iter().enumerate() {
47✔
231
        match e.1 {
47✔
232
            FieldRule::Measure(pre_aggr, aggr, name) => {
20✔
233
                let res = pre_aggr
20✔
234
                    .get_type(input_schema)
20✔
235
                    .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
20✔
236

237
                output_schema.fields.push(FieldDefinition::new(
20✔
238
                    name.clone(),
20✔
239
                    aggr.get_return_type(res.return_type),
20✔
240
                    res.nullable,
20✔
241
                ));
20✔
242
            }
243

244
            FieldRule::Dimension(expression, is_value, name) => {
27✔
245
                if *is_value {
27✔
246
                    let res = expression
14✔
247
                        .get_type(input_schema)
14✔
248
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
14✔
249

250
                    output_schema.fields.push(FieldDefinition::new(
14✔
251
                        name.clone(),
14✔
252
                        res.return_type,
14✔
253
                        res.nullable,
14✔
254
                    ));
14✔
255
                    output_schema.primary_index.push(e.0);
14✔
256
                }
13✔
257
            }
258
        }
259
    }
260
    Ok(output_schema)
20✔
261
}
20✔
262

263
fn build_projection_schema(
57✔
264
    input_schema: &Schema,
57✔
265
    select: &[SelectItem],
57✔
266
) -> Result<Schema, ExecutionError> {
57✔
267
    match select
57✔
268
        .iter()
57✔
269
        .map(|item| parse_sql_select_item(item, input_schema))
198✔
270
        .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
57✔
271
    {
272
        Ok(expressions) => {
57✔
273
            let mut output_schema = Schema::empty();
57✔
274

275
            for e in expressions.iter() {
198✔
276
                let field_name = e.0.clone();
198✔
277
                let field_type =
198✔
278
                    e.1.get_type(input_schema)
198✔
279
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
198✔
280

281
                output_schema.fields.push(FieldDefinition::new(
198✔
282
                    field_name,
198✔
283
                    field_type.return_type,
198✔
284
                    field_type.nullable,
198✔
285
                ));
198✔
286
            }
287

288
            Ok(output_schema)
57✔
289
        }
290
        Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
291
    }
292
}
57✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc