• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4116183752

pending completion
4116183752

push

github

GitHub
refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync` (#821)

790 of 790 new or added lines in 44 files covered. (100.0%)

23005 of 33842 relevant lines covered (67.98%)

56312.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.31
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2
use crate::deserialize;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::execution::ExpressionExecutor;
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::channels::ProcessorChannelForwarder;
7
use dozer_core::errors::ExecutionError;
8
use dozer_core::errors::ExecutionError::InternalError;
9
use dozer_core::node::{PortHandle, Processor};
10
use dozer_core::storage::lmdb_storage::{
11
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
12
};
13
use dozer_core::DEFAULT_PORT_HANDLE;
14
use dozer_types::errors::types::TypeError;
15
use dozer_types::internal_err;
16
use dozer_types::types::{Field, Operation, Record, Schema};
17

18
use dozer_core::epoch::Epoch;
19
use dozer_core::record_store::RecordReader;
20
use dozer_core::storage::common::Database;
21
use dozer_core::storage::errors::StorageError::InvalidDatabase;
22
use dozer_core::storage::prefix_transaction::PrefixTransaction;
23
use lmdb::DatabaseFlags;
24
use std::{collections::HashMap, mem::size_of_val};
25

26
pub enum FieldRule {
27
    /// Represents a dimension field, generally used in the GROUP BY clause
28
    Dimension(
29
        /// Expression for this dimension
30
        Box<Expression>,
31
        /// true of this field should be included in the list of values of the
32
        /// output schema, otherwise false. Generally, this value is true if the field appears
33
        /// in the output results in addition to being in the list of the GROUP BY fields
34
        bool,
35
        /// Name of the field, if renaming is required. If `None` the original name is retained
36
        String,
37
    ),
38
    /// Represents an aggregated field that will be calculated using the appropriate aggregator
39
    Measure(
40
        /// Argument of the Aggregator
41
        Box<Expression>,
42
        /// Aggregator implementation for this measure
43
        Aggregator,
44
        /// Name of the field, if renaming is required. If `None` the original name is retained
45
        String,
46
    ),
47
}
48

49
const COUNTER_KEY: u8 = 1_u8;
50

51
pub(crate) struct AggregationData<'a> {
52
    pub value: Field,
53
    pub state: Option<&'a [u8]>,
54
    pub prefix: u32,
55
}
56

57
impl<'a> AggregationData<'a> {
×
58
    pub fn new(value: Field, state: Option<&'a [u8]>, prefix: u32) -> Self {
742✔
59
        Self {
742✔
60
            value,
742✔
61
            state,
742✔
62
            prefix,
742✔
63
        }
742✔
64
    }
742✔
65
}
66

×
67
#[derive(Debug)]
×
68
pub struct AggregationProcessor {
69
    out_dimensions: Vec<(Box<Expression>, usize)>,
70
    out_measures: Vec<(Box<Expression>, Box<Aggregator>, usize)>,
71
    pub db: Option<Database>,
72
    meta_db: Option<Database>,
73
    aggregators_db: Option<Database>,
74
    input_schema: Schema,
75
}
76

77
enum AggregatorOperation {
78
    Insert,
79
    Delete,
80
    Update,
81
}
82

83
const AGG_VALUES_DATASET_ID: u16 = 0x0000_u16;
84
const AGG_COUNT_DATASET_ID: u16 = 0x0001_u16;
85

86
const AGG_DEFAULT_DIMENSION_ID: u8 = 0xFF_u8;
87

88
impl AggregationProcessor {
×
89
    pub fn new(output_field_rules: Vec<FieldRule>, input_schema: Schema) -> Self {
65✔
90
        let (out_measures, out_dimensions) = populate_rules(&output_field_rules).unwrap();
65✔
91
        Self {
65✔
92
            out_dimensions,
65✔
93
            out_measures,
65✔
94
            db: None,
65✔
95
            meta_db: None,
65✔
96
            aggregators_db: None,
65✔
97
            input_schema,
65✔
98
        }
65✔
99
    }
65✔
100

×
101
    fn init_store(&mut self, env: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
65✔
102
        self.db = Some(env.create_database(Some("aggr"), Some(DatabaseFlags::empty()))?);
65✔
103
        self.aggregators_db =
×
104
            Some(env.create_database(Some("aggr_data"), Some(DatabaseFlags::empty()))?);
65✔
105
        self.meta_db = Some(env.create_database(Some("meta"), Some(DatabaseFlags::empty()))?);
65✔
106
        Ok(())
65✔
107
    }
65✔
108

×
109
    fn fill_dimensions(&self, in_rec: &Record, out_rec: &mut Record) -> Result<(), PipelineError> {
4,699✔
110
        for v in &self.out_dimensions {
8,594✔
111
            out_rec.set_value(v.1, v.0.evaluate(in_rec, &self.input_schema)?.clone());
3,896✔
112
        }
×
113
        Ok(())
4,698✔
114
    }
4,698✔
115

×
116
    fn get_record_key(&self, hash: &Vec<u8>, database_id: u16) -> Result<Vec<u8>, PipelineError> {
8,014✔
117
        let mut vec = Vec::with_capacity(hash.len().wrapping_add(size_of_val(&database_id)));
8,014✔
118
        vec.extend_from_slice(&database_id.to_be_bytes());
8,014✔
119
        vec.extend(hash);
8,014✔
120
        Ok(vec)
8,014✔
121
    }
8,014✔
122

×
123
    fn get_counter(&self, txn: &mut LmdbExclusiveTransaction) -> Result<u32, PipelineError> {
3,287✔
124
        let meta_db = *self
3,287✔
125
            .meta_db
3,287✔
126
            .as_ref()
3,287✔
127
            .ok_or(PipelineError::InternalStorageError(InvalidDatabase))?;
3,287✔
128
        let curr_ctr = match txn.get(meta_db, &COUNTER_KEY.to_be_bytes())? {
3,287✔
129
            Some(v) => u32::from_be_bytes(deserialize!(v)),
3,222✔
130
            None => 1_u32,
65✔
131
        };
×
132
        txn.put(
3,287✔
133
            meta_db,
3,287✔
134
            &COUNTER_KEY.to_be_bytes(),
3,287✔
135
            &(curr_ctr + 1).to_be_bytes(),
3,287✔
136
        )?;
3,287✔
137
        Ok(curr_ctr + 1)
3,287✔
138
    }
3,287✔
139

×
140
    pub(crate) fn decode_buffer(buf: &[u8]) -> Result<(usize, AggregationData), PipelineError> {
744✔
141
        let prefix = u32::from_be_bytes(buf[0..4].try_into().unwrap());
744✔
142
        let mut offset: usize = 4;
744✔
143

744✔
144
        let val_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
744✔
145
        offset += 2;
744✔
146
        let val: Field = Field::decode(&buf[offset..offset + val_len as usize])
744✔
147
            .map_err(TypeError::DeserializationError)?;
744✔
148
        offset += val_len as usize;
744✔
149
        let state_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
744✔
150
        offset += 2;
744✔
151
        let state: Option<&[u8]> = if state_len > 0 {
744✔
152
            Some(&buf[offset..offset + state_len as usize])
743✔
153
        } else {
154
            None
×
155
        };
×
156
        offset += state_len as usize;
743✔
157

743✔
158
        let r = AggregationData::new(val, state, prefix);
743✔
159
        Ok((offset, r))
743✔
160
    }
743✔
161

×
162
    pub(crate) fn encode_buffer(
4,031✔
163
        prefix: u32,
4,031✔
164
        value: &Field,
4,031✔
165
        state: &Option<Vec<u8>>,
4,031✔
166
    ) -> Result<(usize, Vec<u8>), PipelineError> {
4,031✔
167
        let mut r = Vec::with_capacity(512);
4,031✔
168
        r.extend(prefix.to_be_bytes());
4,031✔
169

4,031✔
170
        let sz_val = value.encode();
4,031✔
171
        r.extend((sz_val.len() as u16).to_be_bytes());
4,031✔
172
        r.extend(&sz_val);
4,031✔
173

×
174
        let len = if let Some(state) = state.as_ref() {
4,031✔
175
            r.extend((state.len() as u16).to_be_bytes());
4,000✔
176
            r.extend(state);
4,000✔
177
            state.len()
4,000✔
178
        } else {
×
179
            r.extend(0_u16.to_be_bytes());
31✔
180
            0_usize
31✔
181
        };
×
182

×
183
        Ok((5 + sz_val.len() + len, r))
4,031✔
184
    }
4,031✔
185

×
186
    fn calc_and_fill_measures(
4,029✔
187
        &self,
4,029✔
188
        txn: &mut LmdbExclusiveTransaction,
4,029✔
189
        cur_state: &Option<Vec<u8>>,
4,029✔
190
        deleted_record: Option<&Record>,
4,029✔
191
        inserted_record: Option<&Record>,
4,029✔
192
        out_rec_delete: &mut Record,
4,029✔
193
        out_rec_insert: &mut Record,
4,029✔
194
        op: AggregatorOperation,
4,029✔
195
    ) -> Result<Vec<u8>, PipelineError> {
4,029✔
196
        // array holding the list of states for all measures
4,029✔
197
        let mut next_state = Vec::<u8>::new();
4,029✔
198
        let mut offset: usize = 0;
4,029✔
199

×
200
        for measure in &self.out_measures {
8,057✔
201
            let curr_agg_data = match cur_state {
4,028✔
202
                Some(ref e) => {
742✔
203
                    let (len, res) = Self::decode_buffer(&e[offset..])?;
742✔
204
                    offset += len;
742✔
205
                    Some(res)
742✔
206
                }
207
                None => None,
3,286✔
208
            };
×
209

210
            let (prefix, next_state_slice) = match op {
4,028✔
211
                AggregatorOperation::Insert => {
×
212
                    let inserted_field = measure
3,845✔
213
                        .0
3,845✔
214
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
3,845✔
215
                    if let Some(curr) = curr_agg_data {
3,845✔
216
                        out_rec_delete.set_value(measure.2, curr.value);
558✔
217
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
558✔
218
                        let r = measure.1.insert(
558✔
219
                            curr.state,
558✔
220
                            &inserted_field,
558✔
221
                            measure.0.get_type(&self.input_schema)?.return_type,
558✔
222
                            &mut p_tx,
558✔
223
                            self.aggregators_db.unwrap(),
558✔
224
                        )?;
×
225
                        (curr.prefix, r)
558✔
226
                    } else {
×
227
                        let prefix = self.get_counter(txn)?;
3,287✔
228
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
3,287✔
229
                        let r = measure.1.insert(
3,287✔
230
                            None,
3,287✔
231
                            &inserted_field,
3,287✔
232
                            measure.0.get_type(&self.input_schema)?.return_type,
3,287✔
233
                            &mut p_tx,
3,287✔
234
                            self.aggregators_db.unwrap(),
3,287✔
235
                        )?;
×
236
                        (prefix, r)
3,287✔
237
                    }
238
                }
×
239
                AggregatorOperation::Delete => {
×
240
                    let deleted_field = measure
139✔
241
                        .0
139✔
242
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
139✔
243
                    if let Some(curr) = curr_agg_data {
139✔
244
                        out_rec_delete.set_value(measure.2, curr.value);
139✔
245
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
139✔
246
                        let r = measure.1.delete(
139✔
247
                            curr.state,
139✔
248
                            &deleted_field,
139✔
249
                            measure.0.get_type(&self.input_schema)?.return_type,
139✔
250
                            &mut p_tx,
139✔
251
                            self.aggregators_db.unwrap(),
139✔
252
                        )?;
×
253
                        (curr.prefix, r)
139✔
254
                    } else {
×
255
                        let prefix = self.get_counter(txn)?;
×
256
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
257
                        let r = measure.1.delete(
×
258
                            None,
×
259
                            &deleted_field,
×
260
                            measure.0.get_type(&self.input_schema)?.return_type,
×
261
                            &mut p_tx,
×
262
                            self.aggregators_db.unwrap(),
×
263
                        )?;
×
264
                        (prefix, r)
×
265
                    }
266
                }
×
267
                AggregatorOperation::Update => {
×
268
                    let deleted_field = measure
44✔
269
                        .0
44✔
270
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
44✔
271
                    let updated_field = measure
44✔
272
                        .0
44✔
273
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
44✔
274

×
275
                    if let Some(curr) = curr_agg_data {
44✔
276
                        out_rec_delete.set_value(measure.2, curr.value);
44✔
277
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
44✔
278
                        let r = measure.1.update(
44✔
279
                            curr.state,
44✔
280
                            &deleted_field,
44✔
281
                            &updated_field,
44✔
282
                            measure.0.get_type(&self.input_schema)?.return_type,
44✔
283
                            &mut p_tx,
44✔
284
                            self.aggregators_db.unwrap(),
44✔
285
                        )?;
×
286
                        (curr.prefix, r)
44✔
287
                    } else {
×
288
                        let prefix = self.get_counter(txn)?;
×
289
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
290
                        let r = measure.1.update(
×
291
                            None,
×
292
                            &deleted_field,
×
293
                            &updated_field,
×
294
                            measure.0.get_type(&self.input_schema)?.return_type,
×
295
                            &mut p_tx,
×
296
                            self.aggregators_db.unwrap(),
×
297
                        )?;
×
298
                        (prefix, r)
×
299
                    }
300
                }
301
            };
302

×
303
            next_state.extend(
304
                &Self::encode_buffer(prefix, &next_state_slice.value, &next_state_slice.state)?.1,
4,028✔
305
            );
306
            out_rec_insert.set_value(measure.2, next_state_slice.value);
4,028✔
307
        }
×
308

×
309
        Ok(next_state)
4,029✔
310
    }
4,029✔
311

×
312
    fn update_segment_count(
3,985✔
313
        &self,
3,985✔
314
        txn: &mut LmdbExclusiveTransaction,
3,985✔
315
        db: Database,
3,985✔
316
        key: Vec<u8>,
3,985✔
317
        delta: u64,
3,985✔
318
        decr: bool,
3,985✔
319
    ) -> Result<u64, PipelineError> {
3,985✔
320
        let bytes = txn.get(db, key.as_slice())?;
3,985✔
321

×
322
        let curr_count = match bytes {
3,985✔
323
            Some(b) => u64::from_be_bytes(deserialize!(b)),
698✔
324
            None => 0_u64,
3,287✔
325
        };
×
326

×
327
        let new_val = if decr {
3,985✔
328
            curr_count.wrapping_sub(delta)
139✔
329
        } else {
330
            curr_count.wrapping_add(delta)
3,846✔
331
        };
×
332

×
333
        if new_val > 0 {
3,985✔
334
            txn.put(db, key.as_slice(), new_val.to_be_bytes().as_slice())?;
3,913✔
335
        } else {
336
            txn.del(db, key.as_slice(), None)?;
72✔
337
        }
×
338
        Ok(curr_count)
3,985✔
339
    }
3,985✔
340

×
341
    fn agg_delete(
139✔
342
        &self,
139✔
343
        txn: &mut LmdbExclusiveTransaction,
139✔
344
        db: Database,
139✔
345
        old: &Record,
139✔
346
    ) -> Result<Operation, PipelineError> {
139✔
347
        let size = self.out_measures.len() + self.out_dimensions.len();
139✔
348
        let mut out_rec_insert = Record::nulls(None, size, None);
139✔
349
        let mut out_rec_delete = Record::nulls(None, size, None);
139✔
350

×
351
        let record_hash = if !self.out_dimensions.is_empty() {
139✔
352
            get_key(&self.input_schema, old, &self.out_dimensions)?
139✔
353
            //old.get_key(&self.out_dimensions.iter().map(|i| i.0).collect())
×
354
        } else {
355
            vec![AGG_DEFAULT_DIMENSION_ID]
×
356
        };
×
357

358
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
139✔
359

×
360
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
139✔
361
        let prev_count = self.update_segment_count(txn, db, record_count_key, 1, true)?;
139✔
362

×
363
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
139✔
364
        let new_state = self.calc_and_fill_measures(
139✔
365
            txn,
139✔
366
            &cur_state,
139✔
367
            Some(old),
139✔
368
            None,
139✔
369
            &mut out_rec_delete,
139✔
370
            &mut out_rec_insert,
139✔
371
            AggregatorOperation::Delete,
139✔
372
        )?;
139✔
373

×
374
        let res = if prev_count == 1 {
139✔
375
            self.fill_dimensions(old, &mut out_rec_delete)?;
72✔
376
            Operation::Delete {
72✔
377
                old: out_rec_delete,
72✔
378
            }
72✔
379
        } else {
×
380
            self.fill_dimensions(old, &mut out_rec_insert)?;
67✔
381
            self.fill_dimensions(old, &mut out_rec_delete)?;
67✔
382
            Operation::Update {
67✔
383
                new: out_rec_insert,
67✔
384
                old: out_rec_delete,
67✔
385
            }
67✔
386
        };
×
387

×
388
        if prev_count == 1 {
139✔
389
            let _ = txn.del(db, record_key.as_slice(), None)?;
72✔
390
        } else {
391
            txn.put(db, record_key.as_slice(), new_state.as_slice())?;
67✔
392
        }
×
393
        Ok(res)
139✔
394
    }
139✔
395

×
396
    fn agg_insert(
3,846✔
397
        &self,
3,846✔
398
        txn: &mut LmdbExclusiveTransaction,
3,846✔
399
        db: Database,
3,846✔
400
        new: &Record,
3,846✔
401
    ) -> Result<Operation, PipelineError> {
3,846✔
402
        let size = self.out_measures.len() + self.out_dimensions.len();
3,846✔
403
        let mut out_rec_insert = Record::nulls(None, size, None);
3,846✔
404
        let mut out_rec_delete = Record::nulls(None, size, None);
3,846✔
405

×
406
        let record_hash = if !self.out_dimensions.is_empty() {
3,846✔
407
            get_key(&self.input_schema, new, &self.out_dimensions)?
3,442✔
408
            //new.get_key(&self.out_dimensions.iter().map(|i| i.0).collect())
×
409
        } else {
410
            vec![AGG_DEFAULT_DIMENSION_ID]
404✔
411
        };
×
412

413
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
3,846✔
414

×
415
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
3,846✔
416
        self.update_segment_count(txn, db, record_count_key, 1, false)?;
3,846✔
417

×
418
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
3,846✔
419
        let new_state = self.calc_and_fill_measures(
3,846✔
420
            txn,
3,846✔
421
            &cur_state,
3,846✔
422
            None,
3,846✔
423
            Some(new),
3,846✔
424
            &mut out_rec_delete,
3,846✔
425
            &mut out_rec_insert,
3,846✔
426
            AggregatorOperation::Insert,
3,846✔
427
        )?;
3,846✔
428

×
429
        let res = if cur_state.is_none() {
3,846✔
430
            self.fill_dimensions(new, &mut out_rec_insert)?;
3,287✔
431
            Operation::Insert {
3,287✔
432
                new: out_rec_insert,
3,287✔
433
            }
3,287✔
434
        } else {
×
435
            self.fill_dimensions(new, &mut out_rec_insert)?;
559✔
436
            self.fill_dimensions(new, &mut out_rec_delete)?;
559✔
437
            Operation::Update {
559✔
438
                new: out_rec_insert,
559✔
439
                old: out_rec_delete,
559✔
440
            }
559✔
441
        };
×
442

443
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
3,846✔
444

×
445
        Ok(res)
3,846✔
446
    }
3,846✔
447

×
448
    fn agg_update(
44✔
449
        &self,
44✔
450
        txn: &mut LmdbExclusiveTransaction,
44✔
451
        db: Database,
44✔
452
        old: &Record,
44✔
453
        new: &Record,
44✔
454
        record_hash: Vec<u8>,
44✔
455
    ) -> Result<Operation, PipelineError> {
44✔
456
        let size = self.out_measures.len() + self.out_dimensions.len();
44✔
457
        let mut out_rec_insert = Record::nulls(None, size, None);
44✔
458
        let mut out_rec_delete = Record::nulls(None, size, None);
44✔
459
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
44✔
460

×
461
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
44✔
462
        let new_state = self.calc_and_fill_measures(
44✔
463
            txn,
44✔
464
            &cur_state,
44✔
465
            Some(old),
44✔
466
            Some(new),
44✔
467
            &mut out_rec_delete,
44✔
468
            &mut out_rec_insert,
44✔
469
            AggregatorOperation::Update,
44✔
470
        )?;
44✔
471

×
472
        self.fill_dimensions(new, &mut out_rec_insert)?;
44✔
473
        self.fill_dimensions(old, &mut out_rec_delete)?;
44✔
474

×
475
        let res = Operation::Update {
44✔
476
            new: out_rec_insert,
44✔
477
            old: out_rec_delete,
44✔
478
        };
44✔
479

44✔
480
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
44✔
481

×
482
        Ok(res)
44✔
483
    }
44✔
484

×
485
    pub fn aggregate(
4,006✔
486
        &self,
4,006✔
487
        txn: &mut LmdbExclusiveTransaction,
4,006✔
488
        db: Database,
4,006✔
489
        op: Operation,
4,006✔
490
    ) -> Result<Vec<Operation>, PipelineError> {
4,006✔
491
        match op {
4,006✔
492
            Operation::Insert { ref new } => Ok(vec![self.agg_insert(txn, db, new)?]),
3,823✔
493
            Operation::Delete { ref old } => Ok(vec![self.agg_delete(txn, db, old)?]),
116✔
494
            Operation::Update { ref old, ref new } => {
67✔
495
                let (old_record_hash, new_record_hash) = if self.out_dimensions.is_empty() {
67✔
496
                    (
×
497
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
498
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
499
                    )
×
500
                } else {
×
501
                    (
×
502
                        get_key(&self.input_schema, old, &self.out_dimensions)?,
67✔
503
                        get_key(&self.input_schema, new, &self.out_dimensions)?,
67✔
504
                    )
505
                    //let record_keys: Vec<usize> = self.out_dimensions.iter().map(|i| i.0).collect();
506
                    //(old.get_key(&record_keys), new.get_key(&record_keys))
507
                };
×
508

×
509
                if old_record_hash == new_record_hash {
67✔
510
                    Ok(vec![self.agg_update(txn, db, old, new, old_record_hash)?])
44✔
511
                } else {
×
512
                    Ok(vec![
23✔
513
                        self.agg_delete(txn, db, old)?,
23✔
514
                        self.agg_insert(txn, db, new)?,
23✔
515
                    ])
516
                }
517
            }
×
518
        }
519
    }
4,006✔
520
}
×
521

×
522
fn get_key(
3,715✔
523
    schema: &Schema,
3,715✔
524
    record: &Record,
3,715✔
525
    out_dimensions: &[(Box<Expression>, usize)],
3,715✔
526
) -> Result<Vec<u8>, PipelineError> {
3,715✔
527
    let mut tot_size = 0_usize;
3,715✔
528
    let mut buffers = Vec::<Vec<u8>>::with_capacity(out_dimensions.len());
3,715✔
529

×
530
    for dimension in out_dimensions.iter() {
3,715✔
531
        let value = dimension.0.evaluate(record, schema)?;
3,715✔
532
        let bytes = value.encode();
3,715✔
533
        tot_size += bytes.len();
3,715✔
534
        buffers.push(bytes);
3,715✔
535
    }
×
536

×
537
    let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
3,715✔
538
    for i in buffers {
7,429✔
539
        res_buffer.extend(i);
3,714✔
540
    }
3,714✔
541
    Ok(res_buffer)
3,715✔
542
}
3,715✔
543

×
544
impl Processor for AggregationProcessor {
×
545
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
65✔
546
        internal_err!(self.init_store(state))
×
547
    }
65✔
548

×
549
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
19✔
550
        Ok(())
19✔
551
    }
19✔
552

×
553
    fn process(
3,713✔
554
        &mut self,
3,713✔
555
        _from_port: PortHandle,
3,713✔
556
        op: Operation,
3,713✔
557
        fw: &mut dyn ProcessorChannelForwarder,
3,713✔
558
        txn: &SharedTransaction,
3,713✔
559
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
3,713✔
560
    ) -> Result<(), ExecutionError> {
3,713✔
561
        match self.db {
3,713✔
562
            Some(d) => {
3,713✔
563
                let ops = internal_err!(self.aggregate(&mut txn.write(), d, op))?;
3,713✔
564
                for fop in ops {
7,426✔
565
                    fw.send(fop, DEFAULT_PORT_HANDLE)?;
3,713✔
566
                }
567
                Ok(())
3,713✔
568
            }
569
            _ => Err(ExecutionError::InvalidDatabase),
×
570
        }
571
    }
3,713✔
572
}
573

574
type OutputRules = (
575
    Vec<(Box<Expression>, Box<Aggregator>, usize)>,
576
    Vec<(Box<Expression>, usize)>,
577
);
×
578

×
579
fn populate_rules(output_field_rules: &[FieldRule]) -> Result<OutputRules, PipelineError> {
65✔
580
    let mut out_measures: Vec<(Box<Expression>, Box<Aggregator>, usize)> = Vec::new();
65✔
581
    let mut out_dimensions: Vec<(Box<Expression>, usize)> = Vec::new();
65✔
582

×
583
    for rule in output_field_rules.iter().enumerate() {
186✔
584
        match rule.1 {
186✔
585
            FieldRule::Measure(pre_aggr, aggr, _name) => {
65✔
586
                out_measures.push((pre_aggr.clone(), Box::new(aggr.clone()), rule.0));
65✔
587
            }
65✔
588
            FieldRule::Dimension(expression, is_value, _name) => {
121✔
589
                if *is_value {
121✔
590
                    out_dimensions.push((expression.clone(), rule.0));
61✔
591
                }
61✔
592
            }
593
        }
594
    }
×
595

×
596
    Ok((out_measures, out_dimensions))
65✔
597
}
65✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc