• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283961027

pending completion
4283961027

push

github

GitHub
feat: Blue green cache (#1061)

645 of 645 new or added lines in 45 files covered. (100.0%)

27779 of 39307 relevant lines covered (70.67%)

52489.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.47
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2
use crate::deserialize;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::execution::ExpressionExecutor;
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::channels::ProcessorChannelForwarder;
7
use dozer_core::errors::ExecutionError;
8
use dozer_core::errors::ExecutionError::InternalError;
9
use dozer_core::node::{PortHandle, Processor};
10
use dozer_core::storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::errors::types::TypeError;
13
use dozer_types::types::{Field, Operation, Record, Schema};
14

15
use crate::pipeline::aggregation::aggregator::get_aggregator_from_aggregation_expression;
16
use dozer_core::epoch::Epoch;
17
use dozer_core::record_store::RecordReader;
18
use dozer_core::storage::common::Database;
19
use dozer_core::storage::prefix_transaction::PrefixTransaction;
20
use lmdb::DatabaseFlags;
21
use std::{collections::HashMap, mem::size_of_val};
22

23
const COUNTER_KEY: u8 = 1_u8;
24

25
pub(crate) struct AggregationData<'a> {
26
    pub value: Field,
27
    pub state: Option<&'a [u8]>,
28
    pub prefix: u32,
29
}
30

31
impl<'a> AggregationData<'a> {
32
    pub fn new(value: Field, state: Option<&'a [u8]>, prefix: u32) -> Self {
11,278✔
33
        Self {
11,278✔
34
            value,
11,278✔
35
            state,
11,278✔
36
            prefix,
11,278✔
37
        }
11,278✔
38
    }
11,278✔
39
}
40

41
#[derive(Debug)]
×
42
pub struct AggregationProcessor {
43
    dimensions: Vec<Expression>,
44
    measures: Vec<(Expression, Aggregator)>,
45
    projections: Vec<Expression>,
46
    pub db: Database,
47
    meta_db: Database,
48
    aggregators_db: Database,
49
    input_schema: Schema,
50
    aggregation_schema: Schema,
51
}
52

53
enum AggregatorOperation {
54
    Insert,
55
    Delete,
56
    Update,
57
}
58

59
const AGG_VALUES_DATASET_ID: u16 = 0x0000_u16;
60
const AGG_COUNT_DATASET_ID: u16 = 0x0001_u16;
61

62
const AGG_DEFAULT_DIMENSION_ID: u8 = 0xFF_u8;
63

64
impl AggregationProcessor {
65
    pub fn new(
113✔
66
        dimensions: Vec<Expression>,
113✔
67
        measures: Vec<Expression>,
113✔
68
        projections: Vec<Expression>,
113✔
69
        input_schema: Schema,
113✔
70
        aggregation_schema: Schema,
113✔
71
        txn: &mut LmdbExclusiveTransaction,
113✔
72
    ) -> Result<Self, PipelineError> {
113✔
73
        let mut aggregators: Vec<(Expression, Aggregator)> = Vec::new();
113✔
74
        for measure in measures {
226✔
75
            aggregators.push(get_aggregator_from_aggregation_expression(
113✔
76
                &measure,
113✔
77
                &input_schema,
113✔
78
            )?)
113✔
79
        }
80

81
        Ok(Self {
82
            dimensions,
113✔
83
            measures: aggregators,
113✔
84
            projections,
113✔
85
            db: txn.create_database(Some("aggr"), Some(DatabaseFlags::empty()))?,
113✔
86
            meta_db: txn.create_database(Some("meta"), Some(DatabaseFlags::empty()))?,
113✔
87
            aggregators_db: txn.create_database(Some("aggr_data"), Some(DatabaseFlags::empty()))?,
113✔
88
            input_schema,
113✔
89
            aggregation_schema,
113✔
90
        })
91
    }
113✔
92

93
    fn get_record_key(&self, hash: &Vec<u8>, database_id: u16) -> Result<Vec<u8>, PipelineError> {
42,772✔
94
        let mut vec = Vec::with_capacity(hash.len().wrapping_add(size_of_val(&database_id)));
42,772✔
95
        vec.extend_from_slice(&database_id.to_be_bytes());
42,772✔
96
        vec.extend(hash);
42,772✔
97
        Ok(vec)
42,772✔
98
    }
42,772✔
99

100
    fn get_counter(&self, txn: &mut LmdbExclusiveTransaction) -> Result<u32, PipelineError> {
10,132✔
101
        let curr_ctr = match txn.get(self.meta_db, &COUNTER_KEY.to_be_bytes())? {
10,132✔
102
            Some(v) => u32::from_be_bytes(deserialize!(v)),
10,019✔
103
            None => 1_u32,
113✔
104
        };
105
        txn.put(
10,132✔
106
            self.meta_db,
10,132✔
107
            &COUNTER_KEY.to_be_bytes(),
10,132✔
108
            &(curr_ctr + 1).to_be_bytes(),
10,132✔
109
        )?;
10,132✔
110
        Ok(curr_ctr + 1)
10,132✔
111
    }
10,132✔
112

113
    pub(crate) fn decode_buffer(buf: &[u8]) -> Result<(usize, AggregationData), PipelineError> {
11,278✔
114
        let prefix = u32::from_be_bytes(buf[0..4].try_into().unwrap());
11,278✔
115
        let mut offset: usize = 4;
11,278✔
116

11,278✔
117
        let val_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
11,278✔
118
        offset += 2;
11,278✔
119
        let val: Field = Field::decode(&buf[offset..offset + val_len as usize])
11,278✔
120
            .map_err(TypeError::DeserializationError)?;
11,278✔
121
        offset += val_len as usize;
11,278✔
122
        let state_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
11,278✔
123
        offset += 2;
11,278✔
124
        let state: Option<&[u8]> = if state_len > 0 {
11,278✔
125
            Some(&buf[offset..offset + state_len as usize])
11,278✔
126
        } else {
127
            None
×
128
        };
129
        offset += state_len as usize;
11,278✔
130

11,278✔
131
        let r = AggregationData::new(val, state, prefix);
11,278✔
132
        Ok((offset, r))
11,278✔
133
    }
11,278✔
134

135
    pub(crate) fn encode_buffer(
21,410✔
136
        prefix: u32,
21,410✔
137
        value: &Field,
21,410✔
138
        state: &Option<Vec<u8>>,
21,410✔
139
    ) -> Result<(usize, Vec<u8>), PipelineError> {
21,410✔
140
        let mut r = Vec::with_capacity(512);
21,410✔
141
        r.extend(prefix.to_be_bytes());
21,410✔
142

21,410✔
143
        let sz_val = value.encode();
21,410✔
144
        r.extend((sz_val.len() as u16).to_be_bytes());
21,410✔
145
        r.extend(&sz_val);
21,410✔
146

147
        let len = if let Some(state) = state.as_ref() {
21,410✔
148
            r.extend((state.len() as u16).to_be_bytes());
21,379✔
149
            r.extend(state);
21,379✔
150
            state.len()
21,379✔
151
        } else {
152
            r.extend(0_u16.to_be_bytes());
31✔
153
            0_usize
31✔
154
        };
155

156
        Ok((5 + sz_val.len() + len, r))
21,410✔
157
    }
21,410✔
158

159
    fn calc_and_fill_measures(
21,408✔
160
        &self,
21,408✔
161
        txn: &mut LmdbExclusiveTransaction,
21,408✔
162
        cur_state: &Option<Vec<u8>>,
21,408✔
163
        deleted_record: Option<&Record>,
21,408✔
164
        inserted_record: Option<&Record>,
21,408✔
165
        out_rec_delete: &mut Vec<Field>,
21,408✔
166
        out_rec_insert: &mut Vec<Field>,
21,408✔
167
        op: AggregatorOperation,
21,408✔
168
    ) -> Result<Vec<u8>, PipelineError> {
21,408✔
169
        // array holding the list of states for all measures
21,408✔
170
        let mut next_state = Vec::<u8>::new();
21,408✔
171
        let mut offset: usize = 0;
21,408✔
172

173
        for measure in &self.measures {
42,816✔
174
            let curr_agg_data = match cur_state {
21,408✔
175
                Some(ref e) => {
11,276✔
176
                    let (len, res) = Self::decode_buffer(&e[offset..])?;
11,276✔
177
                    offset += len;
11,276✔
178
                    Some(res)
11,276✔
179
                }
180
                None => None,
10,132✔
181
            };
182

183
            let (prefix, next_state_slice) = match op {
21,408✔
184
                AggregatorOperation::Insert => {
185
                    let inserted_field = measure
21,199✔
186
                        .0
21,199✔
187
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
21,199✔
188
                    if let Some(curr) = curr_agg_data {
21,199✔
189
                        out_rec_delete.push(curr.value);
11,067✔
190
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
11,067✔
191
                        let r = measure.1.insert(
11,067✔
192
                            curr.state,
11,067✔
193
                            &inserted_field,
11,067✔
194
                            measure.0.get_type(&self.input_schema)?.return_type,
11,067✔
195
                            &mut p_tx,
11,067✔
196
                            self.aggregators_db,
11,067✔
197
                        )?;
×
198
                        (curr.prefix, r)
11,067✔
199
                    } else {
200
                        let prefix = self.get_counter(txn)?;
10,132✔
201
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
10,132✔
202
                        let r = measure.1.insert(
10,132✔
203
                            None,
10,132✔
204
                            &inserted_field,
10,132✔
205
                            measure.0.get_type(&self.input_schema)?.return_type,
10,132✔
206
                            &mut p_tx,
10,132✔
207
                            self.aggregators_db,
10,132✔
208
                        )?;
×
209
                        (prefix, r)
10,132✔
210
                    }
211
                }
212
                AggregatorOperation::Delete => {
213
                    let deleted_field = measure
165✔
214
                        .0
165✔
215
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
165✔
216
                    if let Some(curr) = curr_agg_data {
165✔
217
                        out_rec_delete.push(curr.value);
165✔
218
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
165✔
219
                        let r = measure.1.delete(
165✔
220
                            curr.state,
165✔
221
                            &deleted_field,
165✔
222
                            measure.0.get_type(&self.input_schema)?.return_type,
165✔
223
                            &mut p_tx,
165✔
224
                            self.aggregators_db,
165✔
225
                        )?;
×
226
                        (curr.prefix, r)
165✔
227
                    } else {
228
                        let prefix = self.get_counter(txn)?;
×
229
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
230
                        let r = measure.1.delete(
×
231
                            None,
×
232
                            &deleted_field,
×
233
                            measure.0.get_type(&self.input_schema)?.return_type,
×
234
                            &mut p_tx,
×
235
                            self.aggregators_db,
×
236
                        )?;
×
237
                        (prefix, r)
×
238
                    }
239
                }
240
                AggregatorOperation::Update => {
241
                    let deleted_field = measure
44✔
242
                        .0
44✔
243
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
44✔
244
                    let updated_field = measure
44✔
245
                        .0
44✔
246
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
44✔
247

248
                    if let Some(curr) = curr_agg_data {
44✔
249
                        out_rec_delete.push(curr.value);
44✔
250
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
44✔
251
                        let r = measure.1.update(
44✔
252
                            curr.state,
44✔
253
                            &deleted_field,
44✔
254
                            &updated_field,
44✔
255
                            measure.0.get_type(&self.input_schema)?.return_type,
44✔
256
                            &mut p_tx,
44✔
257
                            self.aggregators_db,
44✔
258
                        )?;
×
259
                        (curr.prefix, r)
44✔
260
                    } else {
261
                        let prefix = self.get_counter(txn)?;
×
262
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
263
                        let r = measure.1.update(
×
264
                            None,
×
265
                            &deleted_field,
×
266
                            &updated_field,
×
267
                            measure.0.get_type(&self.input_schema)?.return_type,
×
268
                            &mut p_tx,
×
269
                            self.aggregators_db,
×
270
                        )?;
×
271
                        (prefix, r)
×
272
                    }
273
                }
274
            };
275

276
            next_state.extend(
277
                &Self::encode_buffer(prefix, &next_state_slice.value, &next_state_slice.state)?.1,
21,408✔
278
            );
279
            out_rec_insert.push(next_state_slice.value);
21,408✔
280
        }
281

282
        Ok(next_state)
21,408✔
283
    }
21,408✔
284

285
    fn update_segment_count(
21,364✔
286
        &self,
21,364✔
287
        txn: &mut LmdbExclusiveTransaction,
21,364✔
288
        db: Database,
21,364✔
289
        key: Vec<u8>,
21,364✔
290
        delta: u64,
21,364✔
291
        decr: bool,
21,364✔
292
    ) -> Result<u64, PipelineError> {
21,364✔
293
        let bytes = txn.get(db, key.as_slice())?;
21,364✔
294

295
        let curr_count = match bytes {
21,364✔
296
            Some(b) => u64::from_be_bytes(deserialize!(b)),
11,232✔
297
            None => 0_u64,
10,132✔
298
        };
299

300
        let new_val = if decr {
21,364✔
301
            curr_count.wrapping_sub(delta)
165✔
302
        } else {
303
            curr_count.wrapping_add(delta)
21,199✔
304
        };
305

306
        if new_val > 0 {
21,364✔
307
            txn.put(db, key.as_slice(), new_val.to_be_bytes().as_slice())?;
21,266✔
308
        } else {
309
            txn.del(db, key.as_slice(), None)?;
98✔
310
        }
311
        Ok(curr_count)
21,364✔
312
    }
21,364✔
313

314
    fn agg_delete(
165✔
315
        &self,
165✔
316
        txn: &mut LmdbExclusiveTransaction,
165✔
317
        db: Database,
165✔
318
        old: &mut Record,
165✔
319
    ) -> Result<Operation, PipelineError> {
165✔
320
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
165✔
321
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
165✔
322

323
        let record_hash = if !self.dimensions.is_empty() {
165✔
324
            get_key(&self.input_schema, old, &self.dimensions)?
165✔
325
        } else {
326
            vec![AGG_DEFAULT_DIMENSION_ID]
×
327
        };
328

329
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
165✔
330

331
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
165✔
332
        let prev_count = self.update_segment_count(txn, db, record_count_key, 1, true)?;
165✔
333

334
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
165✔
335
        let new_state = self.calc_and_fill_measures(
165✔
336
            txn,
165✔
337
            &cur_state,
165✔
338
            Some(old),
165✔
339
            None,
165✔
340
            &mut out_rec_delete,
165✔
341
            &mut out_rec_insert,
165✔
342
            AggregatorOperation::Delete,
165✔
343
        )?;
165✔
344

345
        let res = if prev_count == 1 {
165✔
346
            Operation::Delete {
347
                old: self.build_projection(old, out_rec_delete)?,
98✔
348
            }
349
        } else {
350
            Operation::Update {
351
                new: self.build_projection(old, out_rec_insert)?,
67✔
352
                old: self.build_projection(old, out_rec_delete)?,
67✔
353
            }
354
        };
355

356
        if prev_count == 1 {
165✔
357
            let _ = txn.del(db, record_key.as_slice(), None)?;
98✔
358
        } else {
359
            txn.put(db, record_key.as_slice(), new_state.as_slice())?;
67✔
360
        }
361
        Ok(res)
165✔
362
    }
165✔
363

364
    fn agg_insert(
21,199✔
365
        &self,
21,199✔
366
        txn: &mut LmdbExclusiveTransaction,
21,199✔
367
        db: Database,
21,199✔
368
        new: &mut Record,
21,199✔
369
    ) -> Result<Operation, PipelineError> {
21,199✔
370
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
21,199✔
371
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
21,199✔
372

373
        let record_hash = if !self.dimensions.is_empty() {
21,199✔
374
            get_key(&self.input_schema, new, &self.dimensions)?
10,189✔
375
        } else {
376
            vec![AGG_DEFAULT_DIMENSION_ID]
11,010✔
377
        };
378

379
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
21,199✔
380

381
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
21,199✔
382
        self.update_segment_count(txn, db, record_count_key, 1, false)?;
21,199✔
383

384
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
21,199✔
385
        let new_state = self.calc_and_fill_measures(
21,199✔
386
            txn,
21,199✔
387
            &cur_state,
21,199✔
388
            None,
21,199✔
389
            Some(new),
21,199✔
390
            &mut out_rec_delete,
21,199✔
391
            &mut out_rec_insert,
21,199✔
392
            AggregatorOperation::Insert,
21,199✔
393
        )?;
21,199✔
394

395
        let res = if cur_state.is_none() {
21,199✔
396
            Operation::Insert {
397
                new: self.build_projection(new, out_rec_insert)?,
10,132✔
398
            }
399
        } else {
400
            Operation::Update {
401
                new: self.build_projection(new, out_rec_insert)?,
11,067✔
402
                old: self.build_projection(new, out_rec_delete)?,
11,067✔
403
            }
404
        };
405

406
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
21,199✔
407
        Ok(res)
21,199✔
408
    }
21,199✔
409

410
    fn agg_update(
44✔
411
        &self,
44✔
412
        txn: &mut LmdbExclusiveTransaction,
44✔
413
        db: Database,
44✔
414
        old: &mut Record,
44✔
415
        new: &mut Record,
44✔
416
        record_hash: Vec<u8>,
44✔
417
    ) -> Result<Operation, PipelineError> {
44✔
418
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
44✔
419
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
44✔
420
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
44✔
421

422
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
44✔
423
        let new_state = self.calc_and_fill_measures(
44✔
424
            txn,
44✔
425
            &cur_state,
44✔
426
            Some(old),
44✔
427
            Some(new),
44✔
428
            &mut out_rec_delete,
44✔
429
            &mut out_rec_insert,
44✔
430
            AggregatorOperation::Update,
44✔
431
        )?;
44✔
432

433
        let res = Operation::Update {
44✔
434
            new: self.build_projection(new, out_rec_insert)?,
44✔
435
            old: self.build_projection(old, out_rec_delete)?,
44✔
436
        };
437

438
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
44✔
439

440
        Ok(res)
44✔
441
    }
44✔
442

443
    pub fn build_projection(
32,584✔
444
        &self,
32,584✔
445
        original: &mut Record,
32,584✔
446
        measures: Vec<Field>,
32,584✔
447
    ) -> Result<Record, PipelineError> {
32,584✔
448
        let original_len = original.values.len();
32,584✔
449
        original.values.extend(measures);
32,584✔
450
        let mut output = Vec::<Field>::with_capacity(self.projections.len());
32,584✔
451
        for exp in &self.projections {
95,744✔
452
            output.push(exp.evaluate(original, &self.aggregation_schema)?);
63,159✔
453
        }
454
        original.values.drain(original_len..);
32,585✔
455
        Ok(Record::new(None, output, None))
32,585✔
456
    }
32,585✔
457

458
    pub fn aggregate(
21,385✔
459
        &self,
21,385✔
460
        txn: &mut LmdbExclusiveTransaction,
21,385✔
461
        db: Database,
21,385✔
462
        mut op: Operation,
21,385✔
463
    ) -> Result<Vec<Operation>, PipelineError> {
21,385✔
464
        match op {
21,385✔
465
            Operation::Insert { ref mut new } => Ok(vec![self.agg_insert(txn, db, new)?]),
21,176✔
466
            Operation::Delete { ref mut old } => Ok(vec![self.agg_delete(txn, db, old)?]),
142✔
467
            Operation::Update {
468
                ref mut old,
67✔
469
                ref mut new,
67✔
470
            } => {
471
                let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() {
67✔
472
                    (
×
473
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
474
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
475
                    )
×
476
                } else {
477
                    (
478
                        get_key(&self.input_schema, old, &self.dimensions)?,
67✔
479
                        get_key(&self.input_schema, new, &self.dimensions)?,
67✔
480
                    )
481
                };
482

483
                if old_record_hash == new_record_hash {
67✔
484
                    Ok(vec![self.agg_update(txn, db, old, new, old_record_hash)?])
44✔
485
                } else {
486
                    Ok(vec![
23✔
487
                        self.agg_delete(txn, db, old)?,
23✔
488
                        self.agg_insert(txn, db, new)?,
23✔
489
                    ])
490
                }
491
            }
492
            Operation::SnapshottingDone { .. } => Ok(vec![op]),
×
493
        }
×
494
    }
21,385✔
495
}
496

×
497
fn get_key(
10,488✔
498
    schema: &Schema,
10,488✔
499
    record: &Record,
10,488✔
500
    dimensions: &[Expression],
10,488✔
501
) -> Result<Vec<u8>, PipelineError> {
10,488✔
502
    let mut tot_size = 0_usize;
10,488✔
503
    let mut buffers = Vec::<Vec<u8>>::with_capacity(dimensions.len());
10,488✔
504

×
505
    for dimension in dimensions.iter() {
10,488✔
506
        let value = dimension.evaluate(record, schema)?;
10,488✔
507
        let bytes = value.encode();
10,488✔
508
        tot_size += bytes.len();
10,488✔
509
        buffers.push(bytes);
10,488✔
510
    }
511

×
512
    let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
10,488✔
513
    for i in buffers {
20,976✔
514
        res_buffer.extend(i);
10,488✔
515
    }
10,488✔
516
    Ok(res_buffer)
10,488✔
517
}
10,488✔
518

519
impl Processor for AggregationProcessor {
×
520
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
71✔
521
        Ok(())
71✔
522
    }
71✔
523

×
524
    fn process(
21,090✔
525
        &mut self,
21,090✔
526
        _from_port: PortHandle,
21,090✔
527
        op: Operation,
21,090✔
528
        fw: &mut dyn ProcessorChannelForwarder,
21,090✔
529
        txn: &SharedTransaction,
21,090✔
530
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
21,090✔
531
    ) -> Result<(), ExecutionError> {
21,090✔
532
        let ops = self
21,090✔
533
            .aggregate(&mut txn.write(), self.db, op)
21,090✔
534
            .map_err(|e| InternalError(Box::new(e)))?;
21,090✔
535
        for fop in ops {
42,180✔
536
            fw.send(fop, DEFAULT_PORT_HANDLE)?;
21,090✔
537
        }
×
538
        Ok(())
21,090✔
539
    }
21,090✔
540
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc