• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.06
/dozer-sql/src/pipeline/aggregation/sum.rs
1
use crate::pipeline::aggregation::aggregator::AggregationResult;
2
use crate::pipeline::errors::PipelineError;
3
use crate::pipeline::errors::PipelineError::InvalidOperandType;
4
use crate::{deserialize, deserialize_decimal, deserialize_f64, deserialize_i64, deserialize_u64};
5
use dozer_core::storage::prefix_transaction::PrefixTransaction;
6
use dozer_types::ordered_float::OrderedFloat;
7
use dozer_types::rust_decimal::Decimal;
8
use dozer_types::types::{Field, FieldType};
9
use std::ops::{Add, Sub};
10

11
pub struct SumAggregator {}
12
const AGGREGATOR_NAME: &str = "SUM";
13

14
impl SumAggregator {
15
    const _AGGREGATOR_ID: u32 = 0x01;
16

17
    pub(crate) fn get_return_type(from: FieldType) -> FieldType {
1✔
18
        match from {
1✔
19
            FieldType::Decimal => FieldType::Decimal,
×
20
            FieldType::Float => FieldType::Float,
×
21
            FieldType::Int => FieldType::Int,
1✔
22
            FieldType::UInt => FieldType::UInt,
×
23
            _ => from,
×
24
        }
×
25
    }
1✔
26

×
27
    pub(crate) fn _get_type() -> u32 {
×
28
        SumAggregator::_AGGREGATOR_ID
×
29
    }
×
30

31
    pub(crate) fn insert(
23✔
32
        cur_state: Option<&[u8]>,
23✔
33
        new: &Field,
23✔
34
        return_type: FieldType,
23✔
35
        _txn: &mut PrefixTransaction,
23✔
36
    ) -> Result<AggregationResult, PipelineError> {
23✔
37
        match return_type {
23✔
38
            FieldType::Decimal => {
×
39
                let prev = deserialize_decimal!(cur_state);
6✔
40
                let curr = &Field::to_decimal(new).unwrap();
6✔
41
                let r_bytes = (prev.add(curr)).serialize();
6✔
42
                Ok(AggregationResult::new(
6✔
43
                    Self::get_value(&r_bytes, return_type)?,
6✔
44
                    Some(Vec::from(r_bytes)),
6✔
45
                ))
×
46
            }
×
47
            FieldType::Float => {
×
48
                let prev = OrderedFloat::from(deserialize_f64!(cur_state));
6✔
49
                let curr = &OrderedFloat(Field::to_float(new).unwrap());
6✔
50
                let r_bytes = (prev + *curr).to_be_bytes();
6✔
51
                Ok(AggregationResult::new(
6✔
52
                    Self::get_value(&r_bytes, return_type)?,
6✔
53
                    Some(Vec::from(r_bytes)),
6✔
54
                ))
×
55
            }
×
56
            FieldType::Int => {
×
57
                let prev = deserialize_i64!(cur_state);
7✔
58
                let curr = &Field::to_int(new).unwrap();
7✔
59
                let r_bytes = (prev + *curr).to_be_bytes();
7✔
60
                Ok(AggregationResult::new(
7✔
61
                    Self::get_value(&r_bytes, return_type)?,
7✔
62
                    Some(Vec::from(r_bytes)),
7✔
63
                ))
×
64
            }
×
65
            FieldType::UInt => {
×
66
                let prev = deserialize_u64!(cur_state);
4✔
67
                let curr = &Field::to_uint(new).unwrap();
4✔
68
                let r_bytes = (prev + *curr).to_be_bytes();
4✔
69
                Ok(AggregationResult::new(
4✔
70
                    Self::get_value(&r_bytes, return_type)?,
4✔
71
                    Some(Vec::from(r_bytes)),
4✔
72
                ))
×
73
            }
×
74
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
75
        }
×
76
    }
23✔
77

78
    pub(crate) fn update(
7✔
79
        cur_state: Option<&[u8]>,
7✔
80
        old: &Field,
7✔
81
        new: &Field,
7✔
82
        return_type: FieldType,
7✔
83
        _txn: &mut PrefixTransaction,
7✔
84
    ) -> Result<AggregationResult, PipelineError> {
7✔
85
        match return_type {
7✔
86
            FieldType::Decimal => {
×
87
                let prev = deserialize_decimal!(cur_state);
2✔
88
                let curr_del = &Field::to_decimal(old).unwrap();
2✔
89
                let curr_added = &Field::to_decimal(new).unwrap();
2✔
90
                let r_bytes = prev.sub(curr_del).add(curr_added).serialize();
2✔
91
                Ok(AggregationResult::new(
2✔
92
                    Self::get_value(&r_bytes, return_type)?,
2✔
93
                    Some(Vec::from(r_bytes)),
2✔
94
                ))
×
95
            }
×
96
            FieldType::Float => {
×
97
                let prev = OrderedFloat::from(deserialize_f64!(cur_state));
2✔
98
                let curr_del = &OrderedFloat(Field::to_float(old).unwrap());
2✔
99
                let curr_added = &OrderedFloat(Field::to_float(new).unwrap());
2✔
100
                let r_bytes = (prev - *curr_del + *curr_added).to_be_bytes();
2✔
101
                Ok(AggregationResult::new(
2✔
102
                    Self::get_value(&r_bytes, return_type)?,
2✔
103
                    Some(Vec::from(r_bytes)),
2✔
104
                ))
×
105
            }
×
106
            FieldType::Int => {
×
107
                let prev = deserialize_i64!(cur_state);
2✔
108
                let curr_del = &Field::to_int(old).unwrap();
2✔
109
                let curr_added = &Field::to_int(new).unwrap();
2✔
110
                let r_bytes = (prev - *curr_del + *curr_added).to_be_bytes();
2✔
111
                Ok(AggregationResult::new(
2✔
112
                    Self::get_value(&r_bytes, return_type)?,
2✔
113
                    Some(Vec::from(r_bytes)),
2✔
114
                ))
×
115
            }
×
116
            FieldType::UInt => {
×
117
                let prev = deserialize_u64!(cur_state);
1✔
118
                let curr_del = &Field::to_uint(old).unwrap();
1✔
119
                let curr_added = &Field::to_uint(new).unwrap();
1✔
120
                let r_bytes = (prev - *curr_del + *curr_added).to_be_bytes();
1✔
121
                Ok(AggregationResult::new(
1✔
122
                    Self::get_value(&r_bytes, return_type)?,
1✔
123
                    Some(Vec::from(r_bytes)),
1✔
124
                ))
×
125
            }
×
126
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
127
        }
×
128
    }
7✔
129

130
    pub(crate) fn delete(
22✔
131
        cur_state: Option<&[u8]>,
22✔
132
        old: &Field,
22✔
133
        return_type: FieldType,
22✔
134
        _txn: &mut PrefixTransaction,
22✔
135
    ) -> Result<AggregationResult, PipelineError> {
22✔
136
        match return_type {
22✔
137
            FieldType::Decimal => {
×
138
                let prev = deserialize_decimal!(cur_state);
6✔
139
                let curr = &Field::to_decimal(old).unwrap();
6✔
140
                let r_bytes = (prev.sub(curr)).serialize();
6✔
141
                Ok(AggregationResult::new(
6✔
142
                    Self::get_value(&r_bytes, return_type)?,
6✔
143
                    Some(Vec::from(r_bytes)),
6✔
144
                ))
×
145
            }
×
146
            FieldType::Float => {
×
147
                let prev = OrderedFloat::from(deserialize_f64!(cur_state));
6✔
148
                let curr = &OrderedFloat(Field::to_float(old).unwrap());
6✔
149
                let r_bytes = (prev - *curr).to_be_bytes();
6✔
150
                Ok(AggregationResult::new(
6✔
151
                    Self::get_value(&r_bytes, return_type)?,
6✔
152
                    Some(Vec::from(r_bytes)),
6✔
153
                ))
×
154
            }
×
155
            FieldType::Int => {
×
156
                let prev = deserialize_i64!(cur_state);
6✔
157
                let curr = &Field::to_int(old).unwrap();
6✔
158
                let r_bytes = (prev - *curr).to_be_bytes();
6✔
159
                Ok(AggregationResult::new(
6✔
160
                    Self::get_value(&r_bytes, return_type)?,
6✔
161
                    Some(Vec::from(r_bytes)),
6✔
162
                ))
×
163
            }
×
164
            FieldType::UInt => {
×
165
                let prev = deserialize_u64!(cur_state);
4✔
166
                let curr = &Field::to_uint(old).unwrap();
4✔
167
                let r_bytes = (prev - *curr).to_be_bytes();
4✔
168
                Ok(AggregationResult::new(
4✔
169
                    Self::get_value(&r_bytes, return_type)?,
4✔
170
                    Some(Vec::from(r_bytes)),
4✔
171
                ))
×
172
            }
×
173
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
174
        }
×
175
    }
22✔
176

177
    pub(crate) fn get_value(f: &[u8], from: FieldType) -> Result<Field, PipelineError> {
52✔
178
        match from {
52✔
179
            FieldType::Decimal => Ok(Field::Decimal(Decimal::deserialize(deserialize!(f)))),
14✔
180
            FieldType::Float => Ok(Field::Float(OrderedFloat(f64::from_be_bytes(
14✔
181
                deserialize!(f),
14✔
182
            )))),
14✔
183
            FieldType::Int => Ok(Field::Int(i64::from_be_bytes(deserialize!(f)))),
15✔
184
            FieldType::UInt => Ok(Field::UInt(u64::from_be_bytes(deserialize!(f)))),
9✔
185
            _ => Err(PipelineError::DataTypeMismatch),
×
186
        }
×
187
    }
52✔
188
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc