• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5600398145

pending completion
5600398145

push

github

web-flow
fix: Add missing flags and comments in list and logs commands (#1767)

42554 of 55342 relevant lines covered (76.89%)

43080.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.92
/dozer-sql/src/pipeline/aggregation/sum.rs
1
use crate::pipeline::aggregation::aggregator::Aggregator;
2
use crate::pipeline::errors::{FieldTypes, PipelineError};
3
use crate::pipeline::expression::aggregate::AggregateFunctionType;
4
use crate::pipeline::expression::aggregate::AggregateFunctionType::Sum;
5
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor, ExpressionType};
6
use crate::{argv, calculate_err_field};
7
use dozer_types::ordered_float::OrderedFloat;
8
use dozer_types::rust_decimal::Decimal;
9
use dozer_types::types::{DozerDuration, Field, FieldType, Schema, SourceDefinition, TimeUnit};
10
use num_traits::FromPrimitive;
11

12
pub fn validate_sum(args: &[Expression], schema: &Schema) -> Result<ExpressionType, PipelineError> {
108✔
13
    let arg = &argv!(args, 0, AggregateFunctionType::Sum)?.get_type(schema)?;
108✔
14

15
    let ret_type = match arg.return_type {
108✔
16
        FieldType::UInt => FieldType::UInt,
2✔
17
        FieldType::U128 => FieldType::U128,
×
18
        FieldType::Int => FieldType::Int,
38✔
19
        FieldType::I128 => FieldType::I128,
×
20
        FieldType::Float => FieldType::Float,
4✔
21
        FieldType::Decimal => FieldType::Decimal,
60✔
22
        FieldType::Duration => FieldType::Duration,
4✔
23
        FieldType::Boolean
24
        | FieldType::String
25
        | FieldType::Text
26
        | FieldType::Date
27
        | FieldType::Timestamp
28
        | FieldType::Binary
29
        | FieldType::Json
30
        | FieldType::Point => {
31
            return Err(PipelineError::InvalidFunctionArgumentType(
×
32
                Sum.to_string(),
×
33
                arg.return_type,
×
34
                FieldTypes::new(vec![
×
35
                    FieldType::UInt,
×
36
                    FieldType::U128,
×
37
                    FieldType::Int,
×
38
                    FieldType::I128,
×
39
                    FieldType::Float,
×
40
                    FieldType::Decimal,
×
41
                    FieldType::Duration,
×
42
                ]),
×
43
                0,
×
44
            ));
×
45
        }
46
    };
47
    Ok(ExpressionType::new(
108✔
48
        ret_type,
108✔
49
        true,
108✔
50
        SourceDefinition::Dynamic,
108✔
51
        false,
108✔
52
    ))
108✔
53
}
108✔
54

55
#[derive(Debug)]
×
56
pub struct SumAggregator {
57
    current_state: SumState,
58
    return_type: Option<FieldType>,
59
}
60

61
#[derive(Debug)]
×
62
pub struct SumState {
63
    pub(crate) int_state: i64,
64
    pub(crate) i128_state: i128,
65
    pub(crate) uint_state: u64,
66
    pub(crate) u128_state: u128,
67
    pub(crate) float_state: f64,
68
    pub(crate) decimal_state: Decimal,
69
    pub(crate) duration_state: std::time::Duration,
70
}
71

72
impl SumAggregator {
73
    pub fn new() -> Self {
271✔
74
        Self {
271✔
75
            current_state: SumState {
271✔
76
                int_state: 0_i64,
271✔
77
                i128_state: 0_i128,
271✔
78
                uint_state: 0_u64,
271✔
79
                u128_state: 0_u128,
271✔
80
                float_state: 0_f64,
271✔
81
                decimal_state: Decimal::from_f64(0_f64).unwrap(),
271✔
82
                duration_state: std::time::Duration::new(0, 0),
271✔
83
            },
271✔
84
            return_type: None,
271✔
85
        }
271✔
86
    }
271✔
87
}
88

89
impl Aggregator for SumAggregator {
90
    fn init(&mut self, return_type: FieldType) {
271✔
91
        self.return_type = Some(return_type);
271✔
92
    }
271✔
93

94
    fn update(&mut self, old: &[Field], new: &[Field]) -> Result<Field, PipelineError> {
95
        self.delete(old)?;
39✔
96
        self.insert(new)
39✔
97
    }
39✔
98

99
    fn delete(&mut self, old: &[Field]) -> Result<Field, PipelineError> {
115✔
100
        get_sum(old, &mut self.current_state, self.return_type, true)
115✔
101
    }
115✔
102

103
    fn insert(&mut self, new: &[Field]) -> Result<Field, PipelineError> {
310✔
104
        get_sum(new, &mut self.current_state, self.return_type, false)
310✔
105
    }
310✔
106
}
107

108
pub fn get_sum(
835✔
109
    fields: &[Field],
835✔
110
    current_state: &mut SumState,
835✔
111
    return_type: Option<FieldType>,
835✔
112
    decr: bool,
835✔
113
) -> Result<Field, PipelineError> {
835✔
114
    match return_type {
835✔
115
        Some(typ) => match typ {
835✔
116
            FieldType::UInt => {
117
                if decr {
6✔
118
                    for field in fields {
10✔
119
                        let val = calculate_err_field!(field.to_uint(), Sum, field);
5✔
120
                        current_state.uint_state -= val;
5✔
121
                    }
122
                } else {
123
                    for field in fields {
6✔
124
                        let val = calculate_err_field!(field.to_uint(), Sum, field);
5✔
125
                        current_state.uint_state += val;
5✔
126
                    }
127
                }
128
                Ok(Field::UInt(current_state.uint_state))
10✔
129
            }
130
            FieldType::U128 => {
131
                if decr {
×
132
                    for field in fields {
×
133
                        let val = calculate_err_field!(field.to_u128(), Sum, field);
×
134
                        current_state.u128_state -= val;
×
135
                    }
136
                } else {
137
                    for field in fields {
×
138
                        let val = calculate_err_field!(field.to_u128(), Sum, field);
×
139
                        current_state.u128_state += val;
×
140
                    }
141
                }
142
                Ok(Field::U128(current_state.u128_state))
×
143
            }
144
            FieldType::Int => {
145
                if decr {
103✔
146
                    for field in fields {
52✔
147
                        let val = calculate_err_field!(field.to_int(), Sum, field);
26✔
148
                        current_state.int_state -= val;
26✔
149
                    }
150
                } else {
151
                    for field in fields {
154✔
152
                        let val = calculate_err_field!(field.to_int(), Sum, field);
77✔
153
                        current_state.int_state += val;
77✔
154
                    }
155
                }
156
                Ok(Field::Int(current_state.int_state))
103✔
157
            }
158
            FieldType::I128 => {
159
                if decr {
×
160
                    for field in fields {
×
161
                        let val = calculate_err_field!(field.to_i128(), Sum, field);
×
162
                        current_state.i128_state -= val;
×
163
                    }
164
                } else {
165
                    for field in fields {
×
166
                        let val = calculate_err_field!(field.to_i128(), Sum, field);
×
167
                        current_state.i128_state += val;
×
168
                    }
169
                }
170
                Ok(Field::I128(current_state.i128_state))
×
171
            }
172
            FieldType::Float => {
173
                if decr {
32✔
174
                    for field in fields {
32✔
175
                        let val = calculate_err_field!(field.to_float(), Sum, field);
16✔
176
                        current_state.float_state -= val;
16✔
177
                    }
178
                } else {
179
                    for field in fields {
32✔
180
                        let val = calculate_err_field!(field.to_float(), Sum, field);
16✔
181
                        current_state.float_state += val;
16✔
182
                    }
183
                }
184
                Ok(Field::Float(OrderedFloat::from(current_state.float_state)))
32✔
185
            }
186
            FieldType::Decimal => {
187
                if decr {
662✔
188
                    for field in fields {
418✔
189
                        let val = calculate_err_field!(field.to_decimal(), Sum, field);
209✔
190
                        current_state.decimal_state -= val;
209✔
191
                    }
192
                } else {
193
                    for field in fields {
906✔
194
                        let val = calculate_err_field!(field.to_decimal(), Sum, field);
453✔
195
                        current_state.decimal_state += val;
453✔
196
                    }
197
                }
198
                Ok(Field::Decimal(current_state.decimal_state))
662✔
199
            }
200
            FieldType::Duration => {
201
                if decr {
32✔
202
                    for field in fields {
32✔
203
                        let val = calculate_err_field!(field.to_duration()?, Sum, field);
16✔
204
                        current_state.duration_state -= val.0;
16✔
205
                    }
206
                } else {
207
                    for field in fields {
32✔
208
                        let val = calculate_err_field!(field.to_duration()?, Sum, field);
16✔
209
                        current_state.duration_state += val.0;
16✔
210
                    }
211
                }
212
                Ok(Field::Duration(DozerDuration(
32✔
213
                    current_state.duration_state,
32✔
214
                    TimeUnit::Nanoseconds,
32✔
215
                )))
32✔
216
            }
217
            FieldType::Boolean
218
            | FieldType::String
219
            | FieldType::Text
220
            | FieldType::Date
221
            | FieldType::Timestamp
222
            | FieldType::Binary
223
            | FieldType::Json
224
            | FieldType::Point => Err(PipelineError::InvalidReturnType(format!(
×
225
                "Not supported return type {typ} for {Sum}"
×
226
            ))),
×
227
        },
228
        None => Err(PipelineError::InvalidReturnType(format!(
×
229
            "Not supported None return type for {Sum}"
×
230
        ))),
×
231
    }
232
}
839✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc