• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.04
/dozer-sql/src/pipeline/aggregation/avg.rs
1
use crate::deserialize;
2
use crate::pipeline::aggregation::aggregator::AggregationResult;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::errors::PipelineError::InvalidOperandType;
5
use crate::{check_nan_f64, deserialize_u8, to_bytes, try_unwrap};
6
use dozer_core::storage::common::Database;
7
use dozer_core::storage::prefix_transaction::PrefixTransaction;
8
use dozer_types::ordered_float::OrderedFloat;
9
use dozer_types::types::Field::{Decimal, Float};
10
use dozer_types::types::{Field, FieldType};
11
use num_traits::Zero;
12
use std::ops::Div;
13
use std::string::ToString;
14

15
pub struct AvgAggregator {}
16
const AGGREGATOR_NAME: &str = "AVG";
17

18
impl AvgAggregator {
19
    const _AGGREGATOR_ID: u32 = 0x03;
20

21
    pub(crate) fn _get_type() -> u32 {
×
22
        AvgAggregator::_AGGREGATOR_ID
×
23
    }
×
24

×
25
    pub(crate) fn insert(
22✔
26
        _cur_state: Option<&[u8]>,
22✔
27
        new: &Field,
22✔
28
        return_type: FieldType,
22✔
29
        ptx: &mut PrefixTransaction,
22✔
30
        aggregators_db: Database,
22✔
31
    ) -> Result<AggregationResult, PipelineError> {
22✔
32
        match (return_type, new) {
22✔
33
            (FieldType::Decimal, _) => {
×
34
                // Update aggregators_db with new val and its occurrence
35
                let new_val = Field::to_decimal(new).unwrap().serialize();
6✔
36
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
37

6✔
38
                // Calculate average
6✔
39
                let avg = try_unwrap!(Self::calc_decimal_average(ptx, aggregators_db)).serialize();
6✔
40
                Ok(AggregationResult::new(
6✔
41
                    Self::get_value(avg.as_slice(), return_type),
6✔
42
                    Some(Vec::from(avg)),
6✔
43
                ))
6✔
44
            }
45
            (FieldType::Float, _) => {
×
46
                // Update aggregators_db with new val and its occurrence
×
47
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
6✔
48
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
49

6✔
50
                // Calculate average
6✔
51
                let avg = try_unwrap!(Self::calc_f64_average(ptx, aggregators_db)).to_be_bytes();
6✔
52
                Ok(AggregationResult::new(
6✔
53
                    Self::get_value(&avg, return_type),
6✔
54
                    Some(Vec::from(avg)),
6✔
55
                ))
6✔
56
            }
57
            (FieldType::Int, _) => {
×
58
                // Update aggregators_db with new val and its occurrence
×
59
                let new_val = &Field::to_int(new).unwrap();
6✔
60
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
61

6✔
62
                // Calculate average
6✔
63
                let avg = try_unwrap!(Self::calc_i64_average(ptx, aggregators_db)).serialize();
6✔
64
                Ok(AggregationResult::new(
6✔
65
                    Self::get_value(&avg, return_type),
6✔
66
                    Some(Vec::from(avg)),
6✔
67
                ))
6✔
68
            }
69
            (FieldType::UInt, _) => {
×
70
                // Update aggregators_db with new val and its occurrence
×
71
                let new_val = &Field::to_uint(new).unwrap();
4✔
72
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
4✔
73

4✔
74
                // Calculate average
4✔
75
                let avg = try_unwrap!(Self::calc_u64_average(ptx, aggregators_db)).serialize();
4✔
76
                Ok(AggregationResult::new(
4✔
77
                    Self::get_value(&avg, return_type),
4✔
78
                    Some(Vec::from(avg)),
4✔
79
                ))
4✔
80
            }
81
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
82
        }
×
83
    }
22✔
84

×
85
    pub(crate) fn update(
7✔
86
        _cur_state: Option<&[u8]>,
7✔
87
        old: &Field,
7✔
88
        new: &Field,
7✔
89
        return_type: FieldType,
7✔
90
        ptx: &mut PrefixTransaction,
7✔
91
        aggregators_db: Database,
7✔
92
    ) -> Result<AggregationResult, PipelineError> {
7✔
93
        match (return_type, new) {
7✔
94
            (FieldType::Decimal, _) => {
95
                // Update aggregators_db with new val and its occurrence
×
96
                let new_val = &Field::to_decimal(new).unwrap().serialize();
2✔
97
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
98

2✔
99
                // Update aggregators_db with new val and its occurrence
2✔
100
                let old_val = &Field::to_decimal(old).unwrap().serialize();
2✔
101
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
102

2✔
103
                // Calculate average
2✔
104
                let avg = try_unwrap!(Self::calc_decimal_average(ptx, aggregators_db)).serialize();
2✔
105
                Ok(AggregationResult::new(
2✔
106
                    Self::get_value(avg.as_slice(), return_type),
2✔
107
                    Some(Vec::from(avg)),
2✔
108
                ))
2✔
109
            }
×
110
            (FieldType::Float, _) => {
×
111
                // Update aggregators_db with new val and its occurrence
×
112
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
2✔
113
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
114
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
2✔
115
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
116

2✔
117
                // Calculate average
2✔
118
                let avg = try_unwrap!(Self::calc_f64_average(ptx, aggregators_db)).to_be_bytes();
2✔
119
                Ok(AggregationResult::new(
2✔
120
                    Self::get_value(&avg, return_type),
2✔
121
                    Some(Vec::from(avg)),
2✔
122
                ))
2✔
123
            }
×
124
            (FieldType::Int, _) => {
×
125
                // Update aggregators_db with new val and its occurrence
×
126
                let new_val = &Field::to_int(new).unwrap();
2✔
127
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
128
                let old_val = &Field::to_int(old).unwrap();
2✔
129
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
130

2✔
131
                // Calculate average
2✔
132
                let avg = try_unwrap!(Self::calc_i64_average(ptx, aggregators_db)).serialize();
2✔
133
                Ok(AggregationResult::new(
2✔
134
                    Self::get_value(&avg, return_type),
2✔
135
                    Some(Vec::from(avg)),
2✔
136
                ))
2✔
137
            }
×
138
            (FieldType::UInt, _) => {
×
139
                // Update aggregators_db with new val and its occurrence
×
140
                let new_val = &Field::to_uint(new).unwrap();
1✔
141
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
1✔
142
                let old_val = &Field::to_uint(old).unwrap();
1✔
143
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
1✔
144

1✔
145
                // Calculate average
1✔
146
                let avg = try_unwrap!(Self::calc_u64_average(ptx, aggregators_db)).serialize();
1✔
147
                Ok(AggregationResult::new(
1✔
148
                    Self::get_value(&avg, return_type),
1✔
149
                    Some(Vec::from(avg)),
1✔
150
                ))
1✔
151
            }
×
152
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
153
        }
×
154
    }
7✔
155

×
156
    pub(crate) fn delete(
22✔
157
        _cur_state: Option<&[u8]>,
22✔
158
        old: &Field,
22✔
159
        return_type: FieldType,
22✔
160
        ptx: &mut PrefixTransaction,
22✔
161
        aggregators_db: Database,
22✔
162
    ) -> Result<AggregationResult, PipelineError> {
22✔
163
        match (return_type, old) {
22✔
164
            (FieldType::Decimal, _) => {
×
165
                // Update aggregators_db with new val and its occurrence
166
                let old_val = &Field::to_decimal(old).unwrap().serialize();
6✔
167
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
168

6✔
169
                // Calculate average
6✔
170
                let avg = try_unwrap!(Self::calc_decimal_average(ptx, aggregators_db)).serialize();
6✔
171
                Ok(AggregationResult::new(
6✔
172
                    Self::get_value(avg.as_slice(), return_type),
6✔
173
                    Some(Vec::from(avg)),
6✔
174
                ))
6✔
175
            }
176
            (FieldType::Float, _) => {
×
177
                // Update aggregators_db with new val and its occurrence
×
178
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
6✔
179
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
180

6✔
181
                // Calculate average
6✔
182
                let avg = try_unwrap!(Self::calc_f64_average(ptx, aggregators_db)).to_be_bytes();
6✔
183
                Ok(AggregationResult::new(
6✔
184
                    Self::get_value(&avg, return_type),
6✔
185
                    Some(Vec::from(avg)),
6✔
186
                ))
6✔
187
            }
188
            (FieldType::Int, _) => {
×
189
                // Update aggregators_db with new val and its occurrence
×
190
                let old_val = &Field::to_int(old).unwrap();
6✔
191
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
192

6✔
193
                // Calculate average
6✔
194
                let avg = try_unwrap!(Self::calc_i64_average(ptx, aggregators_db)).serialize();
6✔
195
                Ok(AggregationResult::new(
6✔
196
                    Self::get_value(&avg, return_type),
6✔
197
                    Some(Vec::from(avg)),
6✔
198
                ))
6✔
199
            }
200
            (FieldType::UInt, _) => {
×
201
                // Update aggregators_db with new val and its occurrence
×
202
                let old_val = &Field::to_uint(old).unwrap();
4✔
203
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
4✔
204

4✔
205
                // Calculate average
4✔
206
                let avg = try_unwrap!(Self::calc_u64_average(ptx, aggregators_db)).serialize();
4✔
207
                Ok(AggregationResult::new(
4✔
208
                    Self::get_value(&avg, return_type),
4✔
209
                    Some(Vec::from(avg)),
4✔
210
                ))
4✔
211
            }
212
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
213
        }
×
214
    }
22✔
215

×
216
    pub(crate) fn get_value(f: &[u8], from: FieldType) -> Field {
51✔
217
        match from {
51✔
218
            FieldType::Decimal => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
14✔
219
                deserialize!(f),
14✔
220
            )),
14✔
221
            FieldType::Float => Float(OrderedFloat(f64::from_be_bytes(deserialize!(f)))),
14✔
222
            FieldType::Int => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
14✔
223
                deserialize!(f),
14✔
224
            )),
14✔
225
            FieldType::UInt => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
9✔
226
                deserialize!(f),
9✔
227
            )),
9✔
228
            _ => Field::Null,
×
229
        }
×
230
    }
51✔
231

×
232
    fn update_aggregator_db(
58✔
233
        key: &[u8],
58✔
234
        val_delta: u8,
58✔
235
        decr: bool,
58✔
236
        ptx: &mut PrefixTransaction,
58✔
237
        aggregators_db: Database,
58✔
238
    ) {
58✔
239
        let get_prev_count = try_unwrap!(ptx.get(aggregators_db, key));
58✔
240
        let prev_count = deserialize_u8!(get_prev_count);
58✔
241
        let mut new_count = prev_count;
58✔
242
        if decr {
58✔
243
            new_count = new_count.wrapping_sub(val_delta);
29✔
244
        } else {
29✔
245
            new_count = new_count.wrapping_add(val_delta);
29✔
246
        }
29✔
247
        if new_count < 1 {
58✔
248
            try_unwrap!(ptx.del(aggregators_db, key, Option::from(to_bytes!(prev_count))));
22✔
249
        } else {
36✔
250
            try_unwrap!(ptx.put(aggregators_db, key, to_bytes!(new_count)));
36✔
251
        }
36✔
252
    }
58✔
253

×
254
    fn calc_f64_average(
14✔
255
        ptx: &mut PrefixTransaction,
14✔
256
        aggregators_db: Database,
14✔
257
    ) -> Result<f64, PipelineError> {
14✔
258
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
259
        let mut total_count = 0_u8;
14✔
260
        let mut total_sum = 0_f64;
14✔
261
        let mut exist = ptx_cur.first()?;
14✔
262

×
263
        // Loop through aggregators_db to calculate average
264
        while exist {
30✔
265
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
16✔
266
            let val = f64::from_be_bytes(deserialize!(cur.0));
16✔
267
            let get_count = ptx.get(aggregators_db, cur.0);
16✔
268
            if get_count.is_ok() {
16✔
269
                let count = deserialize_u8!(try_unwrap!(get_count));
16✔
270
                total_count += count;
16✔
271
                total_sum += val * f64::from(count);
16✔
272
            }
×
273
            exist = ptx_cur.next()?;
16✔
274
        }
×
275
        Ok(check_nan_f64!(total_sum / f64::from(total_count)))
14✔
276
    }
14✔
277

×
278
    fn calc_decimal_average(
14✔
279
        ptx: &mut PrefixTransaction,
14✔
280
        aggregators_db: Database,
14✔
281
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
14✔
282
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
283
        let mut total_count = 0_u8;
14✔
284
        let mut total_sum = dozer_types::rust_decimal::Decimal::zero();
14✔
285
        let mut exist = ptx_cur.first()?;
14✔
286

×
287
        // Loop through aggregators_db to calculate average
288
        while exist {
30✔
289
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
16✔
290
            let val = dozer_types::rust_decimal::Decimal::deserialize(deserialize!(cur.0));
16✔
291
            let get_count = ptx.get(aggregators_db, cur.0);
16✔
292
            if get_count.is_ok() {
16✔
293
                let count = deserialize_u8!(try_unwrap!(get_count));
16✔
294
                total_count += count;
16✔
295
                total_sum += val * dozer_types::rust_decimal::Decimal::from(count);
16✔
296
            }
×
297
            exist = ptx_cur.next()?;
16✔
298
        }
×
299
        if total_count.is_zero() {
14✔
300
            Ok(dozer_types::rust_decimal::Decimal::zero())
3✔
301
        } else {
×
302
            Ok(total_sum.div(dozer_types::rust_decimal::Decimal::from(total_count)))
11✔
303
        }
×
304
    }
14✔
305

×
306
    fn calc_i64_average(
14✔
307
        ptx: &mut PrefixTransaction,
14✔
308
        aggregators_db: Database,
14✔
309
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
14✔
310
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
311
        let mut total_count = 0_u8;
14✔
312
        let mut total_sum = 0_i64;
14✔
313
        let mut exist = ptx_cur.first()?;
14✔
314

×
315
        // Loop through aggregators_db to calculate average
316
        while exist {
30✔
317
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
16✔
318
            let val = i64::from_be_bytes(deserialize!(cur.0));
16✔
319
            let get_count = ptx.get(aggregators_db, cur.0);
16✔
320
            if get_count.is_ok() {
16✔
321
                let count = deserialize_u8!(try_unwrap!(get_count));
16✔
322
                total_count += count;
16✔
323
                total_sum += val * i64::from(count);
16✔
324
            }
×
325
            exist = ptx_cur.next()?;
16✔
326
        }
×
327
        let total_count_sum = dozer_types::rust_decimal::Decimal::from(total_sum);
14✔
328
        if total_count.is_zero() {
14✔
329
            Ok(dozer_types::rust_decimal::Decimal::zero())
3✔
330
        } else {
×
331
            Ok(total_count_sum.div(dozer_types::rust_decimal::Decimal::from(total_count)))
11✔
332
        }
×
333
    }
14✔
334

×
335
    fn calc_u64_average(
9✔
336
        ptx: &mut PrefixTransaction,
9✔
337
        aggregators_db: Database,
9✔
338
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
9✔
339
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
9✔
340
        let mut total_count = 0_u8;
9✔
341
        let mut total_sum = 0_u64;
9✔
342
        let mut exist = ptx_cur.first()?;
9✔
343

×
344
        // Loop through aggregators_db to calculate average
345
        while exist {
20✔
346
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
347
            let val = u64::from_be_bytes(deserialize!(cur.0));
11✔
348
            let get_count = ptx.get(aggregators_db, cur.0);
11✔
349
            if get_count.is_ok() {
11✔
350
                let count = deserialize_u8!(try_unwrap!(get_count));
11✔
351
                total_count += count;
11✔
352
                total_sum += val * u64::from(count);
11✔
353
            }
×
354
            exist = ptx_cur.next()?;
11✔
355
        }
×
356
        let total_count_sum = dozer_types::rust_decimal::Decimal::from(total_sum);
9✔
357
        if total_count.is_zero() {
9✔
358
            Ok(dozer_types::rust_decimal::Decimal::zero())
2✔
359
        } else {
×
360
            Ok(total_count_sum.div(dozer_types::rust_decimal::Decimal::from(total_count)))
7✔
361
        }
×
362
    }
9✔
363
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc