• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4763034755

pending completion
4763034755

Pull #1460

github

GitHub
Merge 2e63b376c into c58df4a0b
Pull Request #1460: Update init.rs

1 of 1 new or added line in 1 file covered. (100.0%)

34417 of 43846 relevant lines covered (78.5%)

12365.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.92
/dozer-sql/src/pipeline/aggregation/sum.rs
1
use crate::pipeline::aggregation::aggregator::Aggregator;
2
use crate::pipeline::errors::{FieldTypes, PipelineError};
3
use crate::pipeline::expression::aggregate::AggregateFunctionType;
4
use crate::pipeline::expression::aggregate::AggregateFunctionType::Sum;
5
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor, ExpressionType};
6
use crate::{argv, calculate_err_field};
7
use dozer_core::errors::ExecutionError::InvalidType;
8
use dozer_types::ordered_float::OrderedFloat;
9
use dozer_types::rust_decimal::Decimal;
10
use dozer_types::types::{DozerDuration, Field, FieldType, Schema, SourceDefinition, TimeUnit};
11
use num_traits::FromPrimitive;
12

13
pub fn validate_sum(args: &[Expression], schema: &Schema) -> Result<ExpressionType, PipelineError> {
123✔
14
    let arg = &argv!(args, 0, AggregateFunctionType::Sum)?.get_type(schema)?;
123✔
15

16
    let ret_type = match arg.return_type {
123✔
17
        FieldType::UInt => FieldType::UInt,
2✔
18
        FieldType::U128 => FieldType::U128,
×
19
        FieldType::Int => FieldType::Int,
41✔
20
        FieldType::I128 => FieldType::I128,
×
21
        FieldType::Float => FieldType::Float,
4✔
22
        FieldType::Decimal => FieldType::Decimal,
72✔
23
        FieldType::Duration => FieldType::Duration,
4✔
24
        FieldType::Boolean
25
        | FieldType::String
26
        | FieldType::Text
27
        | FieldType::Date
28
        | FieldType::Timestamp
29
        | FieldType::Binary
30
        | FieldType::Bson
31
        | FieldType::Point => {
32
            return Err(PipelineError::InvalidFunctionArgumentType(
×
33
                Sum.to_string(),
×
34
                arg.return_type,
×
35
                FieldTypes::new(vec![
×
36
                    FieldType::UInt,
×
37
                    FieldType::U128,
×
38
                    FieldType::Int,
×
39
                    FieldType::I128,
×
40
                    FieldType::Float,
×
41
                    FieldType::Decimal,
×
42
                    FieldType::Duration,
×
43
                ]),
×
44
                0,
×
45
            ));
×
46
        }
47
    };
48
    Ok(ExpressionType::new(
123✔
49
        ret_type,
123✔
50
        true,
123✔
51
        SourceDefinition::Dynamic,
123✔
52
        false,
123✔
53
    ))
123✔
54
}
123✔
55

56
#[derive(Debug)]
×
57
pub struct SumAggregator {
58
    current_state: SumState,
59
    return_type: Option<FieldType>,
60
}
61

62
#[derive(Debug)]
×
63
pub struct SumState {
64
    pub(crate) int_state: i64,
65
    pub(crate) i128_state: i128,
66
    pub(crate) uint_state: u64,
67
    pub(crate) u128_state: u128,
68
    pub(crate) float_state: f64,
69
    pub(crate) decimal_state: Decimal,
70
    pub(crate) duration_state: std::time::Duration,
71
}
72

73
impl SumAggregator {
74
    pub fn new() -> Self {
326✔
75
        Self {
326✔
76
            current_state: SumState {
326✔
77
                int_state: 0_i64,
326✔
78
                i128_state: 0_i128,
326✔
79
                uint_state: 0_u64,
326✔
80
                u128_state: 0_u128,
326✔
81
                float_state: 0_f64,
326✔
82
                decimal_state: Decimal::from_f64(0_f64).unwrap(),
326✔
83
                duration_state: std::time::Duration::new(0, 0),
326✔
84
            },
326✔
85
            return_type: None,
326✔
86
        }
326✔
87
    }
326✔
88
}
89

90
impl Aggregator for SumAggregator {
91
    fn init(&mut self, return_type: FieldType) {
324✔
92
        self.return_type = Some(return_type);
324✔
93
    }
324✔
94

95
    fn update(&mut self, old: &[Field], new: &[Field]) -> Result<Field, PipelineError> {
96
        self.delete(old)?;
44✔
97
        self.insert(new)
44✔
98
    }
44✔
99

100
    fn delete(&mut self, old: &[Field]) -> Result<Field, PipelineError> {
129✔
101
        get_sum(old, &mut self.current_state, self.return_type, true)
129✔
102
    }
129✔
103

104
    fn insert(&mut self, new: &[Field]) -> Result<Field, PipelineError> {
367✔
105
        get_sum(new, &mut self.current_state, self.return_type, false)
367✔
106
    }
367✔
107
}
108

109
pub fn get_sum(
993✔
110
    fields: &[Field],
993✔
111
    current_state: &mut SumState,
993✔
112
    return_type: Option<FieldType>,
993✔
113
    decr: bool,
993✔
114
) -> Result<Field, PipelineError> {
993✔
115
    match return_type {
993✔
116
        Some(typ) => match typ {
993✔
117
            FieldType::UInt => {
118
                if decr {
9✔
119
                    for field in fields {
10✔
120
                        let val = calculate_err_field!(field.to_uint(), Sum, field);
5✔
121
                        current_state.uint_state -= val;
5✔
122
                    }
123
                } else {
124
                    for field in fields {
9✔
125
                        let val = calculate_err_field!(field.to_uint(), Sum, field);
5✔
126
                        current_state.uint_state += val;
5✔
127
                    }
128
                }
129
                Ok(Field::UInt(current_state.uint_state))
10✔
130
            }
131
            FieldType::U128 => {
132
                if decr {
×
133
                    for field in fields {
×
134
                        let val = calculate_err_field!(field.to_u128(), Sum, field);
×
135
                        current_state.u128_state -= val;
×
136
                    }
137
                } else {
138
                    for field in fields {
×
139
                        let val = calculate_err_field!(field.to_u128(), Sum, field);
×
140
                        current_state.u128_state += val;
×
141
                    }
142
                }
143
                Ok(Field::U128(current_state.u128_state))
×
144
            }
145
            FieldType::Int => {
146
                if decr {
113✔
147
                    for field in fields {
52✔
148
                        let val = calculate_err_field!(field.to_int(), Sum, field);
26✔
149
                        current_state.int_state -= val;
26✔
150
                    }
151
                } else {
152
                    for field in fields {
174✔
153
                        let val = calculate_err_field!(field.to_int(), Sum, field);
87✔
154
                        current_state.int_state += val;
87✔
155
                    }
156
                }
157
                Ok(Field::Int(current_state.int_state))
113✔
158
            }
159
            FieldType::I128 => {
160
                if decr {
×
161
                    for field in fields {
×
162
                        let val = calculate_err_field!(field.to_i128(), Sum, field);
×
163
                        current_state.i128_state -= val;
×
164
                    }
165
                } else {
166
                    for field in fields {
×
167
                        let val = calculate_err_field!(field.to_i128(), Sum, field);
×
168
                        current_state.i128_state += val;
×
169
                    }
170
                }
171
                Ok(Field::I128(current_state.i128_state))
×
172
            }
173
            FieldType::Float => {
174
                if decr {
31✔
175
                    for field in fields {
32✔
176
                        let val = calculate_err_field!(field.to_float(), Sum, field);
16✔
177
                        current_state.float_state -= val;
16✔
178
                    }
179
                } else {
180
                    for field in fields {
31✔
181
                        let val = calculate_err_field!(field.to_float(), Sum, field);
16✔
182
                        current_state.float_state += val;
16✔
183
                    }
184
                }
185
                Ok(Field::Float(OrderedFloat::from(current_state.float_state)))
32✔
186
            }
187
            FieldType::Decimal => {
188
                if decr {
808✔
189
                    for field in fields {
506✔
190
                        let val = calculate_err_field!(field.to_decimal(), Sum, field);
254✔
191
                        current_state.decimal_state -= val;
254✔
192
                    }
193
                } else {
194
                    for field in fields {
1,115✔
195
                        let val = calculate_err_field!(field.to_decimal(), Sum, field);
559✔
196
                        current_state.decimal_state += val;
559✔
197
                    }
198
                }
199
                Ok(Field::Decimal(current_state.decimal_state))
813✔
200
            }
201
            FieldType::Duration => {
202
                if decr {
32✔
203
                    for field in fields {
32✔
204
                        let val = calculate_err_field!(field.to_duration()?, Sum, field);
16✔
205
                        current_state.duration_state -= val.0;
16✔
206
                    }
207
                } else {
208
                    for field in fields {
32✔
209
                        let val = calculate_err_field!(field.to_duration()?, Sum, field);
16✔
210
                        current_state.duration_state += val.0;
16✔
211
                    }
212
                }
213
                Ok(Field::Duration(DozerDuration(
32✔
214
                    current_state.duration_state,
32✔
215
                    TimeUnit::Nanoseconds,
32✔
216
                )))
32✔
217
            }
218
            FieldType::Boolean
219
            | FieldType::String
220
            | FieldType::Text
221
            | FieldType::Date
222
            | FieldType::Timestamp
223
            | FieldType::Binary
224
            | FieldType::Bson
225
            | FieldType::Point => Err(PipelineError::InternalExecutionError(InvalidType(format!(
×
226
                "Not supported return type {typ} for {Sum}"
×
227
            )))),
×
228
        },
229
        None => Err(PipelineError::InternalExecutionError(InvalidType(format!(
×
230
            "Not supported None return type for {Sum}"
×
231
        )))),
×
232
    }
233
}
1,000✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc