• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3967387791

pending completion
3967387791

push

github

GitHub
fix: Unify typed service `query` and `on_event` record representation (#647)

25 of 25 new or added lines in 3 files covered. (100.0%)

22000 of 32915 relevant lines covered (66.84%)

36081.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.04
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2
use crate::deserialize;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::execution::ExpressionExecutor;
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::dag::channels::ProcessorChannelForwarder;
7
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
8
use dozer_core::dag::errors::ExecutionError;
9
use dozer_core::dag::errors::ExecutionError::InternalError;
10
use dozer_core::dag::node::{PortHandle, Processor};
11
use dozer_core::storage::lmdb_storage::{
12
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
13
};
14
use dozer_types::errors::types::TypeError;
15
use dozer_types::internal_err;
16
use dozer_types::types::{Field, Operation, Record, Schema};
17

18
use dozer_core::dag::epoch::Epoch;
19
use dozer_core::dag::record_store::RecordReader;
20
use dozer_core::storage::common::Database;
21
use dozer_core::storage::errors::StorageError::InvalidDatabase;
22
use dozer_core::storage::prefix_transaction::PrefixTransaction;
23
use std::{collections::HashMap, mem::size_of_val};
24

25
pub enum FieldRule {
26
    /// Represents a dimension field, generally used in the GROUP BY clause
27
    Dimension(
28
        /// Expression for this dimension
29
        Box<Expression>,
30
        /// true of this field should be included in the list of values of the
31
        /// output schema, otherwise false. Generally, this value is true if the field appears
32
        /// in the output results in addition to being in the list of the GROUP BY fields
33
        bool,
34
        /// Name of the field, if renaming is required. If `None` the original name is retained
35
        String,
36
    ),
37
    /// Represents an aggregated field that will be calculated using the appropriate aggregator
38
    Measure(
39
        /// Argument of the Aggregator
40
        Box<Expression>,
41
        /// Aggregator implementation for this measure
42
        Aggregator,
43
        /// Name of the field, if renaming is required. If `None` the original name is retained
44
        String,
45
    ),
46
}
47

48
const COUNTER_KEY: u8 = 1_u8;
49

50
pub(crate) struct AggregationData<'a> {
51
    pub value: Field,
52
    pub state: Option<&'a [u8]>,
53
    pub prefix: u32,
54
}
55

56
impl<'a> AggregationData<'a> {
57
    pub fn new(value: Field, state: Option<&'a [u8]>, prefix: u32) -> Self {
10,845✔
58
        Self {
10,845✔
59
            value,
10,845✔
60
            state,
10,845✔
61
            prefix,
10,845✔
62
        }
10,845✔
63
    }
10,845✔
64
}
65

66
#[derive(Debug)]
×
67
pub struct AggregationProcessor {
68
    out_dimensions: Vec<(Box<Expression>, usize)>,
69
    out_measures: Vec<(Box<Expression>, Box<Aggregator>, usize)>,
70
    pub db: Option<Database>,
71
    meta_db: Option<Database>,
72
    aggregators_db: Option<Database>,
73
    input_schema: Schema,
74
}
75

76
enum AggregatorOperation {
77
    Insert,
78
    Delete,
79
    Update,
80
}
81

82
const AGG_VALUES_DATASET_ID: u16 = 0x0000_u16;
83
const AGG_COUNT_DATASET_ID: u16 = 0x0001_u16;
84

85
const AGG_DEFAULT_DIMENSION_ID: u8 = 0xFF_u8;
86

87
impl AggregationProcessor {
88
    pub fn new(output_field_rules: Vec<FieldRule>, input_schema: Schema) -> Self {
63✔
89
        let (out_measures, out_dimensions) = populate_rules(&output_field_rules).unwrap();
63✔
90
        Self {
63✔
91
            out_dimensions,
63✔
92
            out_measures,
63✔
93
            db: None,
63✔
94
            meta_db: None,
63✔
95
            aggregators_db: None,
63✔
96
            input_schema,
63✔
97
        }
63✔
98
    }
63✔
99

100
    fn init_store(&mut self, txn: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
64✔
101
        self.db = Some(txn.open_database("aggr", false)?);
64✔
102
        self.aggregators_db = Some(txn.open_database("aggr_data", false)?);
64✔
103
        self.meta_db = Some(txn.open_database("meta", false)?);
64✔
104
        Ok(())
64✔
105
    }
64✔
106

107
    fn fill_dimensions(&self, in_rec: &Record, out_rec: &mut Record) -> Result<(), PipelineError> {
22,906✔
108
        for v in &self.out_dimensions {
44,606✔
109
            out_rec.set_value(v.1, v.0.evaluate(in_rec, &self.input_schema)?.clone());
21,700✔
110
        }
111
        Ok(())
22,906✔
112
    }
22,906✔
113

114
    fn get_record_key(&self, hash: &Vec<u8>, database_id: u16) -> Result<Vec<u8>, PipelineError> {
24,213✔
115
        let mut vec = Vec::with_capacity(hash.len().wrapping_add(size_of_val(&database_id)));
24,213✔
116
        vec.extend_from_slice(&database_id.to_be_bytes());
24,213✔
117
        vec.extend(hash);
24,213✔
118
        Ok(vec)
24,213✔
119
    }
24,213✔
120

121
    fn get_counter(&self, txn: &mut LmdbExclusiveTransaction) -> Result<u32, PipelineError> {
1,287✔
122
        let meta_db = *self
1,287✔
123
            .meta_db
1,287✔
124
            .as_ref()
1,287✔
125
            .ok_or(PipelineError::InternalStorageError(InvalidDatabase))?;
1,287✔
126
        let curr_ctr = match txn.get(meta_db, &COUNTER_KEY.to_be_bytes())? {
1,287✔
127
            Some(v) => u32::from_be_bytes(deserialize!(v)),
1,223✔
128
            None => 1_u32,
64✔
129
        };
130
        txn.put(
1,287✔
131
            meta_db,
1,287✔
132
            &COUNTER_KEY.to_be_bytes(),
1,287✔
133
            &(curr_ctr + 1).to_be_bytes(),
1,287✔
134
        )?;
1,287✔
135
        Ok(curr_ctr + 1)
1,287✔
136
    }
1,287✔
137

138
    pub(crate) fn decode_buffer(buf: &[u8]) -> Result<(usize, AggregationData), PipelineError> {
10,845✔
139
        let prefix = u32::from_be_bytes(buf[0..4].try_into().unwrap());
10,845✔
140
        let mut offset: usize = 4;
10,845✔
141

10,845✔
142
        let val_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
10,845✔
143
        offset += 2;
10,845✔
144
        let val: Field = Field::decode(&buf[offset..offset + val_len as usize])
10,845✔
145
            .map_err(TypeError::DeserializationError)?;
10,845✔
146
        offset += val_len as usize;
10,845✔
147
        let state_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
10,845✔
148
        offset += 2;
10,845✔
149
        let state: Option<&[u8]> = if state_len > 0 {
10,845✔
150
            Some(&buf[offset..offset + state_len as usize])
10,845✔
151
        } else {
152
            None
×
153
        };
154
        offset += state_len as usize;
10,845✔
155

10,845✔
156
        let r = AggregationData::new(val, state, prefix);
10,845✔
157
        Ok((offset, r))
10,845✔
158
    }
10,845✔
159

160
    pub(crate) fn encode_buffer(
12,132✔
161
        prefix: u32,
12,132✔
162
        value: &Field,
12,132✔
163
        state: &Option<Vec<u8>>,
12,132✔
164
    ) -> Result<(usize, Vec<u8>), PipelineError> {
12,132✔
165
        let mut r = Vec::with_capacity(512);
12,132✔
166
        r.extend(prefix.to_be_bytes());
12,132✔
167

12,132✔
168
        let sz_val = value.encode();
12,132✔
169
        r.extend((sz_val.len() as u16).to_be_bytes());
12,132✔
170
        r.extend(&sz_val);
12,132✔
171

172
        let len = if let Some(state) = state.as_ref() {
12,132✔
173
            r.extend((state.len() as u16).to_be_bytes());
12,101✔
174
            r.extend(state);
12,101✔
175
            state.len()
12,101✔
176
        } else {
177
            r.extend(0_u16.to_be_bytes());
31✔
178
            0_usize
31✔
179
        };
180

181
        Ok((5 + sz_val.len() + len, r))
12,132✔
182
    }
12,132✔
183

184
    fn calc_and_fill_measures(
12,129✔
185
        &self,
12,129✔
186
        txn: &mut LmdbExclusiveTransaction,
12,129✔
187
        cur_state: &Option<Vec<u8>>,
12,129✔
188
        deleted_record: Option<&Record>,
12,129✔
189
        inserted_record: Option<&Record>,
12,129✔
190
        out_rec_delete: &mut Record,
12,129✔
191
        out_rec_insert: &mut Record,
12,129✔
192
        op: AggregatorOperation,
12,129✔
193
    ) -> Result<Vec<u8>, PipelineError> {
12,129✔
194
        // array holding the list of states for all measures
12,129✔
195
        let mut next_state = Vec::<u8>::new();
12,129✔
196
        let mut offset: usize = 0;
12,129✔
197

198
        for measure in &self.out_measures {
24,259✔
199
            let curr_agg_data = match cur_state {
12,129✔
200
                Some(ref e) => {
10,843✔
201
                    let (len, res) = Self::decode_buffer(&e[offset..])?;
10,843✔
202
                    offset += len;
10,843✔
203
                    Some(res)
10,843✔
204
                }
205
                None => None,
1,286✔
206
            };
207

208
            let (prefix, next_state_slice) = match op {
12,129✔
209
                AggregatorOperation::Insert => {
210
                    let inserted_field = measure
11,951✔
211
                        .0
11,951✔
212
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
11,951✔
213
                    if let Some(curr) = curr_agg_data {
11,951✔
214
                        out_rec_delete.set_value(measure.2, curr.value);
10,664✔
215
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
10,664✔
216
                        let r = measure.1.insert(
10,664✔
217
                            curr.state,
10,664✔
218
                            &inserted_field,
10,664✔
219
                            measure.0.get_type(&self.input_schema)?.return_type,
10,664✔
220
                            &mut p_tx,
10,664✔
221
                            self.aggregators_db.unwrap(),
10,664✔
222
                        )?;
×
223
                        (curr.prefix, r)
10,664✔
224
                    } else {
225
                        let prefix = self.get_counter(txn)?;
1,287✔
226
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
1,287✔
227
                        let r = measure.1.insert(
1,287✔
228
                            None,
1,287✔
229
                            &inserted_field,
1,287✔
230
                            measure.0.get_type(&self.input_schema)?.return_type,
1,287✔
231
                            &mut p_tx,
1,287✔
232
                            self.aggregators_db.unwrap(),
1,287✔
233
                        )?;
×
234
                        (prefix, r)
1,287✔
235
                    }
236
                }
237
                AggregatorOperation::Delete => {
238
                    let deleted_field = measure
134✔
239
                        .0
134✔
240
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
134✔
241
                    if let Some(curr) = curr_agg_data {
134✔
242
                        out_rec_delete.set_value(measure.2, curr.value);
134✔
243
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
134✔
244
                        let r = measure.1.delete(
134✔
245
                            curr.state,
134✔
246
                            &deleted_field,
134✔
247
                            measure.0.get_type(&self.input_schema)?.return_type,
134✔
248
                            &mut p_tx,
134✔
249
                            self.aggregators_db.unwrap(),
134✔
250
                        )?;
×
251
                        (curr.prefix, r)
134✔
252
                    } else {
253
                        let prefix = self.get_counter(txn)?;
×
254
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
255
                        let r = measure.1.delete(
×
256
                            None,
×
257
                            &deleted_field,
×
258
                            measure.0.get_type(&self.input_schema)?.return_type,
×
259
                            &mut p_tx,
×
260
                            self.aggregators_db.unwrap(),
×
261
                        )?;
×
262
                        (prefix, r)
×
263
                    }
264
                }
265
                AggregatorOperation::Update => {
266
                    let deleted_field = measure
44✔
267
                        .0
44✔
268
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
44✔
269
                    let updated_field = measure
44✔
270
                        .0
44✔
271
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
44✔
272

273
                    if let Some(curr) = curr_agg_data {
44✔
274
                        out_rec_delete.set_value(measure.2, curr.value);
44✔
275
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
44✔
276
                        let r = measure.1.update(
44✔
277
                            curr.state,
44✔
278
                            &deleted_field,
44✔
279
                            &updated_field,
44✔
280
                            measure.0.get_type(&self.input_schema)?.return_type,
44✔
281
                            &mut p_tx,
44✔
282
                            self.aggregators_db.unwrap(),
44✔
283
                        )?;
×
284
                        (curr.prefix, r)
44✔
285
                    } else {
286
                        let prefix = self.get_counter(txn)?;
×
287
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
288
                        let r = measure.1.update(
×
289
                            None,
×
290
                            &deleted_field,
×
291
                            &updated_field,
×
292
                            measure.0.get_type(&self.input_schema)?.return_type,
×
293
                            &mut p_tx,
×
294
                            self.aggregators_db.unwrap(),
×
295
                        )?;
×
296
                        (prefix, r)
×
297
                    }
298
                }
299
            };
300

301
            next_state.extend(
302
                &Self::encode_buffer(prefix, &next_state_slice.value, &next_state_slice.state)?.1,
12,129✔
303
            );
304
            out_rec_insert.set_value(measure.2, next_state_slice.value);
12,130✔
305
        }
306

307
        Ok(next_state)
12,130✔
308
    }
12,130✔
309

310
    fn update_segment_count(
12,086✔
311
        &self,
12,086✔
312
        txn: &mut LmdbExclusiveTransaction,
12,086✔
313
        db: Database,
12,086✔
314
        key: Vec<u8>,
12,086✔
315
        delta: u64,
12,086✔
316
        decr: bool,
12,086✔
317
    ) -> Result<u64, PipelineError> {
12,086✔
318
        let bytes = txn.get(db, key.as_slice())?;
12,086✔
319

320
        let curr_count = match bytes {
12,086✔
321
            Some(b) => u64::from_be_bytes(deserialize!(b)),
10,799✔
322
            None => 0_u64,
1,287✔
323
        };
324

325
        txn.put(
326
            db,
12,086✔
327
            key.as_slice(),
12,086✔
328
            (if decr {
12,086✔
329
                curr_count.wrapping_sub(delta)
134✔
330
            } else {
331
                curr_count.wrapping_add(delta)
11,952✔
332
            })
333
            .to_be_bytes()
12,086✔
334
            .as_slice(),
12,086✔
335
        )?;
×
336
        Ok(curr_count)
12,086✔
337
    }
12,086✔
338

339
    fn agg_delete(
134✔
340
        &self,
134✔
341
        txn: &mut LmdbExclusiveTransaction,
134✔
342
        db: Database,
134✔
343
        old: &Record,
134✔
344
    ) -> Result<Operation, PipelineError> {
134✔
345
        let size = self.out_measures.len() + self.out_dimensions.len();
134✔
346
        let mut out_rec_insert = Record::nulls(None, size, None);
134✔
347
        let mut out_rec_delete = Record::nulls(None, size, None);
134✔
348

349
        let record_hash = if !self.out_dimensions.is_empty() {
134✔
350
            get_key(&self.input_schema, old, &self.out_dimensions)?
134✔
351
            //old.get_key(&self.out_dimensions.iter().map(|i| i.0).collect())
352
        } else {
353
            vec![AGG_DEFAULT_DIMENSION_ID]
×
354
        };
355

356
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
134✔
357

358
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
134✔
359
        let prev_count = self.update_segment_count(txn, db, record_count_key, 1, true)?;
134✔
360

361
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
134✔
362
        let new_state = self.calc_and_fill_measures(
134✔
363
            txn,
134✔
364
            &cur_state,
134✔
365
            Some(old),
134✔
366
            None,
134✔
367
            &mut out_rec_delete,
134✔
368
            &mut out_rec_insert,
134✔
369
            AggregatorOperation::Delete,
134✔
370
        )?;
134✔
371

372
        let res = if prev_count == 1 {
134✔
373
            self.fill_dimensions(old, &mut out_rec_delete)?;
67✔
374
            Operation::Delete {
67✔
375
                old: out_rec_delete,
67✔
376
            }
67✔
377
        } else {
378
            self.fill_dimensions(old, &mut out_rec_insert)?;
67✔
379
            self.fill_dimensions(old, &mut out_rec_delete)?;
67✔
380
            Operation::Update {
67✔
381
                new: out_rec_insert,
67✔
382
                old: out_rec_delete,
67✔
383
            }
67✔
384
        };
385

386
        if prev_count > 0 {
134✔
387
            txn.put(db, record_key.as_slice(), new_state.as_slice())?;
134✔
388
        } else {
389
            let _ = txn.del(db, record_key.as_slice(), None)?;
×
390
        }
391
        Ok(res)
134✔
392
    }
134✔
393

394
    fn agg_insert(
11,952✔
395
        &self,
11,952✔
396
        txn: &mut LmdbExclusiveTransaction,
11,952✔
397
        db: Database,
11,952✔
398
        new: &Record,
11,952✔
399
    ) -> Result<Operation, PipelineError> {
11,952✔
400
        let size = self.out_measures.len() + self.out_dimensions.len();
11,952✔
401
        let mut out_rec_insert = Record::nulls(None, size, None);
11,952✔
402
        let mut out_rec_delete = Record::nulls(None, size, None);
11,952✔
403

404
        let record_hash = if !self.out_dimensions.is_empty() {
11,952✔
405
            get_key(&self.input_schema, new, &self.out_dimensions)?
11,346✔
406
            //new.get_key(&self.out_dimensions.iter().map(|i| i.0).collect())
407
        } else {
408
            vec![AGG_DEFAULT_DIMENSION_ID]
606✔
409
        };
410

411
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
11,952✔
412

413
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
11,952✔
414
        self.update_segment_count(txn, db, record_count_key, 1, false)?;
11,952✔
415

416
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
11,952✔
417
        let new_state = self.calc_and_fill_measures(
11,952✔
418
            txn,
11,952✔
419
            &cur_state,
11,952✔
420
            None,
11,952✔
421
            Some(new),
11,952✔
422
            &mut out_rec_delete,
11,952✔
423
            &mut out_rec_insert,
11,952✔
424
            AggregatorOperation::Insert,
11,952✔
425
        )?;
11,952✔
426

427
        let res = if cur_state.is_none() {
11,952✔
428
            self.fill_dimensions(new, &mut out_rec_insert)?;
1,287✔
429
            Operation::Insert {
1,287✔
430
                new: out_rec_insert,
1,287✔
431
            }
1,287✔
432
        } else {
433
            self.fill_dimensions(new, &mut out_rec_insert)?;
10,665✔
434
            self.fill_dimensions(new, &mut out_rec_delete)?;
10,665✔
435
            Operation::Update {
10,665✔
436
                new: out_rec_insert,
10,665✔
437
                old: out_rec_delete,
10,665✔
438
            }
10,665✔
439
        };
440

441
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
11,952✔
442

443
        Ok(res)
11,952✔
444
    }
11,952✔
445

446
    fn agg_update(
44✔
447
        &self,
44✔
448
        txn: &mut LmdbExclusiveTransaction,
44✔
449
        db: Database,
44✔
450
        old: &Record,
44✔
451
        new: &Record,
44✔
452
        record_hash: Vec<u8>,
44✔
453
    ) -> Result<Operation, PipelineError> {
44✔
454
        let size = self.out_measures.len() + self.out_dimensions.len();
44✔
455
        let mut out_rec_insert = Record::nulls(None, size, None);
44✔
456
        let mut out_rec_delete = Record::nulls(None, size, None);
44✔
457
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
44✔
458

459
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
44✔
460
        let new_state = self.calc_and_fill_measures(
44✔
461
            txn,
44✔
462
            &cur_state,
44✔
463
            Some(old),
44✔
464
            Some(new),
44✔
465
            &mut out_rec_delete,
44✔
466
            &mut out_rec_insert,
44✔
467
            AggregatorOperation::Update,
44✔
468
        )?;
44✔
469

470
        self.fill_dimensions(new, &mut out_rec_insert)?;
44✔
471
        self.fill_dimensions(old, &mut out_rec_delete)?;
44✔
472

473
        let res = Operation::Update {
44✔
474
            new: out_rec_insert,
44✔
475
            old: out_rec_delete,
44✔
476
        };
44✔
477

44✔
478
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
44✔
479

480
        Ok(res)
44✔
481
    }
44✔
482

483
    pub fn aggregate(
12,107✔
484
        &self,
12,107✔
485
        txn: &mut LmdbExclusiveTransaction,
12,107✔
486
        db: Database,
12,107✔
487
        op: Operation,
12,107✔
488
    ) -> Result<Vec<Operation>, PipelineError> {
12,107✔
489
        match op {
12,107✔
490
            Operation::Insert { ref new } => Ok(vec![self.agg_insert(txn, db, new)?]),
11,929✔
491
            Operation::Delete { ref old } => Ok(vec![self.agg_delete(txn, db, old)?]),
111✔
492
            Operation::Update { ref old, ref new } => {
67✔
493
                let (old_record_hash, new_record_hash) = if self.out_dimensions.is_empty() {
67✔
494
                    (
×
495
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
496
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
497
                    )
×
498
                } else {
499
                    (
500
                        get_key(&self.input_schema, old, &self.out_dimensions)?,
67✔
501
                        get_key(&self.input_schema, new, &self.out_dimensions)?,
67✔
502
                    )
503
                    //let record_keys: Vec<usize> = self.out_dimensions.iter().map(|i| i.0).collect();
504
                    //(old.get_key(&record_keys), new.get_key(&record_keys))
505
                };
506

507
                if old_record_hash == new_record_hash {
67✔
508
                    Ok(vec![self.agg_update(txn, db, old, new, old_record_hash)?])
44✔
509
                } else {
510
                    Ok(vec![
23✔
511
                        self.agg_delete(txn, db, old)?,
23✔
512
                        self.agg_insert(txn, db, new)?,
23✔
513
                    ])
514
                }
515
            }
516
        }
517
    }
12,107✔
518
}
519

520
fn get_key(
11,614✔
521
    schema: &Schema,
11,614✔
522
    record: &Record,
11,614✔
523
    out_dimensions: &[(Box<Expression>, usize)],
11,614✔
524
) -> Result<Vec<u8>, PipelineError> {
11,614✔
525
    let mut tot_size = 0_usize;
11,614✔
526
    let mut buffers = Vec::<Vec<u8>>::with_capacity(out_dimensions.len());
11,614✔
527

528
    for dimension in out_dimensions.iter() {
11,614✔
529
        let value = dimension.0.evaluate(record, schema)?;
11,614✔
530
        let bytes = value.encode();
11,614✔
531
        tot_size += bytes.len();
11,614✔
532
        buffers.push(bytes);
11,614✔
533
    }
534

535
    let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
11,614✔
536
    for i in buffers {
23,228✔
537
        res_buffer.extend(i);
11,614✔
538
    }
11,614✔
539
    Ok(res_buffer)
11,614✔
540
}
11,614✔
541

542
impl Processor for AggregationProcessor {
543
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
63✔
544
        internal_err!(self.init_store(state))
×
545
    }
63✔
546

547
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
37✔
548
        Ok(())
37✔
549
    }
37✔
550

551
    fn process(
11,817✔
552
        &mut self,
11,817✔
553
        _from_port: PortHandle,
11,817✔
554
        op: Operation,
11,817✔
555
        fw: &mut dyn ProcessorChannelForwarder,
11,817✔
556
        txn: &SharedTransaction,
11,817✔
557
        _reader: &HashMap<PortHandle, RecordReader>,
11,817✔
558
    ) -> Result<(), ExecutionError> {
11,817✔
559
        match self.db {
11,817✔
560
            Some(d) => {
11,817✔
561
                let ops = internal_err!(self.aggregate(&mut txn.write(), d, op))?;
11,817✔
562
                for fop in ops {
23,634✔
563
                    fw.send(fop, DEFAULT_PORT_HANDLE)?;
11,817✔
564
                }
565
                Ok(())
11,817✔
566
            }
567
            _ => Err(ExecutionError::InvalidDatabase),
×
568
        }
569
    }
11,817✔
570
}
571

572
type OutputRules = (
573
    Vec<(Box<Expression>, Box<Aggregator>, usize)>,
574
    Vec<(Box<Expression>, usize)>,
575
);
576

577
fn populate_rules(output_field_rules: &[FieldRule]) -> Result<OutputRules, PipelineError> {
63✔
578
    let mut out_measures: Vec<(Box<Expression>, Box<Aggregator>, usize)> = Vec::new();
63✔
579
    let mut out_dimensions: Vec<(Box<Expression>, usize)> = Vec::new();
63✔
580

581
    for rule in output_field_rules.iter().enumerate() {
178✔
582
        match rule.1 {
178✔
583
            FieldRule::Measure(pre_aggr, aggr, _name) => {
64✔
584
                out_measures.push((pre_aggr.clone(), Box::new(aggr.clone()), rule.0));
64✔
585
            }
64✔
586
            FieldRule::Dimension(expression, is_value, _name) => {
114✔
587
                if *is_value {
114✔
588
                    out_dimensions.push((expression.clone(), rule.0));
57✔
589
                }
57✔
590
            }
591
        }
592
    }
593

594
    Ok((out_measures, out_dimensions))
64✔
595
}
64✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc