• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4360628700

pending completion
4360628700

push

github

GitHub
refactor: pipeline memory storage (#1135)

1152 of 1152 new or added lines in 40 files covered. (100.0%)

27472 of 39301 relevant lines covered (69.9%)

28369.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.54
/dozer-sql/src/pipeline/aggregation/tests/aggregation_tests_utils.rs
1
use dozer_core::{node::PortHandle, DEFAULT_PORT_HANDLE};
2
use dozer_types::types::{
3
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, DATE_FORMAT,
4
};
5
use std::collections::HashMap;
6

7
use crate::pipeline::aggregation::processor::AggregationProcessor;
8
use crate::pipeline::errors::PipelineError;
9
use crate::pipeline::planner::projection::CommonPlanner;
10
use crate::pipeline::tests::utils::get_select;
11
use dozer_types::chrono::{DateTime, NaiveDate, TimeZone, Utc};
12
use dozer_types::ordered_float::OrderedFloat;
13
use dozer_types::rust_decimal::Decimal;
14
use std::ops::Div;
15

16
pub(crate) fn init_processor(
45✔
17
    sql: &str,
45✔
18
    input_schemas: HashMap<PortHandle, Schema>,
45✔
19
) -> Result<AggregationProcessor, PipelineError> {
45✔
20
    let input_schema = input_schemas
45✔
21
        .get(&DEFAULT_PORT_HANDLE)
45✔
22
        .unwrap_or_else(|| panic!("Error getting Input Schema"));
45✔
23

45✔
24
    let mut projection_planner = CommonPlanner::new(input_schema.clone());
45✔
25
    let statement = get_select(sql).unwrap();
45✔
26

45✔
27
    projection_planner.plan(*statement).unwrap();
45✔
28

45✔
29
    let processor = AggregationProcessor::new(
45✔
30
        projection_planner.groupby,
45✔
31
        projection_planner.aggregation_output,
45✔
32
        projection_planner.projection_output,
45✔
33
        input_schema.clone(),
45✔
34
        projection_planner.post_aggregation_schema,
45✔
35
    )
45✔
36
    .unwrap_or_else(|e| panic!("{}", e.to_string()));
45✔
37

45✔
38
    Ok(processor)
45✔
39
}
45✔
40

×
41
pub(crate) fn init_input_schema(field_type: FieldType, aggregator_name: &str) -> Schema {
43✔
42
    Schema::empty()
43✔
43
        .field(
43✔
44
            FieldDefinition::new(
43✔
45
                String::from("ID"),
43✔
46
                FieldType::Int,
43✔
47
                false,
43✔
48
                SourceDefinition::Dynamic,
43✔
49
            ),
43✔
50
            false,
43✔
51
        )
43✔
52
        .field(
43✔
53
            FieldDefinition::new(
43✔
54
                String::from("Country"),
43✔
55
                FieldType::String,
43✔
56
                false,
43✔
57
                SourceDefinition::Dynamic,
43✔
58
            ),
43✔
59
            false,
43✔
60
        )
43✔
61
        .field(
43✔
62
            FieldDefinition::new(
43✔
63
                String::from("Salary"),
43✔
64
                field_type,
43✔
65
                false,
43✔
66
                SourceDefinition::Dynamic,
43✔
67
            ),
43✔
68
            false,
43✔
69
        )
43✔
70
        .field(
43✔
71
            FieldDefinition::new(
43✔
72
                format!("{aggregator_name}(Salary)"),
43✔
73
                field_type,
43✔
74
                false,
43✔
75
                SourceDefinition::Dynamic,
43✔
76
            ),
43✔
77
            false,
43✔
78
        )
43✔
79
        .clone()
43✔
80
}
43✔
81

×
82
pub(crate) fn insert_field(country: &str, insert_field: &Field) -> Operation {
111✔
83
    Operation::Insert {
111✔
84
        new: Record::new(
111✔
85
            None,
111✔
86
            vec![
111✔
87
                Field::Int(0),
111✔
88
                Field::String(country.to_string()),
111✔
89
                insert_field.clone(),
111✔
90
                insert_field.clone(),
111✔
91
            ],
111✔
92
            None,
111✔
93
        ),
111✔
94
    }
111✔
95
}
111✔
96

×
97
pub(crate) fn delete_field(country: &str, deleted_field: &Field) -> Operation {
111✔
98
    Operation::Delete {
111✔
99
        old: Record::new(
111✔
100
            None,
111✔
101
            vec![
111✔
102
                Field::Int(0),
111✔
103
                Field::String(country.to_string()),
111✔
104
                deleted_field.clone(),
111✔
105
                deleted_field.clone(),
111✔
106
            ],
111✔
107
            None,
111✔
108
        ),
111✔
109
    }
111✔
110
}
111✔
111

×
112
pub(crate) fn update_field(
67✔
113
    old_country: &str,
67✔
114
    new_country: &str,
67✔
115
    old: &Field,
67✔
116
    new: &Field,
67✔
117
) -> Operation {
67✔
118
    Operation::Update {
67✔
119
        old: Record::new(
67✔
120
            None,
67✔
121
            vec![
67✔
122
                Field::Int(0),
67✔
123
                Field::String(old_country.to_string()),
67✔
124
                old.clone(),
67✔
125
                old.clone(),
67✔
126
            ],
67✔
127
            None,
67✔
128
        ),
67✔
129
        new: Record::new(
67✔
130
            None,
67✔
131
            vec![
67✔
132
                Field::Int(0),
67✔
133
                Field::String(new_country.to_string()),
67✔
134
                new.clone(),
67✔
135
                new.clone(),
67✔
136
            ],
67✔
137
            None,
67✔
138
        ),
67✔
139
    }
67✔
140
}
67✔
141

×
142
pub(crate) fn insert_exp(country: &str, inserted_field: &Field) -> Operation {
68✔
143
    Operation::Insert {
68✔
144
        new: Record::new(
68✔
145
            None,
68✔
146
            vec![Field::String(country.to_string()), inserted_field.clone()],
68✔
147
            None,
68✔
148
        ),
68✔
149
    }
68✔
150
}
68✔
151

×
152
pub(crate) fn delete_exp(country: &str, deleted_field: &Field) -> Operation {
67✔
153
    Operation::Delete {
67✔
154
        old: Record::new(
67✔
155
            None,
67✔
156
            vec![Field::String(country.to_string()), deleted_field.clone()],
67✔
157
            None,
67✔
158
        ),
67✔
159
    }
67✔
160
}
67✔
161

×
162
pub(crate) fn update_exp(
178✔
163
    old_country: &str,
178✔
164
    new_country: &str,
178✔
165
    old: &Field,
178✔
166
    new: &Field,
178✔
167
) -> Operation {
178✔
168
    Operation::Update {
178✔
169
        old: Record::new(
178✔
170
            None,
178✔
171
            vec![Field::String(old_country.to_string()), old.clone()],
178✔
172
            None,
178✔
173
        ),
178✔
174
        new: Record::new(
178✔
175
            None,
178✔
176
            vec![Field::String(new_country.to_string()), new.clone()],
178✔
177
            None,
178✔
178
        ),
178✔
179
    }
178✔
180
}
178✔
181

×
182
pub fn get_decimal_field(val: i64) -> Field {
160✔
183
    Field::Decimal(Decimal::new(val, 0))
160✔
184
}
160✔
185

×
186
pub fn get_decimal_div_field(numerator: i64, denominator: i64) -> Field {
12✔
187
    Field::Decimal(Decimal::new(numerator, 0).div(Decimal::new(denominator, 0)))
12✔
188
}
12✔
189

×
190
pub fn get_ts_field(val: i64) -> Field {
58✔
191
    Field::Timestamp(DateTime::from(Utc.timestamp_millis_opt(val).unwrap()))
58✔
192
}
58✔
193

×
194
pub fn get_date_field(val: &str) -> Field {
58✔
195
    Field::Date(NaiveDate::parse_from_str(val, DATE_FORMAT).unwrap())
58✔
196
}
58✔
197

×
198
#[macro_export]
×
199
macro_rules! output {
200
    ($processor:expr, $inp:expr) => {
×
201
        $processor
×
202
            .aggregate($inp)
×
203
            .unwrap_or_else(|_e| panic!("Error executing aggregate"))
204
    };
×
205
}
×
206

×
207
pub const ITALY: &str = "Italy";
208
pub const SINGAPORE: &str = "Singapore";
×
209

×
210
pub const DATE4: &str = "2015-10-04";
×
211
pub const DATE8: &str = "2015-10-08";
212
pub const DATE16: &str = "2015-10-16";
213

214
pub const FIELD_NULL: &Field = &Field::Null;
215

216
pub const FIELD_0_FLOAT: &Field = &Field::Float(OrderedFloat(0.0));
217
pub const FIELD_100_FLOAT: &Field = &Field::Float(OrderedFloat(100.0));
218
pub const FIELD_150_FLOAT: &Field = &Field::Float(OrderedFloat(150.0));
219
pub const FIELD_200_FLOAT: &Field = &Field::Float(OrderedFloat(200.0));
220
pub const FIELD_250_FLOAT: &Field = &Field::Float(OrderedFloat(250.0));
221
pub const FIELD_350_FLOAT: &Field = &Field::Float(OrderedFloat(350.0));
222
pub const FIELD_75_FLOAT: &Field = &Field::Float(OrderedFloat(75.0));
223
pub const FIELD_50_FLOAT: &Field = &Field::Float(OrderedFloat(50.0));
224
pub const FIELD_250_DIV_3_FLOAT: &Field = &Field::Float(OrderedFloat(250.0 / 3.0));
225
pub const FIELD_350_DIV_3_FLOAT: &Field = &Field::Float(OrderedFloat(350.0 / 3.0));
226

227
pub const FIELD_0_INT: &Field = &Field::Int(0);
228
pub const FIELD_1_INT: &Field = &Field::Int(1);
229
pub const FIELD_2_INT: &Field = &Field::Int(2);
230
pub const FIELD_3_INT: &Field = &Field::Int(3);
231
pub const FIELD_100_INT: &Field = &Field::Int(100);
232
pub const FIELD_150_INT: &Field = &Field::Int(150);
233
pub const FIELD_200_INT: &Field = &Field::Int(200);
234
pub const FIELD_250_INT: &Field = &Field::Int(250);
235
pub const FIELD_350_INT: &Field = &Field::Int(350);
236
pub const FIELD_50_INT: &Field = &Field::Int(50);
237

238
pub const FIELD_100_UINT: &Field = &Field::UInt(100);
239
pub const FIELD_150_UINT: &Field = &Field::UInt(150);
240
pub const FIELD_200_UINT: &Field = &Field::UInt(200);
241
pub const FIELD_250_UINT: &Field = &Field::UInt(250);
242
pub const FIELD_350_UINT: &Field = &Field::UInt(350);
243
pub const FIELD_50_UINT: &Field = &Field::UInt(50);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc