• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.0
/dozer-sql/src/pipeline/aggregation/avg.rs
1
use crate::deserialize;
2
use crate::pipeline::aggregation::aggregator::AggregationResult;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::errors::PipelineError::InvalidOperandType;
5
use crate::{check_nan_f64, deserialize_u8, to_bytes, try_unwrap};
6
use dozer_core::storage::common::Database;
7
use dozer_core::storage::prefix_transaction::PrefixTransaction;
8
use dozer_types::ordered_float::OrderedFloat;
9
use dozer_types::types::Field::{Decimal, Float};
10
use dozer_types::types::{Field, FieldType};
11
use num_traits::Zero;
12
use std::ops::Div;
13
use std::string::ToString;
14

15
pub struct AvgAggregator {}
16
const AGGREGATOR_NAME: &str = "AVG";
17

18
impl AvgAggregator {
19
    const _AGGREGATOR_ID: u32 = 0x03;
20

21
    pub(crate) fn get_return_type(from: FieldType) -> FieldType {
×
22
        match from {
×
23
            FieldType::Decimal => FieldType::Decimal,
×
24
            FieldType::Float => FieldType::Float,
×
25
            FieldType::Int => FieldType::Decimal,
×
26
            FieldType::UInt => FieldType::Decimal,
×
27
            _ => from,
×
28
        }
×
29
    }
×
30

×
31
    pub(crate) fn _get_type() -> u32 {
×
32
        AvgAggregator::_AGGREGATOR_ID
×
33
    }
×
34

×
35
    pub(crate) fn insert(
21✔
36
        _cur_state: Option<&[u8]>,
21✔
37
        new: &Field,
21✔
38
        return_type: FieldType,
21✔
39
        ptx: &mut PrefixTransaction,
21✔
40
        aggregators_db: Database,
21✔
41
    ) -> Result<AggregationResult, PipelineError> {
21✔
42
        match (return_type, new) {
21✔
43
            (FieldType::Decimal, _) => {
×
44
                // Update aggregators_db with new val and its occurrence
×
45
                let new_val = Field::to_decimal(new).unwrap().serialize();
6✔
46
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
47

6✔
48
                // Calculate average
6✔
49
                let avg = try_unwrap!(Self::calc_decimal_average(ptx, aggregators_db)).serialize();
6✔
50
                Ok(AggregationResult::new(
6✔
51
                    Self::get_value(avg.as_slice(), return_type),
6✔
52
                    Some(Vec::from(avg)),
6✔
53
                ))
6✔
54
            }
×
55
            (FieldType::Float, _) => {
×
56
                // Update aggregators_db with new val and its occurrence
×
57
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
6✔
58
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
59

6✔
60
                // Calculate average
6✔
61
                let avg = try_unwrap!(Self::calc_f64_average(ptx, aggregators_db)).to_be_bytes();
6✔
62
                Ok(AggregationResult::new(
6✔
63
                    Self::get_value(&avg, return_type),
6✔
64
                    Some(Vec::from(avg)),
6✔
65
                ))
6✔
66
            }
×
67
            (FieldType::Int, _) => {
×
68
                // Update aggregators_db with new val and its occurrence
×
69
                let new_val = &Field::to_int(new).unwrap();
6✔
70
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
71

6✔
72
                // Calculate average
6✔
73
                let avg = try_unwrap!(Self::calc_i64_average(ptx, aggregators_db)).serialize();
6✔
74
                Ok(AggregationResult::new(
6✔
75
                    Self::get_value(&avg, return_type),
6✔
76
                    Some(Vec::from(avg)),
6✔
77
                ))
6✔
78
            }
×
79
            (FieldType::UInt, _) => {
×
80
                // Update aggregators_db with new val and its occurrence
×
81
                let new_val = &Field::to_uint(new).unwrap();
3✔
82
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
3✔
83

3✔
84
                // Calculate average
3✔
85
                let avg = try_unwrap!(Self::calc_u64_average(ptx, aggregators_db)).serialize();
3✔
86
                Ok(AggregationResult::new(
3✔
87
                    Self::get_value(&avg, return_type),
3✔
88
                    Some(Vec::from(avg)),
3✔
89
                ))
3✔
90
            }
×
91
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
92
        }
×
93
    }
21✔
94

×
95
    pub(crate) fn update(
7✔
96
        _cur_state: Option<&[u8]>,
7✔
97
        old: &Field,
7✔
98
        new: &Field,
7✔
99
        return_type: FieldType,
7✔
100
        ptx: &mut PrefixTransaction,
7✔
101
        aggregators_db: Database,
7✔
102
    ) -> Result<AggregationResult, PipelineError> {
7✔
103
        match (return_type, new) {
7✔
104
            (FieldType::Decimal, _) => {
×
105
                // Update aggregators_db with new val and its occurrence
×
106
                let new_val = &Field::to_decimal(new).unwrap().serialize();
2✔
107
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
108

2✔
109
                // Update aggregators_db with new val and its occurrence
2✔
110
                let old_val = &Field::to_decimal(old).unwrap().serialize();
2✔
111
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
112

2✔
113
                // Calculate average
2✔
114
                let avg = try_unwrap!(Self::calc_decimal_average(ptx, aggregators_db)).serialize();
2✔
115
                Ok(AggregationResult::new(
2✔
116
                    Self::get_value(avg.as_slice(), return_type),
2✔
117
                    Some(Vec::from(avg)),
2✔
118
                ))
2✔
119
            }
×
120
            (FieldType::Float, _) => {
×
121
                // Update aggregators_db with new val and its occurrence
×
122
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
2✔
123
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
124
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
2✔
125
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
126

2✔
127
                // Calculate average
2✔
128
                let avg = try_unwrap!(Self::calc_f64_average(ptx, aggregators_db)).to_be_bytes();
2✔
129
                Ok(AggregationResult::new(
2✔
130
                    Self::get_value(&avg, return_type),
2✔
131
                    Some(Vec::from(avg)),
2✔
132
                ))
2✔
133
            }
×
134
            (FieldType::Int, _) => {
×
135
                // Update aggregators_db with new val and its occurrence
×
136
                let new_val = &Field::to_int(new).unwrap();
2✔
137
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
138
                let old_val = &Field::to_int(old).unwrap();
2✔
139
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
140

2✔
141
                // Calculate average
2✔
142
                let avg = try_unwrap!(Self::calc_i64_average(ptx, aggregators_db)).serialize();
2✔
143
                Ok(AggregationResult::new(
2✔
144
                    Self::get_value(&avg, return_type),
2✔
145
                    Some(Vec::from(avg)),
2✔
146
                ))
2✔
147
            }
×
148
            (FieldType::UInt, _) => {
×
149
                // Update aggregators_db with new val and its occurrence
×
150
                let new_val = &Field::to_uint(new).unwrap();
1✔
151
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
1✔
152
                let old_val = &Field::to_uint(old).unwrap();
1✔
153
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
1✔
154

1✔
155
                // Calculate average
1✔
156
                let avg = try_unwrap!(Self::calc_u64_average(ptx, aggregators_db)).serialize();
1✔
157
                Ok(AggregationResult::new(
1✔
158
                    Self::get_value(&avg, return_type),
1✔
159
                    Some(Vec::from(avg)),
1✔
160
                ))
1✔
161
            }
×
162
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
163
        }
×
164
    }
7✔
165

×
166
    pub(crate) fn delete(
22✔
167
        _cur_state: Option<&[u8]>,
22✔
168
        old: &Field,
22✔
169
        return_type: FieldType,
22✔
170
        ptx: &mut PrefixTransaction,
22✔
171
        aggregators_db: Database,
22✔
172
    ) -> Result<AggregationResult, PipelineError> {
22✔
173
        match (return_type, old) {
22✔
174
            (FieldType::Decimal, _) => {
×
175
                // Update aggregators_db with new val and its occurrence
×
176
                let old_val = &Field::to_decimal(old).unwrap().serialize();
6✔
177
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
178

6✔
179
                // Calculate average
6✔
180
                let avg = try_unwrap!(Self::calc_decimal_average(ptx, aggregators_db)).serialize();
6✔
181
                Ok(AggregationResult::new(
6✔
182
                    Self::get_value(avg.as_slice(), return_type),
6✔
183
                    Some(Vec::from(avg)),
6✔
184
                ))
6✔
185
            }
×
186
            (FieldType::Float, _) => {
×
187
                // Update aggregators_db with new val and its occurrence
×
188
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
6✔
189
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
190

6✔
191
                // Calculate average
6✔
192
                let avg = try_unwrap!(Self::calc_f64_average(ptx, aggregators_db)).to_be_bytes();
6✔
193
                Ok(AggregationResult::new(
6✔
194
                    Self::get_value(&avg, return_type),
6✔
195
                    Some(Vec::from(avg)),
6✔
196
                ))
6✔
197
            }
×
198
            (FieldType::Int, _) => {
×
199
                // Update aggregators_db with new val and its occurrence
×
200
                let old_val = &Field::to_int(old).unwrap();
6✔
201
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
202

6✔
203
                // Calculate average
6✔
204
                let avg = try_unwrap!(Self::calc_i64_average(ptx, aggregators_db)).serialize();
6✔
205
                Ok(AggregationResult::new(
6✔
206
                    Self::get_value(&avg, return_type),
6✔
207
                    Some(Vec::from(avg)),
6✔
208
                ))
6✔
209
            }
×
210
            (FieldType::UInt, _) => {
×
211
                // Update aggregators_db with new val and its occurrence
×
212
                let old_val = &Field::to_uint(old).unwrap();
4✔
213
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
4✔
214

4✔
215
                // Calculate average
4✔
216
                let avg = try_unwrap!(Self::calc_u64_average(ptx, aggregators_db)).serialize();
4✔
217
                Ok(AggregationResult::new(
4✔
218
                    Self::get_value(&avg, return_type),
4✔
219
                    Some(Vec::from(avg)),
4✔
220
                ))
4✔
221
            }
×
222
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
223
        }
×
224
    }
22✔
225

×
226
    pub(crate) fn get_value(f: &[u8], from: FieldType) -> Field {
51✔
227
        match from {
51✔
228
            FieldType::Decimal => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
14✔
229
                deserialize!(f),
14✔
230
            )),
14✔
231
            FieldType::Float => Float(OrderedFloat(f64::from_be_bytes(deserialize!(f)))),
14✔
232
            FieldType::Int => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
14✔
233
                deserialize!(f),
14✔
234
            )),
14✔
235
            FieldType::UInt => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
9✔
236
                deserialize!(f),
9✔
237
            )),
9✔
238
            _ => Field::Null,
×
239
        }
×
240
    }
51✔
241

×
242
    fn update_aggregator_db(
58✔
243
        key: &[u8],
58✔
244
        val_delta: u8,
58✔
245
        decr: bool,
58✔
246
        ptx: &mut PrefixTransaction,
58✔
247
        aggregators_db: Database,
58✔
248
    ) {
58✔
249
        let get_prev_count = try_unwrap!(ptx.get(aggregators_db, key));
58✔
250
        let prev_count = deserialize_u8!(get_prev_count);
58✔
251
        let mut new_count = prev_count;
58✔
252
        if decr {
58✔
253
            new_count = new_count.wrapping_sub(val_delta);
29✔
254
        } else {
29✔
255
            new_count = new_count.wrapping_add(val_delta);
29✔
256
        }
29✔
257
        if new_count < 1 {
58✔
258
            try_unwrap!(ptx.del(aggregators_db, key, Option::from(to_bytes!(prev_count))));
22✔
259
        } else {
36✔
260
            try_unwrap!(ptx.put(aggregators_db, key, to_bytes!(new_count)));
36✔
261
        }
36✔
262
    }
58✔
263

×
264
    fn calc_f64_average(
14✔
265
        ptx: &mut PrefixTransaction,
14✔
266
        aggregators_db: Database,
14✔
267
    ) -> Result<f64, PipelineError> {
14✔
268
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
269
        let mut total_count = 0_u8;
14✔
270
        let mut total_sum = 0_f64;
14✔
271
        let mut exist = ptx_cur.first()?;
14✔
272

×
273
        // Loop through aggregators_db to calculate average
×
274
        while exist {
30✔
275
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
16✔
276
            let val = f64::from_be_bytes(deserialize!(cur.0));
16✔
277
            let get_count = ptx.get(aggregators_db, cur.0);
16✔
278
            if get_count.is_ok() {
16✔
279
                let count = deserialize_u8!(try_unwrap!(get_count));
16✔
280
                total_count += count;
16✔
281
                total_sum += val * f64::from(count);
16✔
282
            }
×
283
            exist = ptx_cur.next()?;
16✔
284
        }
×
285
        Ok(check_nan_f64!(total_sum / f64::from(total_count)))
14✔
286
    }
14✔
287

288
    fn calc_decimal_average(
14✔
289
        ptx: &mut PrefixTransaction,
14✔
290
        aggregators_db: Database,
14✔
291
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
14✔
292
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
293
        let mut total_count = 0_u8;
14✔
294
        let mut total_sum = dozer_types::rust_decimal::Decimal::zero();
14✔
295
        let mut exist = ptx_cur.first()?;
14✔
296

×
297
        // Loop through aggregators_db to calculate average
×
298
        while exist {
30✔
299
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
16✔
300
            let val = dozer_types::rust_decimal::Decimal::deserialize(deserialize!(cur.0));
16✔
301
            let get_count = ptx.get(aggregators_db, cur.0);
16✔
302
            if get_count.is_ok() {
16✔
303
                let count = deserialize_u8!(try_unwrap!(get_count));
16✔
304
                total_count += count;
16✔
305
                total_sum += val * dozer_types::rust_decimal::Decimal::from(count);
16✔
306
            }
×
307
            exist = ptx_cur.next()?;
16✔
308
        }
×
309
        if total_count.is_zero() {
14✔
310
            Ok(dozer_types::rust_decimal::Decimal::zero())
3✔
311
        } else {
312
            Ok(total_sum.div(dozer_types::rust_decimal::Decimal::from(total_count)))
11✔
313
        }
×
314
    }
14✔
315

×
316
    fn calc_i64_average(
14✔
317
        ptx: &mut PrefixTransaction,
14✔
318
        aggregators_db: Database,
14✔
319
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
14✔
320
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
321
        let mut total_count = 0_u8;
14✔
322
        let mut total_sum = 0_i64;
14✔
323
        let mut exist = ptx_cur.first()?;
14✔
324

×
325
        // Loop through aggregators_db to calculate average
×
326
        while exist {
30✔
327
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
16✔
328
            let val = i64::from_be_bytes(deserialize!(cur.0));
16✔
329
            let get_count = ptx.get(aggregators_db, cur.0);
16✔
330
            if get_count.is_ok() {
16✔
331
                let count = deserialize_u8!(try_unwrap!(get_count));
16✔
332
                total_count += count;
16✔
333
                total_sum += val * i64::from(count);
16✔
334
            }
×
335
            exist = ptx_cur.next()?;
16✔
336
        }
×
337
        let total_count_sum = dozer_types::rust_decimal::Decimal::from(total_sum);
14✔
338
        if total_count.is_zero() {
14✔
339
            Ok(dozer_types::rust_decimal::Decimal::zero())
3✔
340
        } else {
×
341
            Ok(total_count_sum.div(dozer_types::rust_decimal::Decimal::from(total_count)))
11✔
342
        }
×
343
    }
14✔
344

×
345
    fn calc_u64_average(
9✔
346
        ptx: &mut PrefixTransaction,
9✔
347
        aggregators_db: Database,
9✔
348
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
9✔
349
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
9✔
350
        let mut total_count = 0_u8;
9✔
351
        let mut total_sum = 0_u64;
9✔
352
        let mut exist = ptx_cur.first()?;
9✔
353

×
354
        // Loop through aggregators_db to calculate average
×
355
        while exist {
20✔
356
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
357
            let val = u64::from_be_bytes(deserialize!(cur.0));
11✔
358
            let get_count = ptx.get(aggregators_db, cur.0);
11✔
359
            if get_count.is_ok() {
11✔
360
                let count = deserialize_u8!(try_unwrap!(get_count));
11✔
361
                total_count += count;
11✔
362
                total_sum += val * u64::from(count);
11✔
363
            }
×
364
            exist = ptx_cur.next()?;
11✔
365
        }
×
366
        let total_count_sum = dozer_types::rust_decimal::Decimal::from(total_sum);
9✔
367
        if total_count.is_zero() {
9✔
368
            Ok(dozer_types::rust_decimal::Decimal::zero())
2✔
369
        } else {
×
370
            Ok(total_count_sum.div(dozer_types::rust_decimal::Decimal::from(total_count)))
7✔
371
        }
×
372
    }
9✔
373
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc