• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.8
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2

3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::utils::record_hashtable_key::{get_record_hash, RecordKey};
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::channels::ProcessorChannelForwarder;
7
use dozer_core::dozer_log::storage::Object;
8
use dozer_core::executor_operation::ProcessorOperation;
9
use dozer_core::node::{PortHandle, Processor};
10
use dozer_core::processor_record::ProcessorRecordStore;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::bincode;
13
use dozer_types::errors::internal::BoxedError;
14
use dozer_types::serde::{Deserialize, Serialize};
15
use dozer_types::types::{Field, FieldType, Operation, Record, Schema};
16
use std::collections::HashMap;
17

18
use crate::pipeline::aggregation::aggregator::{
19
    get_aggregator_from_aggregator_type, get_aggregator_type_from_aggregation_expression,
20
    AggregatorEnum, AggregatorType,
21
};
22
use dozer_core::epoch::Epoch;
23

24
const DEFAULT_SEGMENT_KEY: &str = "DOZER_DEFAULT_SEGMENT_KEY";
25

26
#[derive(Debug, Serialize, Deserialize)]
×
27
#[serde(crate = "dozer_types::serde")]
28
struct AggregationState {
29
    count: usize,
30
    states: Vec<AggregatorEnum>,
31
    values: Option<Vec<Field>>,
32
}
33

34
impl AggregationState {
35
    pub fn new(types: &[AggregatorType], ret_types: &[FieldType]) -> Self {
13,025✔
36
        let mut states: Vec<AggregatorEnum> = Vec::new();
13,025✔
37
        for (idx, typ) in types.iter().enumerate() {
13,242✔
38
            let mut aggr = get_aggregator_from_aggregator_type(*typ);
13,242✔
39
            aggr.init(ret_types[idx]);
13,242✔
40
            states.push(aggr);
13,242✔
41
        }
13,242✔
42

43
        Self {
13,025✔
44
            count: 0,
13,025✔
45
            states,
13,025✔
46
            values: None,
13,025✔
47
        }
13,025✔
48
    }
13,025✔
49
}
50

51
#[derive(Debug)]
×
52
pub struct AggregationProcessor {
53
    _id: String,
54
    dimensions: Vec<Expression>,
55
    measures: Vec<Vec<Expression>>,
56
    measures_types: Vec<AggregatorType>,
57
    measures_return_types: Vec<FieldType>,
58
    projections: Vec<Expression>,
59
    having: Option<Expression>,
60
    input_schema: Schema,
61
    aggregation_schema: Schema,
62
    states: HashMap<RecordKey, AggregationState>,
63
    default_segment_key: RecordKey,
64
    having_eval_schema: Schema,
65
    accurate_keys: bool,
66
}
67

68
enum AggregatorOperation {
69
    Insert,
70
    Delete,
71
    Update,
72
}
73

74
impl AggregationProcessor {
75
    pub fn new(
300✔
76
        id: String,
300✔
77
        dimensions: Vec<Expression>,
300✔
78
        measures: Vec<Expression>,
300✔
79
        projections: Vec<Expression>,
300✔
80
        having: Option<Expression>,
300✔
81
        input_schema: Schema,
300✔
82
        aggregation_schema: Schema,
300✔
83
        enable_probabilistic_optimizations: bool,
300✔
84
        checkpoint_data: Option<Vec<u8>>,
300✔
85
    ) -> Result<Self, BoxedError> {
300✔
86
        let mut aggr_types = Vec::new();
300✔
87
        let mut aggr_measures = Vec::new();
300✔
88
        let mut aggr_measures_ret_types = Vec::new();
300✔
89

90
        for measure in measures {
621✔
91
            let (aggr_measure, aggr_type) =
321✔
92
                get_aggregator_type_from_aggregation_expression(&measure, &input_schema)?;
321✔
93
            aggr_measures.push(aggr_measure);
321✔
94
            aggr_types.push(aggr_type);
321✔
95
            aggr_measures_ret_types.push(measure.get_type(&input_schema)?.return_type)
321✔
96
        }
97

98
        let mut having_eval_schema_fields = input_schema.fields.clone();
300✔
99
        having_eval_schema_fields.extend(aggregation_schema.fields.clone());
300✔
100

300✔
101
        let accurate_keys = !enable_probabilistic_optimizations;
300✔
102

103
        let states = if let Some(data) = checkpoint_data {
300✔
104
            bincode::deserialize(&data)?
×
105
        } else {
106
            HashMap::new()
300✔
107
        };
108

109
        Ok(Self {
110
            _id: id,
300✔
111
            dimensions,
300✔
112
            projections,
300✔
113
            input_schema,
300✔
114
            aggregation_schema,
300✔
115
            states,
300✔
116
            measures: aggr_measures,
300✔
117
            having,
300✔
118
            measures_types: aggr_types,
300✔
119
            measures_return_types: aggr_measures_ret_types,
300✔
120
            default_segment_key: {
300✔
121
                let fields = vec![Field::String(DEFAULT_SEGMENT_KEY.into())];
300✔
122
                if accurate_keys {
300✔
123
                    RecordKey::Accurate(fields)
300✔
124
                } else {
125
                    RecordKey::Hash(get_record_hash(fields.iter()))
×
126
                }
127
            },
128
            having_eval_schema: Schema {
300✔
129
                fields: having_eval_schema_fields,
300✔
130
                primary_index: vec![],
300✔
131
            },
300✔
132
            accurate_keys,
300✔
133
        })
134
    }
300✔
135

136
    fn calc_and_fill_measures(
14,348✔
137
        curr_state: &mut AggregationState,
14,348✔
138
        deleted_record: Option<&Record>,
14,348✔
139
        inserted_record: Option<&Record>,
14,348✔
140
        out_rec_delete: &mut Vec<Field>,
14,348✔
141
        out_rec_insert: &mut Vec<Field>,
14,348✔
142
        op: AggregatorOperation,
14,348✔
143
        measures: &Vec<Vec<Expression>>,
14,348✔
144
        input_schema: &Schema,
14,348✔
145
    ) -> Result<Vec<Field>, PipelineError> {
14,348✔
146
        let mut new_fields: Vec<Field> = Vec::with_capacity(measures.len());
14,348✔
147

148
        for (idx, measure) in measures.iter().enumerate() {
14,670✔
149
            let curr_aggr = &mut curr_state.states[idx];
14,670✔
150
            let curr_val_opt: Option<&Field> = curr_state.values.as_ref().map(|e| &e[idx]);
14,670✔
151

152
            let new_val = match op {
14,670✔
153
                AggregatorOperation::Insert => {
154
                    let mut inserted_fields = Vec::with_capacity(measure.len());
13,242✔
155
                    for m in measure {
26,550✔
156
                        inserted_fields.push(m.evaluate(inserted_record.unwrap(), input_schema)?);
13,308✔
157
                    }
158
                    if let Some(curr_val) = curr_val_opt {
13,242✔
159
                        out_rec_delete.push(curr_val.clone());
11,013✔
160
                    }
12,238✔
161
                    curr_aggr.insert(&inserted_fields)?
13,242✔
162
                }
163
                AggregatorOperation::Delete => {
164
                    let mut deleted_fields = Vec::with_capacity(measure.len());
1,030✔
165
                    for m in measure {
2,126✔
166
                        deleted_fields.push(m.evaluate(deleted_record.unwrap(), input_schema)?);
1,096✔
167
                    }
168
                    if let Some(curr_val) = curr_val_opt {
1,030✔
169
                        out_rec_delete.push(curr_val.clone());
1,030✔
170
                    }
1,030✔
171
                    curr_aggr.delete(&deleted_fields)?
1,030✔
172
                }
173
                AggregatorOperation::Update => {
174
                    let mut deleted_fields = Vec::with_capacity(measure.len());
398✔
175
                    for m in measure {
850✔
176
                        deleted_fields.push(m.evaluate(deleted_record.unwrap(), input_schema)?);
452✔
177
                    }
178
                    let mut inserted_fields = Vec::with_capacity(measure.len());
398✔
179
                    for m in measure {
850✔
180
                        inserted_fields.push(m.evaluate(inserted_record.unwrap(), input_schema)?);
452✔
181
                    }
182
                    if let Some(curr_val) = curr_val_opt {
398✔
183
                        out_rec_delete.push(curr_val.clone());
398✔
184
                    }
398✔
185
                    curr_aggr.update(&deleted_fields, &inserted_fields)?
398✔
186
                }
187
            };
188
            out_rec_insert.push(new_val.clone());
14,670✔
189
            new_fields.push(new_val);
14,670✔
190
        }
191
        Ok(new_fields)
14,348✔
192
    }
14,348✔
193

194
    fn agg_delete(&mut self, old: &mut Record) -> Result<Vec<Operation>, PipelineError> {
953✔
195
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
953✔
196
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
953✔
197

198
        let key = if !self.dimensions.is_empty() {
953✔
199
            Some(self.get_key(old)?)
812✔
200
        } else {
201
            None
141✔
202
        };
203
        let key = key.as_ref().unwrap_or(&self.default_segment_key);
953✔
204

953✔
205
        let curr_state_opt = self.states.get_mut(key);
953✔
206
        assert!(
953✔
207
            curr_state_opt.is_some(),
953✔
208
            "Unable to find aggregator state during DELETE operation"
×
209
        );
210
        let curr_state = curr_state_opt.unwrap();
953✔
211

212
        let new_values = Self::calc_and_fill_measures(
953✔
213
            curr_state,
953✔
214
            Some(old),
953✔
215
            None,
953✔
216
            &mut out_rec_delete,
953✔
217
            &mut out_rec_insert,
953✔
218
            AggregatorOperation::Delete,
953✔
219
            &self.measures,
953✔
220
            &self.input_schema,
953✔
221
        )?;
953✔
222

223
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
953✔
224
        {
225
            None => (true, true),
854✔
226
            Some(having) => (
99✔
227
                Self::having_is_satisfied(
99✔
228
                    &self.having_eval_schema,
99✔
229
                    old,
99✔
230
                    having,
99✔
231
                    &mut out_rec_delete,
99✔
232
                )?,
99✔
233
                Self::having_is_satisfied(
99✔
234
                    &self.having_eval_schema,
99✔
235
                    old,
99✔
236
                    having,
99✔
237
                    &mut out_rec_insert,
99✔
238
                )?,
99✔
239
            ),
240
        };
241

242
        let res = if curr_state.count == 1 {
953✔
243
            self.states.remove(key);
532✔
244
            if out_rec_delete_having_satisfied {
532✔
245
                vec![Operation::Delete {
530✔
246
                    old: Self::build_projection(
530✔
247
                        old,
530✔
248
                        out_rec_delete,
530✔
249
                        &self.projections,
530✔
250
                        &self.aggregation_schema,
530✔
251
                    )?,
530✔
252
                }]
253
            } else {
254
                vec![]
2✔
255
            }
256
        } else {
257
            curr_state.count -= 1;
421✔
258
            curr_state.values = Some(new_values);
421✔
259

421✔
260
            Self::generate_op_for_existing_segment(
421✔
261
                out_rec_delete_having_satisfied,
421✔
262
                out_rec_insert_having_satisfied,
421✔
263
                out_rec_delete,
421✔
264
                out_rec_insert,
421✔
265
                old,
421✔
266
                &self.projections,
421✔
267
                &self.aggregation_schema,
421✔
268
            )?
421✔
269
        };
270

271
        Ok(res)
953✔
272
    }
953✔
273

274
    fn agg_insert(&mut self, new: &mut Record) -> Result<Vec<Operation>, PipelineError> {
13,025✔
275
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
13,025✔
276
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
13,025✔
277

278
        let key = if !self.dimensions.is_empty() {
13,025✔
279
            self.get_key(new)?
2,782✔
280
        } else {
281
            self.default_segment_key.clone()
10,243✔
282
        };
283

284
        let curr_state = self.states.entry(key).or_insert(AggregationState::new(
13,025✔
285
            &self.measures_types,
13,025✔
286
            &self.measures_return_types,
13,025✔
287
        ));
13,025✔
288

289
        let new_values = Self::calc_and_fill_measures(
13,025✔
290
            curr_state,
13,025✔
291
            None,
13,025✔
292
            Some(new),
13,025✔
293
            &mut out_rec_delete,
13,025✔
294
            &mut out_rec_insert,
13,025✔
295
            AggregatorOperation::Insert,
13,025✔
296
            &self.measures,
13,025✔
297
            &self.input_schema,
13,025✔
298
        )?;
13,025✔
299

300
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
13,025✔
301
        {
302
            None => (true, true),
12,645✔
303
            Some(having) => (
380✔
304
                Self::having_is_satisfied(
380✔
305
                    &self.having_eval_schema,
380✔
306
                    new,
380✔
307
                    having,
380✔
308
                    &mut out_rec_delete,
380✔
309
                )?,
380✔
310
                Self::having_is_satisfied(
380✔
311
                    &self.having_eval_schema,
380✔
312
                    new,
380✔
313
                    having,
380✔
314
                    &mut out_rec_insert,
380✔
315
                )?,
380✔
316
            ),
317
        };
318

319
        let res = if curr_state.count == 0 {
13,025✔
320
            if out_rec_insert_having_satisfied {
2,075✔
321
                vec![Operation::Insert {
2,005✔
322
                    new: Self::build_projection(
2,005✔
323
                        new,
2,005✔
324
                        out_rec_insert,
2,005✔
325
                        &self.projections,
2,005✔
326
                        &self.aggregation_schema,
2,005✔
327
                    )?,
2,005✔
328
                }]
329
            } else {
330
                vec![]
70✔
331
            }
332
        } else {
333
            Self::generate_op_for_existing_segment(
10,950✔
334
                out_rec_delete_having_satisfied,
10,950✔
335
                out_rec_insert_having_satisfied,
10,950✔
336
                out_rec_delete,
10,950✔
337
                out_rec_insert,
10,950✔
338
                new,
10,950✔
339
                &self.projections,
10,950✔
340
                &self.aggregation_schema,
10,950✔
341
            )?
10,950✔
342
        };
343

344
        curr_state.count += 1;
13,025✔
345
        curr_state.values = Some(new_values);
13,025✔
346

13,025✔
347
        Ok(res)
13,025✔
348
    }
13,025✔
349

350
    fn generate_op_for_existing_segment(
11,371✔
351
        out_rec_delete_having_satisfied: bool,
11,371✔
352
        out_rec_insert_having_satisfied: bool,
11,371✔
353
        out_rec_delete: Vec<Field>,
11,371✔
354
        out_rec_insert: Vec<Field>,
11,371✔
355
        rec: &mut Record,
11,371✔
356
        projections: &Vec<Expression>,
11,371✔
357
        aggregation_schema: &Schema,
11,371✔
358
    ) -> Result<Vec<Operation>, PipelineError> {
11,371✔
359
        Ok(
11,371✔
360
            match (
11,371✔
361
                out_rec_delete_having_satisfied,
11,371✔
362
                out_rec_insert_having_satisfied,
11,371✔
363
            ) {
11,371✔
364
                (false, true) => vec![Operation::Insert {
5✔
365
                    new: Self::build_projection(
5✔
366
                        rec,
5✔
367
                        out_rec_insert,
5✔
368
                        projections,
5✔
369
                        aggregation_schema,
5✔
370
                    )?,
5✔
371
                }],
372
                (true, false) => vec![Operation::Delete {
3✔
373
                    old: Self::build_projection(
3✔
374
                        rec,
3✔
375
                        out_rec_delete,
3✔
376
                        projections,
3✔
377
                        aggregation_schema,
3✔
378
                    )?,
3✔
379
                }],
380
                (true, true) => vec![Operation::Update {
11,059✔
381
                    new: Self::build_projection(
11,059✔
382
                        rec,
11,059✔
383
                        out_rec_insert,
11,059✔
384
                        projections,
11,059✔
385
                        aggregation_schema,
11,059✔
386
                    )?,
11,059✔
387
                    old: Self::build_projection(
11,059✔
388
                        rec,
11,059✔
389
                        out_rec_delete,
11,059✔
390
                        projections,
11,059✔
391
                        aggregation_schema,
11,059✔
392
                    )?,
11,059✔
393
                }],
394
                (false, false) => vec![],
304✔
395
            },
396
        )
397
    }
11,371✔
398

399
    fn having_is_satisfied(
1,048✔
400
        having_eval_schema: &Schema,
1,048✔
401
        original_record: &mut Record,
1,048✔
402
        having: &Expression,
1,048✔
403
        out_rec: &mut Vec<Field>,
1,048✔
404
    ) -> Result<bool, PipelineError> {
1,048✔
405
        //
1,048✔
406
        let original_record_len = original_record.values.len();
1,048✔
407
        Ok(match out_rec.len() {
1,048✔
408
            0 => false,
99✔
409
            _ => {
410
                original_record.values.extend(std::mem::take(out_rec));
949✔
411
                let r = having
949✔
412
                    .evaluate(original_record, having_eval_schema)?
949✔
413
                    .as_boolean()
949✔
414
                    .unwrap_or(false);
949✔
415
                out_rec.extend(
949✔
416
                    original_record
949✔
417
                        .values
949✔
418
                        .drain(original_record_len..)
949✔
419
                        .collect::<Vec<Field>>(),
949✔
420
                );
949✔
421
                r
949✔
422
            }
423
        })
424
    }
1,048✔
425

426
    fn agg_update(
370✔
427
        &mut self,
370✔
428
        old: &mut Record,
370✔
429
        new: &mut Record,
370✔
430
        key: RecordKey,
370✔
431
    ) -> Result<Vec<Operation>, PipelineError> {
370✔
432
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
370✔
433
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
370✔
434

370✔
435
        let curr_state_opt = self.states.get_mut(&key);
370✔
436
        assert!(
370✔
437
            curr_state_opt.is_some(),
370✔
438
            "Unable to find aggregator state during UPDATE operation"
×
439
        );
440
        let curr_state = curr_state_opt.unwrap();
370✔
441

442
        let new_values = Self::calc_and_fill_measures(
370✔
443
            curr_state,
370✔
444
            Some(old),
370✔
445
            Some(new),
370✔
446
            &mut out_rec_delete,
370✔
447
            &mut out_rec_insert,
370✔
448
            AggregatorOperation::Update,
370✔
449
            &self.measures,
370✔
450
            &self.input_schema,
370✔
451
        )?;
370✔
452

453
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
370✔
454
        {
455
            None => (true, true),
325✔
456
            Some(having) => (
45✔
457
                Self::having_is_satisfied(
45✔
458
                    &self.having_eval_schema,
45✔
459
                    old,
45✔
460
                    having,
45✔
461
                    &mut out_rec_delete,
45✔
462
                )?,
45✔
463
                Self::having_is_satisfied(
45✔
464
                    &self.having_eval_schema,
45✔
465
                    new,
45✔
466
                    having,
45✔
467
                    &mut out_rec_insert,
45✔
468
                )?,
45✔
469
            ),
470
        };
471

472
        let res = match (
370✔
473
            out_rec_delete_having_satisfied,
370✔
474
            out_rec_insert_having_satisfied,
370✔
475
        ) {
370✔
476
            (false, true) => vec![Operation::Insert {
2✔
477
                new: Self::build_projection(
2✔
478
                    new,
2✔
479
                    out_rec_insert,
2✔
480
                    &self.projections,
2✔
481
                    &self.aggregation_schema,
2✔
482
                )?,
2✔
483
            }],
484
            (true, false) => vec![Operation::Delete {
2✔
485
                old: Self::build_projection(
2✔
486
                    old,
2✔
487
                    out_rec_delete,
2✔
488
                    &self.projections,
2✔
489
                    &self.aggregation_schema,
2✔
490
                )?,
2✔
491
            }],
492
            (true, true) => vec![Operation::Update {
327✔
493
                new: Self::build_projection(
327✔
494
                    new,
327✔
495
                    out_rec_insert,
327✔
496
                    &self.projections,
327✔
497
                    &self.aggregation_schema,
327✔
498
                )?,
327✔
499
                old: Self::build_projection(
327✔
500
                    old,
327✔
501
                    out_rec_delete,
327✔
502
                    &self.projections,
327✔
503
                    &self.aggregation_schema,
327✔
504
                )?,
327✔
505
            }],
506
            (false, false) => vec![],
39✔
507
        };
508

509
        curr_state.values = Some(new_values);
370✔
510
        Ok(res)
370✔
511
    }
370✔
512

513
    pub fn build_projection(
25,319✔
514
        original: &mut Record,
25,319✔
515
        measures: Vec<Field>,
25,319✔
516
        projections: &Vec<Expression>,
25,319✔
517
        aggregation_schema: &Schema,
25,319✔
518
    ) -> Result<Record, PipelineError> {
25,319✔
519
        let original_len = original.values.len();
25,319✔
520
        original.values.extend(measures);
25,319✔
521
        let mut output = Vec::<Field>::with_capacity(projections.len());
25,319✔
522
        for exp in projections {
76,642✔
523
            output.push(exp.evaluate(original, aggregation_schema)?);
51,323✔
524
        }
525
        original.values.drain(original_len..);
25,319✔
526
        let mut output_record = Record::new(output);
25,319✔
527

25,319✔
528
        output_record.set_lifetime(original.get_lifetime());
25,319✔
529

25,319✔
530
        Ok(output_record)
25,319✔
531
    }
25,319✔
532

533
    pub fn aggregate(&mut self, mut op: Operation) -> Result<Vec<Operation>, PipelineError> {
14,254✔
534
        match op {
14,254✔
535
            Operation::Insert { ref mut new } => Ok(self.agg_insert(new)?),
12,931✔
536
            Operation::Delete { ref mut old } => Ok(self.agg_delete(old)?),
859✔
537
            Operation::Update {
538
                ref mut old,
464✔
539
                ref mut new,
464✔
540
            } => {
541
                let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() {
464✔
542
                    (
99✔
543
                        self.default_segment_key.clone(),
99✔
544
                        self.default_segment_key.clone(),
99✔
545
                    )
99✔
546
                } else {
547
                    (self.get_key(old)?, self.get_key(new)?)
365✔
548
                };
549

550
                if old_record_hash == new_record_hash {
464✔
551
                    Ok(self.agg_update(old, new, old_record_hash)?)
370✔
552
                } else {
553
                    let mut r = Vec::with_capacity(2);
94✔
554
                    r.extend(self.agg_delete(old)?);
94✔
555
                    r.extend(self.agg_insert(new)?);
94✔
556
                    Ok(r)
94✔
557
                }
558
            }
559
        }
560
    }
14,254✔
561

562
    fn get_key(&self, record: &Record) -> Result<RecordKey, PipelineError> {
4,324✔
563
        let mut key = Vec::<Field>::with_capacity(self.dimensions.len());
4,324✔
564
        for dimension in self.dimensions.iter() {
4,646✔
565
            key.push(dimension.evaluate(record, &self.input_schema)?);
4,646✔
566
        }
567
        if self.accurate_keys {
4,324✔
568
            Ok(RecordKey::Accurate(key))
4,324✔
569
        } else {
570
            Ok(RecordKey::Hash(get_record_hash(key.iter())))
×
571
        }
572
    }
4,324✔
573
}
574

575
impl Processor for AggregationProcessor {
576
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
212✔
577
        Ok(())
212✔
578
    }
212✔
579

580
    fn process(
13,675✔
581
        &mut self,
13,675✔
582
        _from_port: PortHandle,
13,675✔
583
        record_store: &ProcessorRecordStore,
13,675✔
584
        op: ProcessorOperation,
13,675✔
585
        fw: &mut dyn ProcessorChannelForwarder,
13,675✔
586
    ) -> Result<(), BoxedError> {
13,675✔
587
        let op = record_store.load_operation(&op)?;
13,675✔
588
        let ops = self.aggregate(op)?;
13,675✔
589
        for output_op in ops {
27,014✔
590
            let output_op = record_store.create_operation(&output_op)?;
13,339✔
591
            fw.send(output_op, DEFAULT_PORT_HANDLE);
13,339✔
592
        }
593
        Ok(())
13,675✔
594
    }
13,675✔
595

596
    fn serialize(
×
597
        &mut self,
×
598
        _record_store: &ProcessorRecordStore,
×
599
        mut object: Object,
×
600
    ) -> Result<(), BoxedError> {
×
601
        Ok(object.write(&bincode::serialize(&self.states)?)?)
×
602
    }
×
603
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc