• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.32
/dozer-sql/src/pipeline/aggregation/min.rs
1
use crate::pipeline::aggregation::aggregator::AggregationResult;
2
use crate::pipeline::errors::PipelineError;
3
use crate::pipeline::errors::PipelineError::InvalidOperandType;
4
use crate::{deserialize_u8, to_bytes, try_unwrap};
5
use dozer_core::storage::common::Database;
6
use dozer_core::storage::prefix_transaction::PrefixTransaction;
7
use dozer_types::ordered_float::OrderedFloat;
8
use dozer_types::types::Field::{Date, Decimal, Float, Int, Timestamp, UInt};
9
use dozer_types::types::{Field, FieldType, DATE_FORMAT};
10

11
use crate::deserialize;
12
use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, TimeZone, Utc};
13
use std::string::ToString;
14

15
pub struct MinAggregator {}
16
const AGGREGATOR_NAME: &str = "MIN";
17

18
impl MinAggregator {
19
    const _AGGREGATOR_ID: u32 = 0x04;
20

21
    pub(crate) fn _get_type() -> u32 {
×
22
        MinAggregator::_AGGREGATOR_ID
×
23
    }
×
24

×
25
    pub(crate) fn insert(
34✔
26
        _cur_state: Option<&[u8]>,
34✔
27
        new: &Field,
34✔
28
        return_type: FieldType,
34✔
29
        ptx: &mut PrefixTransaction,
34✔
30
        aggregators_db: Database,
34✔
31
    ) -> Result<AggregationResult, PipelineError> {
34✔
32
        match (return_type, new) {
34✔
33
            (FieldType::Date, _) => {
×
34
                // Update aggregators_db with new val and its occurrence
×
35
                let new_val = &Field::to_date(new).unwrap().to_string();
6✔
36
                Self::update_aggregator_db(new_val.as_bytes(), 1, false, ptx, aggregators_db);
6✔
37

6✔
38
                // Calculate minimum
6✔
39
                let minimum = try_unwrap!(Self::calc_date_min(ptx, aggregators_db));
6✔
40
                let max_date = NaiveDate::MAX;
6✔
41
                if minimum == max_date {
6✔
42
                    Ok(AggregationResult::new(Field::Null, None))
×
43
                } else {
×
44
                    Ok(AggregationResult::new(
6✔
45
                        Self::get_value(minimum.to_string().as_bytes(), return_type),
6✔
46
                        Some(Vec::from(minimum.to_string().as_bytes())),
6✔
47
                    ))
6✔
48
                }
×
49
            }
×
50
            (FieldType::Decimal, _) => {
×
51
                // Update aggregators_db with new val and its occurrence
×
52
                let new_val = &Field::to_decimal(new).unwrap().serialize();
6✔
53
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
54

6✔
55
                // Calculate minimum
6✔
56
                let minimum = try_unwrap!(Self::calc_decimal_min(ptx, aggregators_db));
6✔
57
                if minimum == dozer_types::rust_decimal::Decimal::MAX {
6✔
58
                    Ok(AggregationResult::new(Field::Null, None))
×
59
                } else {
×
60
                    Ok(AggregationResult::new(
6✔
61
                        Self::get_value(minimum.serialize().as_slice(), return_type),
6✔
62
                        Some(Vec::from(minimum.serialize())),
6✔
63
                    ))
6✔
64
                }
×
65
            }
×
66
            (FieldType::Float, _) => {
×
67
                // Update aggregators_db with new val and its occurrence
×
68
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
6✔
69
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
70

6✔
71
                // Calculate average
6✔
72
                let minimum = try_unwrap!(Self::calc_f64_min(ptx, aggregators_db));
6✔
73
                if minimum == f64::MAX {
6✔
74
                    Ok(AggregationResult::new(
×
75
                        Field::Null,
×
76
                        Some(Vec::from(minimum.to_be_bytes())),
×
77
                    ))
×
78
                } else {
79
                    Ok(AggregationResult::new(
6✔
80
                        Self::get_value(&minimum.to_be_bytes(), return_type),
6✔
81
                        Some(Vec::from(minimum.to_be_bytes())),
6✔
82
                    ))
6✔
83
                }
×
84
            }
×
85
            (FieldType::Int, _) => {
×
86
                // Update aggregators_db with new val and its occurrence
×
87
                let new_val = &Field::to_int(new).unwrap();
6✔
88
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
89

6✔
90
                // Calculate minimum
6✔
91
                let minimum = try_unwrap!(Self::calc_i64_min(ptx, aggregators_db));
6✔
92
                if minimum == i64::MAX {
6✔
93
                    Ok(AggregationResult::new(
×
94
                        Field::Null,
×
95
                        Some(Vec::from(minimum.to_be_bytes())),
×
96
                    ))
×
97
                } else {
98
                    Ok(AggregationResult::new(
6✔
99
                        Self::get_value(&minimum.to_be_bytes(), return_type),
6✔
100
                        Some(Vec::from(minimum.to_be_bytes())),
6✔
101
                    ))
6✔
102
                }
×
103
            }
×
104
            (FieldType::UInt, _) => {
×
105
                // Update aggregators_db with new val and its occurrence
×
106
                let new_val = &Field::to_uint(new).unwrap();
4✔
107
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
4✔
108

4✔
109
                // Calculate minimum
4✔
110
                let minimum = try_unwrap!(Self::calc_u64_min(ptx, aggregators_db));
4✔
111
                if minimum == u64::MAX {
4✔
112
                    Ok(AggregationResult::new(
×
113
                        Field::Null,
×
114
                        Some(Vec::from(minimum.to_be_bytes())),
×
115
                    ))
×
116
                } else {
117
                    Ok(AggregationResult::new(
4✔
118
                        Self::get_value(&minimum.to_be_bytes(), return_type),
4✔
119
                        Some(Vec::from(minimum.to_be_bytes())),
4✔
120
                    ))
4✔
121
                }
×
122
            }
×
123
            (FieldType::Timestamp, _) => {
×
124
                // Update aggregators_db with new val and its occurrence
×
125
                let new_val = &Field::to_timestamp(new)
6✔
126
                    .unwrap()
6✔
127
                    .timestamp_millis()
6✔
128
                    .to_be_bytes();
6✔
129
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
130

6✔
131
                // Calculate minimum
6✔
132
                let minimum = try_unwrap!(Self::calc_timestamp_min(ptx, aggregators_db));
6✔
133
                let max_datetime: DateTime<FixedOffset> =
6✔
134
                    DateTime::from(DateTime::<FixedOffset>::MAX_UTC);
6✔
135
                if minimum == max_datetime {
6✔
136
                    Ok(AggregationResult::new(Field::Null, None))
×
137
                } else {
×
138
                    Ok(AggregationResult::new(
6✔
139
                        Self::get_value(
6✔
140
                            minimum.timestamp_millis().to_be_bytes().as_slice(),
6✔
141
                            return_type,
6✔
142
                        ),
6✔
143
                        Some(Vec::from(minimum.timestamp_millis().to_be_bytes())),
6✔
144
                    ))
6✔
145
                }
×
146
            }
×
147
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
148
        }
×
149
    }
34✔
150

×
151
    pub(crate) fn update(
11✔
152
        _cur_state: Option<&[u8]>,
11✔
153
        old: &Field,
11✔
154
        new: &Field,
11✔
155
        return_type: FieldType,
11✔
156
        ptx: &mut PrefixTransaction,
11✔
157
        aggregators_db: Database,
11✔
158
    ) -> Result<AggregationResult, PipelineError> {
11✔
159
        match (return_type, new) {
11✔
160
            (FieldType::Date, _) => {
161
                // Update aggregators_db with new val and its occurrence
×
162
                let new_val = &Field::to_date(new).unwrap().to_string();
2✔
163
                Self::update_aggregator_db(new_val.as_bytes(), 1, false, ptx, aggregators_db);
2✔
164
                let old_val = &Field::to_date(old).unwrap().to_string();
2✔
165
                Self::update_aggregator_db(old_val.as_bytes(), 1, true, ptx, aggregators_db);
2✔
166

2✔
167
                // Calculate minimum
2✔
168
                let minimum = try_unwrap!(Self::calc_date_min(ptx, aggregators_db));
2✔
169
                let max_date = NaiveDate::MAX;
2✔
170
                if minimum == max_date {
2✔
171
                    Ok(AggregationResult::new(Field::Null, None))
×
172
                } else {
173
                    Ok(AggregationResult::new(
2✔
174
                        Self::get_value(minimum.to_string().as_bytes(), return_type),
2✔
175
                        Some(Vec::from(minimum.to_string().as_bytes())),
2✔
176
                    ))
2✔
177
                }
×
178
            }
×
179
            (FieldType::Decimal, _) => {
×
180
                // Update aggregators_db with new val and its occurrence
×
181
                let new_val = &Field::to_decimal(new).unwrap().serialize();
2✔
182
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
183
                let old_val = &Field::to_decimal(old).unwrap().serialize();
2✔
184
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
185

2✔
186
                // Calculate minimum
2✔
187
                let minimum = try_unwrap!(Self::calc_decimal_min(ptx, aggregators_db));
2✔
188
                if minimum == dozer_types::rust_decimal::Decimal::MAX {
2✔
189
                    Ok(AggregationResult::new(Field::Null, None))
×
190
                } else {
191
                    Ok(AggregationResult::new(
2✔
192
                        Self::get_value(minimum.serialize().as_slice(), return_type),
2✔
193
                        Some(Vec::from(minimum.serialize())),
2✔
194
                    ))
2✔
195
                }
×
196
            }
×
197
            (FieldType::Float, _) => {
×
198
                // Update aggregators_db with new val and its occurrence
×
199
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
2✔
200
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
201
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
2✔
202
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
203

2✔
204
                // Calculate minimum
2✔
205
                let minimum = try_unwrap!(Self::calc_f64_min(ptx, aggregators_db));
2✔
206
                if minimum == f64::MAX {
2✔
207
                    Ok(AggregationResult::new(
×
208
                        Field::Null,
×
209
                        Some(Vec::from(minimum.to_be_bytes())),
×
210
                    ))
×
211
                } else {
×
212
                    Ok(AggregationResult::new(
2✔
213
                        Self::get_value(&minimum.to_be_bytes(), return_type),
2✔
214
                        Some(Vec::from(minimum.to_be_bytes())),
2✔
215
                    ))
2✔
216
                }
×
217
            }
×
218
            (FieldType::Int, _) => {
×
219
                // Update aggregators_db with new val and its occurrence
×
220
                let new_val = &Field::to_int(new).unwrap();
2✔
221
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
222
                let old_val = &Field::to_int(old).unwrap();
2✔
223
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
224

2✔
225
                // Calculate minimum
2✔
226
                let minimum = try_unwrap!(Self::calc_i64_min(ptx, aggregators_db));
2✔
227
                if minimum == i64::MAX {
2✔
228
                    Ok(AggregationResult::new(
×
229
                        Field::Null,
×
230
                        Some(Vec::from(minimum.to_be_bytes())),
×
231
                    ))
×
232
                } else {
×
233
                    Ok(AggregationResult::new(
2✔
234
                        Self::get_value(&minimum.to_be_bytes(), return_type),
2✔
235
                        Some(Vec::from(minimum.to_be_bytes())),
2✔
236
                    ))
2✔
237
                }
×
238
            }
×
239
            (FieldType::UInt, _) => {
×
240
                // Update aggregators_db with new val and its occurrence
×
241
                let new_val = &Field::to_uint(new).unwrap();
1✔
242
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
1✔
243
                let old_val = &Field::to_uint(old).unwrap();
1✔
244
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
1✔
245

1✔
246
                // Calculate minimum
1✔
247
                let minimum = try_unwrap!(Self::calc_u64_min(ptx, aggregators_db));
1✔
248
                if minimum == u64::MAX {
1✔
249
                    Ok(AggregationResult::new(
×
250
                        Field::Null,
×
251
                        Some(Vec::from(minimum.to_be_bytes())),
×
252
                    ))
×
253
                } else {
×
254
                    Ok(AggregationResult::new(
1✔
255
                        Self::get_value(&minimum.to_be_bytes(), return_type),
1✔
256
                        Some(Vec::from(minimum.to_be_bytes())),
1✔
257
                    ))
1✔
258
                }
×
259
            }
×
260
            (FieldType::Timestamp, _) => {
×
261
                // Update aggregators_db with new val and its occurrence
×
262
                let new_val = &Field::to_timestamp(new)
2✔
263
                    .unwrap()
2✔
264
                    .timestamp_millis()
2✔
265
                    .to_be_bytes();
2✔
266
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
267
                let old_val = &Field::to_timestamp(old)
2✔
268
                    .unwrap()
2✔
269
                    .timestamp_millis()
2✔
270
                    .to_be_bytes();
2✔
271
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
272

2✔
273
                // Calculate minimum
2✔
274
                let minimum = try_unwrap!(Self::calc_timestamp_min(ptx, aggregators_db));
2✔
275
                let max_datetime: DateTime<FixedOffset> =
2✔
276
                    DateTime::from(DateTime::<FixedOffset>::MAX_UTC);
2✔
277
                if minimum == max_datetime {
2✔
278
                    Ok(AggregationResult::new(Field::Null, None))
×
279
                } else {
×
280
                    Ok(AggregationResult::new(
2✔
281
                        Self::get_value(
2✔
282
                            minimum.timestamp_millis().to_be_bytes().as_slice(),
2✔
283
                            return_type,
2✔
284
                        ),
2✔
285
                        Some(Vec::from(minimum.timestamp_millis().to_be_bytes())),
2✔
286
                    ))
2✔
287
                }
×
288
            }
×
289
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
290
        }
×
291
    }
11✔
292

×
293
    pub(crate) fn delete(
34✔
294
        _cur_state: Option<&[u8]>,
34✔
295
        old: &Field,
34✔
296
        return_type: FieldType,
34✔
297
        ptx: &mut PrefixTransaction,
34✔
298
        aggregators_db: Database,
34✔
299
    ) -> Result<AggregationResult, PipelineError> {
34✔
300
        match (return_type, old) {
34✔
301
            (FieldType::Date, _) => {
×
302
                // Update aggregators_db with new val and its occurrence
303
                let old_val = &Field::to_date(old).unwrap().to_string();
6✔
304
                Self::update_aggregator_db(old_val.as_bytes(), 1, true, ptx, aggregators_db);
6✔
305

6✔
306
                // Calculate minimum
6✔
307
                let minimum = try_unwrap!(Self::calc_date_min(ptx, aggregators_db));
6✔
308
                let max_date = NaiveDate::MAX;
6✔
309
                if minimum == max_date {
6✔
310
                    Ok(AggregationResult::new(Field::Null, None))
3✔
311
                } else {
×
312
                    Ok(AggregationResult::new(
3✔
313
                        Self::get_value(minimum.to_string().as_bytes(), return_type),
3✔
314
                        Some(Vec::from(minimum.to_string().as_bytes())),
3✔
315
                    ))
3✔
316
                }
×
317
            }
×
318
            (FieldType::Decimal, _) => {
×
319
                // Update aggregators_db with new val and its occurrence
×
320
                let old_val = &Field::to_decimal(old).unwrap().serialize();
6✔
321
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
322

6✔
323
                // Calculate minimum
6✔
324
                let minimum = try_unwrap!(Self::calc_decimal_min(ptx, aggregators_db));
6✔
325
                if minimum == dozer_types::rust_decimal::Decimal::MAX {
6✔
326
                    Ok(AggregationResult::new(Field::Null, None))
2✔
327
                } else {
×
328
                    Ok(AggregationResult::new(
4✔
329
                        Self::get_value(minimum.serialize().as_slice(), return_type),
4✔
330
                        Some(Vec::from(minimum.serialize())),
4✔
331
                    ))
4✔
332
                }
×
333
            }
×
334
            (FieldType::Float, _) => {
×
335
                // Update aggregators_db with new val and its occurrence
×
336
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
6✔
337
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
338

6✔
339
                // Calculate minimum
6✔
340
                let minimum = try_unwrap!(Self::calc_f64_min(ptx, aggregators_db));
6✔
341
                if minimum == f64::MAX {
6✔
342
                    Ok(AggregationResult::new(Field::Null, None))
3✔
343
                } else {
×
344
                    Ok(AggregationResult::new(
3✔
345
                        Self::get_value(&minimum.to_be_bytes(), return_type),
3✔
346
                        Some(Vec::from(minimum.to_be_bytes())),
3✔
347
                    ))
3✔
348
                }
×
349
            }
×
350
            (FieldType::Int, _) => {
×
351
                // Update aggregators_db with new val and its occurrence
×
352
                let old_val = &Field::to_int(old).unwrap();
6✔
353
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
354

6✔
355
                // Calculate minimum
6✔
356
                let minimum = try_unwrap!(Self::calc_i64_min(ptx, aggregators_db));
6✔
357
                if minimum == i64::MAX {
6✔
358
                    Ok(AggregationResult::new(Field::Null, None))
2✔
359
                } else {
×
360
                    Ok(AggregationResult::new(
4✔
361
                        Self::get_value(&minimum.to_be_bytes(), return_type),
4✔
362
                        Some(Vec::from(minimum.to_be_bytes())),
4✔
363
                    ))
4✔
364
                }
×
365
            }
×
366
            (FieldType::UInt, _) => {
×
367
                // Update aggregators_db with new val and its occurrence
×
368
                let old_val = &Field::to_uint(old).unwrap();
4✔
369
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
4✔
370

4✔
371
                // Calculate minimum
4✔
372
                let minimum = try_unwrap!(Self::calc_u64_min(ptx, aggregators_db));
4✔
373
                if minimum == u64::MAX {
4✔
374
                    Ok(AggregationResult::new(Field::Null, None))
1✔
375
                } else {
×
376
                    Ok(AggregationResult::new(
3✔
377
                        Self::get_value(&minimum.to_be_bytes(), return_type),
3✔
378
                        Some(Vec::from(minimum.to_be_bytes())),
3✔
379
                    ))
3✔
380
                }
×
381
            }
×
382
            (FieldType::Timestamp, _) => {
×
383
                // Update aggregators_db with new val and its occurrence
×
384
                let old_val = &Field::to_timestamp(old)
6✔
385
                    .unwrap()
6✔
386
                    .timestamp_millis()
6✔
387
                    .to_be_bytes();
6✔
388
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
389

6✔
390
                // Calculate minimum
6✔
391
                let minimum = try_unwrap!(Self::calc_timestamp_min(ptx, aggregators_db));
6✔
392
                let max_datetime: DateTime<FixedOffset> =
6✔
393
                    DateTime::from(DateTime::<FixedOffset>::MAX_UTC);
6✔
394
                if minimum == max_datetime {
6✔
395
                    Ok(AggregationResult::new(Field::Null, None))
3✔
396
                } else {
×
397
                    Ok(AggregationResult::new(
3✔
398
                        Self::get_value(
3✔
399
                            minimum.timestamp_millis().to_be_bytes().as_slice(),
3✔
400
                            return_type,
3✔
401
                        ),
3✔
402
                        Some(Vec::from(minimum.timestamp_millis().to_be_bytes())),
3✔
403
                    ))
3✔
404
                }
×
405
            }
×
406
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
407
        }
×
408
    }
34✔
409

×
410
    pub(crate) fn get_value(f: &[u8], from: FieldType) -> Field {
65✔
411
        match from {
65✔
412
            FieldType::Date => Date(
11✔
413
                NaiveDate::parse_from_str(
11✔
414
                    String::from_utf8(deserialize!(f)).unwrap().as_ref(),
11✔
415
                    DATE_FORMAT,
11✔
416
                )
11✔
417
                .unwrap(),
11✔
418
            ),
11✔
419
            FieldType::Decimal => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
12✔
420
                deserialize!(f),
12✔
421
            )),
12✔
422
            FieldType::Float => Float(OrderedFloat(f64::from_be_bytes(deserialize!(f)))),
11✔
423
            FieldType::Int => Int(i64::from_be_bytes(deserialize!(f))),
12✔
424
            FieldType::UInt => UInt(u64::from_be_bytes(deserialize!(f))),
8✔
425
            FieldType::Timestamp => Timestamp(DateTime::from(
11✔
426
                Utc.timestamp_millis(i64::from_be_bytes(deserialize!(f))),
11✔
427
            )),
11✔
428
            _ => Field::Null,
×
429
        }
×
430
    }
65✔
431

×
432
    fn update_aggregator_db(
90✔
433
        key: &[u8],
90✔
434
        val_delta: u8,
90✔
435
        decr: bool,
90✔
436
        ptx: &mut PrefixTransaction,
90✔
437
        aggregators_db: Database,
90✔
438
    ) {
90✔
439
        let get_prev_count = try_unwrap!(ptx.get(aggregators_db, key));
90✔
440
        let prev_count = deserialize_u8!(get_prev_count);
90✔
441
        let mut new_count = prev_count;
90✔
442
        if decr {
90✔
443
            new_count = new_count.wrapping_sub(val_delta);
45✔
444
        } else {
45✔
445
            new_count = new_count.wrapping_add(val_delta);
45✔
446
        }
45✔
447
        if new_count < 1 {
90✔
448
            try_unwrap!(ptx.del(aggregators_db, key, Option::from(to_bytes!(prev_count))));
28✔
449
        } else {
62✔
450
            try_unwrap!(ptx.put(aggregators_db, key, to_bytes!(new_count)));
62✔
451
        }
62✔
452
    }
90✔
453

×
454
    fn calc_f64_min(
14✔
455
        ptx: &mut PrefixTransaction,
14✔
456
        aggregators_db: Database,
14✔
457
    ) -> Result<f64, PipelineError> {
14✔
458
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
459
        let mut minimum = f64::MAX;
14✔
460

14✔
461
        // get first to get the minimum
14✔
462
        if ptx_cur.first()? {
14✔
463
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
464
            minimum = f64::from_be_bytes(deserialize!(cur.0));
11✔
465
        }
11✔
466
        Ok(minimum)
14✔
467
    }
14✔
468

×
469
    fn calc_decimal_min(
14✔
470
        ptx: &mut PrefixTransaction,
14✔
471
        aggregators_db: Database,
14✔
472
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
14✔
473
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
474
        let mut minimum = dozer_types::rust_decimal::Decimal::MAX;
14✔
475

14✔
476
        // get first to get the minimum
14✔
477
        if ptx_cur.first()? {
14✔
478
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
12✔
479
            minimum = dozer_types::rust_decimal::Decimal::deserialize(deserialize!(cur.0));
12✔
480
        }
12✔
481
        Ok(minimum)
14✔
482
    }
14✔
483

×
484
    fn calc_timestamp_min(
14✔
485
        ptx: &mut PrefixTransaction,
14✔
486
        aggregators_db: Database,
14✔
487
    ) -> Result<DateTime<FixedOffset>, PipelineError> {
14✔
488
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
489
        let mut minimum = DateTime::<FixedOffset>::MAX_UTC;
14✔
490

14✔
491
        // get first to get the minimum
14✔
492
        if ptx_cur.first()? {
14✔
493
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
494
            minimum = Utc.timestamp_millis(i64::from_be_bytes(deserialize!(cur.0)));
11✔
495
        }
11✔
496
        Ok(DateTime::from(minimum))
14✔
497
    }
14✔
498

×
499
    fn calc_date_min(
14✔
500
        ptx: &mut PrefixTransaction,
14✔
501
        aggregators_db: Database,
14✔
502
    ) -> Result<NaiveDate, PipelineError> {
14✔
503
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
504
        let mut minimum = NaiveDate::MAX;
14✔
505

14✔
506
        // get first to get the minimum
14✔
507
        if ptx_cur.first()? {
14✔
508
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
509
            minimum = NaiveDate::parse_from_str(
11✔
510
                String::from_utf8(deserialize!(cur.0)).unwrap().as_ref(),
11✔
511
                DATE_FORMAT,
11✔
512
            )
11✔
513
            .unwrap();
11✔
514
        }
11✔
515
        Ok(minimum)
14✔
516
    }
14✔
517

×
518
    fn calc_i64_min(
14✔
519
        ptx: &mut PrefixTransaction,
14✔
520
        aggregators_db: Database,
14✔
521
    ) -> Result<i64, PipelineError> {
14✔
522
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
523
        let mut minimum = i64::MAX;
14✔
524

14✔
525
        // get first to get the minimum
14✔
526
        if ptx_cur.first()? {
14✔
527
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
12✔
528
            minimum = i64::from_be_bytes(deserialize!(cur.0));
12✔
529
        }
12✔
530
        Ok(minimum)
14✔
531
    }
14✔
532

×
533
    fn calc_u64_min(
9✔
534
        ptx: &mut PrefixTransaction,
9✔
535
        aggregators_db: Database,
9✔
536
    ) -> Result<u64, PipelineError> {
9✔
537
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
9✔
538
        let mut minimum = u64::MAX;
9✔
539

9✔
540
        // get first to get the minimum
9✔
541
        if ptx_cur.first()? {
9✔
542
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
8✔
543
            minimum = u64::from_be_bytes(deserialize!(cur.0));
8✔
544
        }
8✔
545
        Ok(minimum)
9✔
546
    }
9✔
547
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc