• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.83
/dozer-sql/src/pipeline/aggregation/sum.rs
1
use crate::pipeline::aggregation::aggregator::AggregationResult;
2
use crate::pipeline::errors::PipelineError;
3
use crate::pipeline::errors::PipelineError::InvalidOperandType;
4
use crate::{deserialize, deserialize_decimal, deserialize_f64, deserialize_i64, deserialize_u64};
5
use dozer_core::storage::prefix_transaction::PrefixTransaction;
6
use dozer_types::ordered_float::OrderedFloat;
7
use dozer_types::rust_decimal::Decimal;
8
use dozer_types::types::{Field, FieldType};
9
use std::ops::{Add, Sub};
10

11
pub struct SumAggregator {}
12
const AGGREGATOR_NAME: &str = "SUM";
13

14
impl SumAggregator {
15
    const _AGGREGATOR_ID: u32 = 0x01;
16

17
    pub(crate) fn _get_type() -> u32 {
×
18
        SumAggregator::_AGGREGATOR_ID
×
19
    }
×
20

×
21
    pub(crate) fn insert(
25✔
22
        cur_state: Option<&[u8]>,
25✔
23
        new: &Field,
25✔
24
        return_type: FieldType,
25✔
25
        _txn: &mut PrefixTransaction,
25✔
26
    ) -> Result<AggregationResult, PipelineError> {
25✔
27
        match return_type {
25✔
28
            FieldType::Decimal => {
×
29
                let prev = deserialize_decimal!(cur_state);
6✔
30
                let curr = &Field::to_decimal(new).unwrap();
6✔
31
                let r_bytes = (prev.add(curr)).serialize();
6✔
32
                Ok(AggregationResult::new(
6✔
33
                    Self::get_value(&r_bytes, return_type)?,
6✔
34
                    Some(Vec::from(r_bytes)),
6✔
35
                ))
×
36
            }
×
37
            FieldType::Float => {
×
38
                let prev = OrderedFloat::from(deserialize_f64!(cur_state));
6✔
39
                let curr = &OrderedFloat(Field::to_float(new).unwrap());
6✔
40
                let r_bytes = (prev + *curr).to_be_bytes();
6✔
41
                Ok(AggregationResult::new(
6✔
42
                    Self::get_value(&r_bytes, return_type)?,
6✔
43
                    Some(Vec::from(r_bytes)),
6✔
44
                ))
×
45
            }
46
            FieldType::Int => {
47
                let prev = deserialize_i64!(cur_state);
9✔
48
                let curr = &Field::to_int(new).unwrap();
9✔
49
                let r_bytes = (prev + *curr).to_be_bytes();
9✔
50
                Ok(AggregationResult::new(
9✔
51
                    Self::get_value(&r_bytes, return_type)?,
9✔
52
                    Some(Vec::from(r_bytes)),
9✔
53
                ))
×
54
            }
55
            FieldType::UInt => {
56
                let prev = deserialize_u64!(cur_state);
4✔
57
                let curr = &Field::to_uint(new).unwrap();
4✔
58
                let r_bytes = (prev + *curr).to_be_bytes();
4✔
59
                Ok(AggregationResult::new(
4✔
60
                    Self::get_value(&r_bytes, return_type)?,
4✔
61
                    Some(Vec::from(r_bytes)),
4✔
62
                ))
×
63
            }
64
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
65
        }
66
    }
25✔
67

×
68
    pub(crate) fn update(
7✔
69
        cur_state: Option<&[u8]>,
7✔
70
        old: &Field,
7✔
71
        new: &Field,
7✔
72
        return_type: FieldType,
7✔
73
        _txn: &mut PrefixTransaction,
7✔
74
    ) -> Result<AggregationResult, PipelineError> {
7✔
75
        match return_type {
7✔
76
            FieldType::Decimal => {
×
77
                let prev = deserialize_decimal!(cur_state);
2✔
78
                let curr_del = &Field::to_decimal(old).unwrap();
2✔
79
                let curr_added = &Field::to_decimal(new).unwrap();
2✔
80
                let r_bytes = prev.sub(curr_del).add(curr_added).serialize();
2✔
81
                Ok(AggregationResult::new(
2✔
82
                    Self::get_value(&r_bytes, return_type)?,
2✔
83
                    Some(Vec::from(r_bytes)),
2✔
84
                ))
×
85
            }
×
86
            FieldType::Float => {
87
                let prev = OrderedFloat::from(deserialize_f64!(cur_state));
2✔
88
                let curr_del = &OrderedFloat(Field::to_float(old).unwrap());
2✔
89
                let curr_added = &OrderedFloat(Field::to_float(new).unwrap());
2✔
90
                let r_bytes = (prev - *curr_del + *curr_added).to_be_bytes();
2✔
91
                Ok(AggregationResult::new(
2✔
92
                    Self::get_value(&r_bytes, return_type)?,
2✔
93
                    Some(Vec::from(r_bytes)),
2✔
94
                ))
95
            }
96
            FieldType::Int => {
97
                let prev = deserialize_i64!(cur_state);
2✔
98
                let curr_del = &Field::to_int(old).unwrap();
2✔
99
                let curr_added = &Field::to_int(new).unwrap();
2✔
100
                let r_bytes = (prev - *curr_del + *curr_added).to_be_bytes();
2✔
101
                Ok(AggregationResult::new(
2✔
102
                    Self::get_value(&r_bytes, return_type)?,
2✔
103
                    Some(Vec::from(r_bytes)),
2✔
104
                ))
105
            }
106
            FieldType::UInt => {
107
                let prev = deserialize_u64!(cur_state);
1✔
108
                let curr_del = &Field::to_uint(old).unwrap();
1✔
109
                let curr_added = &Field::to_uint(new).unwrap();
1✔
110
                let r_bytes = (prev - *curr_del + *curr_added).to_be_bytes();
1✔
111
                Ok(AggregationResult::new(
1✔
112
                    Self::get_value(&r_bytes, return_type)?,
1✔
113
                    Some(Vec::from(r_bytes)),
1✔
114
                ))
115
            }
116
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
117
        }
×
118
    }
7✔
119

×
120
    pub(crate) fn delete(
22✔
121
        cur_state: Option<&[u8]>,
22✔
122
        old: &Field,
22✔
123
        return_type: FieldType,
22✔
124
        _txn: &mut PrefixTransaction,
22✔
125
    ) -> Result<AggregationResult, PipelineError> {
22✔
126
        match return_type {
22✔
127
            FieldType::Decimal => {
128
                let prev = deserialize_decimal!(cur_state);
6✔
129
                let curr = &Field::to_decimal(old).unwrap();
6✔
130
                let r_bytes = (prev.sub(curr)).serialize();
6✔
131
                Ok(AggregationResult::new(
6✔
132
                    Self::get_value(&r_bytes, return_type)?,
6✔
133
                    Some(Vec::from(r_bytes)),
6✔
134
                ))
×
135
            }
×
136
            FieldType::Float => {
×
137
                let prev = OrderedFloat::from(deserialize_f64!(cur_state));
6✔
138
                let curr = &OrderedFloat(Field::to_float(old).unwrap());
6✔
139
                let r_bytes = (prev - *curr).to_be_bytes();
6✔
140
                Ok(AggregationResult::new(
6✔
141
                    Self::get_value(&r_bytes, return_type)?,
6✔
142
                    Some(Vec::from(r_bytes)),
6✔
143
                ))
×
144
            }
145
            FieldType::Int => {
146
                let prev = deserialize_i64!(cur_state);
6✔
147
                let curr = &Field::to_int(old).unwrap();
6✔
148
                let r_bytes = (prev - *curr).to_be_bytes();
6✔
149
                Ok(AggregationResult::new(
6✔
150
                    Self::get_value(&r_bytes, return_type)?,
6✔
151
                    Some(Vec::from(r_bytes)),
6✔
152
                ))
×
153
            }
154
            FieldType::UInt => {
155
                let prev = deserialize_u64!(cur_state);
4✔
156
                let curr = &Field::to_uint(old).unwrap();
4✔
157
                let r_bytes = (prev - *curr).to_be_bytes();
4✔
158
                Ok(AggregationResult::new(
4✔
159
                    Self::get_value(&r_bytes, return_type)?,
4✔
160
                    Some(Vec::from(r_bytes)),
4✔
161
                ))
×
162
            }
163
            _ => Err(InvalidOperandType(AGGREGATOR_NAME.to_string())),
×
164
        }
165
    }
22✔
166

×
167
    pub(crate) fn get_value(f: &[u8], from: FieldType) -> Result<Field, PipelineError> {
54✔
168
        match from {
54✔
169
            FieldType::Decimal => Ok(Field::Decimal(Decimal::deserialize(deserialize!(f)))),
14✔
170
            FieldType::Float => Ok(Field::Float(OrderedFloat(f64::from_be_bytes(
14✔
171
                deserialize!(f),
14✔
172
            )))),
14✔
173
            FieldType::Int => Ok(Field::Int(i64::from_be_bytes(deserialize!(f)))),
17✔
174
            FieldType::UInt => Ok(Field::UInt(u64::from_be_bytes(deserialize!(f)))),
9✔
175
            _ => Err(PipelineError::DataTypeMismatch),
×
176
        }
177
    }
54✔
178
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc