• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4124646176

pending completion
4124646176

Pull #811

github

GitHub
Merge c6bc261de into f4fe30c14
Pull Request #811: chore: integrating sql planner

737 of 737 new or added lines in 23 files covered. (100.0%)

23321 of 35114 relevant lines covered (66.42%)

35990.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.79
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2
use crate::deserialize;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::execution::ExpressionExecutor;
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::channels::ProcessorChannelForwarder;
7
use dozer_core::errors::ExecutionError;
8
use dozer_core::errors::ExecutionError::InternalError;
9
use dozer_core::node::{PortHandle, Processor};
10
use dozer_core::storage::lmdb_storage::{
11
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
12
};
13
use dozer_core::DEFAULT_PORT_HANDLE;
14
use dozer_types::errors::types::TypeError;
15
use dozer_types::internal_err;
16
use dozer_types::types::{Field, Operation, Record, Schema};
17

18
use crate::pipeline::aggregation::aggregator::get_aggregator_from_aggregation_expression;
19
use dozer_core::epoch::Epoch;
20
use dozer_core::record_store::RecordReader;
21
use dozer_core::storage::common::Database;
22
use dozer_core::storage::errors::StorageError::InvalidDatabase;
23
use dozer_core::storage::prefix_transaction::PrefixTransaction;
24
use lmdb::DatabaseFlags;
25
use std::{collections::HashMap, mem::size_of_val};
26

27
const COUNTER_KEY: u8 = 1_u8;
28

29
pub(crate) struct AggregationData<'a> {
30
    pub value: Field,
31
    pub state: Option<&'a [u8]>,
32
    pub prefix: u32,
33
}
34

35
impl<'a> AggregationData<'a> {
36
    pub fn new(value: Field, state: Option<&'a [u8]>, prefix: u32) -> Self {
6,920✔
37
        Self {
6,920✔
38
            value,
6,920✔
39
            state,
6,920✔
40
            prefix,
6,920✔
41
        }
6,920✔
42
    }
6,920✔
43
}
44

45
#[derive(Debug)]
×
46
pub struct AggregationProcessor {
47
    dimensions: Vec<Expression>,
48
    measures: Vec<(Expression, Aggregator)>,
49
    projections: Vec<Expression>,
50
    pub db: Option<Database>,
51
    meta_db: Option<Database>,
52
    aggregators_db: Option<Database>,
53
    input_schema: Schema,
54
    aggregation_schema: Schema,
55
}
56

57
enum AggregatorOperation {
×
58
    Insert,
×
59
    Delete,
×
60
    Update,
×
61
}
×
62

×
63
const AGG_VALUES_DATASET_ID: u16 = 0x0000_u16;
×
64
const AGG_COUNT_DATASET_ID: u16 = 0x0001_u16;
×
65

66
const AGG_DEFAULT_DIMENSION_ID: u8 = 0xFF_u8;
×
67

×
68
impl AggregationProcessor {
69
    pub fn new(
74✔
70
        dimensions: Vec<Expression>,
74✔
71
        measures: Vec<Expression>,
74✔
72
        projections: Vec<Expression>,
74✔
73
        input_schema: Schema,
74✔
74
        aggregation_schema: Schema,
74✔
75
    ) -> Result<Self, PipelineError> {
74✔
76
        //
74✔
77
        let mut aggregators: Vec<(Expression, Aggregator)> = Vec::new();
74✔
78
        for measure in measures {
148✔
79
            aggregators.push(get_aggregator_from_aggregation_expression(
74✔
80
                &measure,
74✔
81
                &input_schema,
74✔
82
            )?)
74✔
83
        }
84

85
        Ok(Self {
74✔
86
            dimensions,
74✔
87
            measures: aggregators,
74✔
88
            projections,
74✔
89
            db: None,
74✔
90
            meta_db: None,
74✔
91
            aggregators_db: None,
74✔
92
            input_schema,
74✔
93
            aggregation_schema,
74✔
94
        })
74✔
95
    }
74✔
96

×
97
    fn init_store(&mut self, env: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
74✔
98
        self.db = Some(env.create_database(Some("aggr"), Some(DatabaseFlags::empty()))?);
74✔
99
        self.aggregators_db =
×
100
            Some(env.create_database(Some("aggr_data"), Some(DatabaseFlags::empty()))?);
74✔
101
        self.meta_db = Some(env.create_database(Some("meta"), Some(DatabaseFlags::empty()))?);
74✔
102
        Ok(())
74✔
103
    }
74✔
104

×
105
    fn get_record_key(&self, hash: &Vec<u8>, database_id: u16) -> Result<Vec<u8>, PipelineError> {
21,984✔
106
        let mut vec = Vec::with_capacity(hash.len().wrapping_add(size_of_val(&database_id)));
21,984✔
107
        vec.extend_from_slice(&database_id.to_be_bytes());
21,984✔
108
        vec.extend(hash);
21,984✔
109
        Ok(vec)
21,984✔
110
    }
21,984✔
111

×
112
    fn get_counter(&self, txn: &mut LmdbExclusiveTransaction) -> Result<u32, PipelineError> {
4,096✔
113
        let meta_db = *self
4,096✔
114
            .meta_db
4,096✔
115
            .as_ref()
4,096✔
116
            .ok_or(PipelineError::InternalStorageError(InvalidDatabase))?;
4,096✔
117
        let curr_ctr = match txn.get(meta_db, &COUNTER_KEY.to_be_bytes())? {
4,096✔
118
            Some(v) => u32::from_be_bytes(deserialize!(v)),
4,022✔
119
            None => 1_u32,
74✔
120
        };
×
121
        txn.put(
4,096✔
122
            meta_db,
4,096✔
123
            &COUNTER_KEY.to_be_bytes(),
4,096✔
124
            &(curr_ctr + 1).to_be_bytes(),
4,096✔
125
        )?;
4,096✔
126
        Ok(curr_ctr + 1)
4,096✔
127
    }
4,096✔
128

×
129
    pub(crate) fn decode_buffer(buf: &[u8]) -> Result<(usize, AggregationData), PipelineError> {
6,920✔
130
        let prefix = u32::from_be_bytes(buf[0..4].try_into().unwrap());
6,920✔
131
        let mut offset: usize = 4;
6,920✔
132

6,920✔
133
        let val_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
6,920✔
134
        offset += 2;
6,920✔
135
        let val: Field = Field::decode(&buf[offset..offset + val_len as usize])
6,920✔
136
            .map_err(TypeError::DeserializationError)?;
6,920✔
137
        offset += val_len as usize;
6,920✔
138
        let state_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
6,920✔
139
        offset += 2;
6,920✔
140
        let state: Option<&[u8]> = if state_len > 0 {
6,920✔
141
            Some(&buf[offset..offset + state_len as usize])
6,920✔
142
        } else {
×
143
            None
×
144
        };
×
145
        offset += state_len as usize;
6,920✔
146

6,920✔
147
        let r = AggregationData::new(val, state, prefix);
6,920✔
148
        Ok((offset, r))
6,920✔
149
    }
6,920✔
150

×
151
    pub(crate) fn encode_buffer(
11,016✔
152
        prefix: u32,
11,016✔
153
        value: &Field,
11,016✔
154
        state: &Option<Vec<u8>>,
11,016✔
155
    ) -> Result<(usize, Vec<u8>), PipelineError> {
11,016✔
156
        let mut r = Vec::with_capacity(512);
11,016✔
157
        r.extend(prefix.to_be_bytes());
11,016✔
158

11,016✔
159
        let sz_val = value.encode();
11,016✔
160
        r.extend((sz_val.len() as u16).to_be_bytes());
11,016✔
161
        r.extend(&sz_val);
11,016✔
162

×
163
        let len = if let Some(state) = state.as_ref() {
11,016✔
164
            r.extend((state.len() as u16).to_be_bytes());
10,985✔
165
            r.extend(state);
10,985✔
166
            state.len()
10,985✔
167
        } else {
×
168
            r.extend(0_u16.to_be_bytes());
31✔
169
            0_usize
31✔
170
        };
×
171

×
172
        Ok((5 + sz_val.len() + len, r))
11,016✔
173
    }
11,016✔
174

×
175
    fn calc_and_fill_measures(
11,014✔
176
        &self,
11,014✔
177
        txn: &mut LmdbExclusiveTransaction,
11,014✔
178
        cur_state: &Option<Vec<u8>>,
11,014✔
179
        deleted_record: Option<&Record>,
11,014✔
180
        inserted_record: Option<&Record>,
11,014✔
181
        out_rec_delete: &mut Vec<Field>,
11,014✔
182
        out_rec_insert: &mut Vec<Field>,
11,014✔
183
        op: AggregatorOperation,
11,014✔
184
    ) -> Result<Vec<u8>, PipelineError> {
11,014✔
185
        // array holding the list of states for all measures
11,014✔
186
        let mut next_state = Vec::<u8>::new();
11,014✔
187
        let mut offset: usize = 0;
11,014✔
188

×
189
        for measure in &self.measures {
22,028✔
190
            let curr_agg_data = match cur_state {
11,014✔
191
                Some(ref e) => {
6,918✔
192
                    let (len, res) = Self::decode_buffer(&e[offset..])?;
6,918✔
193
                    offset += len;
6,918✔
194
                    Some(res)
6,918✔
195
                }
×
196
                None => None,
4,096✔
197
            };
×
198

×
199
            let (prefix, next_state_slice) = match op {
11,014✔
200
                AggregatorOperation::Insert => {
×
201
                    let inserted_field = measure
10,823✔
202
                        .0
10,823✔
203
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
10,823✔
204
                    if let Some(curr) = curr_agg_data {
10,823✔
205
                        out_rec_delete.push(curr.value);
6,727✔
206
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
6,727✔
207
                        let r = measure.1.insert(
6,727✔
208
                            curr.state,
6,727✔
209
                            &inserted_field,
6,727✔
210
                            measure.0.get_type(&self.input_schema)?.return_type,
6,727✔
211
                            &mut p_tx,
6,727✔
212
                            self.aggregators_db.unwrap(),
6,727✔
213
                        )?;
×
214
                        (curr.prefix, r)
6,727✔
215
                    } else {
×
216
                        let prefix = self.get_counter(txn)?;
4,096✔
217
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
4,096✔
218
                        let r = measure.1.insert(
4,096✔
219
                            None,
4,096✔
220
                            &inserted_field,
4,096✔
221
                            measure.0.get_type(&self.input_schema)?.return_type,
4,096✔
222
                            &mut p_tx,
4,096✔
223
                            self.aggregators_db.unwrap(),
4,096✔
224
                        )?;
×
225
                        (prefix, r)
4,096✔
226
                    }
×
227
                }
×
228
                AggregatorOperation::Delete => {
×
229
                    let deleted_field = measure
147✔
230
                        .0
147✔
231
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
147✔
232
                    if let Some(curr) = curr_agg_data {
147✔
233
                        out_rec_delete.push(curr.value);
147✔
234
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
147✔
235
                        let r = measure.1.delete(
147✔
236
                            curr.state,
147✔
237
                            &deleted_field,
147✔
238
                            measure.0.get_type(&self.input_schema)?.return_type,
147✔
239
                            &mut p_tx,
147✔
240
                            self.aggregators_db.unwrap(),
147✔
241
                        )?;
×
242
                        (curr.prefix, r)
147✔
243
                    } else {
×
244
                        let prefix = self.get_counter(txn)?;
×
245
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
246
                        let r = measure.1.delete(
×
247
                            None,
×
248
                            &deleted_field,
×
249
                            measure.0.get_type(&self.input_schema)?.return_type,
×
250
                            &mut p_tx,
×
251
                            self.aggregators_db.unwrap(),
×
252
                        )?;
×
253
                        (prefix, r)
×
254
                    }
×
255
                }
×
256
                AggregatorOperation::Update => {
×
257
                    let deleted_field = measure
44✔
258
                        .0
44✔
259
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
44✔
260
                    let updated_field = measure
44✔
261
                        .0
44✔
262
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
44✔
263

×
264
                    if let Some(curr) = curr_agg_data {
44✔
265
                        out_rec_delete.push(curr.value);
44✔
266
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
44✔
267
                        let r = measure.1.update(
44✔
268
                            curr.state,
44✔
269
                            &deleted_field,
44✔
270
                            &updated_field,
44✔
271
                            measure.0.get_type(&self.input_schema)?.return_type,
44✔
272
                            &mut p_tx,
44✔
273
                            self.aggregators_db.unwrap(),
44✔
274
                        )?;
×
275
                        (curr.prefix, r)
44✔
276
                    } else {
×
277
                        let prefix = self.get_counter(txn)?;
×
278
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
279
                        let r = measure.1.update(
×
280
                            None,
×
281
                            &deleted_field,
×
282
                            &updated_field,
×
283
                            measure.0.get_type(&self.input_schema)?.return_type,
×
284
                            &mut p_tx,
×
285
                            self.aggregators_db.unwrap(),
×
286
                        )?;
×
287
                        (prefix, r)
×
288
                    }
×
289
                }
×
290
            };
×
291

×
292
            next_state.extend(
×
293
                &Self::encode_buffer(prefix, &next_state_slice.value, &next_state_slice.state)?.1,
11,014✔
294
            );
×
295
            out_rec_insert.push(next_state_slice.value);
11,014✔
296
        }
×
297

×
298
        Ok(next_state)
11,014✔
299
    }
11,014✔
300

301
    fn update_segment_count(
10,970✔
302
        &self,
10,970✔
303
        txn: &mut LmdbExclusiveTransaction,
10,970✔
304
        db: Database,
10,970✔
305
        key: Vec<u8>,
10,970✔
306
        delta: u64,
10,970✔
307
        decr: bool,
10,970✔
308
    ) -> Result<u64, PipelineError> {
10,970✔
309
        let bytes = txn.get(db, key.as_slice())?;
10,970✔
310

×
311
        let curr_count = match bytes {
10,970✔
312
            Some(b) => u64::from_be_bytes(deserialize!(b)),
6,874✔
313
            None => 0_u64,
4,096✔
314
        };
×
315

×
316
        let new_val = if decr {
10,970✔
317
            curr_count.wrapping_sub(delta)
147✔
318
        } else {
×
319
            curr_count.wrapping_add(delta)
10,823✔
320
        };
×
321

×
322
        if new_val > 0 {
10,970✔
323
            txn.put(db, key.as_slice(), new_val.to_be_bytes().as_slice())?;
10,890✔
324
        } else {
×
325
            txn.del(db, key.as_slice(), None)?;
80✔
326
        }
×
327
        Ok(curr_count)
10,970✔
328
    }
10,970✔
329

330
    fn agg_delete(
147✔
331
        &self,
147✔
332
        txn: &mut LmdbExclusiveTransaction,
147✔
333
        db: Database,
147✔
334
        old: &mut Record,
147✔
335
    ) -> Result<Operation, PipelineError> {
147✔
336
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
147✔
337
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
147✔
338

×
339
        let record_hash = if !self.dimensions.is_empty() {
147✔
340
            get_key(&self.input_schema, old, &self.dimensions)?
147✔
341
        } else {
×
342
            vec![AGG_DEFAULT_DIMENSION_ID]
×
343
        };
×
344

×
345
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
147✔
346

×
347
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
147✔
348
        let prev_count = self.update_segment_count(txn, db, record_count_key, 1, true)?;
147✔
349

×
350
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
147✔
351
        let new_state = self.calc_and_fill_measures(
147✔
352
            txn,
147✔
353
            &cur_state,
147✔
354
            Some(old),
147✔
355
            None,
147✔
356
            &mut out_rec_delete,
147✔
357
            &mut out_rec_insert,
147✔
358
            AggregatorOperation::Delete,
147✔
359
        )?;
147✔
360

×
361
        let res = if prev_count == 1 {
147✔
362
            Operation::Delete {
×
363
                old: self.build_projection(old, out_rec_delete)?,
80✔
364
            }
×
365
        } else {
×
366
            Operation::Update {
×
367
                new: self.build_projection(old, out_rec_insert)?,
67✔
368
                old: self.build_projection(old, out_rec_delete)?,
67✔
369
            }
×
370
        };
×
371

×
372
        if prev_count == 1 {
147✔
373
            let _ = txn.del(db, record_key.as_slice(), None)?;
80✔
374
        } else {
×
375
            txn.put(db, record_key.as_slice(), new_state.as_slice())?;
67✔
376
        }
×
377
        Ok(res)
147✔
378
    }
147✔
379

×
380
    fn agg_insert(
10,823✔
381
        &self,
10,823✔
382
        txn: &mut LmdbExclusiveTransaction,
10,823✔
383
        db: Database,
10,823✔
384
        new: &mut Record,
10,823✔
385
    ) -> Result<Operation, PipelineError> {
10,823✔
386
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
10,823✔
387
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
10,823✔
388

×
389
        let record_hash = if !self.dimensions.is_empty() {
10,823✔
390
            get_key(&self.input_schema, new, &self.dimensions)?
4,159✔
391
        } else {
×
392
            vec![AGG_DEFAULT_DIMENSION_ID]
6,664✔
393
        };
×
394

×
395
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
10,823✔
396

×
397
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
10,823✔
398
        self.update_segment_count(txn, db, record_count_key, 1, false)?;
10,823✔
399

×
400
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
10,823✔
401
        let new_state = self.calc_and_fill_measures(
10,823✔
402
            txn,
10,823✔
403
            &cur_state,
10,823✔
404
            None,
10,823✔
405
            Some(new),
10,823✔
406
            &mut out_rec_delete,
10,823✔
407
            &mut out_rec_insert,
10,823✔
408
            AggregatorOperation::Insert,
10,823✔
409
        )?;
10,823✔
410

×
411
        let res = if cur_state.is_none() {
10,823✔
412
            Operation::Insert {
413
                new: self.build_projection(new, out_rec_insert)?,
4,096✔
414
            }
×
415
        } else {
×
416
            Operation::Update {
×
417
                new: self.build_projection(new, out_rec_insert)?,
6,727✔
418
                old: self.build_projection(new, out_rec_delete)?,
6,727✔
419
            }
×
420
        };
×
421

×
422
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
10,823✔
423
        Ok(res)
10,823✔
424
    }
10,823✔
425

×
426
    fn agg_update(
44✔
427
        &self,
44✔
428
        txn: &mut LmdbExclusiveTransaction,
44✔
429
        db: Database,
44✔
430
        old: &mut Record,
44✔
431
        new: &mut Record,
44✔
432
        record_hash: Vec<u8>,
44✔
433
    ) -> Result<Operation, PipelineError> {
44✔
434
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
44✔
435
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
44✔
436
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
44✔
437

×
438
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
44✔
439
        let new_state = self.calc_and_fill_measures(
44✔
440
            txn,
44✔
441
            &cur_state,
44✔
442
            Some(old),
44✔
443
            Some(new),
44✔
444
            &mut out_rec_delete,
44✔
445
            &mut out_rec_insert,
44✔
446
            AggregatorOperation::Update,
44✔
447
        )?;
44✔
448

×
449
        let res = Operation::Update {
44✔
450
            new: self.build_projection(new, out_rec_insert)?,
44✔
451
            old: self.build_projection(old, out_rec_delete)?,
44✔
452
        };
×
453

×
454
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
44✔
455

×
456
        Ok(res)
44✔
457
    }
44✔
458

×
459
    pub fn build_projection(
17,852✔
460
        &self,
17,852✔
461
        original: &mut Record,
17,852✔
462
        measures: Vec<Field>,
17,852✔
463
    ) -> Result<Record, PipelineError> {
17,852✔
464
        let original_len = original.values.len();
17,852✔
465
        original.values.extend(measures);
17,852✔
466
        let mut output = Vec::<Field>::with_capacity(self.projections.len());
17,852✔
467
        for exp in &self.projections {
52,752✔
468
            output.push(exp.evaluate(original, &self.aggregation_schema)?);
34,900✔
469
        }
×
470
        original.values.drain(original_len..);
17,852✔
471
        Ok(Record::new(None, output, None))
17,852✔
472
    }
17,852✔
473

×
474
    pub fn aggregate(
10,991✔
475
        &self,
10,991✔
476
        txn: &mut LmdbExclusiveTransaction,
10,991✔
477
        db: Database,
10,991✔
478
        mut op: Operation,
10,991✔
479
    ) -> Result<Vec<Operation>, PipelineError> {
10,991✔
480
        match op {
10,991✔
481
            Operation::Insert { ref mut new } => Ok(vec![self.agg_insert(txn, db, new)?]),
10,800✔
482
            Operation::Delete { ref mut old } => Ok(vec![self.agg_delete(txn, db, old)?]),
124✔
483
            Operation::Update {
×
484
                ref mut old,
67✔
485
                ref mut new,
67✔
486
            } => {
×
487
                let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() {
67✔
488
                    (
×
489
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
490
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
491
                    )
×
492
                } else {
×
493
                    (
×
494
                        get_key(&self.input_schema, old, &self.dimensions)?,
67✔
495
                        get_key(&self.input_schema, new, &self.dimensions)?,
67✔
496
                    )
×
497
                };
×
498

×
499
                if old_record_hash == new_record_hash {
67✔
500
                    Ok(vec![self.agg_update(txn, db, old, new, old_record_hash)?])
44✔
501
                } else {
×
502
                    Ok(vec![
23✔
503
                        self.agg_delete(txn, db, old)?,
23✔
504
                        self.agg_insert(txn, db, new)?,
23✔
505
                    ])
506
                }
507
            }
×
508
        }
×
509
    }
10,991✔
510
}
×
511

×
512
fn get_key(
4,440✔
513
    schema: &Schema,
4,440✔
514
    record: &Record,
4,440✔
515
    dimensions: &[Expression],
4,440✔
516
) -> Result<Vec<u8>, PipelineError> {
4,440✔
517
    let mut tot_size = 0_usize;
4,440✔
518
    let mut buffers = Vec::<Vec<u8>>::with_capacity(dimensions.len());
4,440✔
519

×
520
    for dimension in dimensions.iter() {
4,440✔
521
        let value = dimension.evaluate(record, schema)?;
4,440✔
522
        let bytes = value.encode();
4,440✔
523
        tot_size += bytes.len();
4,440✔
524
        buffers.push(bytes);
4,440✔
525
    }
×
526

×
527
    let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
4,440✔
528
    for i in buffers {
8,880✔
529
        res_buffer.extend(i);
4,440✔
530
    }
4,440✔
531
    Ok(res_buffer)
4,440✔
532
}
4,440✔
533

×
534
impl Processor for AggregationProcessor {
×
535
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
74✔
536
        internal_err!(self.init_store(state))
×
537
    }
74✔
538

×
539
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
31✔
540
        Ok(())
31✔
541
    }
31✔
542

×
543
    fn process(
10,696✔
544
        &mut self,
10,696✔
545
        _from_port: PortHandle,
10,696✔
546
        op: Operation,
10,696✔
547
        fw: &mut dyn ProcessorChannelForwarder,
10,696✔
548
        txn: &SharedTransaction,
10,696✔
549
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
10,696✔
550
    ) -> Result<(), ExecutionError> {
10,696✔
551
        match self.db {
10,696✔
552
            Some(d) => {
10,696✔
553
                let ops = internal_err!(self.aggregate(&mut txn.write(), d, op))?;
10,696✔
554
                for fop in ops {
21,392✔
555
                    fw.send(fop, DEFAULT_PORT_HANDLE)?;
10,696✔
556
                }
×
557
                Ok(())
10,696✔
558
            }
×
559
            _ => Err(ExecutionError::InvalidDatabase),
×
560
        }
×
561
    }
10,696✔
562
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc