• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5972853941

25 Aug 2023 06:52AM UTC coverage: 76.247% (+0.8%) from 75.446%
5972853941

push

github

web-flow
feat: make probabilistic optimizations optional and tunable in the YAML config (#1912)

Probabilistic optimization sacrifices accuracy in order to reduce memory consumption. In certain parts of the pipeline, a Bloom Filter is used ([set_processor](https://github.com/getdozer/dozer/blob/<a class=hub.com/getdozer/dozer/commit/<a class="double-link" href="https://git"><a class=hub.com/getdozer/dozer/commit/2e3ba96c3f4bdf9a691747191ab15617564d8ca2">2e3ba96c3/dozer-sql/src/pipeline/product/set/set_processor.rs#L20)), while in other parts, hash tables that store only the hash of the keys instead of the full keys are used ([aggregation_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/aggregation/processor.rs#L59) and [join_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/join/operator.rs#L57-L58)).

This commit makes these optimizations disabled by default and offers user-configurable flags to enable each of these optimizations separately.

This is an example of how to turn on probabilistic optimizations for each processor in the Dozer configuration.

```
flags:
  enable_probabilistic_optimizations:
    in_sets: true # enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false
    in_joins: true # enable probabilistic optimizations in JOIN operations; Default: false
    in_aggregations: true # enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false
```

347 of 347 new or added lines in 25 files covered. (100.0%)

47165 of 61858 relevant lines covered (76.25%)

48442.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.6
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2

3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::utils::record_hashtable_key::{get_record_hash, RecordKey};
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::channels::ProcessorChannelForwarder;
7
use dozer_core::executor_operation::ProcessorOperation;
8
use dozer_core::node::{PortHandle, Processor};
9
use dozer_core::processor_record::ProcessorRecordStore;
10
use dozer_core::DEFAULT_PORT_HANDLE;
11
use dozer_types::errors::internal::BoxedError;
12
use dozer_types::types::{Field, FieldType, Operation, Record, Schema};
13
use std::collections::HashMap;
14

15
use crate::pipeline::aggregation::aggregator::{
16
    get_aggregator_from_aggregator_type, get_aggregator_type_from_aggregation_expression,
17
    AggregatorEnum, AggregatorType,
18
};
19
use dozer_core::epoch::Epoch;
20

21
const DEFAULT_SEGMENT_KEY: &str = "DOZER_DEFAULT_SEGMENT_KEY";
22

23
#[derive(Debug)]
×
24
struct AggregationState {
25
    count: usize,
26
    states: Vec<AggregatorEnum>,
27
    values: Option<Vec<Field>>,
28
}
29

30
impl AggregationState {
31
    pub fn new(types: &[AggregatorType], ret_types: &[FieldType]) -> Self {
13,025✔
32
        let mut states: Vec<AggregatorEnum> = Vec::new();
13,025✔
33
        for (idx, typ) in types.iter().enumerate() {
13,242✔
34
            let mut aggr = get_aggregator_from_aggregator_type(*typ);
13,242✔
35
            aggr.init(ret_types[idx]);
13,242✔
36
            states.push(aggr);
13,242✔
37
        }
13,242✔
38

39
        Self {
13,025✔
40
            count: 0,
13,025✔
41
            states,
13,025✔
42
            values: None,
13,025✔
43
        }
13,025✔
44
    }
13,025✔
45
}
46

47
#[derive(Debug)]
×
48
pub struct AggregationProcessor {
49
    _id: String,
50
    dimensions: Vec<Expression>,
51
    measures: Vec<Vec<Expression>>,
52
    measures_types: Vec<AggregatorType>,
53
    measures_return_types: Vec<FieldType>,
54
    projections: Vec<Expression>,
55
    having: Option<Expression>,
56
    input_schema: Schema,
57
    aggregation_schema: Schema,
58
    states: HashMap<RecordKey, AggregationState>,
59
    default_segment_key: RecordKey,
60
    having_eval_schema: Schema,
61
    accurate_keys: bool,
62
}
63

64
enum AggregatorOperation {
65
    Insert,
66
    Delete,
67
    Update,
68
}
69

70
impl AggregationProcessor {
71
    pub fn new(
300✔
72
        id: String,
300✔
73
        dimensions: Vec<Expression>,
300✔
74
        measures: Vec<Expression>,
300✔
75
        projections: Vec<Expression>,
300✔
76
        having: Option<Expression>,
300✔
77
        input_schema: Schema,
300✔
78
        aggregation_schema: Schema,
300✔
79
        enable_probabilistic_optimizations: bool,
300✔
80
    ) -> Result<Self, PipelineError> {
300✔
81
        let mut aggr_types = Vec::new();
300✔
82
        let mut aggr_measures = Vec::new();
300✔
83
        let mut aggr_measures_ret_types = Vec::new();
300✔
84

85
        for measure in measures {
621✔
86
            let (aggr_measure, aggr_type) =
321✔
87
                get_aggregator_type_from_aggregation_expression(&measure, &input_schema)?;
321✔
88
            aggr_measures.push(aggr_measure);
321✔
89
            aggr_types.push(aggr_type);
321✔
90
            aggr_measures_ret_types.push(measure.get_type(&input_schema)?.return_type)
321✔
91
        }
92

93
        let mut having_eval_schema_fields = input_schema.fields.clone();
300✔
94
        having_eval_schema_fields.extend(aggregation_schema.fields.clone());
300✔
95

300✔
96
        let accurate_keys = !enable_probabilistic_optimizations;
300✔
97

300✔
98
        Ok(Self {
300✔
99
            _id: id,
300✔
100
            dimensions,
300✔
101
            projections,
300✔
102
            input_schema,
300✔
103
            aggregation_schema,
300✔
104
            states: HashMap::new(),
300✔
105
            measures: aggr_measures,
300✔
106
            having,
300✔
107
            measures_types: aggr_types,
300✔
108
            measures_return_types: aggr_measures_ret_types,
300✔
109
            default_segment_key: {
300✔
110
                let fields = vec![Field::String(DEFAULT_SEGMENT_KEY.into())];
300✔
111
                if accurate_keys {
300✔
112
                    RecordKey::Accurate(fields)
300✔
113
                } else {
114
                    RecordKey::Hash(get_record_hash(fields.iter()))
×
115
                }
116
            },
117
            having_eval_schema: Schema {
300✔
118
                fields: having_eval_schema_fields,
300✔
119
                primary_index: vec![],
300✔
120
            },
300✔
121
            accurate_keys,
300✔
122
        })
123
    }
300✔
124

125
    fn calc_and_fill_measures(
14,348✔
126
        curr_state: &mut AggregationState,
14,348✔
127
        deleted_record: Option<&Record>,
14,348✔
128
        inserted_record: Option<&Record>,
14,348✔
129
        out_rec_delete: &mut Vec<Field>,
14,348✔
130
        out_rec_insert: &mut Vec<Field>,
14,348✔
131
        op: AggregatorOperation,
14,348✔
132
        measures: &Vec<Vec<Expression>>,
14,348✔
133
        input_schema: &Schema,
14,348✔
134
    ) -> Result<Vec<Field>, PipelineError> {
14,348✔
135
        let mut new_fields: Vec<Field> = Vec::with_capacity(measures.len());
14,348✔
136

137
        for (idx, measure) in measures.iter().enumerate() {
14,670✔
138
            let curr_aggr = &mut curr_state.states[idx];
14,670✔
139
            let curr_val_opt: Option<&Field> = curr_state.values.as_ref().map(|e| &e[idx]);
14,670✔
140

141
            let new_val = match op {
14,670✔
142
                AggregatorOperation::Insert => {
143
                    let mut inserted_fields = Vec::with_capacity(measure.len());
13,242✔
144
                    for m in measure {
26,550✔
145
                        inserted_fields.push(m.evaluate(inserted_record.unwrap(), input_schema)?);
13,308✔
146
                    }
147
                    if let Some(curr_val) = curr_val_opt {
13,242✔
148
                        out_rec_delete.push(curr_val.clone());
11,013✔
149
                    }
12,238✔
150
                    curr_aggr.insert(&inserted_fields)?
13,242✔
151
                }
152
                AggregatorOperation::Delete => {
153
                    let mut deleted_fields = Vec::with_capacity(measure.len());
1,030✔
154
                    for m in measure {
2,126✔
155
                        deleted_fields.push(m.evaluate(deleted_record.unwrap(), input_schema)?);
1,096✔
156
                    }
157
                    if let Some(curr_val) = curr_val_opt {
1,030✔
158
                        out_rec_delete.push(curr_val.clone());
1,030✔
159
                    }
1,030✔
160
                    curr_aggr.delete(&deleted_fields)?
1,030✔
161
                }
162
                AggregatorOperation::Update => {
163
                    let mut deleted_fields = Vec::with_capacity(measure.len());
398✔
164
                    for m in measure {
850✔
165
                        deleted_fields.push(m.evaluate(deleted_record.unwrap(), input_schema)?);
452✔
166
                    }
167
                    let mut inserted_fields = Vec::with_capacity(measure.len());
398✔
168
                    for m in measure {
850✔
169
                        inserted_fields.push(m.evaluate(inserted_record.unwrap(), input_schema)?);
452✔
170
                    }
171
                    if let Some(curr_val) = curr_val_opt {
398✔
172
                        out_rec_delete.push(curr_val.clone());
398✔
173
                    }
398✔
174
                    curr_aggr.update(&deleted_fields, &inserted_fields)?
398✔
175
                }
176
            };
177
            out_rec_insert.push(new_val.clone());
14,670✔
178
            new_fields.push(new_val);
14,670✔
179
        }
180
        Ok(new_fields)
14,348✔
181
    }
14,348✔
182

183
    fn agg_delete(&mut self, old: &mut Record) -> Result<Vec<Operation>, PipelineError> {
953✔
184
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
953✔
185
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
953✔
186

187
        let key = if !self.dimensions.is_empty() {
953✔
188
            Some(self.get_key(old)?)
812✔
189
        } else {
190
            None
141✔
191
        };
192
        let key = key.as_ref().unwrap_or(&self.default_segment_key);
953✔
193

953✔
194
        let curr_state_opt = self.states.get_mut(key);
953✔
195
        assert!(
953✔
196
            curr_state_opt.is_some(),
953✔
197
            "Unable to find aggregator state during DELETE operation"
×
198
        );
199
        let curr_state = curr_state_opt.unwrap();
953✔
200

201
        let new_values = Self::calc_and_fill_measures(
953✔
202
            curr_state,
953✔
203
            Some(old),
953✔
204
            None,
953✔
205
            &mut out_rec_delete,
953✔
206
            &mut out_rec_insert,
953✔
207
            AggregatorOperation::Delete,
953✔
208
            &self.measures,
953✔
209
            &self.input_schema,
953✔
210
        )?;
953✔
211

212
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
953✔
213
        {
214
            None => (true, true),
854✔
215
            Some(having) => (
99✔
216
                Self::having_is_satisfied(
99✔
217
                    &self.having_eval_schema,
99✔
218
                    old,
99✔
219
                    having,
99✔
220
                    &mut out_rec_delete,
99✔
221
                )?,
99✔
222
                Self::having_is_satisfied(
99✔
223
                    &self.having_eval_schema,
99✔
224
                    old,
99✔
225
                    having,
99✔
226
                    &mut out_rec_insert,
99✔
227
                )?,
99✔
228
            ),
229
        };
230

231
        let res = if curr_state.count == 1 {
953✔
232
            self.states.remove(key);
532✔
233
            if out_rec_delete_having_satisfied {
532✔
234
                vec![Operation::Delete {
530✔
235
                    old: Self::build_projection(
530✔
236
                        old,
530✔
237
                        out_rec_delete,
530✔
238
                        &self.projections,
530✔
239
                        &self.aggregation_schema,
530✔
240
                    )?,
530✔
241
                }]
242
            } else {
243
                vec![]
2✔
244
            }
245
        } else {
246
            curr_state.count -= 1;
421✔
247
            curr_state.values = Some(new_values);
421✔
248

421✔
249
            Self::generate_op_for_existing_segment(
421✔
250
                out_rec_delete_having_satisfied,
421✔
251
                out_rec_insert_having_satisfied,
421✔
252
                out_rec_delete,
421✔
253
                out_rec_insert,
421✔
254
                old,
421✔
255
                &self.projections,
421✔
256
                &self.aggregation_schema,
421✔
257
            )?
421✔
258
        };
259

260
        Ok(res)
953✔
261
    }
953✔
262

263
    fn agg_insert(&mut self, new: &mut Record) -> Result<Vec<Operation>, PipelineError> {
13,025✔
264
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
13,025✔
265
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
13,025✔
266

267
        let key = if !self.dimensions.is_empty() {
13,025✔
268
            self.get_key(new)?
2,782✔
269
        } else {
270
            self.default_segment_key.clone()
10,243✔
271
        };
272

273
        let curr_state = self.states.entry(key).or_insert(AggregationState::new(
13,025✔
274
            &self.measures_types,
13,025✔
275
            &self.measures_return_types,
13,025✔
276
        ));
13,025✔
277

278
        let new_values = Self::calc_and_fill_measures(
13,025✔
279
            curr_state,
13,025✔
280
            None,
13,025✔
281
            Some(new),
13,025✔
282
            &mut out_rec_delete,
13,025✔
283
            &mut out_rec_insert,
13,025✔
284
            AggregatorOperation::Insert,
13,025✔
285
            &self.measures,
13,025✔
286
            &self.input_schema,
13,025✔
287
        )?;
13,025✔
288

289
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
13,025✔
290
        {
291
            None => (true, true),
12,645✔
292
            Some(having) => (
380✔
293
                Self::having_is_satisfied(
380✔
294
                    &self.having_eval_schema,
380✔
295
                    new,
380✔
296
                    having,
380✔
297
                    &mut out_rec_delete,
380✔
298
                )?,
380✔
299
                Self::having_is_satisfied(
380✔
300
                    &self.having_eval_schema,
380✔
301
                    new,
380✔
302
                    having,
380✔
303
                    &mut out_rec_insert,
380✔
304
                )?,
380✔
305
            ),
306
        };
307

308
        let res = if curr_state.count == 0 {
13,025✔
309
            if out_rec_insert_having_satisfied {
2,075✔
310
                vec![Operation::Insert {
2,005✔
311
                    new: Self::build_projection(
2,005✔
312
                        new,
2,005✔
313
                        out_rec_insert,
2,005✔
314
                        &self.projections,
2,005✔
315
                        &self.aggregation_schema,
2,005✔
316
                    )?,
2,005✔
317
                }]
318
            } else {
319
                vec![]
70✔
320
            }
321
        } else {
322
            Self::generate_op_for_existing_segment(
10,950✔
323
                out_rec_delete_having_satisfied,
10,950✔
324
                out_rec_insert_having_satisfied,
10,950✔
325
                out_rec_delete,
10,950✔
326
                out_rec_insert,
10,950✔
327
                new,
10,950✔
328
                &self.projections,
10,950✔
329
                &self.aggregation_schema,
10,950✔
330
            )?
10,950✔
331
        };
332

333
        curr_state.count += 1;
13,025✔
334
        curr_state.values = Some(new_values);
13,025✔
335

13,025✔
336
        Ok(res)
13,025✔
337
    }
13,025✔
338

339
    fn generate_op_for_existing_segment(
11,371✔
340
        out_rec_delete_having_satisfied: bool,
11,371✔
341
        out_rec_insert_having_satisfied: bool,
11,371✔
342
        out_rec_delete: Vec<Field>,
11,371✔
343
        out_rec_insert: Vec<Field>,
11,371✔
344
        rec: &mut Record,
11,371✔
345
        projections: &Vec<Expression>,
11,371✔
346
        aggregation_schema: &Schema,
11,371✔
347
    ) -> Result<Vec<Operation>, PipelineError> {
11,371✔
348
        Ok(
11,371✔
349
            match (
11,371✔
350
                out_rec_delete_having_satisfied,
11,371✔
351
                out_rec_insert_having_satisfied,
11,371✔
352
            ) {
11,371✔
353
                (false, true) => vec![Operation::Insert {
5✔
354
                    new: Self::build_projection(
5✔
355
                        rec,
5✔
356
                        out_rec_insert,
5✔
357
                        projections,
5✔
358
                        aggregation_schema,
5✔
359
                    )?,
5✔
360
                }],
361
                (true, false) => vec![Operation::Delete {
3✔
362
                    old: Self::build_projection(
3✔
363
                        rec,
3✔
364
                        out_rec_delete,
3✔
365
                        projections,
3✔
366
                        aggregation_schema,
3✔
367
                    )?,
3✔
368
                }],
369
                (true, true) => vec![Operation::Update {
11,059✔
370
                    new: Self::build_projection(
11,059✔
371
                        rec,
11,059✔
372
                        out_rec_insert,
11,059✔
373
                        projections,
11,059✔
374
                        aggregation_schema,
11,059✔
375
                    )?,
11,059✔
376
                    old: Self::build_projection(
11,059✔
377
                        rec,
11,059✔
378
                        out_rec_delete,
11,059✔
379
                        projections,
11,059✔
380
                        aggregation_schema,
11,059✔
381
                    )?,
11,059✔
382
                }],
383
                (false, false) => vec![],
304✔
384
            },
385
        )
386
    }
11,371✔
387

388
    fn having_is_satisfied(
1,048✔
389
        having_eval_schema: &Schema,
1,048✔
390
        original_record: &mut Record,
1,048✔
391
        having: &Expression,
1,048✔
392
        out_rec: &mut Vec<Field>,
1,048✔
393
    ) -> Result<bool, PipelineError> {
1,048✔
394
        //
1,048✔
395
        let original_record_len = original_record.values.len();
1,048✔
396
        Ok(match out_rec.len() {
1,048✔
397
            0 => false,
99✔
398
            _ => {
399
                original_record.values.extend(std::mem::take(out_rec));
949✔
400
                let r = having
949✔
401
                    .evaluate(original_record, having_eval_schema)?
949✔
402
                    .as_boolean()
949✔
403
                    .unwrap_or(false);
949✔
404
                out_rec.extend(
949✔
405
                    original_record
949✔
406
                        .values
949✔
407
                        .drain(original_record_len..)
949✔
408
                        .collect::<Vec<Field>>(),
949✔
409
                );
949✔
410
                r
949✔
411
            }
412
        })
413
    }
1,048✔
414

415
    fn agg_update(
370✔
416
        &mut self,
370✔
417
        old: &mut Record,
370✔
418
        new: &mut Record,
370✔
419
        key: RecordKey,
370✔
420
    ) -> Result<Vec<Operation>, PipelineError> {
370✔
421
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
370✔
422
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
370✔
423

370✔
424
        let curr_state_opt = self.states.get_mut(&key);
370✔
425
        assert!(
370✔
426
            curr_state_opt.is_some(),
370✔
427
            "Unable to find aggregator state during UPDATE operation"
×
428
        );
429
        let curr_state = curr_state_opt.unwrap();
370✔
430

431
        let new_values = Self::calc_and_fill_measures(
370✔
432
            curr_state,
370✔
433
            Some(old),
370✔
434
            Some(new),
370✔
435
            &mut out_rec_delete,
370✔
436
            &mut out_rec_insert,
370✔
437
            AggregatorOperation::Update,
370✔
438
            &self.measures,
370✔
439
            &self.input_schema,
370✔
440
        )?;
370✔
441

442
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
370✔
443
        {
444
            None => (true, true),
325✔
445
            Some(having) => (
45✔
446
                Self::having_is_satisfied(
45✔
447
                    &self.having_eval_schema,
45✔
448
                    old,
45✔
449
                    having,
45✔
450
                    &mut out_rec_delete,
45✔
451
                )?,
45✔
452
                Self::having_is_satisfied(
45✔
453
                    &self.having_eval_schema,
45✔
454
                    new,
45✔
455
                    having,
45✔
456
                    &mut out_rec_insert,
45✔
457
                )?,
45✔
458
            ),
459
        };
460

461
        let res = match (
370✔
462
            out_rec_delete_having_satisfied,
370✔
463
            out_rec_insert_having_satisfied,
370✔
464
        ) {
370✔
465
            (false, true) => vec![Operation::Insert {
2✔
466
                new: Self::build_projection(
2✔
467
                    new,
2✔
468
                    out_rec_insert,
2✔
469
                    &self.projections,
2✔
470
                    &self.aggregation_schema,
2✔
471
                )?,
2✔
472
            }],
473
            (true, false) => vec![Operation::Delete {
2✔
474
                old: Self::build_projection(
2✔
475
                    old,
2✔
476
                    out_rec_delete,
2✔
477
                    &self.projections,
2✔
478
                    &self.aggregation_schema,
2✔
479
                )?,
2✔
480
            }],
481
            (true, true) => vec![Operation::Update {
327✔
482
                new: Self::build_projection(
327✔
483
                    new,
327✔
484
                    out_rec_insert,
327✔
485
                    &self.projections,
327✔
486
                    &self.aggregation_schema,
327✔
487
                )?,
327✔
488
                old: Self::build_projection(
327✔
489
                    old,
327✔
490
                    out_rec_delete,
327✔
491
                    &self.projections,
327✔
492
                    &self.aggregation_schema,
327✔
493
                )?,
327✔
494
            }],
495
            (false, false) => vec![],
39✔
496
        };
497

498
        curr_state.values = Some(new_values);
370✔
499
        Ok(res)
370✔
500
    }
370✔
501

502
    pub fn build_projection(
25,319✔
503
        original: &mut Record,
25,319✔
504
        measures: Vec<Field>,
25,319✔
505
        projections: &Vec<Expression>,
25,319✔
506
        aggregation_schema: &Schema,
25,319✔
507
    ) -> Result<Record, PipelineError> {
25,319✔
508
        let original_len = original.values.len();
25,319✔
509
        original.values.extend(measures);
25,319✔
510
        let mut output = Vec::<Field>::with_capacity(projections.len());
25,319✔
511
        for exp in projections {
76,642✔
512
            output.push(exp.evaluate(original, aggregation_schema)?);
51,323✔
513
        }
514
        original.values.drain(original_len..);
25,319✔
515
        let mut output_record = Record::new(output);
25,319✔
516

25,319✔
517
        output_record.set_lifetime(original.get_lifetime());
25,319✔
518

25,319✔
519
        Ok(output_record)
25,319✔
520
    }
25,319✔
521

522
    pub fn aggregate(&mut self, mut op: Operation) -> Result<Vec<Operation>, PipelineError> {
14,254✔
523
        match op {
14,254✔
524
            Operation::Insert { ref mut new } => Ok(self.agg_insert(new)?),
12,931✔
525
            Operation::Delete { ref mut old } => Ok(self.agg_delete(old)?),
859✔
526
            Operation::Update {
527
                ref mut old,
464✔
528
                ref mut new,
464✔
529
            } => {
530
                let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() {
464✔
531
                    (
99✔
532
                        self.default_segment_key.clone(),
99✔
533
                        self.default_segment_key.clone(),
99✔
534
                    )
99✔
535
                } else {
536
                    (self.get_key(old)?, self.get_key(new)?)
365✔
537
                };
538

539
                if old_record_hash == new_record_hash {
464✔
540
                    Ok(self.agg_update(old, new, old_record_hash)?)
370✔
541
                } else {
542
                    let mut r = Vec::with_capacity(2);
94✔
543
                    r.extend(self.agg_delete(old)?);
94✔
544
                    r.extend(self.agg_insert(new)?);
94✔
545
                    Ok(r)
94✔
546
                }
547
            }
548
        }
549
    }
14,254✔
550

551
    fn get_key(&self, record: &Record) -> Result<RecordKey, PipelineError> {
4,324✔
552
        let mut key = Vec::<Field>::with_capacity(self.dimensions.len());
4,324✔
553
        for dimension in self.dimensions.iter() {
4,646✔
554
            key.push(dimension.evaluate(record, &self.input_schema)?);
4,646✔
555
        }
556
        if self.accurate_keys {
4,324✔
557
            Ok(RecordKey::Accurate(key))
4,324✔
558
        } else {
559
            Ok(RecordKey::Hash(get_record_hash(key.iter())))
×
560
        }
561
    }
4,324✔
562
}
563

564
impl Processor for AggregationProcessor {
565
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
212✔
566
        Ok(())
212✔
567
    }
212✔
568

569
    fn process(
13,675✔
570
        &mut self,
13,675✔
571
        _from_port: PortHandle,
13,675✔
572
        record_store: &ProcessorRecordStore,
13,675✔
573
        op: ProcessorOperation,
13,675✔
574
        fw: &mut dyn ProcessorChannelForwarder,
13,675✔
575
    ) -> Result<(), BoxedError> {
13,675✔
576
        let op = record_store.load_operation(&op)?;
13,675✔
577
        let ops = self.aggregate(op)?;
13,675✔
578
        for output_op in ops {
27,014✔
579
            let output_op = record_store.create_operation(&output_op)?;
13,339✔
580
            fw.send(output_op, DEFAULT_PORT_HANDLE);
13,339✔
581
        }
582
        Ok(())
13,675✔
583
    }
13,675✔
584
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc