• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007820649

pending completion
4007820649

Pull #734

github

GitHub
Merge b71e66da1 into 6c0ac2b2c
Pull Request #734: Bump ahash from 0.8.2 to 0.8.3

23507 of 35166 relevant lines covered (66.85%)

40241.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.73
/dozer-sql/src/pipeline/aggregation/tests/aggregation_tests_utils.rs
1
use dozer_core::{
2
    dag::{
3
        dag::DEFAULT_PORT_HANDLE,
4
        node::{PortHandle, Processor},
5
    },
6
    storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction},
7
};
8
use dozer_types::types::{
9
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, DATE_FORMAT,
10
};
11
use std::collections::HashMap;
12

13
use crate::pipeline::{
14
    aggregation::{factory::get_aggregation_rules, processor::AggregationProcessor},
15
    errors::PipelineError,
16
    tests::utils::get_select,
17
};
18

19
use dozer_types::chrono::{DateTime, NaiveDate, TimeZone, Utc};
20
use dozer_types::ordered_float::OrderedFloat;
21
use dozer_types::rust_decimal::Decimal;
22
use std::ops::Div;
23
use std::path::Path;
24

25
pub(crate) fn init_processor(
46✔
26
    sql: &str,
46✔
27
    input_schemas: HashMap<PortHandle, Schema>,
46✔
28
) -> Result<(AggregationProcessor, SharedTransaction), PipelineError> {
46✔
29
    let select = get_select(sql)?;
46✔
30

31
    let input_schema = input_schemas
46✔
32
        .get(&DEFAULT_PORT_HANDLE)
46✔
33
        .unwrap_or_else(|| panic!("Error getting Input Schema"));
46✔
34

35
    let output_field_rules = get_aggregation_rules(
46✔
36
        &select.projection.clone(),
46✔
37
        &select.group_by.clone(),
46✔
38
        input_schema,
46✔
39
    )?;
46✔
40

41
    let mut processor = AggregationProcessor::new(output_field_rules, input_schema.clone());
46✔
42

46✔
43
    let mut storage = LmdbEnvironmentManager::create(Path::new("/tmp"), "aggregation_test")
46✔
44
        .unwrap_or_else(|e| panic!("{}", e.to_string()));
46✔
45

46✔
46
    processor
46✔
47
        .init(&mut storage)
46✔
48
        .unwrap_or_else(|e| panic!("{}", e.to_string()));
46✔
49

46✔
50
    let tx = storage.create_txn().unwrap();
46✔
51

46✔
52
    Ok((processor, tx))
46✔
53
}
46✔
54

55
pub(crate) fn init_input_schema(field_type: FieldType, aggregator_name: &str) -> Schema {
46✔
56
    Schema::empty()
46✔
57
        .field(
46✔
58
            FieldDefinition::new(
46✔
59
                String::from("ID"),
46✔
60
                FieldType::Int,
46✔
61
                false,
46✔
62
                SourceDefinition::Dynamic,
46✔
63
            ),
46✔
64
            false,
46✔
65
        )
46✔
66
        .field(
46✔
67
            FieldDefinition::new(
46✔
68
                String::from("Country"),
46✔
69
                FieldType::String,
46✔
70
                false,
46✔
71
                SourceDefinition::Dynamic,
46✔
72
            ),
46✔
73
            false,
46✔
74
        )
46✔
75
        .field(
46✔
76
            FieldDefinition::new(
46✔
77
                String::from("Salary"),
46✔
78
                field_type,
46✔
79
                false,
46✔
80
                SourceDefinition::Dynamic,
46✔
81
            ),
46✔
82
            false,
46✔
83
        )
46✔
84
        .field(
46✔
85
            FieldDefinition::new(
46✔
86
                format!("{}(Salary)", aggregator_name),
46✔
87
                field_type,
46✔
88
                false,
46✔
89
                SourceDefinition::Dynamic,
46✔
90
            ),
46✔
91
            false,
46✔
92
        )
46✔
93
        .clone()
46✔
94
}
46✔
95

×
96
pub(crate) fn insert_field(country: &str, insert_field: &Field) -> Operation {
113✔
97
    Operation::Insert {
113✔
98
        new: Record::new(
113✔
99
            None,
113✔
100
            vec![
113✔
101
                Field::Int(0),
113✔
102
                Field::String(country.to_string()),
113✔
103
                insert_field.clone(),
113✔
104
                insert_field.clone(),
113✔
105
            ],
113✔
106
            None,
113✔
107
        ),
113✔
108
    }
113✔
109
}
113✔
110

×
111
pub(crate) fn delete_field(country: &str, deleted_field: &Field) -> Operation {
112✔
112
    Operation::Delete {
112✔
113
        old: Record::new(
112✔
114
            None,
112✔
115
            vec![
112✔
116
                Field::Int(0),
112✔
117
                Field::String(country.to_string()),
112✔
118
                deleted_field.clone(),
112✔
119
                deleted_field.clone(),
112✔
120
            ],
112✔
121
            None,
112✔
122
        ),
112✔
123
    }
112✔
124
}
112✔
125

×
126
pub(crate) fn update_field(
67✔
127
    old_country: &str,
67✔
128
    new_country: &str,
67✔
129
    old: &Field,
67✔
130
    new: &Field,
67✔
131
) -> Operation {
67✔
132
    Operation::Update {
67✔
133
        old: Record::new(
67✔
134
            None,
67✔
135
            vec![
67✔
136
                Field::Int(0),
67✔
137
                Field::String(old_country.to_string()),
67✔
138
                old.clone(),
67✔
139
                old.clone(),
67✔
140
            ],
67✔
141
            None,
67✔
142
        ),
67✔
143
        new: Record::new(
67✔
144
            None,
67✔
145
            vec![
67✔
146
                Field::Int(0),
67✔
147
                Field::String(new_country.to_string()),
67✔
148
                new.clone(),
67✔
149
                new.clone(),
67✔
150
            ],
67✔
151
            None,
67✔
152
        ),
67✔
153
    }
67✔
154
}
67✔
155

156
pub(crate) fn insert_exp(country: &str, inserted_field: &Field) -> Operation {
69✔
157
    Operation::Insert {
69✔
158
        new: Record::new(
69✔
159
            None,
69✔
160
            vec![Field::String(country.to_string()), inserted_field.clone()],
69✔
161
            None,
69✔
162
        ),
69✔
163
    }
69✔
164
}
69✔
165

×
166
pub(crate) fn delete_exp(country: &str, deleted_field: &Field) -> Operation {
68✔
167
    Operation::Delete {
68✔
168
        old: Record::new(
68✔
169
            None,
68✔
170
            vec![Field::String(country.to_string()), deleted_field.clone()],
68✔
171
            None,
68✔
172
        ),
68✔
173
    }
68✔
174
}
68✔
175

176
pub(crate) fn update_exp(
178✔
177
    old_country: &str,
178✔
178
    new_country: &str,
178✔
179
    old: &Field,
178✔
180
    new: &Field,
178✔
181
) -> Operation {
178✔
182
    Operation::Update {
178✔
183
        old: Record::new(
178✔
184
            None,
178✔
185
            vec![Field::String(old_country.to_string()), old.clone()],
178✔
186
            None,
178✔
187
        ),
178✔
188
        new: Record::new(
178✔
189
            None,
178✔
190
            vec![Field::String(new_country.to_string()), new.clone()],
178✔
191
            None,
178✔
192
        ),
178✔
193
    }
178✔
194
}
178✔
195

196
pub fn get_decimal_field(val: i64) -> Field {
172✔
197
    Field::Decimal(Decimal::new(val, 0))
172✔
198
}
172✔
199

200
pub fn get_decimal_div_field(numerator: i64, denominator: i64) -> Field {
12✔
201
    Field::Decimal(Decimal::new(numerator, 0).div(Decimal::new(denominator, 0)))
12✔
202
}
12✔
203

204
pub fn get_ts_field(val: i64) -> Field {
70✔
205
    Field::Timestamp(DateTime::from(Utc.timestamp_millis(val)))
70✔
206
}
70✔
207

208
pub fn get_date_field(val: &str) -> Field {
56✔
209
    Field::Date(NaiveDate::parse_from_str(val, DATE_FORMAT).unwrap())
56✔
210
}
56✔
211

212
#[macro_export]
213
macro_rules! output {
214
    ($processor:expr, $inp:expr, $tx:expr) => {
215
        $processor
216
            .aggregate(&mut $tx.write(), $processor.db.unwrap(), $inp)
217
            .unwrap_or_else(|_e| panic!("Error executing aggregate"))
218
    };
219
}
220

221
pub const ITALY: &str = "Italy";
222
pub const SINGAPORE: &str = "Singapore";
223

224
pub const DATE4: &str = "2015-10-04";
225
pub const DATE8: &str = "2015-10-08";
226
pub const DATE16: &str = "2015-10-16";
227

228
pub const FIELD_NULL: &Field = &Field::Null;
229

230
pub const FIELD_0_FLOAT: &Field = &Field::Float(OrderedFloat(0.0));
231
pub const FIELD_100_FLOAT: &Field = &Field::Float(OrderedFloat(100.0));
232
pub const FIELD_150_FLOAT: &Field = &Field::Float(OrderedFloat(150.0));
233
pub const FIELD_200_FLOAT: &Field = &Field::Float(OrderedFloat(200.0));
234
pub const FIELD_250_FLOAT: &Field = &Field::Float(OrderedFloat(250.0));
235
pub const FIELD_350_FLOAT: &Field = &Field::Float(OrderedFloat(350.0));
236
pub const FIELD_75_FLOAT: &Field = &Field::Float(OrderedFloat(75.0));
237
pub const FIELD_50_FLOAT: &Field = &Field::Float(OrderedFloat(50.0));
238
pub const FIELD_250_DIV_3_FLOAT: &Field = &Field::Float(OrderedFloat(250.0 / 3.0));
239
pub const FIELD_350_DIV_3_FLOAT: &Field = &Field::Float(OrderedFloat(350.0 / 3.0));
240

241
pub const FIELD_0_INT: &Field = &Field::Int(0);
242
pub const FIELD_1_INT: &Field = &Field::Int(1);
243
pub const FIELD_2_INT: &Field = &Field::Int(2);
244
pub const FIELD_3_INT: &Field = &Field::Int(3);
245
pub const FIELD_100_INT: &Field = &Field::Int(100);
246
pub const FIELD_150_INT: &Field = &Field::Int(150);
247
pub const FIELD_200_INT: &Field = &Field::Int(200);
248
pub const FIELD_250_INT: &Field = &Field::Int(250);
249
pub const FIELD_350_INT: &Field = &Field::Int(350);
250
pub const FIELD_50_INT: &Field = &Field::Int(50);
251

252
pub const FIELD_100_UINT: &Field = &Field::UInt(100);
253
pub const FIELD_150_UINT: &Field = &Field::UInt(150);
254
pub const FIELD_200_UINT: &Field = &Field::UInt(200);
255
pub const FIELD_250_UINT: &Field = &Field::UInt(250);
256
pub const FIELD_350_UINT: &Field = &Field::UInt(350);
257
pub const FIELD_50_UINT: &Field = &Field::UInt(50);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc