• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4377467257

pending completion
4377467257

push

github

GitHub
implement `HAVING` (#1198)

395 of 395 new or added lines in 6 files covered. (100.0%)

27638 of 38584 relevant lines covered (71.63%)

27777.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.96
/dozer-sql/src/pipeline/aggregation/tests/aggregation_tests_utils.rs
1
use dozer_core::{node::PortHandle, DEFAULT_PORT_HANDLE};
2
use dozer_types::types::{
3
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, DATE_FORMAT,
4
};
5
use std::collections::HashMap;
6

7
use crate::pipeline::aggregation::processor::AggregationProcessor;
8
use crate::pipeline::errors::PipelineError;
9
use crate::pipeline::planner::projection::CommonPlanner;
10
use crate::pipeline::tests::utils::get_select;
11
use dozer_types::chrono::{DateTime, NaiveDate, TimeZone, Utc};
12
use dozer_types::ordered_float::OrderedFloat;
13
use dozer_types::rust_decimal::Decimal;
14
use std::ops::Div;
15

16
pub(crate) fn init_processor(
51✔
17
    sql: &str,
51✔
18
    input_schemas: HashMap<PortHandle, Schema>,
51✔
19
) -> Result<AggregationProcessor, PipelineError> {
51✔
20
    let input_schema = input_schemas
51✔
21
        .get(&DEFAULT_PORT_HANDLE)
51✔
22
        .unwrap_or_else(|| panic!("Error getting Input Schema"));
51✔
23

51✔
24
    let mut projection_planner = CommonPlanner::new(input_schema.clone());
51✔
25
    let statement = get_select(sql).unwrap();
51✔
26

51✔
27
    projection_planner.plan(*statement).unwrap();
51✔
28

51✔
29
    let processor = AggregationProcessor::new(
51✔
30
        projection_planner.groupby,
51✔
31
        projection_planner.aggregation_output,
51✔
32
        projection_planner.projection_output,
51✔
33
        projection_planner.having,
51✔
34
        input_schema.clone(),
51✔
35
        projection_planner.post_aggregation_schema,
51✔
36
    )
51✔
37
    .unwrap_or_else(|e| panic!("{}", e.to_string()));
51✔
38

51✔
39
    Ok(processor)
51✔
40
}
51✔
41

×
42
pub(crate) fn init_input_schema(field_type: FieldType, aggregator_name: &str) -> Schema {
50✔
43
    Schema::empty()
50✔
44
        .field(
50✔
45
            FieldDefinition::new(
50✔
46
                String::from("ID"),
50✔
47
                FieldType::Int,
50✔
48
                false,
50✔
49
                SourceDefinition::Dynamic,
50✔
50
            ),
50✔
51
            false,
50✔
52
        )
50✔
53
        .field(
50✔
54
            FieldDefinition::new(
50✔
55
                String::from("Country"),
50✔
56
                FieldType::String,
50✔
57
                false,
50✔
58
                SourceDefinition::Dynamic,
50✔
59
            ),
50✔
60
            false,
50✔
61
        )
50✔
62
        .field(
50✔
63
            FieldDefinition::new(
50✔
64
                String::from("Salary"),
50✔
65
                field_type,
50✔
66
                false,
50✔
67
                SourceDefinition::Dynamic,
50✔
68
            ),
50✔
69
            false,
50✔
70
        )
50✔
71
        .field(
50✔
72
            FieldDefinition::new(
50✔
73
                format!("{aggregator_name}(Salary)"),
50✔
74
                field_type,
50✔
75
                false,
50✔
76
                SourceDefinition::Dynamic,
50✔
77
            ),
50✔
78
            false,
50✔
79
        )
50✔
80
        .clone()
50✔
81
}
50✔
82

×
83
pub(crate) fn insert_field(country: &str, insert_field: &Field) -> Operation {
120✔
84
    Operation::Insert {
120✔
85
        new: Record::new(
120✔
86
            None,
120✔
87
            vec![
120✔
88
                Field::Int(0),
120✔
89
                Field::String(country.to_string()),
120✔
90
                insert_field.clone(),
120✔
91
                insert_field.clone(),
120✔
92
            ],
120✔
93
            None,
120✔
94
        ),
120✔
95
    }
120✔
96
}
120✔
97

×
98
pub(crate) fn delete_field(country: &str, deleted_field: &Field) -> Operation {
117✔
99
    Operation::Delete {
117✔
100
        old: Record::new(
117✔
101
            None,
117✔
102
            vec![
117✔
103
                Field::Int(0),
117✔
104
                Field::String(country.to_string()),
117✔
105
                deleted_field.clone(),
117✔
106
                deleted_field.clone(),
117✔
107
            ],
117✔
108
            None,
117✔
109
        ),
117✔
110
    }
117✔
111
}
117✔
112

×
113
pub(crate) fn update_field(
80✔
114
    old_country: &str,
80✔
115
    new_country: &str,
80✔
116
    old: &Field,
80✔
117
    new: &Field,
80✔
118
) -> Operation {
80✔
119
    Operation::Update {
80✔
120
        old: Record::new(
80✔
121
            None,
80✔
122
            vec![
80✔
123
                Field::Int(0),
80✔
124
                Field::String(old_country.to_string()),
80✔
125
                old.clone(),
80✔
126
                old.clone(),
80✔
127
            ],
80✔
128
            None,
80✔
129
        ),
80✔
130
        new: Record::new(
80✔
131
            None,
80✔
132
            vec![
80✔
133
                Field::Int(0),
80✔
134
                Field::String(new_country.to_string()),
80✔
135
                new.clone(),
80✔
136
                new.clone(),
80✔
137
            ],
80✔
138
            None,
80✔
139
        ),
80✔
140
    }
80✔
141
}
80✔
142

×
143
pub(crate) fn insert_exp(country: &str, inserted_field: &Field) -> Operation {
75✔
144
    Operation::Insert {
75✔
145
        new: Record::new(
75✔
146
            None,
75✔
147
            vec![Field::String(country.to_string()), inserted_field.clone()],
75✔
148
            None,
75✔
149
        ),
75✔
150
    }
75✔
151
}
75✔
152

×
153
pub(crate) fn delete_exp(country: &str, deleted_field: &Field) -> Operation {
74✔
154
    Operation::Delete {
74✔
155
        old: Record::new(
74✔
156
            None,
74✔
157
            vec![Field::String(country.to_string()), deleted_field.clone()],
74✔
158
            None,
74✔
159
        ),
74✔
160
    }
74✔
161
}
74✔
162

×
163
pub(crate) fn update_exp(
181✔
164
    old_country: &str,
181✔
165
    new_country: &str,
181✔
166
    old: &Field,
181✔
167
    new: &Field,
181✔
168
) -> Operation {
181✔
169
    Operation::Update {
181✔
170
        old: Record::new(
181✔
171
            None,
181✔
172
            vec![Field::String(old_country.to_string()), old.clone()],
181✔
173
            None,
181✔
174
        ),
181✔
175
        new: Record::new(
181✔
176
            None,
181✔
177
            vec![Field::String(new_country.to_string()), new.clone()],
181✔
178
            None,
181✔
179
        ),
181✔
180
    }
181✔
181
}
181✔
182

×
183
pub fn get_decimal_field(val: i64) -> Field {
160✔
184
    Field::Decimal(Decimal::new(val, 0))
160✔
185
}
160✔
186

×
187
pub fn get_decimal_div_field(numerator: i64, denominator: i64) -> Field {
12✔
188
    Field::Decimal(Decimal::new(numerator, 0).div(Decimal::new(denominator, 0)))
12✔
189
}
12✔
190

×
191
pub fn get_ts_field(val: i64) -> Field {
58✔
192
    Field::Timestamp(DateTime::from(Utc.timestamp_millis_opt(val).unwrap()))
58✔
193
}
58✔
194

×
195
pub fn get_date_field(val: &str) -> Field {
58✔
196
    Field::Date(NaiveDate::parse_from_str(val, DATE_FORMAT).unwrap())
58✔
197
}
58✔
198

199
#[macro_export]
200
macro_rules! output {
201
    ($processor:expr, $inp:expr) => {
202
        $processor
203
            .aggregate($inp)
204
            .unwrap_or_else(|_e| panic!("Error executing aggregate"))
205
    };
206
}
207

208
pub const ITALY: &str = "Italy";
209
pub const SINGAPORE: &str = "Singapore";
210

211
pub const DATE4: &str = "2015-10-04";
212
pub const DATE8: &str = "2015-10-08";
213
pub const DATE16: &str = "2015-10-16";
214

215
pub const FIELD_NULL: &Field = &Field::Null;
216

217
pub const FIELD_0_FLOAT: &Field = &Field::Float(OrderedFloat(0.0));
218
pub const FIELD_100_FLOAT: &Field = &Field::Float(OrderedFloat(100.0));
219
pub const FIELD_150_FLOAT: &Field = &Field::Float(OrderedFloat(150.0));
220
pub const FIELD_200_FLOAT: &Field = &Field::Float(OrderedFloat(200.0));
221
pub const FIELD_250_FLOAT: &Field = &Field::Float(OrderedFloat(250.0));
222
pub const FIELD_350_FLOAT: &Field = &Field::Float(OrderedFloat(350.0));
223
pub const FIELD_75_FLOAT: &Field = &Field::Float(OrderedFloat(75.0));
224
pub const FIELD_50_FLOAT: &Field = &Field::Float(OrderedFloat(50.0));
225
pub const FIELD_250_DIV_3_FLOAT: &Field = &Field::Float(OrderedFloat(250.0 / 3.0));
226
pub const FIELD_350_DIV_3_FLOAT: &Field = &Field::Float(OrderedFloat(350.0 / 3.0));
227

228
pub const FIELD_0_INT: &Field = &Field::Int(0);
229
pub const FIELD_1_INT: &Field = &Field::Int(1);
230
pub const FIELD_2_INT: &Field = &Field::Int(2);
231
pub const FIELD_3_INT: &Field = &Field::Int(3);
232
pub const FIELD_100_INT: &Field = &Field::Int(100);
233
pub const FIELD_150_INT: &Field = &Field::Int(150);
234
pub const FIELD_200_INT: &Field = &Field::Int(200);
235
pub const FIELD_250_INT: &Field = &Field::Int(250);
236
pub const FIELD_300_INT: &Field = &Field::Int(300);
237
pub const FIELD_350_INT: &Field = &Field::Int(350);
238
pub const FIELD_400_INT: &Field = &Field::Int(400);
239
pub const FIELD_500_INT: &Field = &Field::Int(500);
240
pub const FIELD_600_INT: &Field = &Field::Int(600);
241
pub const FIELD_50_INT: &Field = &Field::Int(50);
242

243
pub const FIELD_100_UINT: &Field = &Field::UInt(100);
244
pub const FIELD_150_UINT: &Field = &Field::UInt(150);
245
pub const FIELD_200_UINT: &Field = &Field::UInt(200);
246
pub const FIELD_250_UINT: &Field = &Field::UInt(250);
247
pub const FIELD_350_UINT: &Field = &Field::UInt(350);
248
pub const FIELD_50_UINT: &Field = &Field::UInt(50);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc