• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.26
/dozer-sql/src/pipeline/aggregation/aggregator.rs
1
use crate::pipeline::aggregation::avg::AvgAggregator;
2
use crate::pipeline::aggregation::count::CountAggregator;
3
use crate::pipeline::aggregation::max::MaxAggregator;
4
use crate::pipeline::aggregation::min::MinAggregator;
5
use crate::pipeline::aggregation::sum::SumAggregator;
6
use crate::pipeline::errors::PipelineError;
7

8
use crate::pipeline::expression::aggregate::AggregateFunctionType;
9
use crate::pipeline::expression::execution::Expression;
10
use dozer_core::storage::common::Database;
11
use dozer_core::storage::prefix_transaction::PrefixTransaction;
12
use dozer_types::types::{Field, FieldType, Schema};
13
use std::fmt::{Display, Formatter};
×
14

15
#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)]
×
16
pub enum Aggregator {
17
    Avg,
18
    Count,
19
    Max,
20
    Min,
21
    Sum,
22
}
23

×
24
pub fn get_aggregator_from_aggregation_expression(
×
25
    e: &Expression,
×
26
    schema: &Schema,
×
27
) -> Result<(Expression, Aggregator), PipelineError> {
×
28
    match e {
74✔
29
        Expression::AggregateFunction {
×
30
            fun: AggregateFunctionType::Sum,
31
            args,
9✔
32
        } => Ok((
9✔
33
            args.get(0)
9✔
34
                .ok_or_else(|| {
9✔
35
                    PipelineError::NotEnoughArguments(AggregateFunctionType::Sum.to_string())
×
36
                })?
9✔
37
                .clone(),
9✔
38
            Aggregator::Sum,
9✔
39
        )),
40
        Expression::AggregateFunction {
×
41
            fun: AggregateFunctionType::Min,
×
42
            args,
11✔
43
        } => Ok((
11✔
44
            args.get(0)
11✔
45
                .ok_or_else(|| {
11✔
46
                    PipelineError::NotEnoughArguments(AggregateFunctionType::Min.to_string())
×
47
                })?
11✔
48
                .clone(),
11✔
49
            Aggregator::Min,
11✔
50
        )),
×
51
        Expression::AggregateFunction {
×
52
            fun: AggregateFunctionType::Max,
×
53
            args,
11✔
54
        } => Ok((
11✔
55
            args.get(0)
11✔
56
                .ok_or_else(|| {
11✔
57
                    PipelineError::NotEnoughArguments(AggregateFunctionType::Max.to_string())
×
58
                })?
11✔
59
                .clone(),
11✔
60
            Aggregator::Max,
11✔
61
        )),
×
62
        Expression::AggregateFunction {
×
63
            fun: AggregateFunctionType::Avg,
64
            args,
6✔
65
        } => Ok((
6✔
66
            args.get(0)
6✔
67
                .ok_or_else(|| {
6✔
68
                    PipelineError::NotEnoughArguments(AggregateFunctionType::Avg.to_string())
×
69
                })?
6✔
70
                .clone(),
7✔
71
            Aggregator::Avg,
7✔
72
        )),
×
73
        Expression::AggregateFunction {
×
74
            fun: AggregateFunctionType::Count,
×
75
            args: _,
×
76
        } => Ok((Expression::Literal(Field::Int(0)), Aggregator::Count)),
37✔
77
        _ => Err(PipelineError::InvalidFunction(e.to_string(schema))),
×
78
    }
×
79
}
75✔
80

81
impl Display for Aggregator {
×
82
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
83
        match self {
×
84
            Aggregator::Avg => f.write_str("avg"),
×
85
            Aggregator::Count => f.write_str("count"),
×
86
            Aggregator::Max => f.write_str("max"),
×
87
            Aggregator::Min => f.write_str("min"),
×
88
            Aggregator::Sum => f.write_str("sum"),
×
89
        }
×
90
    }
×
91
}
×
92

×
93
pub(crate) struct AggregationResult {
×
94
    pub value: Field,
×
95
    pub state: Option<Vec<u8>>,
×
96
}
×
97

×
98
impl AggregationResult {
99
    pub fn new(value: Field, state: Option<Vec<u8>>) -> Self {
5,984✔
100
        Self { value, state }
5,984✔
101
    }
5,984✔
102
}
×
103

×
104
impl Aggregator {
×
105
    pub(crate) fn _get_type(&self) -> u32 {
×
106
        match &self {
×
107
            Aggregator::Avg => AvgAggregator::_get_type(),
×
108
            Aggregator::Count => CountAggregator::_get_type(),
×
109
            Aggregator::Max => MaxAggregator::_get_type(),
×
110
            Aggregator::Min => MinAggregator::_get_type(),
×
111
            Aggregator::Sum => SumAggregator::_get_type(),
×
112
        }
×
113
    }
×
114

×
115
    pub(crate) fn insert(
5,799✔
116
        &self,
5,799✔
117
        cur_state: Option<&[u8]>,
5,799✔
118
        new: &Field,
5,799✔
119
        return_type: FieldType,
5,799✔
120
        txn: &mut PrefixTransaction,
5,799✔
121
        agg_db: Database,
5,799✔
122
    ) -> Result<AggregationResult, PipelineError> {
5,799✔
123
        match &self {
5,799✔
124
            Aggregator::Avg => AvgAggregator::insert(cur_state, new, return_type, txn, agg_db),
22✔
125
            Aggregator::Count => CountAggregator::insert(cur_state, new, return_type, txn),
5,684✔
126
            Aggregator::Max => MaxAggregator::insert(cur_state, new, return_type, txn, agg_db),
34✔
127
            Aggregator::Min => MinAggregator::insert(cur_state, new, return_type, txn, agg_db),
34✔
128
            Aggregator::Sum => SumAggregator::insert(cur_state, new, return_type, txn),
25✔
129
        }
130
    }
5,799✔
131

132
    pub(crate) fn update(
44✔
133
        &self,
44✔
134
        cur_state: Option<&[u8]>,
44✔
135
        old: &Field,
44✔
136
        new: &Field,
44✔
137
        return_type: FieldType,
44✔
138
        txn: &mut PrefixTransaction,
44✔
139
        agg_db: Database,
44✔
140
    ) -> Result<AggregationResult, PipelineError> {
44✔
141
        match &self {
44✔
142
            Aggregator::Avg => AvgAggregator::update(cur_state, old, new, return_type, txn, agg_db),
7✔
143
            Aggregator::Count => CountAggregator::update(cur_state, old, new, return_type, txn),
8✔
144
            Aggregator::Max => MaxAggregator::update(cur_state, old, new, return_type, txn, agg_db),
11✔
145
            Aggregator::Min => MinAggregator::update(cur_state, old, new, return_type, txn, agg_db),
11✔
146
            Aggregator::Sum => SumAggregator::update(cur_state, old, new, return_type, txn),
7✔
147
        }
148
    }
44✔
149

150
    pub(crate) fn delete(
141✔
151
        &self,
141✔
152
        cur_state: Option<&[u8]>,
141✔
153
        old: &Field,
141✔
154
        return_type: FieldType,
141✔
155
        txn: &mut PrefixTransaction,
141✔
156
        agg_db: Database,
141✔
157
    ) -> Result<AggregationResult, PipelineError> {
141✔
158
        match &self {
141✔
159
            Aggregator::Avg => AvgAggregator::delete(cur_state, old, return_type, txn, agg_db),
22✔
160
            Aggregator::Count => CountAggregator::delete(cur_state, old, return_type, txn),
29✔
161
            Aggregator::Max => MaxAggregator::delete(cur_state, old, return_type, txn, agg_db),
34✔
162
            Aggregator::Min => MinAggregator::delete(cur_state, old, return_type, txn, agg_db),
34✔
163
            Aggregator::Sum => SumAggregator::delete(cur_state, old, return_type, txn),
22✔
164
        }
165
    }
141✔
166
}
167

168
#[macro_export]
169
macro_rules! deserialize {
170
    ($stmt:expr) => {
171
        $stmt.try_into().unwrap()
172
    };
173
}
174

175
#[macro_export]
176
macro_rules! deserialize_f64 {
177
    ($stmt:expr) => {
178
        match $stmt {
179
            Some(v) => f64::from_be_bytes(deserialize!(v)),
180
            None => 0_f64,
181
        }
182
    };
183
}
184

185
#[macro_export]
186
macro_rules! deserialize_i64 {
187
    ($stmt:expr) => {
188
        match $stmt {
189
            Some(v) => i64::from_be_bytes(deserialize!(v)),
190
            None => 0_i64,
191
        }
192
    };
193
}
194

195
#[macro_export]
196
macro_rules! deserialize_u64 {
197
    ($stmt:expr) => {
198
        match $stmt {
199
            Some(v) => u64::from_be_bytes(deserialize!(v)),
200
            None => 0_u64,
201
        }
202
    };
203
}
204

205
#[macro_export]
206
macro_rules! deserialize_decimal {
207
    ($stmt:expr) => {
208
        match $stmt {
209
            Some(v) => Decimal::deserialize(deserialize!(v)),
210
            None => Decimal::from(0),
211
        }
212
    };
213
}
214

215
#[macro_export]
216
macro_rules! deserialize_u8 {
217
    ($stmt:expr) => {
218
        match $stmt {
219
            Some(v) => u8::from_be_bytes(deserialize!(v)),
220
            None => 0_u8,
221
        }
222
    };
223
}
224

225
#[macro_export]
226
macro_rules! check_nan_f64 {
227
    ($stmt:expr) => {
228
        if $stmt.is_nan() {
229
            0_f64
230
        } else {
231
            $stmt
232
        }
233
    };
234
}
235

236
#[macro_export]
237
macro_rules! check_nan_decimal {
238
    ($stmt:expr) => {
239
        if $stmt.is_nan() {
240
            dozer_types::rust_decimal::Decimal::zero()
241
        } else {
242
            $stmt
243
        }
244
    };
245
}
246

247
#[macro_export]
248
macro_rules! try_unwrap {
249
    ($stmt:expr) => {
250
        $stmt.unwrap_or_else(|e| panic!("{}", e.to_string()))
251
    };
252
}
253

254
#[macro_export]
255
macro_rules! to_bytes {
256
    ($stmt:expr) => {
257
        $stmt.to_be_bytes().as_slice()
258
    };
259
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc