• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.75
/dozer-sql/src/pipeline/aggregation/max.rs
1
use crate::pipeline::aggregation::aggregator::AggregationResult;
2
use crate::pipeline::errors::PipelineError;
3
use crate::pipeline::errors::PipelineError::InvalidOperandType;
4
use crate::{deserialize_u8, to_bytes, try_unwrap};
5
use dozer_core::storage::common::Database;
6
use dozer_core::storage::prefix_transaction::PrefixTransaction;
7
use dozer_types::ordered_float::OrderedFloat;
8
use dozer_types::types::Field::{Date, Decimal, Float, Int, Timestamp, UInt};
9
use dozer_types::types::{Field, FieldType, DATE_FORMAT};
10

11
use crate::deserialize;
12
use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, TimeZone, Utc};
13
use std::string::ToString;
14

15
pub struct MaxAggregator {}
16
const AGGREGATOR_NAME: &str = "MAX";
17

18
impl MaxAggregator {
19
    const _AGGREGATOR_ID: u32 = 0x03;
20

21
    pub(crate) fn _get_type() -> u32 {
×
22
        MaxAggregator::_AGGREGATOR_ID
×
23
    }
×
24

×
25
    pub(crate) fn insert(
34✔
26
        _cur_state: Option<&[u8]>,
34✔
27
        new: &Field,
34✔
28
        return_type: FieldType,
34✔
29
        ptx: &mut PrefixTransaction,
34✔
30
        aggregators_db: Database,
34✔
31
    ) -> Result<AggregationResult, PipelineError> {
34✔
32
        match (return_type, new) {
34✔
33
            (FieldType::Date, _) => {
×
34
                // Update aggregators_db with new val and its occurrence
×
35
                let new_val = &Field::to_date(new).unwrap().to_string();
6✔
36
                Self::update_aggregator_db(new_val.as_bytes(), 1, false, ptx, aggregators_db);
6✔
37

6✔
38
                // Calculate minimum
6✔
39
                let maximum = try_unwrap!(Self::calc_date_max(ptx, aggregators_db));
6✔
40
                let min_date = NaiveDate::MIN;
6✔
41
                if maximum == min_date {
6✔
42
                    Ok(AggregationResult::new(Field::Null, None))
×
43
                } else {
×
44
                    Ok(AggregationResult::new(
6✔
45
                        Self::get_value(maximum.to_string().as_bytes(), return_type),
6✔
46
                        Some(Vec::from(maximum.to_string().as_bytes())),
6✔
47
                    ))
6✔
48
                }
×
49
            }
×
50
            (FieldType::Decimal, _) => {
×
51
                // Update aggregators_db with new val and its occurrence
×
52
                let new_val = &Field::to_decimal(new).unwrap().serialize();
6✔
53
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
54

6✔
55
                // Calculate minimum
6✔
56
                let maximum = try_unwrap!(Self::calc_decimal_max(ptx, aggregators_db));
6✔
57
                if maximum == dozer_types::rust_decimal::Decimal::MIN {
6✔
58
                    Ok(AggregationResult::new(Field::Null, None))
×
59
                } else {
×
60
                    Ok(AggregationResult::new(
6✔
61
                        Self::get_value(maximum.serialize().as_slice(), return_type),
6✔
62
                        Some(Vec::from(maximum.serialize())),
6✔
63
                    ))
6✔
64
                }
×
65
            }
×
66
            (FieldType::Float, _) => {
×
67
                // Update aggregators_db with new val and its occurrence
×
68
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
6✔
69
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
70

6✔
71
                // Calculate average
6✔
72
                let maximum = try_unwrap!(Self::calc_f64_max(ptx, aggregators_db)).to_be_bytes();
6✔
73
                Ok(AggregationResult::new(
6✔
74
                    Self::get_value(&maximum, return_type),
6✔
75
                    Some(Vec::from(maximum)),
6✔
76
                ))
6✔
77
            }
78
            (FieldType::Int, _) => {
79
                // Update aggregators_db with new val and its occurrence
80
                let new_val = &Field::to_int(new).unwrap();
6✔
81
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
82

6✔
83
                // Calculate average
6✔
84
                let maximum = try_unwrap!(Self::calc_i64_max(ptx, aggregators_db)).to_be_bytes();
6✔
85
                Ok(AggregationResult::new(
6✔
86
                    Self::get_value(&maximum, return_type),
6✔
87
                    Some(Vec::from(maximum)),
6✔
88
                ))
6✔
89
            }
90
            (FieldType::UInt, _) => {
91
                // Update aggregators_db with new val and its occurrence
92
                let new_val = &Field::to_uint(new).unwrap();
4✔
93
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
4✔
94

4✔
95
                // Calculate average
4✔
96
                let maximum = try_unwrap!(Self::calc_u64_max(ptx, aggregators_db)).to_be_bytes();
4✔
97
                Ok(AggregationResult::new(
4✔
98
                    Self::get_value(&maximum, return_type),
4✔
99
                    Some(Vec::from(maximum)),
4✔
100
                ))
4✔
101
            }
102
            (FieldType::Timestamp, _) => {
103
                // Update aggregators_db with new val and its occurrence
104
                let new_val = &Field::to_timestamp(new)
6✔
105
                    .unwrap()
6✔
106
                    .timestamp_millis()
6✔
107
                    .to_be_bytes();
6✔
108
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
109

6✔
110
                // Calculate minimum
6✔
111
                let maximum = try_unwrap!(Self::calc_timestamp_max(ptx, aggregators_db));
6✔
112
                let min_datetime: DateTime<FixedOffset> =
6✔
113
                    DateTime::from(DateTime::<FixedOffset>::MIN_UTC);
6✔
114
                if maximum == min_datetime {
6✔
115
                    Ok(AggregationResult::new(Field::Null, None))
×
116
                } else {
×
117
                    Ok(AggregationResult::new(
6✔
118
                        Self::get_value(
6✔
119
                            maximum.timestamp_millis().to_be_bytes().as_slice(),
6✔
120
                            return_type,
6✔
121
                        ),
6✔
122
                        Some(Vec::from(maximum.timestamp_millis().to_be_bytes())),
6✔
123
                    ))
6✔
124
                }
×
125
            }
×
126
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
127
        }
×
128
    }
34✔
129

×
130
    pub(crate) fn update(
11✔
131
        _cur_state: Option<&[u8]>,
11✔
132
        old: &Field,
11✔
133
        new: &Field,
11✔
134
        return_type: FieldType,
11✔
135
        ptx: &mut PrefixTransaction,
11✔
136
        aggregators_db: Database,
11✔
137
    ) -> Result<AggregationResult, PipelineError> {
11✔
138
        match (return_type, new) {
11✔
139
            (FieldType::Date, _) => {
140
                // Update aggregators_db with new val and its occurrence
×
141
                let new_val = &Field::to_date(new).unwrap().to_string();
2✔
142
                Self::update_aggregator_db(new_val.as_bytes(), 1, false, ptx, aggregators_db);
2✔
143
                let old_val = &Field::to_date(old).unwrap().to_string();
2✔
144
                Self::update_aggregator_db(old_val.as_bytes(), 1, true, ptx, aggregators_db);
2✔
145

2✔
146
                // Calculate minimum
2✔
147
                let maximum = try_unwrap!(Self::calc_date_max(ptx, aggregators_db));
2✔
148
                let min_date = NaiveDate::MIN;
2✔
149
                if maximum == min_date {
2✔
150
                    Ok(AggregationResult::new(Field::Null, None))
×
151
                } else {
152
                    Ok(AggregationResult::new(
2✔
153
                        Self::get_value(maximum.to_string().as_bytes(), return_type),
2✔
154
                        Some(Vec::from(maximum.to_string().as_bytes())),
2✔
155
                    ))
2✔
156
                }
×
157
            }
×
158
            (FieldType::Decimal, _) => {
×
159
                // Update aggregators_db with new val and its occurrence
×
160
                let new_val = &Field::to_decimal(new).unwrap().serialize();
2✔
161
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
162
                let old_val = &Field::to_decimal(old).unwrap().serialize();
2✔
163
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
164

2✔
165
                // Calculate minimum
2✔
166
                let maximum = try_unwrap!(Self::calc_decimal_max(ptx, aggregators_db));
2✔
167
                if maximum == dozer_types::rust_decimal::Decimal::MIN {
2✔
168
                    Ok(AggregationResult::new(Field::Null, None))
×
169
                } else {
170
                    Ok(AggregationResult::new(
2✔
171
                        Self::get_value(maximum.serialize().as_slice(), return_type),
2✔
172
                        Some(Vec::from(maximum.serialize())),
2✔
173
                    ))
2✔
174
                }
×
175
            }
×
176
            (FieldType::Float, _) => {
×
177
                // Update aggregators_db with new val and its occurrence
×
178
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
2✔
179
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
180
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
2✔
181
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
182

2✔
183
                // Calculate average
2✔
184
                let maximum = try_unwrap!(Self::calc_f64_max(ptx, aggregators_db)).to_be_bytes();
2✔
185
                Ok(AggregationResult::new(
2✔
186
                    Self::get_value(&maximum, return_type),
2✔
187
                    Some(Vec::from(maximum)),
2✔
188
                ))
2✔
189
            }
190
            (FieldType::Int, _) => {
×
191
                // Update aggregators_db with new val and its occurrence
×
192
                let new_val = &Field::to_int(new).unwrap();
2✔
193
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
194
                let old_val = &Field::to_int(old).unwrap();
2✔
195
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
196

2✔
197
                // Calculate average
2✔
198
                let maximum = (try_unwrap!(Self::calc_i64_max(ptx, aggregators_db))).to_be_bytes();
2✔
199
                Ok(AggregationResult::new(
2✔
200
                    Self::get_value(&maximum, return_type),
2✔
201
                    Some(Vec::from(maximum)),
2✔
202
                ))
2✔
203
            }
204
            (FieldType::UInt, _) => {
×
205
                // Update aggregators_db with new val and its occurrence
×
206
                let new_val = &Field::to_uint(new).unwrap();
1✔
207
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
1✔
208
                let old_val = &Field::to_uint(old).unwrap();
1✔
209
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
1✔
210

1✔
211
                // Calculate average
1✔
212
                let maximum = (try_unwrap!(Self::calc_u64_max(ptx, aggregators_db))).to_be_bytes();
1✔
213
                Ok(AggregationResult::new(
1✔
214
                    Self::get_value(&maximum, return_type),
1✔
215
                    Some(Vec::from(maximum)),
1✔
216
                ))
1✔
217
            }
218
            (FieldType::Timestamp, _) => {
×
219
                // Update aggregators_db with new val and its occurrence
×
220
                let new_val = &Field::to_timestamp(new)
2✔
221
                    .unwrap()
2✔
222
                    .timestamp_millis()
2✔
223
                    .to_be_bytes();
2✔
224
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
225
                let old_val = &Field::to_timestamp(old)
2✔
226
                    .unwrap()
2✔
227
                    .timestamp_millis()
2✔
228
                    .to_be_bytes();
2✔
229
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
230

2✔
231
                // Calculate maximum
2✔
232
                let maximum = try_unwrap!(Self::calc_timestamp_max(ptx, aggregators_db));
2✔
233
                let min_datetime: DateTime<FixedOffset> =
2✔
234
                    DateTime::from(DateTime::<FixedOffset>::MIN_UTC);
2✔
235
                if maximum == min_datetime {
2✔
236
                    Ok(AggregationResult::new(Field::Null, None))
×
237
                } else {
×
238
                    Ok(AggregationResult::new(
2✔
239
                        Self::get_value(
2✔
240
                            maximum.timestamp_millis().to_be_bytes().as_slice(),
2✔
241
                            return_type,
2✔
242
                        ),
2✔
243
                        Some(Vec::from(maximum.timestamp_millis().to_be_bytes())),
2✔
244
                    ))
2✔
245
                }
×
246
            }
×
247
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
248
        }
×
249
    }
11✔
250

×
251
    pub(crate) fn delete(
34✔
252
        _cur_state: Option<&[u8]>,
34✔
253
        old: &Field,
34✔
254
        return_type: FieldType,
34✔
255
        ptx: &mut PrefixTransaction,
34✔
256
        aggregators_db: Database,
34✔
257
    ) -> Result<AggregationResult, PipelineError> {
34✔
258
        match (return_type, old) {
34✔
259
            (FieldType::Date, _) => {
×
260
                // Update aggregators_db with new val and its occurrence
261
                let old_val = &Field::to_date(old).unwrap().to_string();
6✔
262
                Self::update_aggregator_db(old_val.as_bytes(), 1, true, ptx, aggregators_db);
6✔
263

6✔
264
                // Calculate minimum
6✔
265
                let maximum = try_unwrap!(Self::calc_date_max(ptx, aggregators_db));
6✔
266
                let min_date = NaiveDate::MIN;
6✔
267
                if maximum == min_date {
6✔
268
                    Ok(AggregationResult::new(Field::Null, None))
3✔
269
                } else {
×
270
                    Ok(AggregationResult::new(
3✔
271
                        Self::get_value(maximum.to_string().as_bytes(), return_type),
3✔
272
                        Some(Vec::from(maximum.to_string().as_bytes())),
3✔
273
                    ))
3✔
274
                }
×
275
            }
×
276
            (FieldType::Decimal, _) => {
×
277
                // Update aggregators_db with new val and its occurrence
×
278
                let old_val = &Field::to_decimal(old).unwrap().serialize();
6✔
279
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
280

6✔
281
                // Calculate minimum
6✔
282
                let maximum = try_unwrap!(Self::calc_decimal_max(ptx, aggregators_db));
6✔
283
                if maximum == dozer_types::rust_decimal::Decimal::MIN {
6✔
284
                    Ok(AggregationResult::new(Field::Null, None))
3✔
285
                } else {
×
286
                    Ok(AggregationResult::new(
3✔
287
                        Self::get_value(maximum.serialize().as_slice(), return_type),
3✔
288
                        Some(Vec::from(maximum.serialize())),
3✔
289
                    ))
3✔
290
                }
×
291
            }
×
292
            (FieldType::Float, _) => {
×
293
                // Update aggregators_db with new val and its occurrence
×
294
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
6✔
295
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
296

6✔
297
                // Calculate average
6✔
298
                let maximum = try_unwrap!(Self::calc_f64_max(ptx, aggregators_db));
6✔
299
                if maximum == f64::MIN {
6✔
300
                    Ok(AggregationResult::new(Field::Null, None))
3✔
301
                } else {
×
302
                    Ok(AggregationResult::new(
3✔
303
                        Self::get_value(&maximum.to_be_bytes(), return_type),
3✔
304
                        Some(Vec::from(maximum.to_be_bytes())),
3✔
305
                    ))
3✔
306
                }
×
307
            }
×
308
            (FieldType::Int, _) => {
×
309
                // Update aggregators_db with new val and its occurrence
×
310
                let old_val = &Field::to_int(old).unwrap();
6✔
311
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
312

6✔
313
                // Calculate average
6✔
314
                let maximum = try_unwrap!(Self::calc_i64_max(ptx, aggregators_db));
6✔
315
                if maximum == i64::MIN {
6✔
316
                    Ok(AggregationResult::new(Field::Null, None))
3✔
317
                } else {
×
318
                    Ok(AggregationResult::new(
3✔
319
                        Self::get_value(&maximum.to_be_bytes(), return_type),
3✔
320
                        Some(Vec::from(maximum.to_be_bytes())),
3✔
321
                    ))
3✔
322
                }
×
323
            }
×
324
            (FieldType::UInt, _) => {
×
325
                // Update aggregators_db with new val and its occurrence
×
326
                let old_val = &Field::to_uint(old).unwrap();
4✔
327
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
4✔
328

4✔
329
                // Calculate average
4✔
330
                let maximum = try_unwrap!(Self::calc_u64_max(ptx, aggregators_db));
4✔
331
                if maximum == u64::MIN {
4✔
332
                    Ok(AggregationResult::new(Field::Null, None))
2✔
333
                } else {
×
334
                    Ok(AggregationResult::new(
2✔
335
                        Self::get_value(&maximum.to_be_bytes(), return_type),
2✔
336
                        Some(Vec::from(maximum.to_be_bytes())),
2✔
337
                    ))
2✔
338
                }
×
339
            }
×
340
            (FieldType::Timestamp, _) => {
×
341
                // Update aggregators_db with new val and its occurrence
×
342
                let old_val = &Field::to_timestamp(old)
6✔
343
                    .unwrap()
6✔
344
                    .timestamp_millis()
6✔
345
                    .to_be_bytes();
6✔
346
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
347

6✔
348
                // Calculate maximum
6✔
349
                let maximum = try_unwrap!(Self::calc_timestamp_max(ptx, aggregators_db));
6✔
350
                let min_datetime: DateTime<FixedOffset> =
6✔
351
                    DateTime::from(DateTime::<FixedOffset>::MIN_UTC);
6✔
352
                if maximum == min_datetime {
6✔
353
                    Ok(AggregationResult::new(Field::Null, None))
3✔
354
                } else {
×
355
                    Ok(AggregationResult::new(
3✔
356
                        Self::get_value(
3✔
357
                            maximum.timestamp_millis().to_be_bytes().as_slice(),
3✔
358
                            return_type,
3✔
359
                        ),
3✔
360
                        Some(Vec::from(maximum.timestamp_millis().to_be_bytes())),
3✔
361
                    ))
3✔
362
                }
×
363
            }
×
364
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
365
        }
×
366
    }
34✔
367

×
368
    pub(crate) fn get_value(f: &[u8], from: FieldType) -> Field {
62✔
369
        match from {
62✔
370
            FieldType::Date => Date(
11✔
371
                NaiveDate::parse_from_str(
11✔
372
                    String::from_utf8(deserialize!(f)).unwrap().as_ref(),
11✔
373
                    DATE_FORMAT,
11✔
374
                )
11✔
375
                .unwrap(),
11✔
376
            ),
11✔
377
            FieldType::Decimal => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
11✔
378
                deserialize!(f),
11✔
379
            )),
11✔
380
            FieldType::Float => Float(OrderedFloat(f64::from_be_bytes(deserialize!(f)))),
11✔
381
            FieldType::Int => Int(i64::from_be_bytes(deserialize!(f))),
11✔
382
            FieldType::UInt => UInt(u64::from_be_bytes(deserialize!(f))),
7✔
383
            FieldType::Timestamp => Timestamp(DateTime::from(
11✔
384
                Utc.timestamp_millis(i64::from_be_bytes(deserialize!(f))),
11✔
385
            )),
11✔
386
            _ => Field::Null,
×
387
        }
×
388
    }
62✔
389

×
390
    fn update_aggregator_db(
90✔
391
        key: &[u8],
90✔
392
        val_delta: u8,
90✔
393
        decr: bool,
90✔
394
        ptx: &mut PrefixTransaction,
90✔
395
        aggregators_db: Database,
90✔
396
    ) {
90✔
397
        let get_prev_count = try_unwrap!(ptx.get(aggregators_db, key));
90✔
398
        let prev_count = deserialize_u8!(get_prev_count);
90✔
399
        let mut new_count = prev_count;
90✔
400
        if decr {
90✔
401
            new_count = new_count.wrapping_sub(val_delta);
45✔
402
        } else {
45✔
403
            new_count = new_count.wrapping_add(val_delta);
45✔
404
        }
45✔
405
        if new_count < 1 {
90✔
406
            try_unwrap!(ptx.del(aggregators_db, key, Option::from(to_bytes!(prev_count))));
34✔
407
        } else {
56✔
408
            try_unwrap!(ptx.put(aggregators_db, key, to_bytes!(new_count)));
56✔
409
        }
56✔
410
    }
90✔
411

×
412
    fn calc_f64_max(
14✔
413
        ptx: &mut PrefixTransaction,
14✔
414
        aggregators_db: Database,
14✔
415
    ) -> Result<f64, PipelineError> {
14✔
416
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
417
        let mut maximum = f64::MIN;
14✔
418

14✔
419
        // get first to get the maximum
14✔
420
        if ptx_cur.last()? {
14✔
421
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
422
            maximum = f64::from_be_bytes(deserialize!(cur.0));
11✔
423
        }
11✔
424
        Ok(maximum)
14✔
425
    }
14✔
426

×
427
    fn calc_decimal_max(
14✔
428
        ptx: &mut PrefixTransaction,
14✔
429
        aggregators_db: Database,
14✔
430
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
14✔
431
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
432
        let mut maximum = dozer_types::rust_decimal::Decimal::MIN;
14✔
433

14✔
434
        // get first to get the minimum
14✔
435
        if ptx_cur.last()? {
14✔
436
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
437
            maximum = dozer_types::rust_decimal::Decimal::deserialize(deserialize!(cur.0));
11✔
438
        }
11✔
439
        Ok(maximum)
14✔
440
    }
14✔
441

×
442
    fn calc_timestamp_max(
14✔
443
        ptx: &mut PrefixTransaction,
14✔
444
        aggregators_db: Database,
14✔
445
    ) -> Result<DateTime<FixedOffset>, PipelineError> {
14✔
446
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
447
        let mut maximum = DateTime::<FixedOffset>::MIN_UTC;
14✔
448

14✔
449
        // get first to get the minimum
14✔
450
        if ptx_cur.last()? {
14✔
451
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
452
            maximum = Utc.timestamp_millis(i64::from_be_bytes(deserialize!(cur.0)));
11✔
453
        }
11✔
454
        Ok(DateTime::from(maximum))
14✔
455
    }
14✔
456

×
457
    fn calc_date_max(
14✔
458
        ptx: &mut PrefixTransaction,
14✔
459
        aggregators_db: Database,
14✔
460
    ) -> Result<NaiveDate, PipelineError> {
14✔
461
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
462
        let mut maximum = NaiveDate::MIN;
14✔
463

14✔
464
        // get first to get the minimum
14✔
465
        if ptx_cur.last()? {
14✔
466
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
467
            maximum = NaiveDate::parse_from_str(
11✔
468
                String::from_utf8(deserialize!(cur.0)).unwrap().as_ref(),
11✔
469
                DATE_FORMAT,
11✔
470
            )
11✔
471
            .unwrap();
11✔
472
        }
11✔
473
        Ok(maximum)
14✔
474
    }
14✔
475

×
476
    fn calc_i64_max(
14✔
477
        ptx: &mut PrefixTransaction,
14✔
478
        aggregators_db: Database,
14✔
479
    ) -> Result<i64, PipelineError> {
14✔
480
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
481
        let mut maximum = i64::MIN;
14✔
482

14✔
483
        // get first to get the maximum
14✔
484
        if ptx_cur.last()? {
14✔
485
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
486
            maximum = i64::from_be_bytes(deserialize!(cur.0));
11✔
487
        }
11✔
488
        Ok(maximum)
14✔
489
    }
14✔
490

×
491
    fn calc_u64_max(
9✔
492
        ptx: &mut PrefixTransaction,
9✔
493
        aggregators_db: Database,
9✔
494
    ) -> Result<u64, PipelineError> {
9✔
495
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
9✔
496
        let mut maximum = u64::MIN;
9✔
497

9✔
498
        // get first to get the maximum
9✔
499
        if ptx_cur.last()? {
9✔
500
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
7✔
501
            maximum = u64::from_be_bytes(deserialize!(cur.0));
7✔
502
        }
7✔
503
        Ok(maximum)
9✔
504
    }
9✔
505
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc