• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4377467257

pending completion
4377467257

push

github

GitHub
implement `HAVING` (#1198)

395 of 395 new or added lines in 6 files covered. (100.0%)

27638 of 38584 relevant lines covered (71.63%)

27777.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.55
/dozer-sql/src/pipeline/planner/projection.rs
1
#![allow(dead_code)]
2

3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::builder::ExpressionBuilder;
5
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor};
6
use dozer_types::types::{FieldDefinition, Schema};
7
use sqlparser::ast::{Expr, Ident, Select, SelectItem};
8

9
#[derive(Clone, Copy)]
×
10
pub enum PrimaryKeyAction {
11
    Retain,
12
    Drop,
13
    Force,
14
}
15

16
pub struct CommonPlanner {
17
    input_schema: Schema,
18
    pub post_aggregation_schema: Schema,
19
    pub post_projection_schema: Schema,
20
    // Vector of aggregations to be appended to the original record
21
    pub aggregation_output: Vec<Expression>,
22
    pub having: Option<Expression>,
23
    pub groupby: Vec<Expression>,
24
    pub projection_output: Vec<Expression>,
25
}
26

27
impl CommonPlanner {
28
    fn append_to_schema(
2,936✔
29
        expr: &Expression,
2,936✔
30
        alias: Option<String>,
2,936✔
31
        input_schema: &Schema,
2,936✔
32
        output_schema: &mut Schema,
2,936✔
33
    ) -> Result<(), PipelineError> {
2,936✔
34
        let expr_type = expr.get_type(input_schema)?;
2,936✔
35
        output_schema.fields.push(FieldDefinition::new(
2,936✔
36
            alias.unwrap_or_else(|| expr.to_string(input_schema)),
2,937✔
37
            expr_type.return_type,
2,936✔
38
            expr_type.nullable,
2,936✔
39
            expr_type.source,
2,936✔
40
        ));
2,936✔
41

2,936✔
42
        Ok(())
2,936✔
43
    }
2,936✔
44

45
    fn add_select_item(&mut self, item: SelectItem) -> Result<(), PipelineError> {
2,685✔
46
        let expr_items: Vec<(Expr, Option<String>)> = match item {
2,685✔
47
            SelectItem::UnnamedExpr(expr) => vec![(expr, None)],
2,543✔
48
            SelectItem::ExprWithAlias { expr, alias } => vec![(expr, Some(alias.value))],
142✔
49
            SelectItem::QualifiedWildcard(_, _) => panic!("not supported yet"),
×
50
            SelectItem::Wildcard(_) => self
×
51
                .input_schema
×
52
                .fields
×
53
                .iter()
×
54
                .map(|col| (Expr::Identifier(Ident::new(col.to_owned().name)), None))
×
55
                .collect(),
×
56
        };
57

58
        for (expr, alias) in expr_items {
5,368✔
59
            let mut builder = ExpressionBuilder::new(
2,684✔
60
                self.input_schema.fields.len() + self.aggregation_output.len(),
2,684✔
61
            );
2,684✔
62
            let projection_expression = builder.build(true, &expr, &self.input_schema)?;
2,684✔
63

64
            for new_aggr in builder.aggregations {
2,934✔
65
                Self::append_to_schema(
250✔
66
                    &new_aggr,
250✔
67
                    alias.clone(),
250✔
68
                    &self.input_schema,
250✔
69
                    &mut self.post_aggregation_schema,
250✔
70
                )?;
250✔
71
                self.aggregation_output.push(new_aggr);
250✔
72
            }
73

74
            self.projection_output.push(projection_expression.clone());
2,684✔
75
            Self::append_to_schema(
2,684✔
76
                &projection_expression,
2,684✔
77
                alias,
2,684✔
78
                &self.post_aggregation_schema,
2,684✔
79
                &mut self.post_projection_schema,
2,684✔
80
            )?;
2,684✔
81
        }
82

83
        Ok(())
2,684✔
84
    }
2,684✔
85

86
    fn add_join_item(&mut self, item: SelectItem) -> Result<(), PipelineError> {
×
87
        let expr_items: Vec<(Expr, Option<String>)> = match item {
×
88
            SelectItem::UnnamedExpr(expr) => vec![(expr, None)],
×
89
            SelectItem::ExprWithAlias { expr, alias } => vec![(expr, Some(alias.value))],
×
90
            SelectItem::QualifiedWildcard(_, _) => panic!("not supported yet"),
×
91
            SelectItem::Wildcard(_) => panic!("not supported yet"),
×
92
        };
93

94
        for (expr, alias) in expr_items {
×
95
            let mut builder = ExpressionBuilder::new(
×
96
                self.input_schema.fields.len() + self.aggregation_output.len(),
×
97
            );
×
98
            let projection_expression = builder.build(true, &expr, &self.input_schema)?;
×
99

100
            for new_aggr in builder.aggregations {
×
101
                Self::append_to_schema(
×
102
                    &new_aggr,
×
103
                    alias.clone(),
×
104
                    &self.input_schema,
×
105
                    &mut self.post_aggregation_schema,
×
106
                )?;
×
107
                self.aggregation_output.push(new_aggr);
×
108
            }
109

110
            self.projection_output.push(projection_expression.clone());
×
111
            Self::append_to_schema(
×
112
                &projection_expression,
×
113
                alias,
×
114
                &self.post_aggregation_schema,
×
115
                &mut self.post_projection_schema,
×
116
            )?;
×
117
        }
118

119
        Ok(())
×
120
    }
×
121

122
    fn add_having_item(&mut self, expr: Expr) -> Result<(), PipelineError> {
6✔
123
        let mut builder = ExpressionBuilder::from(
6✔
124
            self.input_schema.fields.len(),
6✔
125
            self.aggregation_output.clone(),
6✔
126
        );
6✔
127
        let having_expression = builder.build(true, &expr, &self.input_schema)?;
6✔
128

129
        let mut post_aggregation_schema = self.input_schema.clone();
6✔
130
        let mut aggregation_output = Vec::new();
6✔
131

132
        for new_aggr in builder.aggregations {
13✔
133
            Self::append_to_schema(
7✔
134
                &new_aggr,
7✔
135
                None,
7✔
136
                &self.input_schema,
7✔
137
                &mut post_aggregation_schema,
7✔
138
            )?;
7✔
139
            aggregation_output.push(new_aggr);
7✔
140
        }
141
        self.aggregation_output = aggregation_output;
6✔
142
        self.post_aggregation_schema = post_aggregation_schema;
6✔
143

6✔
144
        self.having = Some(having_expression);
6✔
145

6✔
146
        Ok(())
6✔
147
    }
6✔
148

149
    fn add_groupby_items(&mut self, expr_items: Vec<Expr>) -> Result<(), PipelineError> {
215✔
150
        let mut indexes = vec![];
215✔
151
        let mut set_pk = true;
215✔
152
        for expr in expr_items {
434✔
153
            let mut builder = ExpressionBuilder::new(
218✔
154
                self.input_schema.fields.len() + self.aggregation_output.len(),
218✔
155
            );
218✔
156
            let groupby_expression = builder.build(false, &expr, &self.input_schema)?;
218✔
157
            self.groupby.push(groupby_expression.clone());
218✔
158

×
159
            if let Some(e) = self
218✔
160
                .projection_output
218✔
161
                .iter()
218✔
162
                .enumerate()
218✔
163
                .find(|e| e.1 == &groupby_expression)
224✔
164
            {
217✔
165
                indexes.push(e.0);
217✔
166
            } else {
217✔
167
                set_pk = false
2✔
168
            }
169
        }
×
170

×
171
        if set_pk {
216✔
172
            indexes.sort();
212✔
173
            self.post_projection_schema.primary_index = indexes;
212✔
174
        }
212✔
175

176
        Ok(())
216✔
177
    }
216✔
178

×
179
    pub fn plan(&mut self, select: Select) -> Result<(), PipelineError> {
1,068✔
180
        for expr in select.projection {
3,749✔
181
            self.add_select_item(expr)?;
2,682✔
182
        }
×
183
        if !select.group_by.is_empty() {
1,067✔
184
            self.add_groupby_items(select.group_by)?;
217✔
185
        }
850✔
186

×
187
        if let Some(having) = select.having {
1,067✔
188
            self.add_having_item(having)?;
7✔
189
        }
1,060✔
190

191
        Ok(())
1,066✔
192
    }
1,067✔
193

194
    pub fn new(input_schema: Schema) -> Self {
1,067✔
195
        Self {
1,067✔
196
            input_schema: input_schema.clone(),
1,067✔
197
            post_aggregation_schema: input_schema,
1,067✔
198
            post_projection_schema: Schema::empty(),
1,067✔
199
            aggregation_output: Vec::new(),
1,067✔
200
            having: None,
1,067✔
201
            groupby: Vec::new(),
1,067✔
202
            projection_output: Vec::new(),
1,067✔
203
        }
1,067✔
204
    }
1,067✔
205
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc