• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.14
/dozer-sql/src/pipeline/aggregation/max_value.rs
1
use crate::pipeline::aggregation::aggregator::{update_val_map, Aggregator};
2
use crate::pipeline::errors::PipelineError::InvalidReturnType;
3
use crate::pipeline::errors::{FieldTypes, PipelineError};
4
use crate::pipeline::expression::aggregate::AggregateFunctionType::MaxValue;
5
use crate::pipeline::expression::execution::{Expression, ExpressionType};
6
use crate::{argv, calculate_err};
7
use dozer_types::serde::{Deserialize, Serialize};
8
use dozer_types::types::{Field, FieldType, Schema, SourceDefinition};
9
use std::collections::BTreeMap;
10

11
pub fn validate_max_value(
26✔
12
    args: &[Expression],
26✔
13
    schema: &Schema,
26✔
14
) -> Result<ExpressionType, PipelineError> {
26✔
15
    let base_arg = &argv!(args, 0, MaxValue)?.get_type(schema)?;
26✔
16
    let arg = &argv!(args, 1, MaxValue)?.get_type(schema)?;
26✔
17

18
    match base_arg.return_type {
26✔
19
        FieldType::UInt => FieldType::UInt,
2✔
20
        FieldType::U128 => FieldType::U128,
×
21
        FieldType::Int => FieldType::Int,
4✔
22
        FieldType::I128 => FieldType::I128,
×
23
        FieldType::Float => FieldType::Float,
4✔
24
        FieldType::Decimal => FieldType::Decimal,
4✔
25
        FieldType::Timestamp => FieldType::Timestamp,
4✔
26
        FieldType::Date => FieldType::Date,
4✔
27
        FieldType::Duration => FieldType::Duration,
4✔
28
        FieldType::Boolean
29
        | FieldType::String
30
        | FieldType::Text
31
        | FieldType::Binary
32
        | FieldType::Json
33
        | FieldType::Point => {
34
            return Err(PipelineError::InvalidFunctionArgumentType(
×
35
                MaxValue.to_string(),
×
36
                arg.return_type,
×
37
                FieldTypes::new(vec![
×
38
                    FieldType::Decimal,
×
39
                    FieldType::UInt,
×
40
                    FieldType::U128,
×
41
                    FieldType::Int,
×
42
                    FieldType::I128,
×
43
                    FieldType::Float,
×
44
                    FieldType::Timestamp,
×
45
                    FieldType::Date,
×
46
                    FieldType::Duration,
×
47
                ]),
×
48
                0,
×
49
            ));
×
50
        }
51
    };
52

53
    Ok(ExpressionType::new(
26✔
54
        arg.return_type,
26✔
55
        true,
26✔
56
        SourceDefinition::Dynamic,
26✔
57
        false,
26✔
58
    ))
26✔
59
}
26✔
60

61
#[derive(Debug, Serialize, Deserialize)]
×
62
#[serde(crate = "dozer_types::serde")]
63
pub struct MaxValueAggregator {
64
    current_state: BTreeMap<Field, u64>,
65
    return_state: BTreeMap<Field, Vec<Field>>,
66
    return_type: Option<FieldType>,
67
}
68

69
impl MaxValueAggregator {
70
    pub fn new() -> Self {
33✔
71
        Self {
33✔
72
            current_state: BTreeMap::new(),
33✔
73
            return_state: BTreeMap::new(),
33✔
74
            return_type: None,
33✔
75
        }
33✔
76
    }
33✔
77
}
78

79
impl Aggregator for MaxValueAggregator {
80
    fn init(&mut self, return_type: FieldType) {
33✔
81
        self.return_type = Some(return_type);
33✔
82
    }
33✔
83

84
    fn update(&mut self, old: &[Field], new: &[Field]) -> Result<Field, PipelineError> {
85
        self.delete(old)?;
27✔
86
        self.insert(new)
27✔
87
    }
27✔
88

89
    fn delete(&mut self, old: &[Field]) -> Result<Field, PipelineError> {
90
        update_val_map(
60✔
91
            old,
60✔
92
            1_u64,
60✔
93
            true,
60✔
94
            &mut self.current_state,
60✔
95
            &mut self.return_state,
60✔
96
        )?;
60✔
97
        get_max_value(&self.current_state, &self.return_state, self.return_type)
60✔
98
    }
60✔
99

100
    fn insert(&mut self, new: &[Field]) -> Result<Field, PipelineError> {
101
        update_val_map(
60✔
102
            new,
60✔
103
            1_u64,
60✔
104
            false,
60✔
105
            &mut self.current_state,
60✔
106
            &mut self.return_state,
60✔
107
        )?;
60✔
108
        get_max_value(&self.current_state, &self.return_state, self.return_type)
60✔
109
    }
60✔
110
}
111

112
fn get_max_value(
120✔
113
    field_map: &BTreeMap<Field, u64>,
120✔
114
    return_map: &BTreeMap<Field, Vec<Field>>,
120✔
115
    return_type: Option<FieldType>,
120✔
116
) -> Result<Field, PipelineError> {
120✔
117
    if field_map.is_empty() {
120✔
118
        Ok(Field::Null)
33✔
119
    } else {
120
        let val = calculate_err!(field_map.keys().max(), MaxValue).clone();
87✔
121

87✔
122
        match return_map.get(&val) {
87✔
123
            Some(v) => match v.get(0) {
87✔
124
                Some(v) => {
87✔
125
                    let value = v.clone();
87✔
126
                    Ok(value)
87✔
127
                }
128
                None => Err(InvalidReturnType(format!("{:?}", return_type))),
×
129
            },
130
            None => Err(InvalidReturnType(format!("{:?}", return_type))),
×
131
        }
132
    }
133
}
120✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc