• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.65
/dozer-sql/src/pipeline/aggregation/min.rs
1
use crate::pipeline::aggregation::aggregator::AggregationResult;
2
use crate::pipeline::errors::PipelineError;
3
use crate::pipeline::errors::PipelineError::InvalidOperandType;
4
use crate::{deserialize_u8, to_bytes, try_unwrap};
5
use dozer_core::storage::common::Database;
6
use dozer_core::storage::prefix_transaction::PrefixTransaction;
7
use dozer_types::ordered_float::OrderedFloat;
8
use dozer_types::types::Field::{Date, Decimal, Float, Int, Timestamp, UInt};
9
use dozer_types::types::{Field, FieldType, DATE_FORMAT};
10

11
use crate::deserialize;
12
use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, TimeZone, Utc};
13
use std::string::ToString;
14

15
pub struct MinAggregator {}
16
const AGGREGATOR_NAME: &str = "MIN";
17

18
impl MinAggregator {
19
    const _AGGREGATOR_ID: u32 = 0x04;
20

21
    pub(crate) fn get_return_type(from: FieldType) -> FieldType {
×
22
        match from {
×
23
            FieldType::Date => FieldType::Date,
×
24
            FieldType::Decimal => FieldType::Decimal,
×
25
            FieldType::Float => FieldType::Float,
×
26
            FieldType::Int => FieldType::Int,
×
27
            FieldType::UInt => FieldType::UInt,
×
28
            FieldType::Timestamp => FieldType::Timestamp,
×
29
            _ => from,
×
30
        }
×
31
    }
×
32

×
33
    pub(crate) fn _get_type() -> u32 {
×
34
        MinAggregator::_AGGREGATOR_ID
×
35
    }
×
36

37
    pub(crate) fn insert(
34✔
38
        _cur_state: Option<&[u8]>,
34✔
39
        new: &Field,
34✔
40
        return_type: FieldType,
34✔
41
        ptx: &mut PrefixTransaction,
34✔
42
        aggregators_db: Database,
34✔
43
    ) -> Result<AggregationResult, PipelineError> {
34✔
44
        match (return_type, new) {
34✔
45
            (FieldType::Date, _) => {
×
46
                // Update aggregators_db with new val and its occurrence
×
47
                let new_val = &Field::to_date(new).unwrap().to_string();
6✔
48
                Self::update_aggregator_db(new_val.as_bytes(), 1, false, ptx, aggregators_db);
6✔
49

6✔
50
                // Calculate minimum
6✔
51
                let minimum = try_unwrap!(Self::calc_date_min(ptx, aggregators_db));
6✔
52
                let max_date = NaiveDate::MAX;
6✔
53
                if minimum == max_date {
6✔
54
                    Ok(AggregationResult::new(Field::Null, None))
×
55
                } else {
×
56
                    Ok(AggregationResult::new(
6✔
57
                        Self::get_value(minimum.to_string().as_bytes(), return_type),
6✔
58
                        Some(Vec::from(minimum.to_string().as_bytes())),
6✔
59
                    ))
6✔
60
                }
×
61
            }
×
62
            (FieldType::Decimal, _) => {
×
63
                // Update aggregators_db with new val and its occurrence
×
64
                let new_val = &Field::to_decimal(new).unwrap().serialize();
6✔
65
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
66

6✔
67
                // Calculate minimum
6✔
68
                let minimum = try_unwrap!(Self::calc_decimal_min(ptx, aggregators_db));
6✔
69
                if minimum == dozer_types::rust_decimal::Decimal::MAX {
6✔
70
                    Ok(AggregationResult::new(Field::Null, None))
×
71
                } else {
×
72
                    Ok(AggregationResult::new(
6✔
73
                        Self::get_value(minimum.serialize().as_slice(), return_type),
6✔
74
                        Some(Vec::from(minimum.serialize())),
6✔
75
                    ))
6✔
76
                }
×
77
            }
×
78
            (FieldType::Float, _) => {
×
79
                // Update aggregators_db with new val and its occurrence
×
80
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
6✔
81
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
82

6✔
83
                // Calculate average
6✔
84
                let minimum = try_unwrap!(Self::calc_f64_min(ptx, aggregators_db));
6✔
85
                if minimum == f64::MAX {
6✔
86
                    Ok(AggregationResult::new(
×
87
                        Field::Null,
×
88
                        Some(Vec::from(minimum.to_be_bytes())),
×
89
                    ))
×
90
                } else {
×
91
                    Ok(AggregationResult::new(
6✔
92
                        Self::get_value(&minimum.to_be_bytes(), return_type),
6✔
93
                        Some(Vec::from(minimum.to_be_bytes())),
6✔
94
                    ))
6✔
95
                }
×
96
            }
×
97
            (FieldType::Int, _) => {
×
98
                // Update aggregators_db with new val and its occurrence
×
99
                let new_val = &Field::to_int(new).unwrap();
6✔
100
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
101

6✔
102
                // Calculate minimum
6✔
103
                let minimum = try_unwrap!(Self::calc_i64_min(ptx, aggregators_db));
6✔
104
                if minimum == i64::MAX {
6✔
105
                    Ok(AggregationResult::new(
×
106
                        Field::Null,
×
107
                        Some(Vec::from(minimum.to_be_bytes())),
×
108
                    ))
×
109
                } else {
×
110
                    Ok(AggregationResult::new(
6✔
111
                        Self::get_value(&minimum.to_be_bytes(), return_type),
6✔
112
                        Some(Vec::from(minimum.to_be_bytes())),
6✔
113
                    ))
6✔
114
                }
×
115
            }
×
116
            (FieldType::UInt, _) => {
×
117
                // Update aggregators_db with new val and its occurrence
×
118
                let new_val = &Field::to_uint(new).unwrap();
4✔
119
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
4✔
120

4✔
121
                // Calculate minimum
4✔
122
                let minimum = try_unwrap!(Self::calc_u64_min(ptx, aggregators_db));
4✔
123
                if minimum == u64::MAX {
4✔
124
                    Ok(AggregationResult::new(
×
125
                        Field::Null,
×
126
                        Some(Vec::from(minimum.to_be_bytes())),
×
127
                    ))
×
128
                } else {
×
129
                    Ok(AggregationResult::new(
4✔
130
                        Self::get_value(&minimum.to_be_bytes(), return_type),
4✔
131
                        Some(Vec::from(minimum.to_be_bytes())),
4✔
132
                    ))
4✔
133
                }
×
134
            }
×
135
            (FieldType::Timestamp, _) => {
×
136
                // Update aggregators_db with new val and its occurrence
×
137
                let new_val = &Field::to_timestamp(new)
6✔
138
                    .unwrap()
6✔
139
                    .timestamp_millis()
6✔
140
                    .to_be_bytes();
6✔
141
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
142

6✔
143
                // Calculate minimum
6✔
144
                let minimum = try_unwrap!(Self::calc_timestamp_min(ptx, aggregators_db));
6✔
145
                let max_datetime: DateTime<FixedOffset> =
6✔
146
                    DateTime::from(DateTime::<FixedOffset>::MAX_UTC);
6✔
147
                if minimum == max_datetime {
6✔
148
                    Ok(AggregationResult::new(Field::Null, None))
×
149
                } else {
×
150
                    Ok(AggregationResult::new(
6✔
151
                        Self::get_value(
6✔
152
                            minimum.timestamp_millis().to_be_bytes().as_slice(),
6✔
153
                            return_type,
6✔
154
                        ),
6✔
155
                        Some(Vec::from(minimum.timestamp_millis().to_be_bytes())),
6✔
156
                    ))
6✔
157
                }
×
158
            }
×
159
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
160
        }
161
    }
34✔
162

×
163
    pub(crate) fn update(
11✔
164
        _cur_state: Option<&[u8]>,
11✔
165
        old: &Field,
11✔
166
        new: &Field,
11✔
167
        return_type: FieldType,
11✔
168
        ptx: &mut PrefixTransaction,
11✔
169
        aggregators_db: Database,
11✔
170
    ) -> Result<AggregationResult, PipelineError> {
11✔
171
        match (return_type, new) {
11✔
172
            (FieldType::Date, _) => {
×
173
                // Update aggregators_db with new val and its occurrence
×
174
                let new_val = &Field::to_date(new).unwrap().to_string();
2✔
175
                Self::update_aggregator_db(new_val.as_bytes(), 1, false, ptx, aggregators_db);
2✔
176
                let old_val = &Field::to_date(old).unwrap().to_string();
2✔
177
                Self::update_aggregator_db(old_val.as_bytes(), 1, true, ptx, aggregators_db);
2✔
178

2✔
179
                // Calculate minimum
2✔
180
                let minimum = try_unwrap!(Self::calc_date_min(ptx, aggregators_db));
2✔
181
                let max_date = NaiveDate::MAX;
2✔
182
                if minimum == max_date {
2✔
183
                    Ok(AggregationResult::new(Field::Null, None))
×
184
                } else {
×
185
                    Ok(AggregationResult::new(
2✔
186
                        Self::get_value(minimum.to_string().as_bytes(), return_type),
2✔
187
                        Some(Vec::from(minimum.to_string().as_bytes())),
2✔
188
                    ))
2✔
189
                }
×
190
            }
×
191
            (FieldType::Decimal, _) => {
×
192
                // Update aggregators_db with new val and its occurrence
193
                let new_val = &Field::to_decimal(new).unwrap().serialize();
2✔
194
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
195
                let old_val = &Field::to_decimal(old).unwrap().serialize();
2✔
196
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
197

2✔
198
                // Calculate minimum
2✔
199
                let minimum = try_unwrap!(Self::calc_decimal_min(ptx, aggregators_db));
2✔
200
                if minimum == dozer_types::rust_decimal::Decimal::MAX {
2✔
201
                    Ok(AggregationResult::new(Field::Null, None))
×
202
                } else {
×
203
                    Ok(AggregationResult::new(
2✔
204
                        Self::get_value(minimum.serialize().as_slice(), return_type),
2✔
205
                        Some(Vec::from(minimum.serialize())),
2✔
206
                    ))
2✔
207
                }
×
208
            }
×
209
            (FieldType::Float, _) => {
×
210
                // Update aggregators_db with new val and its occurrence
211
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
2✔
212
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
213
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
2✔
214
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
215

2✔
216
                // Calculate minimum
2✔
217
                let minimum = try_unwrap!(Self::calc_f64_min(ptx, aggregators_db));
2✔
218
                if minimum == f64::MAX {
2✔
219
                    Ok(AggregationResult::new(
×
220
                        Field::Null,
×
221
                        Some(Vec::from(minimum.to_be_bytes())),
×
222
                    ))
×
223
                } else {
×
224
                    Ok(AggregationResult::new(
2✔
225
                        Self::get_value(&minimum.to_be_bytes(), return_type),
2✔
226
                        Some(Vec::from(minimum.to_be_bytes())),
2✔
227
                    ))
2✔
228
                }
×
229
            }
×
230
            (FieldType::Int, _) => {
×
231
                // Update aggregators_db with new val and its occurrence
232
                let new_val = &Field::to_int(new).unwrap();
2✔
233
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
234
                let old_val = &Field::to_int(old).unwrap();
2✔
235
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
236

2✔
237
                // Calculate minimum
2✔
238
                let minimum = try_unwrap!(Self::calc_i64_min(ptx, aggregators_db));
2✔
239
                if minimum == i64::MAX {
2✔
240
                    Ok(AggregationResult::new(
×
241
                        Field::Null,
×
242
                        Some(Vec::from(minimum.to_be_bytes())),
×
243
                    ))
×
244
                } else {
×
245
                    Ok(AggregationResult::new(
2✔
246
                        Self::get_value(&minimum.to_be_bytes(), return_type),
2✔
247
                        Some(Vec::from(minimum.to_be_bytes())),
2✔
248
                    ))
2✔
249
                }
×
250
            }
×
251
            (FieldType::UInt, _) => {
×
252
                // Update aggregators_db with new val and its occurrence
253
                let new_val = &Field::to_uint(new).unwrap();
1✔
254
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
1✔
255
                let old_val = &Field::to_uint(old).unwrap();
1✔
256
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
1✔
257

1✔
258
                // Calculate minimum
1✔
259
                let minimum = try_unwrap!(Self::calc_u64_min(ptx, aggregators_db));
1✔
260
                if minimum == u64::MAX {
1✔
261
                    Ok(AggregationResult::new(
×
262
                        Field::Null,
×
263
                        Some(Vec::from(minimum.to_be_bytes())),
×
264
                    ))
×
265
                } else {
×
266
                    Ok(AggregationResult::new(
1✔
267
                        Self::get_value(&minimum.to_be_bytes(), return_type),
1✔
268
                        Some(Vec::from(minimum.to_be_bytes())),
1✔
269
                    ))
1✔
270
                }
×
271
            }
×
272
            (FieldType::Timestamp, _) => {
×
273
                // Update aggregators_db with new val and its occurrence
274
                let new_val = &Field::to_timestamp(new)
2✔
275
                    .unwrap()
2✔
276
                    .timestamp_millis()
2✔
277
                    .to_be_bytes();
2✔
278
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
279
                let old_val = &Field::to_timestamp(old)
2✔
280
                    .unwrap()
2✔
281
                    .timestamp_millis()
2✔
282
                    .to_be_bytes();
2✔
283
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
284

2✔
285
                // Calculate minimum
2✔
286
                let minimum = try_unwrap!(Self::calc_timestamp_min(ptx, aggregators_db));
2✔
287
                let max_datetime: DateTime<FixedOffset> =
2✔
288
                    DateTime::from(DateTime::<FixedOffset>::MAX_UTC);
2✔
289
                if minimum == max_datetime {
2✔
290
                    Ok(AggregationResult::new(Field::Null, None))
×
291
                } else {
×
292
                    Ok(AggregationResult::new(
2✔
293
                        Self::get_value(
2✔
294
                            minimum.timestamp_millis().to_be_bytes().as_slice(),
2✔
295
                            return_type,
2✔
296
                        ),
2✔
297
                        Some(Vec::from(minimum.timestamp_millis().to_be_bytes())),
2✔
298
                    ))
2✔
299
                }
×
300
            }
301
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
302
        }
×
303
    }
11✔
304

×
305
    pub(crate) fn delete(
34✔
306
        _cur_state: Option<&[u8]>,
34✔
307
        old: &Field,
34✔
308
        return_type: FieldType,
34✔
309
        ptx: &mut PrefixTransaction,
34✔
310
        aggregators_db: Database,
34✔
311
    ) -> Result<AggregationResult, PipelineError> {
34✔
312
        match (return_type, old) {
34✔
313
            (FieldType::Date, _) => {
×
314
                // Update aggregators_db with new val and its occurrence
315
                let old_val = &Field::to_date(old).unwrap().to_string();
6✔
316
                Self::update_aggregator_db(old_val.as_bytes(), 1, true, ptx, aggregators_db);
6✔
317

6✔
318
                // Calculate minimum
6✔
319
                let minimum = try_unwrap!(Self::calc_date_min(ptx, aggregators_db));
6✔
320
                let max_date = NaiveDate::MAX;
6✔
321
                if minimum == max_date {
6✔
322
                    Ok(AggregationResult::new(Field::Null, None))
3✔
323
                } else {
×
324
                    Ok(AggregationResult::new(
3✔
325
                        Self::get_value(minimum.to_string().as_bytes(), return_type),
3✔
326
                        Some(Vec::from(minimum.to_string().as_bytes())),
3✔
327
                    ))
3✔
328
                }
×
329
            }
330
            (FieldType::Decimal, _) => {
331
                // Update aggregators_db with new val and its occurrence
332
                let old_val = &Field::to_decimal(old).unwrap().serialize();
6✔
333
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
334

6✔
335
                // Calculate minimum
6✔
336
                let minimum = try_unwrap!(Self::calc_decimal_min(ptx, aggregators_db));
6✔
337
                if minimum == dozer_types::rust_decimal::Decimal::MAX {
6✔
338
                    Ok(AggregationResult::new(Field::Null, None))
2✔
339
                } else {
×
340
                    Ok(AggregationResult::new(
4✔
341
                        Self::get_value(minimum.serialize().as_slice(), return_type),
4✔
342
                        Some(Vec::from(minimum.serialize())),
4✔
343
                    ))
4✔
344
                }
×
345
            }
346
            (FieldType::Float, _) => {
347
                // Update aggregators_db with new val and its occurrence
348
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
6✔
349
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
350

6✔
351
                // Calculate minimum
6✔
352
                let minimum = try_unwrap!(Self::calc_f64_min(ptx, aggregators_db));
6✔
353
                if minimum == f64::MAX {
6✔
354
                    Ok(AggregationResult::new(Field::Null, None))
3✔
355
                } else {
×
356
                    Ok(AggregationResult::new(
3✔
357
                        Self::get_value(&minimum.to_be_bytes(), return_type),
3✔
358
                        Some(Vec::from(minimum.to_be_bytes())),
3✔
359
                    ))
3✔
360
                }
×
361
            }
362
            (FieldType::Int, _) => {
363
                // Update aggregators_db with new val and its occurrence
364
                let old_val = &Field::to_int(old).unwrap();
6✔
365
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
366

6✔
367
                // Calculate minimum
6✔
368
                let minimum = try_unwrap!(Self::calc_i64_min(ptx, aggregators_db));
6✔
369
                if minimum == i64::MAX {
6✔
370
                    Ok(AggregationResult::new(Field::Null, None))
2✔
371
                } else {
×
372
                    Ok(AggregationResult::new(
4✔
373
                        Self::get_value(&minimum.to_be_bytes(), return_type),
4✔
374
                        Some(Vec::from(minimum.to_be_bytes())),
4✔
375
                    ))
4✔
376
                }
×
377
            }
378
            (FieldType::UInt, _) => {
379
                // Update aggregators_db with new val and its occurrence
380
                let old_val = &Field::to_uint(old).unwrap();
4✔
381
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
4✔
382

4✔
383
                // Calculate minimum
4✔
384
                let minimum = try_unwrap!(Self::calc_u64_min(ptx, aggregators_db));
4✔
385
                if minimum == u64::MAX {
4✔
386
                    Ok(AggregationResult::new(Field::Null, None))
1✔
387
                } else {
×
388
                    Ok(AggregationResult::new(
3✔
389
                        Self::get_value(&minimum.to_be_bytes(), return_type),
3✔
390
                        Some(Vec::from(minimum.to_be_bytes())),
3✔
391
                    ))
3✔
392
                }
×
393
            }
394
            (FieldType::Timestamp, _) => {
395
                // Update aggregators_db with new val and its occurrence
396
                let old_val = &Field::to_timestamp(old)
6✔
397
                    .unwrap()
6✔
398
                    .timestamp_millis()
6✔
399
                    .to_be_bytes();
6✔
400
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
401

6✔
402
                // Calculate minimum
6✔
403
                let minimum = try_unwrap!(Self::calc_timestamp_min(ptx, aggregators_db));
6✔
404
                let max_datetime: DateTime<FixedOffset> =
6✔
405
                    DateTime::from(DateTime::<FixedOffset>::MAX_UTC);
6✔
406
                if minimum == max_datetime {
6✔
407
                    Ok(AggregationResult::new(Field::Null, None))
3✔
408
                } else {
409
                    Ok(AggregationResult::new(
3✔
410
                        Self::get_value(
3✔
411
                            minimum.timestamp_millis().to_be_bytes().as_slice(),
3✔
412
                            return_type,
3✔
413
                        ),
3✔
414
                        Some(Vec::from(minimum.timestamp_millis().to_be_bytes())),
3✔
415
                    ))
3✔
416
                }
417
            }
418
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
419
        }
420
    }
34✔
421

422
    pub(crate) fn get_value(f: &[u8], from: FieldType) -> Field {
65✔
423
        match from {
65✔
424
            FieldType::Date => Date(
11✔
425
                NaiveDate::parse_from_str(
11✔
426
                    String::from_utf8(deserialize!(f)).unwrap().as_ref(),
11✔
427
                    DATE_FORMAT,
11✔
428
                )
11✔
429
                .unwrap(),
11✔
430
            ),
11✔
431
            FieldType::Decimal => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
12✔
432
                deserialize!(f),
12✔
433
            )),
12✔
434
            FieldType::Float => Float(OrderedFloat(f64::from_be_bytes(deserialize!(f)))),
11✔
435
            FieldType::Int => Int(i64::from_be_bytes(deserialize!(f))),
12✔
436
            FieldType::UInt => UInt(u64::from_be_bytes(deserialize!(f))),
8✔
437
            FieldType::Timestamp => Timestamp(DateTime::from(
11✔
438
                Utc.timestamp_millis(i64::from_be_bytes(deserialize!(f))),
11✔
439
            )),
11✔
440
            _ => Field::Null,
×
441
        }
442
    }
65✔
443

444
    fn update_aggregator_db(
90✔
445
        key: &[u8],
90✔
446
        val_delta: u8,
90✔
447
        decr: bool,
90✔
448
        ptx: &mut PrefixTransaction,
90✔
449
        aggregators_db: Database,
90✔
450
    ) {
90✔
451
        let get_prev_count = try_unwrap!(ptx.get(aggregators_db, key));
90✔
452
        let prev_count = deserialize_u8!(get_prev_count);
90✔
453
        let mut new_count = prev_count;
90✔
454
        if decr {
90✔
455
            new_count = new_count.wrapping_sub(val_delta);
45✔
456
        } else {
45✔
457
            new_count = new_count.wrapping_add(val_delta);
45✔
458
        }
45✔
459
        if new_count < 1 {
90✔
460
            try_unwrap!(ptx.del(aggregators_db, key, Option::from(to_bytes!(prev_count))));
28✔
461
        } else {
62✔
462
            try_unwrap!(ptx.put(aggregators_db, key, to_bytes!(new_count)));
62✔
463
        }
62✔
464
    }
90✔
465

466
    fn calc_f64_min(
14✔
467
        ptx: &mut PrefixTransaction,
14✔
468
        aggregators_db: Database,
14✔
469
    ) -> Result<f64, PipelineError> {
14✔
470
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
471
        let mut minimum = f64::MAX;
14✔
472

14✔
473
        // get first to get the minimum
14✔
474
        if ptx_cur.first()? {
14✔
475
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
476
            minimum = f64::from_be_bytes(deserialize!(cur.0));
11✔
477
        }
11✔
478
        Ok(minimum)
14✔
479
    }
14✔
480

481
    fn calc_decimal_min(
14✔
482
        ptx: &mut PrefixTransaction,
14✔
483
        aggregators_db: Database,
14✔
484
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
14✔
485
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
486
        let mut minimum = dozer_types::rust_decimal::Decimal::MAX;
14✔
487

14✔
488
        // get first to get the minimum
14✔
489
        if ptx_cur.first()? {
14✔
490
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
12✔
491
            minimum = dozer_types::rust_decimal::Decimal::deserialize(deserialize!(cur.0));
12✔
492
        }
12✔
493
        Ok(minimum)
14✔
494
    }
14✔
495

496
    fn calc_timestamp_min(
14✔
497
        ptx: &mut PrefixTransaction,
14✔
498
        aggregators_db: Database,
14✔
499
    ) -> Result<DateTime<FixedOffset>, PipelineError> {
14✔
500
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
501
        let mut minimum = DateTime::<FixedOffset>::MAX_UTC;
14✔
502

14✔
503
        // get first to get the minimum
14✔
504
        if ptx_cur.first()? {
14✔
505
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
506
            minimum = Utc.timestamp_millis(i64::from_be_bytes(deserialize!(cur.0)));
11✔
507
        }
11✔
508
        Ok(DateTime::from(minimum))
14✔
509
    }
14✔
510

511
    fn calc_date_min(
14✔
512
        ptx: &mut PrefixTransaction,
14✔
513
        aggregators_db: Database,
14✔
514
    ) -> Result<NaiveDate, PipelineError> {
14✔
515
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
516
        let mut minimum = NaiveDate::MAX;
14✔
517

14✔
518
        // get first to get the minimum
14✔
519
        if ptx_cur.first()? {
14✔
520
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
521
            minimum = NaiveDate::parse_from_str(
11✔
522
                String::from_utf8(deserialize!(cur.0)).unwrap().as_ref(),
11✔
523
                DATE_FORMAT,
11✔
524
            )
11✔
525
            .unwrap();
11✔
526
        }
11✔
527
        Ok(minimum)
14✔
528
    }
14✔
529

530
    fn calc_i64_min(
14✔
531
        ptx: &mut PrefixTransaction,
14✔
532
        aggregators_db: Database,
14✔
533
    ) -> Result<i64, PipelineError> {
14✔
534
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
535
        let mut minimum = i64::MAX;
14✔
536

14✔
537
        // get first to get the minimum
14✔
538
        if ptx_cur.first()? {
14✔
539
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
12✔
540
            minimum = i64::from_be_bytes(deserialize!(cur.0));
12✔
541
        }
12✔
542
        Ok(minimum)
14✔
543
    }
14✔
544

545
    fn calc_u64_min(
9✔
546
        ptx: &mut PrefixTransaction,
9✔
547
        aggregators_db: Database,
9✔
548
    ) -> Result<u64, PipelineError> {
9✔
549
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
9✔
550
        let mut minimum = u64::MAX;
9✔
551

9✔
552
        // get first to get the minimum
9✔
553
        if ptx_cur.first()? {
9✔
554
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
8✔
555
            minimum = u64::from_be_bytes(deserialize!(cur.0));
8✔
556
        }
8✔
557
        Ok(minimum)
9✔
558
    }
9✔
559
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc