• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4113913291

pending completion
4113913291

Pull #821

github

GitHub
Merge a8cca3f0b into 8f74ec17e
Pull Request #821: refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync`

869 of 869 new or added lines in 45 files covered. (100.0%)

23486 of 37503 relevant lines covered (62.62%)

36806.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.02
/dozer-sql/src/pipeline/aggregation/tests/aggregation_tests_utils.rs
1
use dozer_core::{
2
    node::{PortHandle, Processor},
3
    storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction},
4
    DEFAULT_PORT_HANDLE,
5
};
6
use dozer_types::types::{
7
    Field, FieldDefinition, FieldType, Operation, Record, Schema, SourceDefinition, DATE_FORMAT,
8
};
9
use std::collections::HashMap;
10

11
use crate::pipeline::{
12
    aggregation::{factory::get_aggregation_rules, processor::AggregationProcessor},
13
    errors::PipelineError,
14
    tests::utils::get_select,
15
};
16

17
use dozer_types::chrono::{DateTime, NaiveDate, TimeZone, Utc};
18
use dozer_types::ordered_float::OrderedFloat;
19
use dozer_types::rust_decimal::Decimal;
20
use std::ops::Div;
21
use std::path::Path;
22

23
pub(crate) fn init_processor(
46✔
24
    sql: &str,
46✔
25
    input_schemas: HashMap<PortHandle, Schema>,
46✔
26
) -> Result<(AggregationProcessor, SharedTransaction), PipelineError> {
46✔
27
    let select = get_select(sql)?;
46✔
28

×
29
    let input_schema = input_schemas
46✔
30
        .get(&DEFAULT_PORT_HANDLE)
46✔
31
        .unwrap_or_else(|| panic!("Error getting Input Schema"));
46✔
32

×
33
    let output_field_rules = get_aggregation_rules(
46✔
34
        &select.projection.clone(),
46✔
35
        &select.group_by.clone(),
46✔
36
        input_schema,
46✔
37
    )?;
46✔
38

×
39
    let mut processor = AggregationProcessor::new(output_field_rules, input_schema.clone());
46✔
40

46✔
41
    let mut storage =
46✔
42
        LmdbEnvironmentManager::create(Path::new("/tmp"), "aggregation_test", Default::default())
46✔
43
            .unwrap_or_else(|e| panic!("{}", e.to_string()));
46✔
44

46✔
45
    processor
46✔
46
        .init(&mut storage)
46✔
47
        .unwrap_or_else(|e| panic!("{}", e.to_string()));
46✔
48

46✔
49
    let tx = storage.create_txn().unwrap();
46✔
50

46✔
51
    Ok((processor, tx))
46✔
52
}
46✔
53

×
54
pub(crate) fn init_input_schema(field_type: FieldType, aggregator_name: &str) -> Schema {
46✔
55
    Schema::empty()
46✔
56
        .field(
46✔
57
            FieldDefinition::new(
46✔
58
                String::from("ID"),
46✔
59
                FieldType::Int,
46✔
60
                false,
46✔
61
                SourceDefinition::Dynamic,
46✔
62
            ),
46✔
63
            false,
46✔
64
        )
46✔
65
        .field(
46✔
66
            FieldDefinition::new(
46✔
67
                String::from("Country"),
46✔
68
                FieldType::String,
46✔
69
                false,
46✔
70
                SourceDefinition::Dynamic,
46✔
71
            ),
46✔
72
            false,
46✔
73
        )
46✔
74
        .field(
46✔
75
            FieldDefinition::new(
46✔
76
                String::from("Salary"),
46✔
77
                field_type,
46✔
78
                false,
46✔
79
                SourceDefinition::Dynamic,
46✔
80
            ),
46✔
81
            false,
46✔
82
        )
46✔
83
        .field(
46✔
84
            FieldDefinition::new(
46✔
85
                format!("{aggregator_name}(Salary)"),
46✔
86
                field_type,
46✔
87
                false,
46✔
88
                SourceDefinition::Dynamic,
46✔
89
            ),
46✔
90
            false,
46✔
91
        )
46✔
92
        .clone()
46✔
93
}
46✔
94

×
95
pub(crate) fn insert_field(country: &str, insert_field: &Field) -> Operation {
113✔
96
    Operation::Insert {
113✔
97
        new: Record::new(
113✔
98
            None,
113✔
99
            vec![
113✔
100
                Field::Int(0),
113✔
101
                Field::String(country.to_string()),
113✔
102
                insert_field.clone(),
113✔
103
                insert_field.clone(),
113✔
104
            ],
113✔
105
            None,
113✔
106
        ),
113✔
107
    }
113✔
108
}
113✔
109

×
110
pub(crate) fn delete_field(country: &str, deleted_field: &Field) -> Operation {
112✔
111
    Operation::Delete {
112✔
112
        old: Record::new(
112✔
113
            None,
112✔
114
            vec![
112✔
115
                Field::Int(0),
112✔
116
                Field::String(country.to_string()),
112✔
117
                deleted_field.clone(),
112✔
118
                deleted_field.clone(),
112✔
119
            ],
112✔
120
            None,
112✔
121
        ),
112✔
122
    }
112✔
123
}
112✔
124

×
125
pub(crate) fn update_field(
67✔
126
    old_country: &str,
67✔
127
    new_country: &str,
67✔
128
    old: &Field,
67✔
129
    new: &Field,
67✔
130
) -> Operation {
67✔
131
    Operation::Update {
67✔
132
        old: Record::new(
67✔
133
            None,
67✔
134
            vec![
67✔
135
                Field::Int(0),
67✔
136
                Field::String(old_country.to_string()),
67✔
137
                old.clone(),
67✔
138
                old.clone(),
67✔
139
            ],
67✔
140
            None,
67✔
141
        ),
67✔
142
        new: Record::new(
67✔
143
            None,
67✔
144
            vec![
67✔
145
                Field::Int(0),
67✔
146
                Field::String(new_country.to_string()),
67✔
147
                new.clone(),
67✔
148
                new.clone(),
67✔
149
            ],
67✔
150
            None,
67✔
151
        ),
67✔
152
    }
67✔
153
}
67✔
154

×
155
pub(crate) fn insert_exp(country: &str, inserted_field: &Field) -> Operation {
69✔
156
    Operation::Insert {
69✔
157
        new: Record::new(
69✔
158
            None,
69✔
159
            vec![Field::String(country.to_string()), inserted_field.clone()],
69✔
160
            None,
69✔
161
        ),
69✔
162
    }
69✔
163
}
69✔
164

×
165
pub(crate) fn delete_exp(country: &str, deleted_field: &Field) -> Operation {
68✔
166
    Operation::Delete {
68✔
167
        old: Record::new(
68✔
168
            None,
68✔
169
            vec![Field::String(country.to_string()), deleted_field.clone()],
68✔
170
            None,
68✔
171
        ),
68✔
172
    }
68✔
173
}
68✔
174

×
175
pub(crate) fn update_exp(
178✔
176
    old_country: &str,
178✔
177
    new_country: &str,
178✔
178
    old: &Field,
178✔
179
    new: &Field,
178✔
180
) -> Operation {
178✔
181
    Operation::Update {
178✔
182
        old: Record::new(
178✔
183
            None,
178✔
184
            vec![Field::String(old_country.to_string()), old.clone()],
178✔
185
            None,
178✔
186
        ),
178✔
187
        new: Record::new(
178✔
188
            None,
178✔
189
            vec![Field::String(new_country.to_string()), new.clone()],
178✔
190
            None,
178✔
191
        ),
178✔
192
    }
178✔
193
}
178✔
194

×
195
pub fn get_decimal_field(val: i64) -> Field {
172✔
196
    Field::Decimal(Decimal::new(val, 0))
172✔
197
}
172✔
198

×
199
pub fn get_decimal_div_field(numerator: i64, denominator: i64) -> Field {
12✔
200
    Field::Decimal(Decimal::new(numerator, 0).div(Decimal::new(denominator, 0)))
12✔
201
}
12✔
202

×
203
pub fn get_ts_field(val: i64) -> Field {
70✔
204
    Field::Timestamp(DateTime::from(Utc.timestamp_millis(val)))
70✔
205
}
70✔
206

×
207
pub fn get_date_field(val: &str) -> Field {
56✔
208
    Field::Date(NaiveDate::parse_from_str(val, DATE_FORMAT).unwrap())
56✔
209
}
56✔
210

×
211
#[macro_export]
212
macro_rules! output {
213
    ($processor:expr, $inp:expr, $tx:expr) => {
214
        $processor
215
            .aggregate(&mut $tx.write(), $processor.db.unwrap(), $inp)
216
            .unwrap_or_else(|_e| panic!("Error executing aggregate"))
217
    };
218
}
219

220
pub const ITALY: &str = "Italy";
221
pub const SINGAPORE: &str = "Singapore";
222

223
pub const DATE4: &str = "2015-10-04";
224
pub const DATE8: &str = "2015-10-08";
225
pub const DATE16: &str = "2015-10-16";
226

227
pub const FIELD_NULL: &Field = &Field::Null;
228

229
pub const FIELD_0_FLOAT: &Field = &Field::Float(OrderedFloat(0.0));
230
pub const FIELD_100_FLOAT: &Field = &Field::Float(OrderedFloat(100.0));
231
pub const FIELD_150_FLOAT: &Field = &Field::Float(OrderedFloat(150.0));
232
pub const FIELD_200_FLOAT: &Field = &Field::Float(OrderedFloat(200.0));
233
pub const FIELD_250_FLOAT: &Field = &Field::Float(OrderedFloat(250.0));
234
pub const FIELD_350_FLOAT: &Field = &Field::Float(OrderedFloat(350.0));
235
pub const FIELD_75_FLOAT: &Field = &Field::Float(OrderedFloat(75.0));
236
pub const FIELD_50_FLOAT: &Field = &Field::Float(OrderedFloat(50.0));
237
pub const FIELD_250_DIV_3_FLOAT: &Field = &Field::Float(OrderedFloat(250.0 / 3.0));
238
pub const FIELD_350_DIV_3_FLOAT: &Field = &Field::Float(OrderedFloat(350.0 / 3.0));
239

240
pub const FIELD_0_INT: &Field = &Field::Int(0);
241
pub const FIELD_1_INT: &Field = &Field::Int(1);
242
pub const FIELD_2_INT: &Field = &Field::Int(2);
243
pub const FIELD_3_INT: &Field = &Field::Int(3);
244
pub const FIELD_100_INT: &Field = &Field::Int(100);
245
pub const FIELD_150_INT: &Field = &Field::Int(150);
246
pub const FIELD_200_INT: &Field = &Field::Int(200);
247
pub const FIELD_250_INT: &Field = &Field::Int(250);
248
pub const FIELD_350_INT: &Field = &Field::Int(350);
249
pub const FIELD_50_INT: &Field = &Field::Int(50);
250

251
pub const FIELD_100_UINT: &Field = &Field::UInt(100);
252
pub const FIELD_150_UINT: &Field = &Field::UInt(150);
253
pub const FIELD_200_UINT: &Field = &Field::UInt(200);
254
pub const FIELD_250_UINT: &Field = &Field::UInt(250);
255
pub const FIELD_350_UINT: &Field = &Field::UInt(350);
256
pub const FIELD_50_UINT: &Field = &Field::UInt(50);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc