• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4102355041

pending completion
4102355041

Pull #811

github

GitHub
Merge 37b55f3df into 7c772e92a
Pull Request #811: chore: integrating sql planner

427 of 427 new or added lines in 15 files covered. (100.0%)

24596 of 37831 relevant lines covered (65.02%)

37254.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.28
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2
use crate::deserialize;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::execution::ExpressionExecutor;
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::dag::channels::ProcessorChannelForwarder;
7
use dozer_core::dag::errors::ExecutionError;
8
use dozer_core::dag::errors::ExecutionError::InternalError;
9
use dozer_core::dag::node::{PortHandle, Processor};
10
use dozer_core::dag::DEFAULT_PORT_HANDLE;
11
use dozer_core::storage::lmdb_storage::{
12
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
13
};
14
use dozer_types::errors::types::TypeError;
15
use dozer_types::internal_err;
16
use dozer_types::types::{Field, Operation, Record, Schema};
17

18
use crate::pipeline::aggregation::aggregator::get_aggregator_from_aggregation_expression;
19
use dozer_core::dag::epoch::Epoch;
20
use dozer_core::dag::record_store::RecordReader;
21
use dozer_core::storage::common::Database;
22
use dozer_core::storage::errors::StorageError::InvalidDatabase;
23
use dozer_core::storage::prefix_transaction::PrefixTransaction;
24
use std::{collections::HashMap, mem::size_of_val};
25

26
const COUNTER_KEY: u8 = 1_u8;
27

28
pub(crate) struct AggregationData<'a> {
29
    pub value: Field,
30
    pub state: Option<&'a [u8]>,
31
    pub prefix: u32,
32
}
33

34
impl<'a> AggregationData<'a> {
35
    pub fn new(value: Field, state: Option<&'a [u8]>, prefix: u32) -> Self {
1,090✔
36
        Self {
1,090✔
37
            value,
1,090✔
38
            state,
1,090✔
39
            prefix,
1,090✔
40
        }
1,090✔
41
    }
1,090✔
42
}
43

44
#[derive(Debug)]
×
45
pub struct AggregationProcessor {
46
    dimensions: Vec<Expression>,
47
    measures: Vec<(Expression, Aggregator)>,
48
    projections: Vec<Expression>,
49
    pub db: Option<Database>,
50
    meta_db: Option<Database>,
51
    aggregators_db: Option<Database>,
52
    input_schema: Schema,
53
    aggregation_schema: Schema,
54
}
55

56
enum AggregatorOperation {
57
    Insert,
×
58
    Delete,
×
59
    Update,
×
60
}
×
61

×
62
const AGG_VALUES_DATASET_ID: u16 = 0x0000_u16;
×
63
const AGG_COUNT_DATASET_ID: u16 = 0x0001_u16;
×
64

65
const AGG_DEFAULT_DIMENSION_ID: u8 = 0xFF_u8;
66

×
67
impl AggregationProcessor {
68
    pub fn new(
75✔
69
        dimensions: Vec<Expression>,
75✔
70
        measures: Vec<Expression>,
75✔
71
        projections: Vec<Expression>,
75✔
72
        input_schema: Schema,
75✔
73
        aggregation_schema: Schema,
75✔
74
    ) -> Result<Self, PipelineError> {
75✔
75
        //
75✔
76
        let mut aggregators: Vec<(Expression, Aggregator)> = Vec::new();
75✔
77
        for measure in measures {
150✔
78
            aggregators.push(get_aggregator_from_aggregation_expression(
75✔
79
                &measure,
75✔
80
                &input_schema,
75✔
81
            )?)
75✔
82
        }
83

84
        Ok(Self {
75✔
85
            dimensions,
75✔
86
            measures: aggregators,
75✔
87
            projections,
75✔
88
            db: None,
75✔
89
            meta_db: None,
75✔
90
            aggregators_db: None,
75✔
91
            input_schema,
75✔
92
            aggregation_schema,
75✔
93
        })
75✔
94
    }
75✔
95

×
96
    fn init_store(&mut self, txn: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
75✔
97
        self.db = Some(txn.open_database("aggr", false)?);
75✔
98
        self.aggregators_db = Some(txn.open_database("aggr_data", false)?);
75✔
99
        self.meta_db = Some(txn.open_database("meta", false)?);
75✔
100
        Ok(())
75✔
101
    }
75✔
102

×
103
    fn get_record_key(&self, hash: &Vec<u8>, database_id: u16) -> Result<Vec<u8>, PipelineError> {
11,924✔
104
        let mut vec = Vec::with_capacity(hash.len().wrapping_add(size_of_val(&database_id)));
11,924✔
105
        vec.extend_from_slice(&database_id.to_be_bytes());
11,924✔
106
        vec.extend(hash);
11,924✔
107
        Ok(vec)
11,924✔
108
    }
11,924✔
109

×
110
    fn get_counter(&self, txn: &mut LmdbExclusiveTransaction) -> Result<u32, PipelineError> {
4,896✔
111
        let meta_db = *self
4,896✔
112
            .meta_db
4,896✔
113
            .as_ref()
4,896✔
114
            .ok_or(PipelineError::InternalStorageError(InvalidDatabase))?;
4,896✔
115
        let curr_ctr = match txn.get(meta_db, &COUNTER_KEY.to_be_bytes())? {
4,896✔
116
            Some(v) => u32::from_be_bytes(deserialize!(v)),
4,821✔
117
            None => 1_u32,
75✔
118
        };
×
119
        txn.put(
4,896✔
120
            meta_db,
4,896✔
121
            &COUNTER_KEY.to_be_bytes(),
4,896✔
122
            &(curr_ctr + 1).to_be_bytes(),
4,896✔
123
        )?;
4,896✔
124
        Ok(curr_ctr + 1)
4,896✔
125
    }
4,896✔
126

×
127
    pub(crate) fn decode_buffer(buf: &[u8]) -> Result<(usize, AggregationData), PipelineError> {
1,090✔
128
        let prefix = u32::from_be_bytes(buf[0..4].try_into().unwrap());
1,090✔
129
        let mut offset: usize = 4;
1,090✔
130

1,090✔
131
        let val_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
1,090✔
132
        offset += 2;
1,090✔
133
        let val: Field = Field::decode(&buf[offset..offset + val_len as usize])
1,090✔
134
            .map_err(TypeError::DeserializationError)?;
1,090✔
135
        offset += val_len as usize;
1,090✔
136
        let state_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
1,090✔
137
        offset += 2;
1,090✔
138
        let state: Option<&[u8]> = if state_len > 0 {
1,090✔
139
            Some(&buf[offset..offset + state_len as usize])
1,090✔
140
        } else {
×
141
            None
×
142
        };
×
143
        offset += state_len as usize;
1,090✔
144

1,090✔
145
        let r = AggregationData::new(val, state, prefix);
1,090✔
146
        Ok((offset, r))
1,090✔
147
    }
1,090✔
148

×
149
    pub(crate) fn encode_buffer(
5,986✔
150
        prefix: u32,
5,986✔
151
        value: &Field,
5,986✔
152
        state: &Option<Vec<u8>>,
5,986✔
153
    ) -> Result<(usize, Vec<u8>), PipelineError> {
5,986✔
154
        let mut r = Vec::with_capacity(512);
5,986✔
155
        r.extend(prefix.to_be_bytes());
5,986✔
156

5,986✔
157
        let sz_val = value.encode();
5,986✔
158
        r.extend((sz_val.len() as u16).to_be_bytes());
5,986✔
159
        r.extend(&sz_val);
5,986✔
160

×
161
        let len = if let Some(state) = state.as_ref() {
5,986✔
162
            r.extend((state.len() as u16).to_be_bytes());
5,955✔
163
            r.extend(state);
5,955✔
164
            state.len()
5,955✔
165
        } else {
×
166
            r.extend(0_u16.to_be_bytes());
31✔
167
            0_usize
31✔
168
        };
×
169

×
170
        Ok((5 + sz_val.len() + len, r))
5,986✔
171
    }
5,986✔
172

×
173
    fn calc_and_fill_measures(
5,984✔
174
        &self,
5,984✔
175
        txn: &mut LmdbExclusiveTransaction,
5,984✔
176
        cur_state: &Option<Vec<u8>>,
5,984✔
177
        deleted_record: Option<&Record>,
5,984✔
178
        inserted_record: Option<&Record>,
5,984✔
179
        out_rec_delete: &mut Vec<Field>,
5,984✔
180
        out_rec_insert: &mut Vec<Field>,
5,984✔
181
        op: AggregatorOperation,
5,984✔
182
    ) -> Result<Vec<u8>, PipelineError> {
5,984✔
183
        // array holding the list of states for all measures
5,984✔
184
        let mut next_state = Vec::<u8>::new();
5,984✔
185
        let mut offset: usize = 0;
5,984✔
186

×
187
        for measure in &self.measures {
11,968✔
188
            let curr_agg_data = match cur_state {
5,984✔
189
                Some(ref e) => {
1,088✔
190
                    let (len, res) = Self::decode_buffer(&e[offset..])?;
1,088✔
191
                    offset += len;
1,088✔
192
                    Some(res)
1,088✔
193
                }
×
194
                None => None,
4,896✔
195
            };
×
196

×
197
            let (prefix, next_state_slice) = match op {
5,984✔
198
                AggregatorOperation::Insert => {
×
199
                    let inserted_field = measure
5,799✔
200
                        .0
5,799✔
201
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
5,799✔
202
                    if let Some(curr) = curr_agg_data {
5,799✔
203
                        out_rec_delete.push(curr.value);
903✔
204
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
903✔
205
                        let r = measure.1.insert(
903✔
206
                            curr.state,
903✔
207
                            &inserted_field,
903✔
208
                            measure.0.get_type(&self.input_schema)?.return_type,
903✔
209
                            &mut p_tx,
903✔
210
                            self.aggregators_db.unwrap(),
903✔
211
                        )?;
×
212
                        (curr.prefix, r)
903✔
213
                    } else {
×
214
                        let prefix = self.get_counter(txn)?;
4,896✔
215
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
4,896✔
216
                        let r = measure.1.insert(
4,896✔
217
                            None,
4,896✔
218
                            &inserted_field,
4,896✔
219
                            measure.0.get_type(&self.input_schema)?.return_type,
4,896✔
220
                            &mut p_tx,
4,896✔
221
                            self.aggregators_db.unwrap(),
4,896✔
222
                        )?;
×
223
                        (prefix, r)
4,896✔
224
                    }
225
                }
×
226
                AggregatorOperation::Delete => {
×
227
                    let deleted_field = measure
141✔
228
                        .0
141✔
229
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
141✔
230
                    if let Some(curr) = curr_agg_data {
141✔
231
                        out_rec_delete.push(curr.value);
141✔
232
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
141✔
233
                        let r = measure.1.delete(
141✔
234
                            curr.state,
141✔
235
                            &deleted_field,
141✔
236
                            measure.0.get_type(&self.input_schema)?.return_type,
141✔
237
                            &mut p_tx,
141✔
238
                            self.aggregators_db.unwrap(),
141✔
239
                        )?;
×
240
                        (curr.prefix, r)
141✔
241
                    } else {
×
242
                        let prefix = self.get_counter(txn)?;
×
243
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
244
                        let r = measure.1.delete(
×
245
                            None,
×
246
                            &deleted_field,
×
247
                            measure.0.get_type(&self.input_schema)?.return_type,
×
248
                            &mut p_tx,
×
249
                            self.aggregators_db.unwrap(),
×
250
                        )?;
×
251
                        (prefix, r)
×
252
                    }
253
                }
×
254
                AggregatorOperation::Update => {
×
255
                    let deleted_field = measure
44✔
256
                        .0
44✔
257
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
44✔
258
                    let updated_field = measure
44✔
259
                        .0
44✔
260
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
44✔
261

×
262
                    if let Some(curr) = curr_agg_data {
44✔
263
                        out_rec_delete.push(curr.value);
44✔
264
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
44✔
265
                        let r = measure.1.update(
44✔
266
                            curr.state,
44✔
267
                            &deleted_field,
44✔
268
                            &updated_field,
44✔
269
                            measure.0.get_type(&self.input_schema)?.return_type,
44✔
270
                            &mut p_tx,
44✔
271
                            self.aggregators_db.unwrap(),
44✔
272
                        )?;
×
273
                        (curr.prefix, r)
44✔
274
                    } else {
×
275
                        let prefix = self.get_counter(txn)?;
×
276
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
277
                        let r = measure.1.update(
×
278
                            None,
×
279
                            &deleted_field,
×
280
                            &updated_field,
×
281
                            measure.0.get_type(&self.input_schema)?.return_type,
×
282
                            &mut p_tx,
×
283
                            self.aggregators_db.unwrap(),
×
284
                        )?;
×
285
                        (prefix, r)
×
286
                    }
×
287
                }
×
288
            };
×
289

×
290
            next_state.extend(
×
291
                &Self::encode_buffer(prefix, &next_state_slice.value, &next_state_slice.state)?.1,
5,984✔
292
            );
×
293
            out_rec_insert.push(next_state_slice.value);
5,984✔
294
        }
×
295

×
296
        Ok(next_state)
5,984✔
297
    }
5,984✔
298

299
    fn update_segment_count(
5,940✔
300
        &self,
5,940✔
301
        txn: &mut LmdbExclusiveTransaction,
5,940✔
302
        db: Database,
5,940✔
303
        key: Vec<u8>,
5,940✔
304
        delta: u64,
5,940✔
305
        decr: bool,
5,940✔
306
    ) -> Result<u64, PipelineError> {
5,940✔
307
        let bytes = txn.get(db, key.as_slice())?;
5,940✔
308

×
309
        let curr_count = match bytes {
5,940✔
310
            Some(b) => u64::from_be_bytes(deserialize!(b)),
1,044✔
311
            None => 0_u64,
4,896✔
312
        };
×
313

×
314
        let new_val = if decr {
5,940✔
315
            curr_count.wrapping_sub(delta)
141✔
316
        } else {
×
317
            curr_count.wrapping_add(delta)
5,799✔
318
        };
×
319

320
        if new_val > 0 {
5,940✔
321
            txn.put(db, key.as_slice(), new_val.to_be_bytes().as_slice())?;
5,866✔
322
        } else {
×
323
            txn.del(db, key.as_slice(), None)?;
74✔
324
        }
325
        Ok(curr_count)
5,940✔
326
    }
5,940✔
327

328
    fn agg_delete(
141✔
329
        &self,
141✔
330
        txn: &mut LmdbExclusiveTransaction,
141✔
331
        db: Database,
141✔
332
        old: &mut Record,
141✔
333
    ) -> Result<Operation, PipelineError> {
141✔
334
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
141✔
335
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
141✔
336

×
337
        let record_hash = if !self.dimensions.is_empty() {
141✔
338
            get_key(&self.input_schema, old, &self.dimensions)?
141✔
339
        } else {
×
340
            vec![AGG_DEFAULT_DIMENSION_ID]
×
341
        };
×
342

×
343
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
141✔
344

×
345
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
141✔
346
        let prev_count = self.update_segment_count(txn, db, record_count_key, 1, true)?;
141✔
347

×
348
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
141✔
349
        let new_state = self.calc_and_fill_measures(
141✔
350
            txn,
141✔
351
            &cur_state,
141✔
352
            Some(old),
141✔
353
            None,
141✔
354
            &mut out_rec_delete,
141✔
355
            &mut out_rec_insert,
141✔
356
            AggregatorOperation::Delete,
141✔
357
        )?;
141✔
358

×
359
        let res = if prev_count == 1 {
141✔
360
            Operation::Delete {
361
                old: self.build_projection(old, out_rec_delete)?,
74✔
362
            }
×
363
        } else {
×
364
            Operation::Update {
×
365
                new: self.build_projection(old, out_rec_insert)?,
67✔
366
                old: self.build_projection(old, out_rec_delete)?,
67✔
367
            }
×
368
        };
×
369

×
370
        if prev_count == 1 {
141✔
371
            let _ = txn.del(db, record_key.as_slice(), None)?;
74✔
372
        } else {
×
373
            txn.put(db, record_key.as_slice(), new_state.as_slice())?;
67✔
374
        }
×
375
        Ok(res)
141✔
376
    }
141✔
377

378
    fn agg_insert(
5,799✔
379
        &self,
5,799✔
380
        txn: &mut LmdbExclusiveTransaction,
5,799✔
381
        db: Database,
5,799✔
382
        new: &mut Record,
5,799✔
383
    ) -> Result<Operation, PipelineError> {
5,799✔
384
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
5,799✔
385
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
5,799✔
386

×
387
        let record_hash = if !self.dimensions.is_empty() {
5,799✔
388
            get_key(&self.input_schema, new, &self.dimensions)?
4,957✔
389
        } else {
×
390
            vec![AGG_DEFAULT_DIMENSION_ID]
842✔
391
        };
×
392

×
393
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
5,799✔
394

×
395
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
5,799✔
396
        self.update_segment_count(txn, db, record_count_key, 1, false)?;
5,799✔
397

×
398
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
5,799✔
399
        let new_state = self.calc_and_fill_measures(
5,799✔
400
            txn,
5,799✔
401
            &cur_state,
5,799✔
402
            None,
5,799✔
403
            Some(new),
5,799✔
404
            &mut out_rec_delete,
5,799✔
405
            &mut out_rec_insert,
5,799✔
406
            AggregatorOperation::Insert,
5,799✔
407
        )?;
5,799✔
408

×
409
        let res = if cur_state.is_none() {
5,799✔
410
            Operation::Insert {
411
                new: self.build_projection(new, out_rec_insert)?,
4,896✔
412
            }
413
        } else {
×
414
            Operation::Update {
×
415
                new: self.build_projection(new, out_rec_insert)?,
903✔
416
                old: self.build_projection(new, out_rec_delete)?,
903✔
417
            }
×
418
        };
×
419

×
420
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
5,799✔
421
        Ok(res)
5,799✔
422
    }
5,799✔
423

×
424
    fn agg_update(
44✔
425
        &self,
44✔
426
        txn: &mut LmdbExclusiveTransaction,
44✔
427
        db: Database,
44✔
428
        old: &mut Record,
44✔
429
        new: &mut Record,
44✔
430
        record_hash: Vec<u8>,
44✔
431
    ) -> Result<Operation, PipelineError> {
44✔
432
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
44✔
433
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
44✔
434
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
44✔
435

×
436
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
44✔
437
        let new_state = self.calc_and_fill_measures(
44✔
438
            txn,
44✔
439
            &cur_state,
44✔
440
            Some(old),
44✔
441
            Some(new),
44✔
442
            &mut out_rec_delete,
44✔
443
            &mut out_rec_insert,
44✔
444
            AggregatorOperation::Update,
44✔
445
        )?;
44✔
446

×
447
        let res = Operation::Update {
44✔
448
            new: self.build_projection(new, out_rec_insert)?,
44✔
449
            old: self.build_projection(old, out_rec_delete)?,
44✔
450
        };
×
451

×
452
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
44✔
453

×
454
        Ok(res)
44✔
455
    }
44✔
456

×
457
    pub fn build_projection(
6,998✔
458
        &self,
6,998✔
459
        original: &mut Record,
6,998✔
460
        measures: Vec<Field>,
6,998✔
461
    ) -> Result<Record, PipelineError> {
6,998✔
462
        let original_len = original.values.len();
6,998✔
463
        original.values.extend(measures);
6,998✔
464
        let mut output = Vec::<Field>::with_capacity(self.projections.len());
6,998✔
465
        for exp in &self.projections {
19,788✔
466
            output.push(exp.evaluate(original, &self.aggregation_schema)?);
12,790✔
467
        }
×
468
        original.values.drain(original_len..);
6,998✔
469
        Ok(Record::new(None, output, None))
6,998✔
470
    }
6,998✔
471

×
472
    pub fn aggregate(
5,961✔
473
        &self,
5,961✔
474
        txn: &mut LmdbExclusiveTransaction,
5,961✔
475
        db: Database,
5,961✔
476
        mut op: Operation,
5,961✔
477
    ) -> Result<Vec<Operation>, PipelineError> {
5,961✔
478
        match op {
5,961✔
479
            Operation::Insert { ref mut new } => Ok(vec![self.agg_insert(txn, db, new)?]),
5,776✔
480
            Operation::Delete { ref mut old } => Ok(vec![self.agg_delete(txn, db, old)?]),
118✔
481
            Operation::Update {
×
482
                ref mut old,
67✔
483
                ref mut new,
67✔
484
            } => {
×
485
                let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() {
67✔
486
                    (
×
487
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
488
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
489
                    )
×
490
                } else {
×
491
                    (
×
492
                        get_key(&self.input_schema, old, &self.dimensions)?,
67✔
493
                        get_key(&self.input_schema, new, &self.dimensions)?,
67✔
494
                    )
×
495
                };
×
496

×
497
                if old_record_hash == new_record_hash {
67✔
498
                    Ok(vec![self.agg_update(txn, db, old, new, old_record_hash)?])
44✔
499
                } else {
500
                    Ok(vec![
23✔
501
                        self.agg_delete(txn, db, old)?,
23✔
502
                        self.agg_insert(txn, db, new)?,
23✔
503
                    ])
504
                }
505
            }
506
        }
507
    }
5,961✔
508
}
×
509

510
fn get_key(
5,232✔
511
    schema: &Schema,
5,232✔
512
    record: &Record,
5,232✔
513
    dimensions: &[Expression],
5,232✔
514
) -> Result<Vec<u8>, PipelineError> {
5,232✔
515
    let mut tot_size = 0_usize;
5,232✔
516
    let mut buffers = Vec::<Vec<u8>>::with_capacity(dimensions.len());
5,232✔
517

×
518
    for dimension in dimensions.iter() {
5,232✔
519
        let value = dimension.evaluate(record, schema)?;
5,232✔
520
        let bytes = value.encode();
5,232✔
521
        tot_size += bytes.len();
5,232✔
522
        buffers.push(bytes);
5,232✔
523
    }
×
524

×
525
    let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
5,232✔
526
    for i in buffers {
10,464✔
527
        res_buffer.extend(i);
5,232✔
528
    }
5,232✔
529
    Ok(res_buffer)
5,232✔
530
}
5,232✔
531

×
532
impl Processor for AggregationProcessor {
×
533
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
75✔
534
        internal_err!(self.init_store(state))
×
535
    }
75✔
536

×
537
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
34✔
538
        Ok(())
34✔
539
    }
34✔
540

×
541
    fn process(
5,666✔
542
        &mut self,
5,666✔
543
        _from_port: PortHandle,
5,666✔
544
        op: Operation,
5,666✔
545
        fw: &mut dyn ProcessorChannelForwarder,
5,666✔
546
        txn: &SharedTransaction,
5,666✔
547
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
5,666✔
548
    ) -> Result<(), ExecutionError> {
5,666✔
549
        match self.db {
5,666✔
550
            Some(d) => {
5,666✔
551
                let ops = internal_err!(self.aggregate(&mut txn.write(), d, op))?;
5,666✔
552
                for fop in ops {
11,332✔
553
                    fw.send(fop, DEFAULT_PORT_HANDLE)?;
5,666✔
554
                }
×
555
                Ok(())
5,666✔
556
            }
×
557
            _ => Err(ExecutionError::InvalidDatabase),
×
558
        }
×
559
    }
5,666✔
560
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc