• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3983272635

pending completion
3983272635

push

github

GitHub
fix insert after delete problem in aggregation (#709)

52 of 52 new or added lines in 2 files covered. (100.0%)

22141 of 34542 relevant lines covered (64.1%)

46204.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.59
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2
use crate::deserialize;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::execution::ExpressionExecutor;
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::dag::channels::ProcessorChannelForwarder;
7
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
8
use dozer_core::dag::errors::ExecutionError;
9
use dozer_core::dag::errors::ExecutionError::InternalError;
10
use dozer_core::dag::node::{PortHandle, Processor};
11
use dozer_core::storage::lmdb_storage::{
12
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
13
};
14
use dozer_types::errors::types::TypeError;
15
use dozer_types::internal_err;
16
use dozer_types::types::{Field, Operation, Record, Schema};
17

18
use dozer_core::dag::epoch::Epoch;
19
use dozer_core::dag::record_store::RecordReader;
20
use dozer_core::storage::common::Database;
21
use dozer_core::storage::errors::StorageError::InvalidDatabase;
22
use dozer_core::storage::prefix_transaction::PrefixTransaction;
23
use std::{collections::HashMap, mem::size_of_val};
24

25
pub enum FieldRule {
26
    /// Represents a dimension field, generally used in the GROUP BY clause
27
    Dimension(
28
        /// Expression for this dimension
29
        Box<Expression>,
30
        /// true of this field should be included in the list of values of the
31
        /// output schema, otherwise false. Generally, this value is true if the field appears
32
        /// in the output results in addition to being in the list of the GROUP BY fields
33
        bool,
34
        /// Name of the field, if renaming is required. If `None` the original name is retained
35
        String,
36
    ),
37
    /// Represents an aggregated field that will be calculated using the appropriate aggregator
38
    Measure(
39
        /// Argument of the Aggregator
40
        Box<Expression>,
41
        /// Aggregator implementation for this measure
42
        Aggregator,
43
        /// Name of the field, if renaming is required. If `None` the original name is retained
44
        String,
45
    ),
46
}
47

48
const COUNTER_KEY: u8 = 1_u8;
49

50
pub(crate) struct AggregationData<'a> {
51
    pub value: Field,
52
    pub state: Option<&'a [u8]>,
53
    pub prefix: u32,
54
}
55

56
impl<'a> AggregationData<'a> {
57
    pub fn new(value: Field, state: Option<&'a [u8]>, prefix: u32) -> Self {
11,247✔
58
        Self {
11,247✔
59
            value,
11,247✔
60
            state,
11,247✔
61
            prefix,
11,247✔
62
        }
11,247✔
63
    }
11,247✔
64
}
65

66
#[derive(Debug)]
×
67
pub struct AggregationProcessor {
68
    out_dimensions: Vec<(Box<Expression>, usize)>,
69
    out_measures: Vec<(Box<Expression>, Box<Aggregator>, usize)>,
70
    pub db: Option<Database>,
71
    meta_db: Option<Database>,
72
    aggregators_db: Option<Database>,
73
    input_schema: Schema,
74
}
75

76
enum AggregatorOperation {
77
    Insert,
78
    Delete,
79
    Update,
80
}
81

82
const AGG_VALUES_DATASET_ID: u16 = 0x0000_u16;
83
const AGG_COUNT_DATASET_ID: u16 = 0x0001_u16;
84

85
const AGG_DEFAULT_DIMENSION_ID: u8 = 0xFF_u8;
86

87
impl AggregationProcessor {
88
    pub fn new(output_field_rules: Vec<FieldRule>, input_schema: Schema) -> Self {
87✔
89
        let (out_measures, out_dimensions) = populate_rules(&output_field_rules).unwrap();
87✔
90
        Self {
87✔
91
            out_dimensions,
87✔
92
            out_measures,
87✔
93
            db: None,
87✔
94
            meta_db: None,
87✔
95
            aggregators_db: None,
87✔
96
            input_schema,
87✔
97
        }
87✔
98
    }
87✔
99

100
    fn init_store(&mut self, txn: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
87✔
101
        self.db = Some(txn.open_database("aggr", false)?);
87✔
102
        self.aggregators_db = Some(txn.open_database("aggr_data", false)?);
87✔
103
        self.meta_db = Some(txn.open_database("meta", false)?);
87✔
104
        Ok(())
87✔
105
    }
87✔
106

107
    fn fill_dimensions(&self, in_rec: &Record, out_rec: &mut Record) -> Result<(), PipelineError> {
30,518✔
108
        for v in &self.out_dimensions {
59,026✔
109
            out_rec.set_value(v.1, v.0.evaluate(in_rec, &self.input_schema)?.clone());
28,508✔
110
        }
111
        Ok(())
30,518✔
112
    }
30,518✔
113

114
    fn get_record_key(&self, hash: &Vec<u8>, database_id: u16) -> Result<Vec<u8>, PipelineError> {
38,638✔
115
        let mut vec = Vec::with_capacity(hash.len().wrapping_add(size_of_val(&database_id)));
38,638✔
116
        vec.extend_from_slice(&database_id.to_be_bytes());
38,638✔
117
        vec.extend(hash);
38,638✔
118
        Ok(vec)
38,638✔
119
    }
38,638✔
120

121
    fn get_counter(&self, txn: &mut LmdbExclusiveTransaction) -> Result<u32, PipelineError> {
8,096✔
122
        let meta_db = *self
8,096✔
123
            .meta_db
8,096✔
124
            .as_ref()
8,096✔
125
            .ok_or(PipelineError::InternalStorageError(InvalidDatabase))?;
8,096✔
126
        let curr_ctr = match txn.get(meta_db, &COUNTER_KEY.to_be_bytes())? {
8,096✔
127
            Some(v) => u32::from_be_bytes(deserialize!(v)),
8,009✔
128
            None => 1_u32,
87✔
129
        };
130
        txn.put(
8,096✔
131
            meta_db,
8,096✔
132
            &COUNTER_KEY.to_be_bytes(),
8,096✔
133
            &(curr_ctr + 1).to_be_bytes(),
8,096✔
134
        )?;
8,096✔
135
        Ok(curr_ctr + 1)
8,096✔
136
    }
8,096✔
137

138
    pub(crate) fn decode_buffer(buf: &[u8]) -> Result<(usize, AggregationData), PipelineError> {
11,247✔
139
        let prefix = u32::from_be_bytes(buf[0..4].try_into().unwrap());
11,247✔
140
        let mut offset: usize = 4;
11,247✔
141

11,247✔
142
        let val_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
11,247✔
143
        offset += 2;
11,247✔
144
        let val: Field = Field::decode(&buf[offset..offset + val_len as usize])
11,247✔
145
            .map_err(TypeError::DeserializationError)?;
11,247✔
146
        offset += val_len as usize;
11,247✔
147
        let state_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
11,247✔
148
        offset += 2;
11,247✔
149
        let state: Option<&[u8]> = if state_len > 0 {
11,247✔
150
            Some(&buf[offset..offset + state_len as usize])
11,247✔
151
        } else {
152
            None
×
153
        };
154
        offset += state_len as usize;
11,247✔
155

11,247✔
156
        let r = AggregationData::new(val, state, prefix);
11,247✔
157
        Ok((offset, r))
11,247✔
158
    }
11,247✔
159

160
    pub(crate) fn encode_buffer(
19,343✔
161
        prefix: u32,
19,343✔
162
        value: &Field,
19,343✔
163
        state: &Option<Vec<u8>>,
19,343✔
164
    ) -> Result<(usize, Vec<u8>), PipelineError> {
19,343✔
165
        let mut r = Vec::with_capacity(512);
19,343✔
166
        r.extend(prefix.to_be_bytes());
19,343✔
167

19,343✔
168
        let sz_val = value.encode();
19,343✔
169
        r.extend((sz_val.len() as u16).to_be_bytes());
19,343✔
170
        r.extend(&sz_val);
19,343✔
171

172
        let len = if let Some(state) = state.as_ref() {
19,343✔
173
            r.extend((state.len() as u16).to_be_bytes());
19,312✔
174
            r.extend(state);
19,312✔
175
            state.len()
19,312✔
176
        } else {
177
            r.extend(0_u16.to_be_bytes());
31✔
178
            0_usize
31✔
179
        };
180

181
        Ok((5 + sz_val.len() + len, r))
19,343✔
182
    }
19,343✔
183

184
    fn calc_and_fill_measures(
19,341✔
185
        &self,
19,341✔
186
        txn: &mut LmdbExclusiveTransaction,
19,341✔
187
        cur_state: &Option<Vec<u8>>,
19,341✔
188
        deleted_record: Option<&Record>,
19,341✔
189
        inserted_record: Option<&Record>,
19,341✔
190
        out_rec_delete: &mut Record,
19,341✔
191
        out_rec_insert: &mut Record,
19,341✔
192
        op: AggregatorOperation,
19,341✔
193
    ) -> Result<Vec<u8>, PipelineError> {
19,341✔
194
        // array holding the list of states for all measures
19,341✔
195
        let mut next_state = Vec::<u8>::new();
19,341✔
196
        let mut offset: usize = 0;
19,341✔
197

198
        for measure in &self.out_measures {
38,682✔
199
            let curr_agg_data = match cur_state {
19,341✔
200
                Some(ref e) => {
11,245✔
201
                    let (len, res) = Self::decode_buffer(&e[offset..])?;
11,245✔
202
                    offset += len;
11,245✔
203
                    Some(res)
11,245✔
204
                }
205
                None => None,
8,096✔
206
            };
207

208
            let (prefix, next_state_slice) = match op {
19,341✔
209
                AggregatorOperation::Insert => {
210
                    let inserted_field = measure
19,162✔
211
                        .0
19,162✔
212
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
19,162✔
213
                    if let Some(curr) = curr_agg_data {
19,162✔
214
                        out_rec_delete.set_value(measure.2, curr.value);
11,066✔
215
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
11,066✔
216
                        let r = measure.1.insert(
11,066✔
217
                            curr.state,
11,066✔
218
                            &inserted_field,
11,066✔
219
                            measure.0.get_type(&self.input_schema)?.return_type,
11,066✔
220
                            &mut p_tx,
11,066✔
221
                            self.aggregators_db.unwrap(),
11,066✔
222
                        )?;
×
223
                        (curr.prefix, r)
11,066✔
224
                    } else {
225
                        let prefix = self.get_counter(txn)?;
8,096✔
226
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
8,096✔
227
                        let r = measure.1.insert(
8,096✔
228
                            None,
8,096✔
229
                            &inserted_field,
8,096✔
230
                            measure.0.get_type(&self.input_schema)?.return_type,
8,096✔
231
                            &mut p_tx,
8,096✔
232
                            self.aggregators_db.unwrap(),
8,096✔
233
                        )?;
×
234
                        (prefix, r)
8,096✔
235
                    }
236
                }
237
                AggregatorOperation::Delete => {
238
                    let deleted_field = measure
135✔
239
                        .0
135✔
240
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
135✔
241
                    if let Some(curr) = curr_agg_data {
135✔
242
                        out_rec_delete.set_value(measure.2, curr.value);
135✔
243
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
135✔
244
                        let r = measure.1.delete(
135✔
245
                            curr.state,
135✔
246
                            &deleted_field,
135✔
247
                            measure.0.get_type(&self.input_schema)?.return_type,
135✔
248
                            &mut p_tx,
135✔
249
                            self.aggregators_db.unwrap(),
135✔
250
                        )?;
×
251
                        (curr.prefix, r)
135✔
252
                    } else {
253
                        let prefix = self.get_counter(txn)?;
×
254
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
255
                        let r = measure.1.delete(
×
256
                            None,
×
257
                            &deleted_field,
×
258
                            measure.0.get_type(&self.input_schema)?.return_type,
×
259
                            &mut p_tx,
×
260
                            self.aggregators_db.unwrap(),
×
261
                        )?;
×
262
                        (prefix, r)
×
263
                    }
264
                }
265
                AggregatorOperation::Update => {
266
                    let deleted_field = measure
44✔
267
                        .0
44✔
268
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
44✔
269
                    let updated_field = measure
44✔
270
                        .0
44✔
271
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
44✔
272

273
                    if let Some(curr) = curr_agg_data {
44✔
274
                        out_rec_delete.set_value(measure.2, curr.value);
44✔
275
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
44✔
276
                        let r = measure.1.update(
44✔
277
                            curr.state,
44✔
278
                            &deleted_field,
44✔
279
                            &updated_field,
44✔
280
                            measure.0.get_type(&self.input_schema)?.return_type,
44✔
281
                            &mut p_tx,
44✔
282
                            self.aggregators_db.unwrap(),
44✔
283
                        )?;
×
284
                        (curr.prefix, r)
44✔
285
                    } else {
286
                        let prefix = self.get_counter(txn)?;
×
287
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
288
                        let r = measure.1.update(
×
289
                            None,
×
290
                            &deleted_field,
×
291
                            &updated_field,
×
292
                            measure.0.get_type(&self.input_schema)?.return_type,
×
293
                            &mut p_tx,
×
294
                            self.aggregators_db.unwrap(),
×
295
                        )?;
×
296
                        (prefix, r)
×
297
                    }
298
                }
299
            };
300

301
            next_state.extend(
302
                &Self::encode_buffer(prefix, &next_state_slice.value, &next_state_slice.state)?.1,
19,341✔
303
            );
304
            out_rec_insert.set_value(measure.2, next_state_slice.value);
19,341✔
305
        }
306

307
        Ok(next_state)
19,341✔
308
    }
19,341✔
309

310
    fn update_segment_count(
15,687✔
311
        &self,
15,687✔
312
        txn: &mut LmdbExclusiveTransaction,
15,687✔
313
        db: Database,
15,687✔
314
        key: Vec<u8>,
15,687✔
315
        delta: u64,
15,687✔
316
        decr: bool,
15,687✔
317
    ) -> Result<u64, PipelineError> {
15,687✔
318
        let bytes = txn.get(db, key.as_slice())?;
15,687✔
319

320
        let curr_count = match bytes {
15,687✔
321
            Some(b) => u64::from_be_bytes(deserialize!(b)),
10,801✔
322
            None => 0_u64,
4,886✔
323
        };
324

325
        let new_val = if decr {
15,687✔
326
            curr_count.wrapping_sub(delta)
135✔
327
        } else {
×
328
            curr_count.wrapping_add(delta)
15,552✔
329
        };
×
330

331
        if new_val > 0 {
15,687✔
332
            txn.put(db, key.as_slice(), new_val.to_be_bytes().as_slice())?;
15,619✔
333
        } else {
×
334
            txn.del(db, key.as_slice(), None)?;
68✔
335
        }
×
336
        Ok(curr_count)
15,687✔
337
    }
15,687✔
338

339
    fn agg_delete(
135✔
340
        &self,
135✔
341
        txn: &mut LmdbExclusiveTransaction,
135✔
342
        db: Database,
135✔
343
        old: &Record,
135✔
344
    ) -> Result<Operation, PipelineError> {
135✔
345
        let size = self.out_measures.len() + self.out_dimensions.len();
135✔
346
        let mut out_rec_insert = Record::nulls(None, size, None);
135✔
347
        let mut out_rec_delete = Record::nulls(None, size, None);
135✔
348

349
        let record_hash = if !self.out_dimensions.is_empty() {
135✔
350
            get_key(&self.input_schema, old, &self.out_dimensions)?
135✔
351
            //old.get_key(&self.out_dimensions.iter().map(|i| i.0).collect())
352
        } else {
353
            vec![AGG_DEFAULT_DIMENSION_ID]
×
354
        };
355

356
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
135✔
357

358
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
135✔
359
        let prev_count = self.update_segment_count(txn, db, record_count_key, 1, true)?;
135✔
360

361
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
135✔
362
        let new_state = self.calc_and_fill_measures(
135✔
363
            txn,
135✔
364
            &cur_state,
135✔
365
            Some(old),
135✔
366
            None,
135✔
367
            &mut out_rec_delete,
135✔
368
            &mut out_rec_insert,
135✔
369
            AggregatorOperation::Delete,
135✔
370
        )?;
135✔
371

372
        let res = if prev_count == 1 {
135✔
373
            self.fill_dimensions(old, &mut out_rec_delete)?;
68✔
374
            Operation::Delete {
68✔
375
                old: out_rec_delete,
68✔
376
            }
68✔
377
        } else {
378
            self.fill_dimensions(old, &mut out_rec_insert)?;
67✔
379
            self.fill_dimensions(old, &mut out_rec_delete)?;
67✔
380
            Operation::Update {
67✔
381
                new: out_rec_insert,
67✔
382
                old: out_rec_delete,
67✔
383
            }
67✔
384
        };
385

386
        if prev_count == 1 {
135✔
387
            let _ = txn.del(db, record_key.as_slice(), None)?;
68✔
388
        } else {
389
            txn.put(db, record_key.as_slice(), new_state.as_slice())?;
67✔
390
        }
391
        Ok(res)
135✔
392
    }
135✔
393

394
    fn agg_insert(
19,162✔
395
        &self,
19,162✔
396
        txn: &mut LmdbExclusiveTransaction,
19,162✔
397
        db: Database,
19,162✔
398
        new: &Record,
19,162✔
399
    ) -> Result<Operation, PipelineError> {
19,162✔
400
        let size = self.out_measures.len() + self.out_dimensions.len();
19,162✔
401
        let mut out_rec_insert = Record::nulls(None, size, None);
19,162✔
402
        let mut out_rec_delete = Record::nulls(None, size, None);
19,162✔
403

404
        let record_hash = if !self.out_dimensions.is_empty() {
19,162✔
405
            get_key(&self.input_schema, new, &self.out_dimensions)?
18,152✔
406
            //new.get_key(&self.out_dimensions.iter().map(|i| i.0).collect())
407
        } else {
408
            vec![AGG_DEFAULT_DIMENSION_ID]
1,010✔
409
        };
410

411
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
19,162✔
412

413
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
19,162✔
414
        self.update_segment_count(txn, db, record_count_key, 1, false)?;
19,162✔
415

416
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
19,162✔
417
        let new_state = self.calc_and_fill_measures(
19,162✔
418
            txn,
19,162✔
419
            &cur_state,
19,162✔
420
            None,
19,162✔
421
            Some(new),
19,162✔
422
            &mut out_rec_delete,
19,162✔
423
            &mut out_rec_insert,
19,162✔
424
            AggregatorOperation::Insert,
19,162✔
425
        )?;
19,162✔
426

427
        let res = if cur_state.is_none() {
19,162✔
428
            self.fill_dimensions(new, &mut out_rec_insert)?;
8,096✔
429
            Operation::Insert {
8,096✔
430
                new: out_rec_insert,
8,096✔
431
            }
8,096✔
432
        } else {
433
            self.fill_dimensions(new, &mut out_rec_insert)?;
11,066✔
434
            self.fill_dimensions(new, &mut out_rec_delete)?;
11,066✔
435
            Operation::Update {
11,066✔
436
                new: out_rec_insert,
11,066✔
437
                old: out_rec_delete,
11,066✔
438
            }
11,066✔
439
        };
440

441
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
19,162✔
442

443
        Ok(res)
19,162✔
444
    }
19,162✔
445

446
    fn agg_update(
44✔
447
        &self,
44✔
448
        txn: &mut LmdbExclusiveTransaction,
44✔
449
        db: Database,
44✔
450
        old: &Record,
44✔
451
        new: &Record,
44✔
452
        record_hash: Vec<u8>,
44✔
453
    ) -> Result<Operation, PipelineError> {
44✔
454
        let size = self.out_measures.len() + self.out_dimensions.len();
44✔
455
        let mut out_rec_insert = Record::nulls(None, size, None);
44✔
456
        let mut out_rec_delete = Record::nulls(None, size, None);
44✔
457
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
44✔
458

459
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
44✔
460
        let new_state = self.calc_and_fill_measures(
44✔
461
            txn,
44✔
462
            &cur_state,
44✔
463
            Some(old),
44✔
464
            Some(new),
44✔
465
            &mut out_rec_delete,
44✔
466
            &mut out_rec_insert,
44✔
467
            AggregatorOperation::Update,
44✔
468
        )?;
44✔
469

470
        self.fill_dimensions(new, &mut out_rec_insert)?;
44✔
471
        self.fill_dimensions(old, &mut out_rec_delete)?;
44✔
472

473
        let res = Operation::Update {
44✔
474
            new: out_rec_insert,
44✔
475
            old: out_rec_delete,
44✔
476
        };
44✔
477

44✔
478
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
44✔
479

480
        Ok(res)
44✔
481
    }
44✔
482

483
    pub fn aggregate(
19,318✔
484
        &self,
19,318✔
485
        txn: &mut LmdbExclusiveTransaction,
19,318✔
486
        db: Database,
19,318✔
487
        op: Operation,
19,318✔
488
    ) -> Result<Vec<Operation>, PipelineError> {
19,318✔
489
        match op {
19,318✔
490
            Operation::Insert { ref new } => Ok(vec![self.agg_insert(txn, db, new)?]),
19,139✔
491
            Operation::Delete { ref old } => Ok(vec![self.agg_delete(txn, db, old)?]),
112✔
492
            Operation::Update { ref old, ref new } => {
67✔
493
                let (old_record_hash, new_record_hash) = if self.out_dimensions.is_empty() {
67✔
494
                    (
×
495
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
496
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
497
                    )
×
498
                } else {
499
                    (
500
                        get_key(&self.input_schema, old, &self.out_dimensions)?,
67✔
501
                        get_key(&self.input_schema, new, &self.out_dimensions)?,
67✔
502
                    )
503
                    //let record_keys: Vec<usize> = self.out_dimensions.iter().map(|i| i.0).collect();
504
                    //(old.get_key(&record_keys), new.get_key(&record_keys))
505
                };
506

507
                if old_record_hash == new_record_hash {
67✔
508
                    Ok(vec![self.agg_update(txn, db, old, new, old_record_hash)?])
44✔
509
                } else {
510
                    Ok(vec![
23✔
511
                        self.agg_delete(txn, db, old)?,
23✔
512
                        self.agg_insert(txn, db, new)?,
23✔
513
                    ])
514
                }
515
            }
516
        }
517
    }
19,318✔
518
}
519

520
fn get_key(
18,421✔
521
    schema: &Schema,
18,421✔
522
    record: &Record,
18,421✔
523
    out_dimensions: &[(Box<Expression>, usize)],
18,421✔
524
) -> Result<Vec<u8>, PipelineError> {
18,421✔
525
    let mut tot_size = 0_usize;
18,421✔
526
    let mut buffers = Vec::<Vec<u8>>::with_capacity(out_dimensions.len());
18,421✔
527

528
    for dimension in out_dimensions.iter() {
18,421✔
529
        let value = dimension.0.evaluate(record, schema)?;
18,421✔
530
        let bytes = value.encode();
18,421✔
531
        tot_size += bytes.len();
18,421✔
532
        buffers.push(bytes);
18,421✔
533
    }
534

535
    let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
18,421✔
536
    for i in buffers {
36,842✔
537
        res_buffer.extend(i);
18,421✔
538
    }
18,421✔
539
    Ok(res_buffer)
18,421✔
540
}
18,421✔
541

542
impl Processor for AggregationProcessor {
543
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
87✔
544
        internal_err!(self.init_store(state))
×
545
    }
87✔
546

547
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
86✔
548
        Ok(())
86✔
549
    }
86✔
550

551
    fn process(
19,025✔
552
        &mut self,
19,025✔
553
        _from_port: PortHandle,
19,025✔
554
        op: Operation,
19,025✔
555
        fw: &mut dyn ProcessorChannelForwarder,
19,025✔
556
        txn: &SharedTransaction,
19,025✔
557
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
19,025✔
558
    ) -> Result<(), ExecutionError> {
19,025✔
559
        match self.db {
19,025✔
560
            Some(d) => {
19,025✔
561
                let ops = internal_err!(self.aggregate(&mut txn.write(), d, op))?;
19,025✔
562
                for fop in ops {
38,050✔
563
                    fw.send(fop, DEFAULT_PORT_HANDLE)?;
19,025✔
564
                }
565
                Ok(())
19,025✔
566
            }
567
            _ => Err(ExecutionError::InvalidDatabase),
×
568
        }
569
    }
19,025✔
570
}
571

572
type OutputRules = (
573
    Vec<(Box<Expression>, Box<Aggregator>, usize)>,
574
    Vec<(Box<Expression>, usize)>,
575
);
576

577
fn populate_rules(output_field_rules: &[FieldRule]) -> Result<OutputRules, PipelineError> {
87✔
578
    let mut out_measures: Vec<(Box<Expression>, Box<Aggregator>, usize)> = Vec::new();
87✔
579
    let mut out_dimensions: Vec<(Box<Expression>, usize)> = Vec::new();
87✔
580

581
    for rule in output_field_rules.iter().enumerate() {
240✔
582
        match rule.1 {
240✔
583
            FieldRule::Measure(pre_aggr, aggr, _name) => {
87✔
584
                out_measures.push((pre_aggr.clone(), Box::new(aggr.clone()), rule.0));
87✔
585
            }
87✔
586
            FieldRule::Dimension(expression, is_value, _name) => {
153✔
587
                if *is_value {
153✔
588
                    out_dimensions.push((expression.clone(), rule.0));
77✔
589
                }
77✔
590
            }
591
        }
592
    }
593

594
    Ok((out_measures, out_dimensions))
87✔
595
}
87✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc