• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4414871180

pending completion
4414871180

push

github

GitHub
feat: add object store validation (#1140)

40 of 40 new or added lines in 2 files covered. (100.0%)

28658 of 39135 relevant lines covered (73.23%)

92989.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.63
/dozer-sql/src/pipeline/aggregation/avg.rs
1
use crate::pipeline::aggregation::aggregator::{update_map, Aggregator};
2
use crate::pipeline::errors::{FieldTypes, PipelineError};
3
use crate::pipeline::expression::aggregate::AggregateFunctionType;
4
use crate::pipeline::expression::aggregate::AggregateFunctionType::Avg;
5
use crate::pipeline::expression::execution::{Expression, ExpressionExecutor, ExpressionType};
6
use crate::{argv, calculate_err_field, calculate_err_type};
7
use dozer_core::errors::ExecutionError::InvalidType;
8
use dozer_types::ordered_float::OrderedFloat;
9
use dozer_types::rust_decimal::Decimal;
10
use dozer_types::types::{Field, FieldType, Schema, SourceDefinition};
11
use num_traits::FromPrimitive;
12
use std::collections::BTreeMap;
13

14
pub fn validate_avg(args: &[Expression], schema: &Schema) -> Result<ExpressionType, PipelineError> {
12✔
15
    let arg = &argv!(args, 0, AggregateFunctionType::Avg)?.get_type(schema)?;
13✔
16

17
    let ret_type = match arg.return_type {
14✔
18
        FieldType::Decimal => FieldType::Decimal,
4✔
19
        FieldType::Int => FieldType::Decimal,
4✔
20
        FieldType::UInt => FieldType::Decimal,
2✔
21
        FieldType::Float => FieldType::Float,
4✔
22
        r => {
×
23
            return Err(PipelineError::InvalidFunctionArgumentType(
×
24
                "AVG".to_string(),
×
25
                r,
×
26
                FieldTypes::new(vec![
×
27
                    FieldType::Decimal,
×
28
                    FieldType::UInt,
×
29
                    FieldType::Int,
×
30
                    FieldType::Float,
×
31
                ]),
×
32
                0,
×
33
            ));
×
34
        }
35
    };
36
    Ok(ExpressionType::new(
14✔
37
        ret_type,
14✔
38
        true,
14✔
39
        SourceDefinition::Dynamic,
14✔
40
        false,
14✔
41
    ))
14✔
42
}
14✔
43

44
#[derive(Debug)]
×
45
pub struct AvgAggregator {
46
    current_state: BTreeMap<Field, u64>,
47
    return_type: Option<FieldType>,
48
}
49

50
impl AvgAggregator {
51
    pub fn new() -> Self {
22✔
52
        Self {
22✔
53
            current_state: BTreeMap::new(),
22✔
54
            return_type: None,
22✔
55
        }
22✔
56
    }
22✔
57
}
58

59
impl Aggregator for AvgAggregator {
60
    fn init(&mut self, return_type: FieldType) {
22✔
61
        self.return_type = Some(return_type);
22✔
62
    }
22✔
63

64
    fn update(&mut self, old: &[Field], new: &[Field]) -> Result<Field, PipelineError> {
65
        self.delete(old)?;
7✔
66
        self.insert(new)
7✔
67
    }
7✔
68

69
    fn delete(&mut self, old: &[Field]) -> Result<Field, PipelineError> {
29✔
70
        update_map(old, 1_u64, true, &mut self.current_state);
29✔
71
        get_average(&self.current_state, self.return_type)
29✔
72
    }
29✔
73

74
    fn insert(&mut self, new: &[Field]) -> Result<Field, PipelineError> {
29✔
75
        update_map(new, 1_u64, false, &mut self.current_state);
29✔
76
        get_average(&self.current_state, self.return_type)
29✔
77
    }
29✔
78
}
79

80
fn get_average(
81
    field_map: &BTreeMap<Field, u64>,
82
    return_type: Option<FieldType>,
83
) -> Result<Field, PipelineError> {
84
    match return_type {
58✔
85
        Some(FieldType::UInt) => {
86
            if field_map.is_empty() {
×
87
                Ok(Field::Decimal(calculate_err_type!(
×
88
                    Decimal::from_f64(0_f64),
×
89
                    Avg,
×
90
                    FieldType::Decimal
×
91
                )))
×
92
            } else {
93
                let mut sum =
×
94
                    calculate_err_type!(Decimal::from_f64(0_f64), Avg, FieldType::Decimal);
×
95
                let mut count =
×
96
                    calculate_err_type!(Decimal::from_f64(0_f64), Avg, FieldType::Decimal);
×
97
                for (field, cnt) in field_map {
×
98
                    let cnt = calculate_err_field!(Decimal::from_u64(*cnt), Avg, field);
×
99
                    sum += calculate_err_field!(field.to_decimal(), Avg, field) * cnt;
×
100
                    count += cnt;
×
101
                }
102
                Ok(Field::Decimal(sum / count))
×
103
            }
104
        }
105
        Some(FieldType::Int) => {
106
            if field_map.is_empty() {
×
107
                Ok(Field::Decimal(calculate_err_type!(
×
108
                    Decimal::from_f64(0_f64),
×
109
                    Avg,
×
110
                    FieldType::Decimal
×
111
                )))
×
112
            } else {
113
                let mut sum =
×
114
                    calculate_err_type!(Decimal::from_f64(0_f64), Avg, FieldType::Decimal);
×
115
                let mut count =
×
116
                    calculate_err_type!(Decimal::from_f64(0_f64), Avg, FieldType::Decimal);
×
117
                for (field, cnt) in field_map {
×
118
                    let cnt = calculate_err_field!(Decimal::from_u64(*cnt), Avg, field);
×
119
                    sum += calculate_err_field!(field.to_decimal(), Avg, field) * cnt;
×
120
                    count += cnt;
×
121
                }
122
                Ok(Field::Decimal(sum / count))
×
123
            }
124
        }
125
        Some(FieldType::Float) => {
126
            if field_map.is_empty() {
16✔
127
                Ok(Field::Float(OrderedFloat::from(0_f64)))
7✔
128
            } else {
129
                let mut sum = 0_f64;
9✔
130
                let mut count = 0_f64;
9✔
131
                for (field, cnt) in field_map {
23✔
132
                    sum += calculate_err_field!(field.to_float(), Avg, field) * (*cnt as f64);
14✔
133
                    count += *cnt as f64;
14✔
134
                }
135
                Ok(Field::Float(OrderedFloat::from(sum / count)))
9✔
136
            }
137
        }
138
        Some(FieldType::Decimal) => {
139
            if field_map.is_empty() {
42✔
140
                Ok(Field::Decimal(calculate_err_type!(
16✔
141
                    Decimal::from_f64(0_f64),
16✔
142
                    Avg,
16✔
143
                    FieldType::Decimal
16✔
144
                )))
16✔
145
            } else {
146
                let mut sum =
26✔
147
                    calculate_err_type!(Decimal::from_f64(0_f64), Avg, FieldType::Decimal);
26✔
148
                let mut count =
26✔
149
                    calculate_err_type!(Decimal::from_f64(0_f64), Avg, FieldType::Decimal);
26✔
150
                for (field, cnt) in field_map {
67✔
151
                    let cnt = calculate_err_field!(Decimal::from_u64(*cnt), Avg, field);
41✔
152
                    sum += calculate_err_field!(field.to_decimal(), Avg, field) * cnt;
41✔
153
                    count += cnt;
41✔
154
                }
155
                Ok(Field::Decimal(sum / count))
26✔
156
            }
157
        }
158
        Some(not_supported_return_type) => Err(PipelineError::InternalExecutionError(InvalidType(
×
159
            format!("Not supported return type {not_supported_return_type} for {Avg}"),
×
160
        ))),
×
161
        None => Err(PipelineError::InternalExecutionError(InvalidType(format!(
×
162
            "Not supported None return type for {Avg}"
×
163
        )))),
×
164
    }
165
}
58✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc