• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5672512448

pending completion
5672512448

push

github

web-flow
chore: Change `make_from!` in `from_arrow` to func to improve readability (#1792)

31 of 31 new or added lines in 4 files covered. (100.0%)

45630 of 59777 relevant lines covered (76.33%)

38810.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.53
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2

3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
5
use dozer_core::channels::ProcessorChannelForwarder;
6
use dozer_core::executor_operation::ProcessorOperation;
7
use dozer_core::node::{PortHandle, Processor};
8
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordRef};
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::errors::internal::BoxedError;
11
use dozer_types::types::{Field, FieldType, Schema};
12
use std::hash::{Hash, Hasher};
13

14
use crate::pipeline::aggregation::aggregator::{
15
    get_aggregator_from_aggregator_type, get_aggregator_type_from_aggregation_expression,
16
    AggregatorEnum, AggregatorType,
17
};
18
use ahash::AHasher;
19
use dozer_core::epoch::Epoch;
20
use hashbrown::HashMap;
21

22
const DEFAULT_SEGMENT_KEY: &str = "DOZER_DEFAULT_SEGMENT_KEY";
23

24
#[derive(Debug)]
×
25
struct AggregationState {
×
26
    count: usize,
27
    states: Vec<AggregatorEnum>,
28
    values: Option<Vec<Field>>,
29
}
30

31
impl AggregationState {
32
    pub fn new(types: &[AggregatorType], ret_types: &[FieldType]) -> Self {
11,837✔
33
        let mut states: Vec<AggregatorEnum> = Vec::new();
11,837✔
34
        for (idx, typ) in types.iter().enumerate() {
11,961✔
35
            let mut aggr = get_aggregator_from_aggregator_type(*typ);
11,961✔
36
            aggr.init(ret_types[idx]);
11,961✔
37
            states.push(aggr);
11,961✔
38
        }
11,961✔
39

×
40
        Self {
11,837✔
41
            count: 0,
11,837✔
42
            states,
11,837✔
43
            values: None,
11,837✔
44
        }
11,837✔
45
    }
11,837✔
46
}
×
47

48
#[derive(Debug)]
×
49
pub struct AggregationProcessor {
×
50
    _id: String,
51
    dimensions: Vec<Expression>,
52
    measures: Vec<Vec<Expression>>,
53
    measures_types: Vec<AggregatorType>,
54
    measures_return_types: Vec<FieldType>,
55
    projections: Vec<Expression>,
56
    having: Option<Expression>,
57
    input_schema: Schema,
58
    aggregation_schema: Schema,
59
    states: HashMap<u64, AggregationState>,
60
    default_segment_key: u64,
61
    having_eval_schema: Schema,
62
}
63

64
enum AggregatorOperation {
65
    Insert,
66
    Delete,
67
    Update,
68
}
69

70
impl AggregationProcessor {
71
    pub fn new(
210✔
72
        id: String,
210✔
73
        dimensions: Vec<Expression>,
210✔
74
        measures: Vec<Expression>,
210✔
75
        projections: Vec<Expression>,
210✔
76
        having: Option<Expression>,
210✔
77
        input_schema: Schema,
210✔
78
        aggregation_schema: Schema,
210✔
79
    ) -> Result<Self, PipelineError> {
210✔
80
        let mut aggr_types = Vec::new();
210✔
81
        let mut aggr_measures = Vec::new();
210✔
82
        let mut aggr_measures_ret_types = Vec::new();
210✔
83

×
84
        for measure in measures {
432✔
85
            let (aggr_measure, aggr_type) =
222✔
86
                get_aggregator_type_from_aggregation_expression(&measure, &input_schema)?;
222✔
87
            aggr_measures.push(aggr_measure);
222✔
88
            aggr_types.push(aggr_type);
222✔
89
            aggr_measures_ret_types.push(measure.get_type(&input_schema)?.return_type)
222✔
90
        }
×
91

92
        let mut hasher = AHasher::default();
210✔
93
        DEFAULT_SEGMENT_KEY.hash(&mut hasher);
210✔
94

210✔
95
        let mut having_eval_schema_fields = input_schema.fields.clone();
210✔
96
        having_eval_schema_fields.extend(aggregation_schema.fields.clone());
210✔
97

210✔
98
        Ok(Self {
210✔
99
            _id: id,
210✔
100
            dimensions,
210✔
101
            projections,
210✔
102
            input_schema,
210✔
103
            aggregation_schema,
210✔
104
            states: HashMap::new(),
210✔
105
            measures: aggr_measures,
210✔
106
            having,
210✔
107
            measures_types: aggr_types,
210✔
108
            measures_return_types: aggr_measures_ret_types,
210✔
109
            default_segment_key: hasher.finish(),
210✔
110
            having_eval_schema: Schema {
210✔
111
                fields: having_eval_schema_fields,
210✔
112
                primary_index: vec![],
210✔
113
            },
210✔
114
        })
210✔
115
    }
210✔
116

×
117
    fn calc_and_fill_measures(
12,746✔
118
        curr_state: &mut AggregationState,
12,746✔
119
        deleted_record: Option<&ProcessorRecordRef>,
12,746✔
120
        inserted_record: Option<&ProcessorRecordRef>,
12,746✔
121
        out_rec_delete: &mut Vec<Field>,
12,746✔
122
        out_rec_insert: &mut Vec<Field>,
12,746✔
123
        op: AggregatorOperation,
12,746✔
124
        measures: &Vec<Vec<Expression>>,
12,746✔
125
        input_schema: &Schema,
12,746✔
126
    ) -> Result<Vec<Field>, PipelineError> {
12,746✔
127
        let mut new_fields: Vec<Field> = Vec::with_capacity(measures.len());
12,746✔
128

×
129
        for (idx, measure) in measures.iter().enumerate() {
12,930✔
130
            let curr_aggr = &mut curr_state.states[idx];
12,930✔
131
            let curr_val_opt: Option<&Field> = curr_state.values.as_ref().map(|e| &e[idx]);
12,930✔
132

×
133
            let new_val = match op {
12,930✔
134
                AggregatorOperation::Insert => {
×
135
                    let mut inserted_fields = Vec::with_capacity(measure.len());
11,961✔
136
                    for m in measure {
23,988✔
137
                        inserted_fields
×
138
                            .push(m.evaluate(inserted_record.unwrap().get_record(), input_schema)?);
12,027✔
139
                    }
×
140
                    if let Some(curr_val) = curr_val_opt {
11,961✔
141
                        out_rec_delete.push(curr_val.clone());
10,635✔
142
                    }
11,335✔
143
                    curr_aggr.insert(&inserted_fields)?
11,961✔
144
                }
×
145
                AggregatorOperation::Delete => {
146
                    let mut deleted_fields = Vec::with_capacity(measure.len());
691✔
147
                    for m in measure {
1,448✔
148
                        deleted_fields
×
149
                            .push(m.evaluate(deleted_record.unwrap().get_record(), input_schema)?);
757✔
150
                    }
×
151
                    if let Some(curr_val) = curr_val_opt {
691✔
152
                        out_rec_delete.push(curr_val.clone());
691✔
153
                    }
691✔
154
                    curr_aggr.delete(&deleted_fields)?
691✔
155
                }
×
156
                AggregatorOperation::Update => {
157
                    let mut deleted_fields = Vec::with_capacity(measure.len());
278✔
158
                    for m in measure {
610✔
159
                        deleted_fields
×
160
                            .push(m.evaluate(deleted_record.unwrap().get_record(), input_schema)?);
332✔
161
                    }
×
162
                    let mut inserted_fields = Vec::with_capacity(measure.len());
278✔
163
                    for m in measure {
610✔
164
                        inserted_fields
×
165
                            .push(m.evaluate(inserted_record.unwrap().get_record(), input_schema)?);
332✔
166
                    }
×
167
                    if let Some(curr_val) = curr_val_opt {
278✔
168
                        out_rec_delete.push(curr_val.clone());
278✔
169
                    }
278✔
170
                    curr_aggr.update(&deleted_fields, &inserted_fields)?
278✔
171
                }
×
172
            };
173
            out_rec_insert.push(new_val.clone());
12,930✔
174
            new_fields.push(new_val);
12,930✔
175
        }
×
176
        Ok(new_fields)
12,746✔
177
    }
12,746✔
178

×
179
    fn agg_delete(
647✔
180
        &mut self,
647✔
181
        old: &mut ProcessorRecordRef,
647✔
182
    ) -> Result<Vec<ProcessorOperation>, PipelineError> {
647✔
183
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
647✔
184
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
647✔
185

×
186
        let key = if !self.dimensions.is_empty() {
647✔
187
            get_key(&self.input_schema, old, &self.dimensions)?
536✔
188
        } else {
×
189
            self.default_segment_key
111✔
190
        };
×
191

192
        let curr_state_opt = self.states.get_mut(&key);
647✔
193
        assert!(
647✔
194
            curr_state_opt.is_some(),
647✔
195
            "Unable to find aggregator state during DELETE operation"
×
196
        );
×
197
        let curr_state = curr_state_opt.unwrap();
647✔
198

×
199
        let new_values = Self::calc_and_fill_measures(
647✔
200
            curr_state,
647✔
201
            Some(old),
647✔
202
            None,
647✔
203
            &mut out_rec_delete,
647✔
204
            &mut out_rec_insert,
647✔
205
            AggregatorOperation::Delete,
647✔
206
            &self.measures,
647✔
207
            &self.input_schema,
647✔
208
        )?;
647✔
209

×
210
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
647✔
211
        {
×
212
            None => (true, true),
587✔
213
            Some(having) => (
60✔
214
                Self::having_is_satisfied(
60✔
215
                    &self.having_eval_schema,
60✔
216
                    old,
60✔
217
                    having,
60✔
218
                    &mut out_rec_delete,
60✔
219
                )?,
60✔
220
                Self::having_is_satisfied(
60✔
221
                    &self.having_eval_schema,
60✔
222
                    old,
60✔
223
                    having,
60✔
224
                    &mut out_rec_insert,
60✔
225
                )?,
60✔
226
            ),
×
227
        };
228

229
        let res = if curr_state.count == 1 {
647✔
230
            self.states.remove(&key);
352✔
231
            if out_rec_delete_having_satisfied {
352✔
232
                vec![ProcessorOperation::Delete {
350✔
233
                    old: Self::build_projection(
350✔
234
                        old,
350✔
235
                        out_rec_delete,
350✔
236
                        &self.projections,
350✔
237
                        &self.aggregation_schema,
350✔
238
                    )?,
350✔
239
                }]
×
240
            } else {
241
                vec![]
2✔
242
            }
×
243
        } else {
244
            curr_state.count -= 1;
295✔
245
            curr_state.values = Some(new_values);
295✔
246

295✔
247
            Self::generate_op_for_existing_segment(
295✔
248
                out_rec_delete_having_satisfied,
295✔
249
                out_rec_insert_having_satisfied,
295✔
250
                out_rec_delete,
295✔
251
                out_rec_insert,
295✔
252
                old,
295✔
253
                &self.projections,
295✔
254
                &self.aggregation_schema,
295✔
255
            )?
295✔
256
        };
×
257

258
        Ok(res)
647✔
259
    }
647✔
260

×
261
    fn agg_insert(
11,837✔
262
        &mut self,
11,837✔
263
        new: &mut ProcessorRecordRef,
11,837✔
264
    ) -> Result<Vec<ProcessorOperation>, PipelineError> {
11,837✔
265
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
11,837✔
266
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
11,837✔
267

×
268
        let key = if !self.dimensions.is_empty() {
11,837✔
269
            get_key(&self.input_schema, new, &self.dimensions)?
1,666✔
270
        } else {
×
271
            self.default_segment_key
10,171✔
272
        };
×
273

274
        let curr_state = self.states.entry(key).or_insert(AggregationState::new(
11,837✔
275
            &self.measures_types,
11,837✔
276
            &self.measures_return_types,
11,837✔
277
        ));
11,837✔
278

×
279
        let new_values = Self::calc_and_fill_measures(
11,837✔
280
            curr_state,
11,837✔
281
            None,
11,837✔
282
            Some(new),
11,837✔
283
            &mut out_rec_delete,
11,837✔
284
            &mut out_rec_insert,
11,837✔
285
            AggregatorOperation::Insert,
11,837✔
286
            &self.measures,
11,837✔
287
            &self.input_schema,
11,837✔
288
        )?;
11,837✔
289

×
290
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
11,837✔
291
        {
×
292
            None => (true, true),
11,613✔
293
            Some(having) => (
224✔
294
                Self::having_is_satisfied(
224✔
295
                    &self.having_eval_schema,
224✔
296
                    new,
224✔
297
                    having,
224✔
298
                    &mut out_rec_delete,
224✔
299
                )?,
224✔
300
                Self::having_is_satisfied(
224✔
301
                    &self.having_eval_schema,
224✔
302
                    new,
224✔
303
                    having,
224✔
304
                    &mut out_rec_insert,
224✔
305
                )?,
224✔
306
            ),
×
307
        };
308

309
        let res = if curr_state.count == 0 {
11,837✔
310
            if out_rec_insert_having_satisfied {
1,238✔
311
                vec![ProcessorOperation::Insert {
1,195✔
312
                    new: Self::build_projection(
1,195✔
313
                        new,
1,195✔
314
                        out_rec_insert,
1,195✔
315
                        &self.projections,
1,195✔
316
                        &self.aggregation_schema,
1,195✔
317
                    )?,
1,195✔
318
                }]
×
319
            } else {
320
                vec![]
43✔
321
            }
×
322
        } else {
323
            Self::generate_op_for_existing_segment(
10,599✔
324
                out_rec_delete_having_satisfied,
10,599✔
325
                out_rec_insert_having_satisfied,
10,599✔
326
                out_rec_delete,
10,599✔
327
                out_rec_insert,
10,599✔
328
                new,
10,599✔
329
                &self.projections,
10,599✔
330
                &self.aggregation_schema,
10,599✔
331
            )?
10,599✔
332
        };
×
333

334
        curr_state.count += 1;
11,837✔
335
        curr_state.values = Some(new_values);
11,837✔
336

11,837✔
337
        Ok(res)
11,837✔
338
    }
11,837✔
339

×
340
    fn generate_op_for_existing_segment(
10,894✔
341
        out_rec_delete_having_satisfied: bool,
10,894✔
342
        out_rec_insert_having_satisfied: bool,
10,894✔
343
        out_rec_delete: Vec<Field>,
10,894✔
344
        out_rec_insert: Vec<Field>,
10,894✔
345
        rec: &mut ProcessorRecordRef,
10,894✔
346
        projections: &Vec<Expression>,
10,894✔
347
        aggregation_schema: &Schema,
10,894✔
348
    ) -> Result<Vec<ProcessorOperation>, PipelineError> {
10,894✔
349
        Ok(
10,894✔
350
            match (
10,894✔
351
                out_rec_delete_having_satisfied,
10,894✔
352
                out_rec_insert_having_satisfied,
10,894✔
353
            ) {
10,894✔
354
                (false, true) => vec![ProcessorOperation::Insert {
5✔
355
                    new: Self::build_projection(
5✔
356
                        rec,
5✔
357
                        out_rec_insert,
5✔
358
                        projections,
5✔
359
                        aggregation_schema,
5✔
360
                    )?,
5✔
361
                }],
×
362
                (true, false) => vec![ProcessorOperation::Delete {
3✔
363
                    old: Self::build_projection(
3✔
364
                        rec,
3✔
365
                        out_rec_delete,
3✔
366
                        projections,
3✔
367
                        aggregation_schema,
3✔
368
                    )?,
3✔
369
                }],
×
370
                (true, true) => vec![ProcessorOperation::Update {
10,711✔
371
                    new: Self::build_projection(
10,711✔
372
                        rec,
10,711✔
373
                        out_rec_insert,
10,711✔
374
                        projections,
10,711✔
375
                        aggregation_schema,
10,711✔
376
                    )?,
10,711✔
377
                    old: Self::build_projection(
10,711✔
378
                        rec,
10,711✔
379
                        out_rec_delete,
10,711✔
380
                        projections,
10,711✔
381
                        aggregation_schema,
10,711✔
382
                    )?,
10,711✔
383
                }],
×
384
                (false, false) => vec![],
175✔
385
            },
×
386
        )
387
    }
10,894✔
388

×
389
    fn having_is_satisfied(
628✔
390
        having_eval_schema: &Schema,
628✔
391
        original_record: &mut ProcessorRecordRef,
628✔
392
        having: &Expression,
628✔
393
        out_rec: &mut Vec<Field>,
628✔
394
    ) -> Result<bool, PipelineError> {
628✔
395
        let mut original_record = ProcessorRecord::from_referenced_record(original_record.clone());
628✔
396
        Ok(match out_rec.len() {
628✔
397
            0 => false,
60✔
398
            _ => {
×
399
                for f in out_rec.iter() {
568✔
400
                    original_record.extend_direct_field(f.clone());
568✔
401
                }
568✔
402

×
403
                having
568✔
404
                    .evaluate(&original_record, having_eval_schema)?
568✔
405
                    .as_boolean()
568✔
406
                    .unwrap_or(false)
568✔
407
            }
×
408
        })
409
    }
628✔
410

×
411
    fn agg_update(
262✔
412
        &mut self,
262✔
413
        old: &mut ProcessorRecordRef,
262✔
414
        new: &mut ProcessorRecordRef,
262✔
415
        key: u64,
262✔
416
    ) -> Result<Vec<ProcessorOperation>, PipelineError> {
262✔
417
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
262✔
418
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
262✔
419

262✔
420
        let curr_state_opt = self.states.get_mut(&key);
262✔
421
        assert!(
262✔
422
            curr_state_opt.is_some(),
262✔
423
            "Unable to find aggregator state during UPDATE operation"
×
424
        );
×
425
        let curr_state = curr_state_opt.unwrap();
262✔
426

×
427
        let new_values = Self::calc_and_fill_measures(
262✔
428
            curr_state,
262✔
429
            Some(old),
262✔
430
            Some(new),
262✔
431
            &mut out_rec_delete,
262✔
432
            &mut out_rec_insert,
262✔
433
            AggregatorOperation::Update,
262✔
434
            &self.measures,
262✔
435
            &self.input_schema,
262✔
436
        )?;
262✔
437

×
438
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
262✔
439
        {
×
440
            None => (true, true),
232✔
441
            Some(having) => (
30✔
442
                Self::having_is_satisfied(
30✔
443
                    &self.having_eval_schema,
30✔
444
                    old,
30✔
445
                    having,
30✔
446
                    &mut out_rec_delete,
30✔
447
                )?,
30✔
448
                Self::having_is_satisfied(
30✔
449
                    &self.having_eval_schema,
30✔
450
                    new,
30✔
451
                    having,
30✔
452
                    &mut out_rec_insert,
30✔
453
                )?,
30✔
454
            ),
×
455
        };
456

457
        let res = match (
262✔
458
            out_rec_delete_having_satisfied,
262✔
459
            out_rec_insert_having_satisfied,
262✔
460
        ) {
262✔
461
            (false, true) => vec![ProcessorOperation::Insert {
2✔
462
                new: Self::build_projection(
2✔
463
                    new,
2✔
464
                    out_rec_insert,
2✔
465
                    &self.projections,
2✔
466
                    &self.aggregation_schema,
2✔
467
                )?,
2✔
468
            }],
×
469
            (true, false) => vec![ProcessorOperation::Delete {
2✔
470
                old: Self::build_projection(
2✔
471
                    old,
2✔
472
                    out_rec_delete,
2✔
473
                    &self.projections,
2✔
474
                    &self.aggregation_schema,
2✔
475
                )?,
2✔
476
            }],
×
477
            (true, true) => vec![ProcessorOperation::Update {
234✔
478
                new: Self::build_projection(
234✔
479
                    new,
234✔
480
                    out_rec_insert,
234✔
481
                    &self.projections,
234✔
482
                    &self.aggregation_schema,
234✔
483
                )?,
234✔
484
                old: Self::build_projection(
234✔
485
                    old,
234✔
486
                    out_rec_delete,
234✔
487
                    &self.projections,
234✔
488
                    &self.aggregation_schema,
234✔
489
                )?,
234✔
490
            }],
×
491
            (false, false) => vec![],
24✔
492
        };
×
493

494
        curr_state.values = Some(new_values);
262✔
495
        Ok(res)
262✔
496
    }
262✔
497

×
498
    pub fn build_projection(
23,447✔
499
        original: &mut ProcessorRecordRef,
23,447✔
500
        measures: Vec<Field>,
23,447✔
501
        projections: &Vec<Expression>,
23,447✔
502
        aggregation_schema: &Schema,
23,447✔
503
    ) -> Result<ProcessorRecordRef, PipelineError> {
23,447✔
504
        let mut original = ProcessorRecord::from_referenced_record(original.clone());
23,447✔
505
        for f in measures {
47,150✔
506
            original.extend_direct_field(f);
23,703✔
507
        }
23,703✔
508

×
509
        let mut output_record = ProcessorRecord::new();
23,447✔
510
        for exp in projections {
70,594✔
511
            output_record.extend_direct_field(exp.evaluate(&original, aggregation_schema)?);
47,147✔
512
        }
×
513

514
        output_record.set_lifetime(original.get_lifetime());
23,447✔
515

23,447✔
516
        Ok(ProcessorRecordRef::new(output_record))
23,447✔
517
    }
23,447✔
518

×
519
    pub fn aggregate(
12,679✔
520
        &mut self,
12,679✔
521
        mut op: ProcessorOperation,
12,679✔
522
    ) -> Result<Vec<ProcessorOperation>, PipelineError> {
12,679✔
523
        match op {
12,679✔
524
            ProcessorOperation::Insert { ref mut new } => Ok(self.agg_insert(new)?),
11,770✔
525
            ProcessorOperation::Delete { ref mut old } => Ok(self.agg_delete(old)?),
580✔
526
            ProcessorOperation::Update {
×
527
                ref mut old,
329✔
528
                ref mut new,
329✔
529
            } => {
×
530
                let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() {
329✔
531
                    (self.default_segment_key, self.default_segment_key)
84✔
532
                } else {
×
533
                    (
534
                        get_key(&self.input_schema, old, &self.dimensions)?,
245✔
535
                        get_key(&self.input_schema, new, &self.dimensions)?,
245✔
536
                    )
×
537
                };
538

539
                if old_record_hash == new_record_hash {
329✔
540
                    Ok(self.agg_update(old, new, old_record_hash)?)
262✔
541
                } else {
×
542
                    let mut r = Vec::with_capacity(2);
67✔
543
                    r.extend(self.agg_delete(old)?);
67✔
544
                    r.extend(self.agg_insert(new)?);
67✔
545
                    Ok(r)
67✔
546
                }
×
547
            }
548
        }
549
    }
12,679✔
550
}
×
551

552
fn get_key(
2,692✔
553
    schema: &Schema,
2,692✔
554
    record: &ProcessorRecordRef,
2,692✔
555
    dimensions: &[Expression],
2,692✔
556
) -> Result<u64, PipelineError> {
2,692✔
557
    let mut key = Vec::<Field>::with_capacity(dimensions.len());
2,692✔
558
    for dimension in dimensions.iter() {
2,876✔
559
        key.push(dimension.evaluate(record.get_record(), schema)?);
2,876✔
560
    }
×
561
    let mut hasher = AHasher::default();
2,692✔
562
    key.hash(&mut hasher);
2,692✔
563
    let v = hasher.finish();
2,692✔
564
    Ok(v)
2,692✔
565
}
2,692✔
566

×
567
impl Processor for AggregationProcessor {
568
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
121✔
569
        Ok(())
121✔
570
    }
121✔
571

×
572
    fn process(
12,100✔
573
        &mut self,
12,100✔
574
        _from_port: PortHandle,
12,100✔
575
        op: ProcessorOperation,
12,100✔
576
        fw: &mut dyn ProcessorChannelForwarder,
12,100✔
577
    ) -> Result<(), BoxedError> {
12,100✔
578
        let ops = self.aggregate(op)?;
12,100✔
579
        for fop in ops {
24,008✔
580
            fw.send(fop, DEFAULT_PORT_HANDLE);
11,908✔
581
        }
11,908✔
582
        Ok(())
12,100✔
583
    }
12,100✔
584
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc