• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965606986

pending completion
3965606986

push

github

GitHub
chore: bump sqlparser to v0.30.0 (#686)

13 of 13 new or added lines in 3 files covered. (100.0%)

21931 of 32697 relevant lines covered (67.07%)

36311.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.45
/dozer-sql/src/pipeline/aggregation/factory.rs
1
use std::collections::HashMap;
2

3
use dozer_core::dag::{
4
    dag::DEFAULT_PORT_HANDLE,
5
    errors::ExecutionError,
6
    node::{OutputPortDef, OutputPortType, PortHandle, Processor, ProcessorFactory},
7
};
8
use dozer_types::types::{FieldDefinition, Schema};
9
use sqlparser::ast::{Expr as SqlExpr, Expr, SelectItem};
10

11
use crate::pipeline::{
12
    errors::PipelineError,
13
    expression::{
14
        aggregate::AggregateFunctionType,
15
        builder::{BuilderExpressionType, ExpressionBuilder},
16
        execution::{Expression, ExpressionExecutor},
17
    },
18
    projection::{factory::parse_sql_select_item, processor::ProjectionProcessor},
19
};
20

21
use super::{
22
    aggregator::Aggregator,
23
    processor::{AggregationProcessor, FieldRule},
24
};
25

26
#[derive(Debug)]
×
27
pub struct AggregationProcessorFactory {
28
    select: Vec<SelectItem>,
29
    groupby: Vec<SqlExpr>,
30
    stateful: bool,
31
}
32

33
impl AggregationProcessorFactory {
34
    /// Creates a new [`AggregationProcessorFactory`].
×
35
    pub fn new(select: Vec<SelectItem>, groupby: Vec<SqlExpr>, stateful: bool) -> Self {
91✔
36
        Self {
91✔
37
            select,
91✔
38
            groupby,
91✔
39
            stateful,
91✔
40
        }
91✔
41
    }
91✔
42
}
×
43

44
impl ProcessorFactory for AggregationProcessorFactory {
×
45
    fn get_input_ports(&self) -> Vec<PortHandle> {
152✔
46
        vec![DEFAULT_PORT_HANDLE]
152✔
47
    }
152✔
48

×
49
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
228✔
50
        if self.stateful {
228✔
51
            vec![OutputPortDef::new(
×
52
                DEFAULT_PORT_HANDLE,
×
53
                OutputPortType::StatefulWithPrimaryKeyLookup {
×
54
                    retr_old_records_for_deletes: true,
×
55
                    retr_old_records_for_updates: true,
×
56
                },
×
57
            )]
×
58
        } else {
×
59
            vec![OutputPortDef::new(
228✔
60
                DEFAULT_PORT_HANDLE,
228✔
61
                OutputPortType::Stateless,
228✔
62
            )]
228✔
63
        }
×
64
    }
228✔
65

×
66
    fn get_output_schema(
77✔
67
        &self,
77✔
68
        _output_port: &PortHandle,
77✔
69
        input_schemas: &HashMap<PortHandle, Schema>,
77✔
70
    ) -> Result<Schema, ExecutionError> {
77✔
71
        let input_schema = input_schemas
77✔
72
            .get(&DEFAULT_PORT_HANDLE)
77✔
73
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
77✔
74
        let output_field_rules =
77✔
75
            get_aggregation_rules(&self.select, &self.groupby, input_schema).unwrap();
77✔
76

77✔
77
        if is_aggregation(&self.groupby, &output_field_rules) {
77✔
78
            return build_output_schema(input_schema, output_field_rules);
20✔
79
        }
57✔
80

57✔
81
        build_projection_schema(input_schema, &self.select)
57✔
82
    }
77✔
83

×
84
    fn build(
76✔
85
        &self,
76✔
86
        input_schemas: HashMap<PortHandle, Schema>,
76✔
87
        _output_schemas: HashMap<PortHandle, Schema>,
76✔
88
    ) -> Result<Box<dyn Processor>, ExecutionError> {
76✔
89
        let input_schema = input_schemas
76✔
90
            .get(&DEFAULT_PORT_HANDLE)
76✔
91
            .ok_or(ExecutionError::InvalidPortHandle(DEFAULT_PORT_HANDLE))?;
76✔
92
        let output_field_rules =
76✔
93
            get_aggregation_rules(&self.select, &self.groupby, input_schema).unwrap();
76✔
94

76✔
95
        if is_aggregation(&self.groupby, &output_field_rules) {
76✔
96
            return Ok(Box::new(AggregationProcessor::new(
19✔
97
                output_field_rules,
19✔
98
                input_schema.clone(),
19✔
99
            )));
19✔
100
        }
57✔
101

57✔
102
        // Build a Projection
57✔
103
        match self
57✔
104
            .select
57✔
105
            .iter()
57✔
106
            .map(|item| parse_sql_select_item(item, input_schema))
198✔
107
            .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
57✔
108
        {
×
109
            Ok(expressions) => Ok(Box::new(ProjectionProcessor::new(
57✔
110
                input_schema.clone(),
57✔
111
                expressions,
57✔
112
            ))),
57✔
113
            Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
114
        }
×
115
    }
76✔
116

×
117
    fn prepare(
×
118
        &self,
×
119
        _input_schemas: HashMap<PortHandle, Schema>,
×
120
        _output_schemas: HashMap<PortHandle, Schema>,
×
121
    ) -> Result<(), ExecutionError> {
×
122
        Ok(())
×
123
    }
×
124
}
×
125

×
126
fn is_aggregation(groupby: &[SqlExpr], output_field_rules: &[FieldRule]) -> bool {
153✔
127
    if !groupby.is_empty() {
153✔
128
        return true;
25✔
129
    }
128✔
130

128✔
131
    output_field_rules
128✔
132
        .iter()
128✔
133
        .any(|rule| matches!(rule, FieldRule::Measure(_, _, _)))
410✔
134
}
153✔
135

×
136
pub(crate) fn get_aggregation_rules(
198✔
137
    select: &[SelectItem],
198✔
138
    groupby: &[SqlExpr],
198✔
139
    schema: &Schema,
198✔
140
) -> Result<Vec<FieldRule>, PipelineError> {
198✔
141
    let mut select_rules = select
198✔
142
        .iter()
198✔
143
        .map(|item| parse_sql_aggregate_item(item, schema))
552✔
144
        .filter(|e| e.is_ok())
552✔
145
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
198✔
146

×
147
    let mut groupby_rules = groupby
198✔
148
        .iter()
198✔
149
        .map(|expr| parse_sql_groupby_item(expr, schema))
198✔
150
        .collect::<Result<Vec<FieldRule>, PipelineError>>()?;
198✔
151

×
152
    select_rules.append(&mut groupby_rules);
198✔
153

198✔
154
    Ok(select_rules)
198✔
155
}
198✔
156

×
157
fn build_field_rule(
552✔
158
    sql_expr: &Expr,
552✔
159
    schema: &Schema,
552✔
160
    name: String,
552✔
161
) -> Result<FieldRule, PipelineError> {
552✔
162
    let builder = ExpressionBuilder {};
552✔
163
    let expression =
552✔
164
        builder.parse_sql_expression(&BuilderExpressionType::Aggregation, sql_expr, schema)?;
552✔
165

×
166
    match get_aggregator(expression.0.clone(), schema) {
552✔
167
        Ok(aggregator) => Ok(FieldRule::Measure(
84✔
168
            ExpressionBuilder {}
84✔
169
                .parse_sql_expression(&BuilderExpressionType::PreAggregation, sql_expr, schema)?
84✔
170
                .0,
171
            aggregator,
84✔
172
            name,
84✔
173
        )),
174
        Err(_) => Ok(FieldRule::Dimension(expression.0, true, name)),
468✔
175
    }
×
176
}
552✔
177

×
178
fn parse_sql_aggregate_item(
552✔
179
    item: &SelectItem,
552✔
180
    schema: &Schema,
552✔
181
) -> Result<FieldRule, PipelineError> {
552✔
182
    match item {
552✔
183
        SelectItem::UnnamedExpr(sql_expr) => {
515✔
184
            build_field_rule(sql_expr, schema, sql_expr.to_string())
515✔
185
        }
×
186
        SelectItem::ExprWithAlias { expr, alias } => {
37✔
187
            build_field_rule(expr, schema, alias.value.clone())
37✔
188
        }
×
189
        SelectItem::Wildcard(_) => Err(PipelineError::InvalidExpression(
×
190
            "Wildcard Operator is not supported".to_string(),
×
191
        )),
×
192
        SelectItem::QualifiedWildcard(..) => Err(PipelineError::InvalidExpression(
×
193
            "Qualified Wildcard Operator is not supported".to_string(),
×
194
        )),
×
195
    }
196
}
552✔
197

198
fn parse_sql_groupby_item(
70✔
199
    sql_expression: &SqlExpr,
70✔
200
    schema: &Schema,
70✔
201
) -> Result<FieldRule, PipelineError> {
70✔
202
    Ok(FieldRule::Dimension(
70✔
203
        ExpressionBuilder {}.build(
70✔
204
            &BuilderExpressionType::FullExpression,
70✔
205
            sql_expression,
70✔
206
            schema,
70✔
207
        )?,
70✔
208
        false,
×
209
        sql_expression.to_string(),
70✔
210
    ))
×
211
}
70✔
212

×
213
fn get_aggregator(
552✔
214
    expression: Box<Expression>,
552✔
215
    schema: &Schema,
552✔
216
) -> Result<Aggregator, PipelineError> {
552✔
217
    match *expression {
552✔
218
        Expression::AggregateFunction { fun, args } => {
84✔
219
            let arg_type = args[0].get_type(schema);
84✔
220
            match (&fun, arg_type) {
84✔
221
                (AggregateFunctionType::Avg, _) => Ok(Aggregator::Avg),
7✔
222
                (AggregateFunctionType::Count, _) => Ok(Aggregator::Count),
46✔
223
                (AggregateFunctionType::Max, _) => Ok(Aggregator::Max),
11✔
224
                (AggregateFunctionType::Min, _) => Ok(Aggregator::Min),
11✔
225
                (AggregateFunctionType::Sum, _) => Ok(Aggregator::Sum),
9✔
226
                _ => Err(PipelineError::InvalidExpression(format!(
×
227
                    "Not implemented Aggregation function: {:?}",
×
228
                    fun
×
229
                ))),
×
230
            }
×
231
        }
×
232
        _ => Err(PipelineError::InvalidExpression(format!(
468✔
233
            "Not an Aggregation function: {:?}",
468✔
234
            expression
468✔
235
        ))),
468✔
236
    }
237
}
552✔
238

×
239
fn build_output_schema(
20✔
240
    input_schema: &Schema,
20✔
241
    output_field_rules: Vec<FieldRule>,
20✔
242
) -> Result<Schema, ExecutionError> {
20✔
243
    let mut output_schema = Schema::empty();
20✔
244

×
245
    for e in output_field_rules.iter().enumerate() {
47✔
246
        match e.1 {
47✔
247
            FieldRule::Measure(pre_aggr, aggr, name) => {
20✔
248
                let res = pre_aggr
20✔
249
                    .get_type(input_schema)
20✔
250
                    .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
20✔
251

×
252
                output_schema.fields.push(FieldDefinition::new(
20✔
253
                    name.clone(),
20✔
254
                    aggr.get_return_type(res.return_type),
20✔
255
                    res.nullable,
20✔
256
                ));
20✔
257
            }
258

259
            FieldRule::Dimension(expression, is_value, name) => {
27✔
260
                if *is_value {
27✔
261
                    let res = expression
14✔
262
                        .get_type(input_schema)
14✔
263
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
14✔
264

×
265
                    output_schema.fields.push(FieldDefinition::new(
14✔
266
                        name.clone(),
14✔
267
                        res.return_type,
14✔
268
                        res.nullable,
14✔
269
                    ));
14✔
270
                    output_schema.primary_index.push(e.0);
14✔
271
                }
13✔
272
            }
×
273
        }
×
274
    }
275
    Ok(output_schema)
20✔
276
}
20✔
277

×
278
fn build_projection_schema(
57✔
279
    input_schema: &Schema,
57✔
280
    select: &[SelectItem],
57✔
281
) -> Result<Schema, ExecutionError> {
57✔
282
    match select
57✔
283
        .iter()
57✔
284
        .map(|item| parse_sql_select_item(item, input_schema))
198✔
285
        .collect::<Result<Vec<(String, Expression)>, PipelineError>>()
57✔
286
    {
287
        Ok(expressions) => {
57✔
288
            let mut output_schema = Schema::empty();
57✔
289

290
            for e in expressions.iter() {
198✔
291
                let field_name = e.0.clone();
198✔
292
                let field_type =
198✔
293
                    e.1.get_type(input_schema)
198✔
294
                        .map_err(|e| ExecutionError::InternalError(Box::new(e)))?;
198✔
295

296
                output_schema.fields.push(FieldDefinition::new(
198✔
297
                    field_name,
198✔
298
                    field_type.return_type,
198✔
299
                    field_type.nullable,
198✔
300
                ));
198✔
301
            }
302

303
            Ok(output_schema)
57✔
304
        }
305
        Err(error) => Err(ExecutionError::InternalStringError(error.to_string())),
×
306
    }
307
}
57✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc