• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4371879582

pending completion
4371879582

push

github

GitHub
chore: Remove `Record::get_hashed_primary_key` and use primary key instead (#1191)

27570 of 38452 relevant lines covered (71.7%)

51081.14 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.84
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2

3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::execution::ExpressionExecutor;
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::channels::ProcessorChannelForwarder;
7
use dozer_core::errors::ExecutionError;
8
use dozer_core::errors::ExecutionError::InternalError;
9
use dozer_core::node::{PortHandle, Processor};
10
use dozer_core::storage::lmdb_storage::SharedTransaction;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::types::{Field, FieldType, Operation, Record, Schema};
13
use std::hash::{Hash, Hasher};
14

15
use crate::pipeline::aggregation::aggregator::{
16
    get_aggregator_from_aggregator_type, get_aggregator_type_from_aggregation_expression,
17
    AggregatorType,
18
};
19
use ahash::AHasher;
20
use dozer_core::epoch::Epoch;
21
use hashbrown::HashMap;
22

23
const DEFAULT_SEGMENT_KEY: &str = "DOZER_DEFAULT_SEGMENT_KEY";
24

25
#[derive(Debug)]
×
26
struct AggregationState {
27
    count: usize,
28
    states: Vec<Box<dyn Aggregator>>,
29
    values: Option<Vec<Field>>,
30
}
31

32
impl AggregationState {
33
    pub fn new(types: &[AggregatorType], ret_types: &[FieldType]) -> Self {
21,195✔
34
        let mut states: Vec<Box<dyn Aggregator>> = Vec::new();
21,195✔
35
        for (idx, typ) in types.iter().enumerate() {
21,197✔
36
            let mut aggr = get_aggregator_from_aggregator_type(*typ);
21,197✔
37
            aggr.init(ret_types[idx]);
21,197✔
38
            states.push(aggr);
21,197✔
39
        }
21,197✔
40

41
        Self {
21,196✔
42
            count: 0,
21,196✔
43
            states,
21,196✔
44
            values: None,
21,196✔
45
        }
21,196✔
46
    }
21,196✔
47
}
48

49
#[derive(Debug)]
×
50
pub struct AggregationProcessor {
51
    dimensions: Vec<Expression>,
52
    measures: Vec<Vec<Expression>>,
53
    measures_types: Vec<AggregatorType>,
54
    measures_return_types: Vec<FieldType>,
55
    projections: Vec<Expression>,
56
    input_schema: Schema,
57
    aggregation_schema: Schema,
58
    states: HashMap<u64, AggregationState>,
59
    default_segment_key: u64,
60
}
61

62
enum AggregatorOperation {
63
    Insert,
64
    Delete,
65
    Update,
66
}
67

68
impl AggregationProcessor {
69
    pub fn new(
112✔
70
        dimensions: Vec<Expression>,
112✔
71
        measures: Vec<Expression>,
112✔
72
        projections: Vec<Expression>,
112✔
73
        input_schema: Schema,
112✔
74
        aggregation_schema: Schema,
112✔
75
    ) -> Result<Self, PipelineError> {
112✔
76
        let mut aggr_types = Vec::new();
112✔
77
        let mut aggr_measures = Vec::new();
112✔
78
        let mut aggr_measures_ret_types = Vec::new();
112✔
79

80
        for measure in measures {
224✔
81
            let (aggr_measure, aggr_type) =
112✔
82
                get_aggregator_type_from_aggregation_expression(&measure, &input_schema)?;
112✔
83
            aggr_measures.push(aggr_measure);
112✔
84
            aggr_types.push(aggr_type);
112✔
85
            aggr_measures_ret_types.push(measure.get_type(&input_schema)?.return_type)
112✔
86
        }
87

88
        let mut hasher = AHasher::default();
112✔
89
        DEFAULT_SEGMENT_KEY.hash(&mut hasher);
112✔
90

112✔
91
        Ok(Self {
112✔
92
            dimensions,
112✔
93
            projections,
112✔
94
            input_schema,
112✔
95
            aggregation_schema,
112✔
96
            states: HashMap::new(),
112✔
97
            measures: aggr_measures,
112✔
98
            measures_types: aggr_types,
112✔
99
            measures_return_types: aggr_measures_ret_types,
112✔
100
            default_segment_key: hasher.finish(),
112✔
101
        })
112✔
102
    }
112✔
103

104
    fn calc_and_fill_measures(
21,404✔
105
        curr_state: &mut AggregationState,
21,404✔
106
        deleted_record: Option<&Record>,
21,404✔
107
        inserted_record: Option<&Record>,
21,404✔
108
        out_rec_delete: &mut Vec<Field>,
21,404✔
109
        out_rec_insert: &mut Vec<Field>,
21,404✔
110
        op: AggregatorOperation,
21,404✔
111
        measures: &Vec<Vec<Expression>>,
21,404✔
112
        input_schema: &Schema,
21,404✔
113
    ) -> Result<Vec<Field>, PipelineError> {
21,404✔
114
        //
21,404✔
115

21,404✔
116
        let mut new_fields: Vec<Field> = Vec::with_capacity(measures.len());
21,404✔
117

118
        for (idx, measure) in measures.iter().enumerate() {
21,405✔
119
            let curr_aggr = &mut curr_state.states[idx];
21,405✔
120
            let curr_val_opt: Option<&Field> = curr_state.values.as_ref().map(|e| &e[idx]);
21,405✔
121

122
            let new_val = match op {
21,406✔
123
                AggregatorOperation::Insert => {
124
                    let mut inserted_fields = Vec::with_capacity(measure.len());
21,197✔
125
                    for m in measure {
42,394✔
126
                        inserted_fields.push(m.evaluate(inserted_record.unwrap(), input_schema)?);
21,196✔
127
                    }
128
                    if let Some(curr_val) = curr_val_opt {
21,198✔
129
                        out_rec_delete.push(curr_val.clone());
11,067✔
130
                    }
20,127✔
131
                    curr_aggr.insert(&inserted_fields)?
21,198✔
132
                }
133
                AggregatorOperation::Delete => {
134
                    let mut deleted_fields = Vec::with_capacity(measure.len());
164✔
135
                    for m in measure {
328✔
136
                        deleted_fields.push(m.evaluate(deleted_record.unwrap(), input_schema)?);
163✔
137
                    }
138
                    if let Some(curr_val) = curr_val_opt {
165✔
139
                        out_rec_delete.push(curr_val.clone());
165✔
140
                    }
165✔
141
                    curr_aggr.delete(&deleted_fields)?
165✔
142
                }
143
                AggregatorOperation::Update => {
144
                    let mut deleted_fields = Vec::with_capacity(measure.len());
44✔
145
                    for m in measure {
88✔
146
                        deleted_fields.push(m.evaluate(deleted_record.unwrap(), input_schema)?);
44✔
147
                    }
148
                    let mut inserted_fields = Vec::with_capacity(measure.len());
44✔
149
                    for m in measure {
88✔
150
                        inserted_fields.push(m.evaluate(inserted_record.unwrap(), input_schema)?);
44✔
151
                    }
152
                    if let Some(curr_val) = curr_val_opt {
44✔
153
                        out_rec_delete.push(curr_val.clone());
44✔
154
                    }
44✔
155
                    curr_aggr.update(&deleted_fields, &inserted_fields)?
44✔
156
                }
157
            };
158
            out_rec_insert.push(new_val.clone());
21,401✔
159
            new_fields.push(new_val);
21,401✔
160
        }
161
        Ok(new_fields)
21,400✔
162
    }
21,400✔
163

164
    fn agg_delete(&mut self, old: &mut Record) -> Result<Operation, PipelineError> {
165✔
165
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
165✔
166
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
165✔
167

168
        let key = if !self.dimensions.is_empty() {
165✔
169
            get_key(&self.input_schema, old, &self.dimensions)?
165✔
170
        } else {
171
            self.default_segment_key
×
172
        };
173

174
        let curr_state_opt = self.states.get_mut(&key);
165✔
175
        assert!(
165✔
176
            curr_state_opt.is_some(),
165✔
177
            "Unable to find aggregator state during DELETE operation"
×
178
        );
179
        let mut curr_state = curr_state_opt.unwrap();
165✔
180

181
        let new_values = Self::calc_and_fill_measures(
165✔
182
            curr_state,
165✔
183
            Some(old),
165✔
184
            None,
165✔
185
            &mut out_rec_delete,
165✔
186
            &mut out_rec_insert,
165✔
187
            AggregatorOperation::Delete,
165✔
188
            &self.measures,
165✔
189
            &self.input_schema,
165✔
190
        )?;
165✔
191

192
        let res = if curr_state.count == 1 {
165✔
193
            self.states.remove(&key);
97✔
194
            Operation::Delete {
97✔
195
                old: Self::build_projection(
97✔
196
                    old,
97✔
197
                    out_rec_delete,
97✔
198
                    &self.projections,
97✔
199
                    &self.aggregation_schema,
97✔
200
                )?,
97✔
201
            }
202
        } else {
203
            curr_state.count -= 1;
68✔
204
            curr_state.values = Some(new_values);
68✔
205
            Operation::Update {
68✔
206
                new: Self::build_projection(
68✔
207
                    old,
68✔
208
                    out_rec_insert,
68✔
209
                    &self.projections,
68✔
210
                    &self.aggregation_schema,
68✔
211
                )?,
68✔
212
                old: Self::build_projection(
68✔
213
                    old,
68✔
214
                    out_rec_delete,
68✔
215
                    &self.projections,
68✔
216
                    &self.aggregation_schema,
68✔
217
                )?,
68✔
218
            }
219
        };
220

221
        Ok(res)
162✔
222
    }
162✔
223

224
    fn agg_insert(&mut self, new: &mut Record) -> Result<Operation, PipelineError> {
21,199✔
225
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
21,199✔
226
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
21,199✔
227

228
        let key = if !self.dimensions.is_empty() {
21,199✔
229
            get_key(&self.input_schema, new, &self.dimensions)?
10,189✔
230
        } else {
231
            self.default_segment_key
11,010✔
232
        };
233

234
        let curr_state = self.states.entry(key).or_insert(AggregationState::new(
21,199✔
235
            &self.measures_types,
21,199✔
236
            &self.measures_return_types,
21,199✔
237
        ));
21,199✔
238

239
        let new_values = Self::calc_and_fill_measures(
21,199✔
240
            curr_state,
21,199✔
241
            None,
21,199✔
242
            Some(new),
21,199✔
243
            &mut out_rec_delete,
21,199✔
244
            &mut out_rec_insert,
21,199✔
245
            AggregatorOperation::Insert,
21,199✔
246
            &self.measures,
21,199✔
247
            &self.input_schema,
21,199✔
248
        )?;
21,199✔
249

250
        let res = if curr_state.count == 0 {
21,199✔
251
            Operation::Insert {
252
                new: Self::build_projection(
10,132✔
253
                    new,
10,132✔
254
                    out_rec_insert,
10,132✔
255
                    &self.projections,
10,132✔
256
                    &self.aggregation_schema,
10,132✔
257
                )?,
10,132✔
258
            }
259
        } else {
260
            Operation::Update {
261
                new: Self::build_projection(
11,067✔
262
                    new,
11,067✔
263
                    out_rec_insert,
11,067✔
264
                    &self.projections,
11,067✔
265
                    &self.aggregation_schema,
11,067✔
266
                )?,
11,067✔
267
                old: Self::build_projection(
11,067✔
268
                    new,
11,067✔
269
                    out_rec_delete,
11,067✔
270
                    &self.projections,
11,067✔
271
                    &self.aggregation_schema,
11,067✔
272
                )?,
11,067✔
273
            }
274
        };
275

276
        curr_state.count += 1;
21,198✔
277
        curr_state.values = Some(new_values);
21,198✔
278

21,198✔
279
        Ok(res)
21,198✔
280
    }
21,198✔
281

282
    fn agg_update(
44✔
283
        &mut self,
44✔
284
        old: &mut Record,
44✔
285
        new: &mut Record,
44✔
286
        key: u64,
44✔
287
    ) -> Result<Operation, PipelineError> {
44✔
288
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
44✔
289
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
44✔
290

44✔
291
        let curr_state_opt = self.states.get_mut(&key);
44✔
292
        assert!(
44✔
293
            curr_state_opt.is_some(),
44✔
294
            "Unable to find aggregator state during UPDATE operation"
×
295
        );
296
        let mut curr_state = curr_state_opt.unwrap();
44✔
297

298
        let new_values = Self::calc_and_fill_measures(
44✔
299
            curr_state,
44✔
300
            Some(old),
44✔
301
            Some(new),
44✔
302
            &mut out_rec_delete,
44✔
303
            &mut out_rec_insert,
44✔
304
            AggregatorOperation::Update,
44✔
305
            &self.measures,
44✔
306
            &self.input_schema,
44✔
307
        )?;
44✔
308

309
        let res = Operation::Update {
44✔
310
            new: Self::build_projection(
44✔
311
                new,
44✔
312
                out_rec_insert,
44✔
313
                &self.projections,
44✔
314
                &self.aggregation_schema,
44✔
315
            )?,
44✔
316
            old: Self::build_projection(
44✔
317
                old,
44✔
318
                out_rec_delete,
44✔
319
                &self.projections,
44✔
320
                &self.aggregation_schema,
44✔
321
            )?,
44✔
322
        };
323

324
        curr_state.values = Some(new_values);
44✔
325
        Ok(res)
44✔
326
    }
44✔
327

328
    pub fn build_projection(
32,582✔
329
        original: &mut Record,
32,582✔
330
        measures: Vec<Field>,
32,582✔
331
        projections: &Vec<Expression>,
32,582✔
332
        aggregation_schema: &Schema,
32,582✔
333
    ) -> Result<Record, PipelineError> {
32,582✔
334
        let original_len = original.values.len();
32,582✔
335
        original.values.extend(measures);
32,582✔
336
        let mut output = Vec::<Field>::with_capacity(projections.len());
32,582✔
337
        for exp in projections {
95,714✔
338
            output.push(exp.evaluate(original, aggregation_schema)?);
63,133✔
339
        }
340
        original.values.drain(original_len..);
32,581✔
341
        Ok(Record::new(None, output, None))
32,581✔
342
    }
32,581✔
343

344
    pub fn aggregate(&mut self, mut op: Operation) -> Result<Vec<Operation>, PipelineError> {
21,383✔
345
        match op {
21,383✔
346
            Operation::Insert { ref mut new } => Ok(vec![self.agg_insert(new)?]),
21,176✔
347
            Operation::Delete { ref mut old } => Ok(vec![self.agg_delete(old)?]),
140✔
348
            Operation::Update {
349
                ref mut old,
67✔
350
                ref mut new,
67✔
351
            } => {
352
                let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() {
67✔
353
                    (self.default_segment_key, self.default_segment_key)
×
354
                } else {
355
                    (
356
                        get_key(&self.input_schema, old, &self.dimensions)?,
67✔
357
                        get_key(&self.input_schema, new, &self.dimensions)?,
67✔
358
                    )
359
                };
360

361
                if old_record_hash == new_record_hash {
67✔
362
                    Ok(vec![self.agg_update(old, new, old_record_hash)?])
44✔
363
                } else {
364
                    Ok(vec![self.agg_delete(old)?, self.agg_insert(new)?])
23✔
365
                }
366
            }
367
        }
368
    }
21,382✔
369
}
370

371
fn get_key(
10,484✔
372
    schema: &Schema,
10,484✔
373
    record: &Record,
10,484✔
374
    dimensions: &[Expression],
10,484✔
375
) -> Result<u64, PipelineError> {
10,484✔
376
    let mut key = Vec::<Field>::with_capacity(dimensions.len());
10,484✔
377
    for dimension in dimensions.iter() {
10,484✔
378
        key.push(dimension.evaluate(record, schema)?);
10,484✔
379
    }
380
    let mut hasher = AHasher::default();
10,486✔
381
    key.hash(&mut hasher);
10,486✔
382
    let v = hasher.finish();
10,486✔
383
    Ok(v)
10,486✔
384
}
10,486✔
385

386
impl Processor for AggregationProcessor {
387
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
66✔
388
        Ok(())
66✔
389
    }
66✔
390

391
    fn process(
21,090✔
392
        &mut self,
21,090✔
393
        _from_port: PortHandle,
21,090✔
394
        op: Operation,
21,090✔
395
        fw: &mut dyn ProcessorChannelForwarder,
21,090✔
396
        _txn: &SharedTransaction,
21,090✔
397
    ) -> Result<(), ExecutionError> {
21,090✔
398
        let ops = self.aggregate(op).map_err(|e| InternalError(Box::new(e)))?;
21,090✔
399
        for fop in ops {
42,180✔
400
            fw.send(fop, DEFAULT_PORT_HANDLE)?;
21,090✔
401
        }
402
        Ok(())
21,090✔
403
    }
21,090✔
404
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc