• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.82
/dozer-sql/src/pipeline/aggregation/max.rs
1
use crate::pipeline::aggregation::aggregator::AggregationResult;
2
use crate::pipeline::errors::PipelineError;
3
use crate::pipeline::errors::PipelineError::InvalidOperandType;
4
use crate::{deserialize_u8, to_bytes, try_unwrap};
5
use dozer_core::storage::common::Database;
6
use dozer_core::storage::prefix_transaction::PrefixTransaction;
7
use dozer_types::ordered_float::OrderedFloat;
8
use dozer_types::types::Field::{Date, Decimal, Float, Int, Timestamp, UInt};
9
use dozer_types::types::{Field, FieldType, DATE_FORMAT};
10

11
use crate::deserialize;
12
use dozer_types::chrono::{DateTime, FixedOffset, NaiveDate, TimeZone, Utc};
13
use std::string::ToString;
14

15
pub struct MaxAggregator {}
16
const AGGREGATOR_NAME: &str = "MAX";
17

18
impl MaxAggregator {
19
    const _AGGREGATOR_ID: u32 = 0x03;
20

21
    pub(crate) fn get_return_type(from: FieldType) -> FieldType {
×
22
        match from {
×
23
            FieldType::Date => FieldType::Date,
×
24
            FieldType::Decimal => FieldType::Decimal,
×
25
            FieldType::Float => FieldType::Float,
×
26
            FieldType::Int => FieldType::Int,
×
27
            FieldType::UInt => FieldType::UInt,
×
28
            FieldType::Timestamp => FieldType::Timestamp,
×
29
            _ => from,
×
30
        }
×
31
    }
×
32

×
33
    pub(crate) fn _get_type() -> u32 {
×
34
        MaxAggregator::_AGGREGATOR_ID
×
35
    }
×
36

37
    pub(crate) fn insert(
34✔
38
        _cur_state: Option<&[u8]>,
34✔
39
        new: &Field,
34✔
40
        return_type: FieldType,
34✔
41
        ptx: &mut PrefixTransaction,
34✔
42
        aggregators_db: Database,
34✔
43
    ) -> Result<AggregationResult, PipelineError> {
34✔
44
        match (return_type, new) {
34✔
45
            (FieldType::Date, _) => {
×
46
                // Update aggregators_db with new val and its occurrence
×
47
                let new_val = &Field::to_date(new).unwrap().to_string();
6✔
48
                Self::update_aggregator_db(new_val.as_bytes(), 1, false, ptx, aggregators_db);
6✔
49

6✔
50
                // Calculate minimum
6✔
51
                let maximum = try_unwrap!(Self::calc_date_max(ptx, aggregators_db));
6✔
52
                let min_date = NaiveDate::MIN;
6✔
53
                if maximum == min_date {
6✔
54
                    Ok(AggregationResult::new(Field::Null, None))
×
55
                } else {
×
56
                    Ok(AggregationResult::new(
6✔
57
                        Self::get_value(maximum.to_string().as_bytes(), return_type),
6✔
58
                        Some(Vec::from(maximum.to_string().as_bytes())),
6✔
59
                    ))
6✔
60
                }
×
61
            }
×
62
            (FieldType::Decimal, _) => {
×
63
                // Update aggregators_db with new val and its occurrence
×
64
                let new_val = &Field::to_decimal(new).unwrap().serialize();
6✔
65
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
66

6✔
67
                // Calculate minimum
6✔
68
                let maximum = try_unwrap!(Self::calc_decimal_max(ptx, aggregators_db));
6✔
69
                if maximum == dozer_types::rust_decimal::Decimal::MIN {
6✔
70
                    Ok(AggregationResult::new(Field::Null, None))
×
71
                } else {
×
72
                    Ok(AggregationResult::new(
6✔
73
                        Self::get_value(maximum.serialize().as_slice(), return_type),
6✔
74
                        Some(Vec::from(maximum.serialize())),
6✔
75
                    ))
6✔
76
                }
×
77
            }
×
78
            (FieldType::Float, _) => {
×
79
                // Update aggregators_db with new val and its occurrence
×
80
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
6✔
81
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
82

6✔
83
                // Calculate average
6✔
84
                let maximum = try_unwrap!(Self::calc_f64_max(ptx, aggregators_db)).to_be_bytes();
6✔
85
                Ok(AggregationResult::new(
6✔
86
                    Self::get_value(&maximum, return_type),
6✔
87
                    Some(Vec::from(maximum)),
6✔
88
                ))
6✔
89
            }
×
90
            (FieldType::Int, _) => {
×
91
                // Update aggregators_db with new val and its occurrence
×
92
                let new_val = &Field::to_int(new).unwrap();
6✔
93
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
6✔
94

6✔
95
                // Calculate average
6✔
96
                let maximum = try_unwrap!(Self::calc_i64_max(ptx, aggregators_db)).to_be_bytes();
6✔
97
                Ok(AggregationResult::new(
6✔
98
                    Self::get_value(&maximum, return_type),
6✔
99
                    Some(Vec::from(maximum)),
6✔
100
                ))
6✔
101
            }
×
102
            (FieldType::UInt, _) => {
×
103
                // Update aggregators_db with new val and its occurrence
×
104
                let new_val = &Field::to_uint(new).unwrap();
4✔
105
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
4✔
106

4✔
107
                // Calculate average
4✔
108
                let maximum = try_unwrap!(Self::calc_u64_max(ptx, aggregators_db)).to_be_bytes();
4✔
109
                Ok(AggregationResult::new(
4✔
110
                    Self::get_value(&maximum, return_type),
4✔
111
                    Some(Vec::from(maximum)),
4✔
112
                ))
4✔
113
            }
×
114
            (FieldType::Timestamp, _) => {
×
115
                // Update aggregators_db with new val and its occurrence
×
116
                let new_val = &Field::to_timestamp(new)
6✔
117
                    .unwrap()
6✔
118
                    .timestamp_millis()
6✔
119
                    .to_be_bytes();
6✔
120
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
6✔
121

6✔
122
                // Calculate minimum
6✔
123
                let maximum = try_unwrap!(Self::calc_timestamp_max(ptx, aggregators_db));
6✔
124
                let min_datetime: DateTime<FixedOffset> =
6✔
125
                    DateTime::from(DateTime::<FixedOffset>::MIN_UTC);
6✔
126
                if maximum == min_datetime {
6✔
127
                    Ok(AggregationResult::new(Field::Null, None))
×
128
                } else {
×
129
                    Ok(AggregationResult::new(
6✔
130
                        Self::get_value(
6✔
131
                            maximum.timestamp_millis().to_be_bytes().as_slice(),
6✔
132
                            return_type,
6✔
133
                        ),
6✔
134
                        Some(Vec::from(maximum.timestamp_millis().to_be_bytes())),
6✔
135
                    ))
6✔
136
                }
×
137
            }
×
138
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
139
        }
140
    }
34✔
141

×
142
    pub(crate) fn update(
11✔
143
        _cur_state: Option<&[u8]>,
11✔
144
        old: &Field,
11✔
145
        new: &Field,
11✔
146
        return_type: FieldType,
11✔
147
        ptx: &mut PrefixTransaction,
11✔
148
        aggregators_db: Database,
11✔
149
    ) -> Result<AggregationResult, PipelineError> {
11✔
150
        match (return_type, new) {
11✔
151
            (FieldType::Date, _) => {
×
152
                // Update aggregators_db with new val and its occurrence
×
153
                let new_val = &Field::to_date(new).unwrap().to_string();
2✔
154
                Self::update_aggregator_db(new_val.as_bytes(), 1, false, ptx, aggregators_db);
2✔
155
                let old_val = &Field::to_date(old).unwrap().to_string();
2✔
156
                Self::update_aggregator_db(old_val.as_bytes(), 1, true, ptx, aggregators_db);
2✔
157

2✔
158
                // Calculate minimum
2✔
159
                let maximum = try_unwrap!(Self::calc_date_max(ptx, aggregators_db));
2✔
160
                let min_date = NaiveDate::MIN;
2✔
161
                if maximum == min_date {
2✔
162
                    Ok(AggregationResult::new(Field::Null, None))
×
163
                } else {
×
164
                    Ok(AggregationResult::new(
2✔
165
                        Self::get_value(maximum.to_string().as_bytes(), return_type),
2✔
166
                        Some(Vec::from(maximum.to_string().as_bytes())),
2✔
167
                    ))
2✔
168
                }
×
169
            }
×
170
            (FieldType::Decimal, _) => {
×
171
                // Update aggregators_db with new val and its occurrence
172
                let new_val = &Field::to_decimal(new).unwrap().serialize();
2✔
173
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
174
                let old_val = &Field::to_decimal(old).unwrap().serialize();
2✔
175
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
176

2✔
177
                // Calculate minimum
2✔
178
                let maximum = try_unwrap!(Self::calc_decimal_max(ptx, aggregators_db));
2✔
179
                if maximum == dozer_types::rust_decimal::Decimal::MIN {
2✔
180
                    Ok(AggregationResult::new(Field::Null, None))
×
181
                } else {
×
182
                    Ok(AggregationResult::new(
2✔
183
                        Self::get_value(maximum.serialize().as_slice(), return_type),
2✔
184
                        Some(Vec::from(maximum.serialize())),
2✔
185
                    ))
2✔
186
                }
×
187
            }
×
188
            (FieldType::Float, _) => {
×
189
                // Update aggregators_db with new val and its occurrence
190
                let new_val = &OrderedFloat(Field::to_float(new).unwrap());
2✔
191
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
192
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
2✔
193
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
194

2✔
195
                // Calculate average
2✔
196
                let maximum = try_unwrap!(Self::calc_f64_max(ptx, aggregators_db)).to_be_bytes();
2✔
197
                Ok(AggregationResult::new(
2✔
198
                    Self::get_value(&maximum, return_type),
2✔
199
                    Some(Vec::from(maximum)),
2✔
200
                ))
2✔
201
            }
×
202
            (FieldType::Int, _) => {
×
203
                // Update aggregators_db with new val and its occurrence
×
204
                let new_val = &Field::to_int(new).unwrap();
2✔
205
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
2✔
206
                let old_val = &Field::to_int(old).unwrap();
2✔
207
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
2✔
208

2✔
209
                // Calculate average
2✔
210
                let maximum = (try_unwrap!(Self::calc_i64_max(ptx, aggregators_db))).to_be_bytes();
2✔
211
                Ok(AggregationResult::new(
2✔
212
                    Self::get_value(&maximum, return_type),
2✔
213
                    Some(Vec::from(maximum)),
2✔
214
                ))
2✔
215
            }
×
216
            (FieldType::UInt, _) => {
×
217
                // Update aggregators_db with new val and its occurrence
×
218
                let new_val = &Field::to_uint(new).unwrap();
1✔
219
                Self::update_aggregator_db(to_bytes!(new_val), 1, false, ptx, aggregators_db);
1✔
220
                let old_val = &Field::to_uint(old).unwrap();
1✔
221
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
1✔
222

1✔
223
                // Calculate average
1✔
224
                let maximum = (try_unwrap!(Self::calc_u64_max(ptx, aggregators_db))).to_be_bytes();
1✔
225
                Ok(AggregationResult::new(
1✔
226
                    Self::get_value(&maximum, return_type),
1✔
227
                    Some(Vec::from(maximum)),
1✔
228
                ))
1✔
229
            }
×
230
            (FieldType::Timestamp, _) => {
×
231
                // Update aggregators_db with new val and its occurrence
×
232
                let new_val = &Field::to_timestamp(new)
2✔
233
                    .unwrap()
2✔
234
                    .timestamp_millis()
2✔
235
                    .to_be_bytes();
2✔
236
                Self::update_aggregator_db(new_val.as_slice(), 1, false, ptx, aggregators_db);
2✔
237
                let old_val = &Field::to_timestamp(old)
2✔
238
                    .unwrap()
2✔
239
                    .timestamp_millis()
2✔
240
                    .to_be_bytes();
2✔
241
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
2✔
242

2✔
243
                // Calculate maximum
2✔
244
                let maximum = try_unwrap!(Self::calc_timestamp_max(ptx, aggregators_db));
2✔
245
                let min_datetime: DateTime<FixedOffset> =
2✔
246
                    DateTime::from(DateTime::<FixedOffset>::MIN_UTC);
2✔
247
                if maximum == min_datetime {
2✔
248
                    Ok(AggregationResult::new(Field::Null, None))
×
249
                } else {
×
250
                    Ok(AggregationResult::new(
2✔
251
                        Self::get_value(
2✔
252
                            maximum.timestamp_millis().to_be_bytes().as_slice(),
2✔
253
                            return_type,
2✔
254
                        ),
2✔
255
                        Some(Vec::from(maximum.timestamp_millis().to_be_bytes())),
2✔
256
                    ))
2✔
257
                }
×
258
            }
259
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
260
        }
×
261
    }
11✔
262

×
263
    pub(crate) fn delete(
34✔
264
        _cur_state: Option<&[u8]>,
34✔
265
        old: &Field,
34✔
266
        return_type: FieldType,
34✔
267
        ptx: &mut PrefixTransaction,
34✔
268
        aggregators_db: Database,
34✔
269
    ) -> Result<AggregationResult, PipelineError> {
34✔
270
        match (return_type, old) {
34✔
271
            (FieldType::Date, _) => {
×
272
                // Update aggregators_db with new val and its occurrence
273
                let old_val = &Field::to_date(old).unwrap().to_string();
6✔
274
                Self::update_aggregator_db(old_val.as_bytes(), 1, true, ptx, aggregators_db);
6✔
275

6✔
276
                // Calculate minimum
6✔
277
                let maximum = try_unwrap!(Self::calc_date_max(ptx, aggregators_db));
6✔
278
                let min_date = NaiveDate::MIN;
6✔
279
                if maximum == min_date {
6✔
280
                    Ok(AggregationResult::new(Field::Null, None))
3✔
281
                } else {
×
282
                    Ok(AggregationResult::new(
3✔
283
                        Self::get_value(maximum.to_string().as_bytes(), return_type),
3✔
284
                        Some(Vec::from(maximum.to_string().as_bytes())),
3✔
285
                    ))
3✔
286
                }
×
287
            }
288
            (FieldType::Decimal, _) => {
289
                // Update aggregators_db with new val and its occurrence
290
                let old_val = &Field::to_decimal(old).unwrap().serialize();
6✔
291
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
292

6✔
293
                // Calculate minimum
6✔
294
                let maximum = try_unwrap!(Self::calc_decimal_max(ptx, aggregators_db));
6✔
295
                if maximum == dozer_types::rust_decimal::Decimal::MIN {
6✔
296
                    Ok(AggregationResult::new(Field::Null, None))
3✔
297
                } else {
×
298
                    Ok(AggregationResult::new(
3✔
299
                        Self::get_value(maximum.serialize().as_slice(), return_type),
3✔
300
                        Some(Vec::from(maximum.serialize())),
3✔
301
                    ))
3✔
302
                }
×
303
            }
304
            (FieldType::Float, _) => {
305
                // Update aggregators_db with new val and its occurrence
306
                let old_val = &OrderedFloat(Field::to_float(old).unwrap());
6✔
307
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
308

6✔
309
                // Calculate average
6✔
310
                let maximum = try_unwrap!(Self::calc_f64_max(ptx, aggregators_db));
6✔
311
                if maximum == f64::MIN {
6✔
312
                    Ok(AggregationResult::new(Field::Null, None))
3✔
313
                } else {
×
314
                    Ok(AggregationResult::new(
3✔
315
                        Self::get_value(&maximum.to_be_bytes(), return_type),
3✔
316
                        Some(Vec::from(maximum.to_be_bytes())),
3✔
317
                    ))
3✔
318
                }
×
319
            }
320
            (FieldType::Int, _) => {
321
                // Update aggregators_db with new val and its occurrence
322
                let old_val = &Field::to_int(old).unwrap();
6✔
323
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
6✔
324

6✔
325
                // Calculate average
6✔
326
                let maximum = try_unwrap!(Self::calc_i64_max(ptx, aggregators_db));
6✔
327
                if maximum == i64::MIN {
6✔
328
                    Ok(AggregationResult::new(Field::Null, None))
3✔
329
                } else {
×
330
                    Ok(AggregationResult::new(
3✔
331
                        Self::get_value(&maximum.to_be_bytes(), return_type),
3✔
332
                        Some(Vec::from(maximum.to_be_bytes())),
3✔
333
                    ))
3✔
334
                }
×
335
            }
336
            (FieldType::UInt, _) => {
337
                // Update aggregators_db with new val and its occurrence
338
                let old_val = &Field::to_uint(old).unwrap();
4✔
339
                Self::update_aggregator_db(to_bytes!(old_val), 1, true, ptx, aggregators_db);
4✔
340

4✔
341
                // Calculate average
4✔
342
                let maximum = try_unwrap!(Self::calc_u64_max(ptx, aggregators_db));
4✔
343
                if maximum == u64::MIN {
4✔
344
                    Ok(AggregationResult::new(Field::Null, None))
2✔
345
                } else {
×
346
                    Ok(AggregationResult::new(
2✔
347
                        Self::get_value(&maximum.to_be_bytes(), return_type),
2✔
348
                        Some(Vec::from(maximum.to_be_bytes())),
2✔
349
                    ))
2✔
350
                }
×
351
            }
352
            (FieldType::Timestamp, _) => {
353
                // Update aggregators_db with new val and its occurrence
354
                let old_val = &Field::to_timestamp(old)
6✔
355
                    .unwrap()
6✔
356
                    .timestamp_millis()
6✔
357
                    .to_be_bytes();
6✔
358
                Self::update_aggregator_db(old_val.as_slice(), 1, true, ptx, aggregators_db);
6✔
359

6✔
360
                // Calculate maximum
6✔
361
                let maximum = try_unwrap!(Self::calc_timestamp_max(ptx, aggregators_db));
6✔
362
                let min_datetime: DateTime<FixedOffset> =
6✔
363
                    DateTime::from(DateTime::<FixedOffset>::MIN_UTC);
6✔
364
                if maximum == min_datetime {
6✔
365
                    Ok(AggregationResult::new(Field::Null, None))
3✔
366
                } else {
367
                    Ok(AggregationResult::new(
3✔
368
                        Self::get_value(
3✔
369
                            maximum.timestamp_millis().to_be_bytes().as_slice(),
3✔
370
                            return_type,
3✔
371
                        ),
3✔
372
                        Some(Vec::from(maximum.timestamp_millis().to_be_bytes())),
3✔
373
                    ))
3✔
374
                }
375
            }
376
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
377
        }
378
    }
34✔
379

380
    pub(crate) fn get_value(f: &[u8], from: FieldType) -> Field {
62✔
381
        match from {
62✔
382
            FieldType::Date => Date(
11✔
383
                NaiveDate::parse_from_str(
11✔
384
                    String::from_utf8(deserialize!(f)).unwrap().as_ref(),
11✔
385
                    DATE_FORMAT,
11✔
386
                )
11✔
387
                .unwrap(),
11✔
388
            ),
11✔
389
            FieldType::Decimal => Decimal(dozer_types::rust_decimal::Decimal::deserialize(
11✔
390
                deserialize!(f),
11✔
391
            )),
11✔
392
            FieldType::Float => Float(OrderedFloat(f64::from_be_bytes(deserialize!(f)))),
11✔
393
            FieldType::Int => Int(i64::from_be_bytes(deserialize!(f))),
11✔
394
            FieldType::UInt => UInt(u64::from_be_bytes(deserialize!(f))),
7✔
395
            FieldType::Timestamp => Timestamp(DateTime::from(
11✔
396
                Utc.timestamp_millis(i64::from_be_bytes(deserialize!(f))),
11✔
397
            )),
11✔
398
            _ => Field::Null,
×
399
        }
400
    }
62✔
401

402
    fn update_aggregator_db(
90✔
403
        key: &[u8],
90✔
404
        val_delta: u8,
90✔
405
        decr: bool,
90✔
406
        ptx: &mut PrefixTransaction,
90✔
407
        aggregators_db: Database,
90✔
408
    ) {
90✔
409
        let get_prev_count = try_unwrap!(ptx.get(aggregators_db, key));
90✔
410
        let prev_count = deserialize_u8!(get_prev_count);
90✔
411
        let mut new_count = prev_count;
90✔
412
        if decr {
90✔
413
            new_count = new_count.wrapping_sub(val_delta);
45✔
414
        } else {
45✔
415
            new_count = new_count.wrapping_add(val_delta);
45✔
416
        }
45✔
417
        if new_count < 1 {
90✔
418
            try_unwrap!(ptx.del(aggregators_db, key, Option::from(to_bytes!(prev_count))));
34✔
419
        } else {
56✔
420
            try_unwrap!(ptx.put(aggregators_db, key, to_bytes!(new_count)));
56✔
421
        }
56✔
422
    }
90✔
423

424
    fn calc_f64_max(
14✔
425
        ptx: &mut PrefixTransaction,
14✔
426
        aggregators_db: Database,
14✔
427
    ) -> Result<f64, PipelineError> {
14✔
428
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
429
        let mut maximum = f64::MIN;
14✔
430

14✔
431
        // get first to get the maximum
14✔
432
        if ptx_cur.last()? {
14✔
433
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
434
            maximum = f64::from_be_bytes(deserialize!(cur.0));
11✔
435
        }
11✔
436
        Ok(maximum)
14✔
437
    }
14✔
438

439
    fn calc_decimal_max(
14✔
440
        ptx: &mut PrefixTransaction,
14✔
441
        aggregators_db: Database,
14✔
442
    ) -> Result<dozer_types::rust_decimal::Decimal, PipelineError> {
14✔
443
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
444
        let mut maximum = dozer_types::rust_decimal::Decimal::MIN;
14✔
445

14✔
446
        // get first to get the minimum
14✔
447
        if ptx_cur.last()? {
14✔
448
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
449
            maximum = dozer_types::rust_decimal::Decimal::deserialize(deserialize!(cur.0));
11✔
450
        }
11✔
451
        Ok(maximum)
14✔
452
    }
14✔
453

454
    fn calc_timestamp_max(
14✔
455
        ptx: &mut PrefixTransaction,
14✔
456
        aggregators_db: Database,
14✔
457
    ) -> Result<DateTime<FixedOffset>, PipelineError> {
14✔
458
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
459
        let mut maximum = DateTime::<FixedOffset>::MIN_UTC;
14✔
460

14✔
461
        // get first to get the minimum
14✔
462
        if ptx_cur.last()? {
14✔
463
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
464
            maximum = Utc.timestamp_millis(i64::from_be_bytes(deserialize!(cur.0)));
11✔
465
        }
11✔
466
        Ok(DateTime::from(maximum))
14✔
467
    }
14✔
468

469
    fn calc_date_max(
14✔
470
        ptx: &mut PrefixTransaction,
14✔
471
        aggregators_db: Database,
14✔
472
    ) -> Result<NaiveDate, PipelineError> {
14✔
473
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
474
        let mut maximum = NaiveDate::MIN;
14✔
475

14✔
476
        // get first to get the minimum
14✔
477
        if ptx_cur.last()? {
14✔
478
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
479
            maximum = NaiveDate::parse_from_str(
11✔
480
                String::from_utf8(deserialize!(cur.0)).unwrap().as_ref(),
11✔
481
                DATE_FORMAT,
11✔
482
            )
11✔
483
            .unwrap();
11✔
484
        }
11✔
485
        Ok(maximum)
14✔
486
    }
14✔
487

488
    fn calc_i64_max(
14✔
489
        ptx: &mut PrefixTransaction,
14✔
490
        aggregators_db: Database,
14✔
491
    ) -> Result<i64, PipelineError> {
14✔
492
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
14✔
493
        let mut maximum = i64::MIN;
14✔
494

14✔
495
        // get first to get the maximum
14✔
496
        if ptx_cur.last()? {
14✔
497
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
11✔
498
            maximum = i64::from_be_bytes(deserialize!(cur.0));
11✔
499
        }
11✔
500
        Ok(maximum)
14✔
501
    }
14✔
502

503
    fn calc_u64_max(
9✔
504
        ptx: &mut PrefixTransaction,
9✔
505
        aggregators_db: Database,
9✔
506
    ) -> Result<u64, PipelineError> {
9✔
507
        let ptx_cur = ptx.open_cursor(aggregators_db)?;
9✔
508
        let mut maximum = u64::MIN;
9✔
509

9✔
510
        // get first to get the maximum
9✔
511
        if ptx_cur.last()? {
9✔
512
            let cur = try_unwrap!(ptx_cur.read()).unwrap();
7✔
513
            maximum = u64::from_be_bytes(deserialize!(cur.0));
7✔
514
        }
7✔
515
        Ok(maximum)
9✔
516
    }
9✔
517
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc