• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4056569677

pending completion
4056569677

Pull #764

github

GitHub
Merge 808a2a5d5 into 3173d7b16
Pull Request #764: refactor: integrate projection planner

572 of 572 new or added lines in 4 files covered. (100.0%)

24859 of 37726 relevant lines covered (65.89%)

38100.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.28
/dozer-sql/src/pipeline/aggregation/processor_new.rs
1
#![allow(clippy::too_many_arguments)]
2
use crate::deserialize;
3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::execution::ExpressionExecutor;
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::dag::channels::ProcessorChannelForwarder;
7
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
8
use dozer_core::dag::errors::ExecutionError;
9
use dozer_core::dag::errors::ExecutionError::InternalError;
10
use dozer_core::dag::node::{PortHandle, Processor};
11
use dozer_core::storage::lmdb_storage::{
12
    LmdbEnvironmentManager, LmdbExclusiveTransaction, SharedTransaction,
13
};
14
use dozer_types::errors::types::TypeError;
15
use dozer_types::internal_err;
16
use dozer_types::types::{Field, Operation, Record, Schema};
17

18
use crate::pipeline::aggregation::aggregator::get_aggregator_from_aggregation_expression;
19
use dozer_core::dag::epoch::Epoch;
20
use dozer_core::dag::record_store::RecordReader;
21
use dozer_core::storage::common::Database;
22
use dozer_core::storage::errors::StorageError::InvalidDatabase;
23
use dozer_core::storage::prefix_transaction::PrefixTransaction;
24
use std::{collections::HashMap, mem::size_of_val};
25

26
const COUNTER_KEY: u8 = 1_u8;
27

28
pub(crate) struct AggregationData<'a> {
29
    pub value: Field,
30
    pub state: Option<&'a [u8]>,
31
    pub prefix: u32,
32
}
33

34
impl<'a> AggregationData<'a> {
35
    pub fn new(value: Field, state: Option<&'a [u8]>, prefix: u32) -> Self {
1✔
36
        Self {
1✔
37
            value,
1✔
38
            state,
1✔
39
            prefix,
1✔
40
        }
1✔
41
    }
1✔
42
}
43

44
#[derive(Debug)]
×
45
pub struct AggregationProcessor {
46
    dimensions: Vec<Expression>,
47
    measures: Vec<(Expression, Aggregator)>,
48
    projections: Vec<Expression>,
49
    pub db: Option<Database>,
50
    meta_db: Option<Database>,
51
    aggregators_db: Option<Database>,
52
    input_schema: Schema,
53
    aggregation_schema: Schema,
54
}
55

56
enum AggregatorOperation {
57
    Insert,
58
    Delete,
59
    Update,
60
}
61

62
const AGG_VALUES_DATASET_ID: u16 = 0x0000_u16;
63
const AGG_COUNT_DATASET_ID: u16 = 0x0001_u16;
64

65
const AGG_DEFAULT_DIMENSION_ID: u8 = 0xFF_u8;
66

67
impl AggregationProcessor {
68
    pub fn new(
1✔
69
        dimensions: Vec<Expression>,
1✔
70
        measures: Vec<Expression>,
1✔
71
        projections: Vec<Expression>,
1✔
72
        input_schema: Schema,
1✔
73
        aggregation_schema: Schema,
1✔
74
    ) -> Result<Self, PipelineError> {
1✔
75
        //
1✔
76
        let mut aggregators: Vec<(Expression, Aggregator)> = Vec::new();
1✔
77
        for measure in measures {
2✔
78
            aggregators.push(get_aggregator_from_aggregation_expression(
1✔
79
                &measure,
1✔
80
                &input_schema,
1✔
81
            )?)
1✔
82
        }
83

84
        Ok(Self {
1✔
85
            dimensions,
1✔
86
            measures: aggregators,
1✔
87
            projections,
1✔
88
            db: None,
1✔
89
            meta_db: None,
1✔
90
            aggregators_db: None,
1✔
91
            input_schema,
1✔
92
            aggregation_schema,
1✔
93
        })
1✔
94
    }
1✔
95

96
    fn init_store(&mut self, txn: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
1✔
97
        self.db = Some(txn.open_database("aggr", false)?);
1✔
98
        self.aggregators_db = Some(txn.open_database("aggr_data", false)?);
1✔
99
        self.meta_db = Some(txn.open_database("meta", false)?);
1✔
100
        Ok(())
1✔
101
    }
1✔
102

103
    fn get_record_key(&self, hash: &Vec<u8>, database_id: u16) -> Result<Vec<u8>, PipelineError> {
4✔
104
        let mut vec = Vec::with_capacity(hash.len().wrapping_add(size_of_val(&database_id)));
4✔
105
        vec.extend_from_slice(&database_id.to_be_bytes());
4✔
106
        vec.extend(hash);
4✔
107
        Ok(vec)
4✔
108
    }
4✔
109

110
    fn get_counter(&self, txn: &mut LmdbExclusiveTransaction) -> Result<u32, PipelineError> {
1✔
111
        let meta_db = *self
1✔
112
            .meta_db
1✔
113
            .as_ref()
1✔
114
            .ok_or(PipelineError::InternalStorageError(InvalidDatabase))?;
1✔
115
        let curr_ctr = match txn.get(meta_db, &COUNTER_KEY.to_be_bytes())? {
1✔
116
            Some(v) => u32::from_be_bytes(deserialize!(v)),
×
117
            None => 1_u32,
1✔
118
        };
119
        txn.put(
1✔
120
            meta_db,
1✔
121
            &COUNTER_KEY.to_be_bytes(),
1✔
122
            &(curr_ctr + 1).to_be_bytes(),
1✔
123
        )?;
1✔
124
        Ok(curr_ctr + 1)
1✔
125
    }
1✔
126

127
    pub(crate) fn decode_buffer(buf: &[u8]) -> Result<(usize, AggregationData), PipelineError> {
1✔
128
        let prefix = u32::from_be_bytes(buf[0..4].try_into().unwrap());
1✔
129
        let mut offset: usize = 4;
1✔
130

1✔
131
        let val_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
1✔
132
        offset += 2;
1✔
133
        let val: Field = Field::decode(&buf[offset..offset + val_len as usize])
1✔
134
            .map_err(TypeError::DeserializationError)?;
1✔
135
        offset += val_len as usize;
1✔
136
        let state_len = u16::from_be_bytes(buf[offset..offset + 2].try_into().unwrap());
1✔
137
        offset += 2;
1✔
138
        let state: Option<&[u8]> = if state_len > 0 {
1✔
139
            Some(&buf[offset..offset + state_len as usize])
1✔
140
        } else {
141
            None
×
142
        };
143
        offset += state_len as usize;
1✔
144

1✔
145
        let r = AggregationData::new(val, state, prefix);
1✔
146
        Ok((offset, r))
1✔
147
    }
1✔
148

149
    pub(crate) fn encode_buffer(
2✔
150
        prefix: u32,
2✔
151
        value: &Field,
2✔
152
        state: &Option<Vec<u8>>,
2✔
153
    ) -> Result<(usize, Vec<u8>), PipelineError> {
2✔
154
        let mut r = Vec::with_capacity(512);
2✔
155
        r.extend(prefix.to_be_bytes());
2✔
156

2✔
157
        let sz_val = value.encode();
2✔
158
        r.extend((sz_val.len() as u16).to_be_bytes());
2✔
159
        r.extend(&sz_val);
2✔
160

161
        let len = if let Some(state) = state.as_ref() {
2✔
162
            r.extend((state.len() as u16).to_be_bytes());
2✔
163
            r.extend(state);
2✔
164
            state.len()
2✔
165
        } else {
166
            r.extend(0_u16.to_be_bytes());
×
167
            0_usize
×
168
        };
169

170
        Ok((5 + sz_val.len() + len, r))
2✔
171
    }
2✔
172

173
    fn calc_and_fill_measures(
2✔
174
        &self,
2✔
175
        txn: &mut LmdbExclusiveTransaction,
2✔
176
        cur_state: &Option<Vec<u8>>,
2✔
177
        deleted_record: Option<&Record>,
2✔
178
        inserted_record: Option<&Record>,
2✔
179
        out_rec_delete: &mut Vec<Field>,
2✔
180
        out_rec_insert: &mut Vec<Field>,
2✔
181
        op: AggregatorOperation,
2✔
182
    ) -> Result<Vec<u8>, PipelineError> {
2✔
183
        // array holding the list of states for all measures
2✔
184
        let mut next_state = Vec::<u8>::new();
2✔
185
        let mut offset: usize = 0;
2✔
186

187
        for measure in &self.measures {
4✔
188
            let curr_agg_data = match cur_state {
2✔
189
                Some(ref e) => {
1✔
190
                    let (len, res) = Self::decode_buffer(&e[offset..])?;
1✔
191
                    offset += len;
1✔
192
                    Some(res)
1✔
193
                }
194
                None => None,
1✔
195
            };
196

197
            let (prefix, next_state_slice) = match op {
2✔
198
                AggregatorOperation::Insert => {
199
                    let inserted_field = measure
2✔
200
                        .0
2✔
201
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
2✔
202
                    if let Some(curr) = curr_agg_data {
2✔
203
                        out_rec_delete.push(curr.value);
1✔
204
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
1✔
205
                        let r = measure.1.insert(
1✔
206
                            curr.state,
1✔
207
                            &inserted_field,
1✔
208
                            measure.0.get_type(&self.input_schema)?.return_type,
1✔
209
                            &mut p_tx,
1✔
210
                            self.aggregators_db.unwrap(),
1✔
211
                        )?;
×
212
                        (curr.prefix, r)
1✔
213
                    } else {
214
                        let prefix = self.get_counter(txn)?;
1✔
215
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
1✔
216
                        let r = measure.1.insert(
1✔
217
                            None,
1✔
218
                            &inserted_field,
1✔
219
                            measure.0.get_type(&self.input_schema)?.return_type,
1✔
220
                            &mut p_tx,
1✔
221
                            self.aggregators_db.unwrap(),
1✔
222
                        )?;
×
223
                        (prefix, r)
1✔
224
                    }
225
                }
226
                AggregatorOperation::Delete => {
227
                    let deleted_field = measure
×
228
                        .0
×
229
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
×
230
                    if let Some(curr) = curr_agg_data {
×
231
                        out_rec_delete.push(curr.value);
×
232
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
×
233
                        let r = measure.1.delete(
×
234
                            curr.state,
×
235
                            &deleted_field,
×
236
                            measure.0.get_type(&self.input_schema)?.return_type,
×
237
                            &mut p_tx,
×
238
                            self.aggregators_db.unwrap(),
×
239
                        )?;
×
240
                        (curr.prefix, r)
×
241
                    } else {
242
                        let prefix = self.get_counter(txn)?;
×
243
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
244
                        let r = measure.1.delete(
×
245
                            None,
×
246
                            &deleted_field,
×
247
                            measure.0.get_type(&self.input_schema)?.return_type,
×
248
                            &mut p_tx,
×
249
                            self.aggregators_db.unwrap(),
×
250
                        )?;
×
251
                        (prefix, r)
×
252
                    }
253
                }
254
                AggregatorOperation::Update => {
255
                    let deleted_field = measure
×
256
                        .0
×
257
                        .evaluate(deleted_record.unwrap(), &self.input_schema)?;
×
258
                    let updated_field = measure
×
259
                        .0
×
260
                        .evaluate(inserted_record.unwrap(), &self.input_schema)?;
×
261

262
                    if let Some(curr) = curr_agg_data {
×
263
                        out_rec_delete.push(curr.value);
×
264
                        let mut p_tx = PrefixTransaction::new(txn, curr.prefix);
×
265
                        let r = measure.1.update(
×
266
                            curr.state,
×
267
                            &deleted_field,
×
268
                            &updated_field,
×
269
                            measure.0.get_type(&self.input_schema)?.return_type,
×
270
                            &mut p_tx,
×
271
                            self.aggregators_db.unwrap(),
×
272
                        )?;
×
273
                        (curr.prefix, r)
×
274
                    } else {
275
                        let prefix = self.get_counter(txn)?;
×
276
                        let mut p_tx = PrefixTransaction::new(txn, prefix);
×
277
                        let r = measure.1.update(
×
278
                            None,
×
279
                            &deleted_field,
×
280
                            &updated_field,
×
281
                            measure.0.get_type(&self.input_schema)?.return_type,
×
282
                            &mut p_tx,
×
283
                            self.aggregators_db.unwrap(),
×
284
                        )?;
×
285
                        (prefix, r)
×
286
                    }
287
                }
288
            };
289

290
            next_state.extend(
291
                &Self::encode_buffer(prefix, &next_state_slice.value, &next_state_slice.state)?.1,
2✔
292
            );
293
            out_rec_insert.push(next_state_slice.value);
2✔
294
        }
295

296
        Ok(next_state)
2✔
297
    }
2✔
298

299
    fn update_segment_count(
2✔
300
        &self,
2✔
301
        txn: &mut LmdbExclusiveTransaction,
2✔
302
        db: Database,
2✔
303
        key: Vec<u8>,
2✔
304
        delta: u64,
2✔
305
        decr: bool,
2✔
306
    ) -> Result<u64, PipelineError> {
2✔
307
        let bytes = txn.get(db, key.as_slice())?;
2✔
308

309
        let curr_count = match bytes {
2✔
310
            Some(b) => u64::from_be_bytes(deserialize!(b)),
1✔
311
            None => 0_u64,
1✔
312
        };
313

314
        let new_val = if decr {
2✔
315
            curr_count.wrapping_sub(delta)
×
316
        } else {
317
            curr_count.wrapping_add(delta)
2✔
318
        };
319

320
        if new_val > 0 {
2✔
321
            txn.put(db, key.as_slice(), new_val.to_be_bytes().as_slice())?;
2✔
322
        } else {
323
            txn.del(db, key.as_slice(), None)?;
×
324
        }
325
        Ok(curr_count)
2✔
326
    }
2✔
327

328
    fn agg_delete(
×
329
        &self,
×
330
        txn: &mut LmdbExclusiveTransaction,
×
331
        db: Database,
×
332
        old: &mut Record,
×
333
    ) -> Result<Operation, PipelineError> {
×
334
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
×
335
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
×
336

337
        let record_hash = if !self.dimensions.is_empty() {
×
338
            get_key(&self.input_schema, old, &self.dimensions)?
×
339
        } else {
340
            vec![AGG_DEFAULT_DIMENSION_ID]
×
341
        };
342

343
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
×
344

345
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
×
346
        let prev_count = self.update_segment_count(txn, db, record_count_key, 1, true)?;
×
347

348
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
×
349
        let new_state = self.calc_and_fill_measures(
×
350
            txn,
×
351
            &cur_state,
×
352
            Some(old),
×
353
            None,
×
354
            &mut out_rec_delete,
×
355
            &mut out_rec_insert,
×
356
            AggregatorOperation::Delete,
×
357
        )?;
×
358

359
        let res = if prev_count == 1 {
×
360
            Operation::Delete {
361
                old: self.build_projection(old, out_rec_delete)?,
×
362
            }
363
        } else {
364
            Operation::Update {
365
                new: self.build_projection(old, out_rec_insert)?,
×
366
                old: self.build_projection(old, out_rec_delete)?,
×
367
            }
368
        };
369

370
        if prev_count == 1 {
×
371
            let _ = txn.del(db, record_key.as_slice(), None)?;
×
372
        } else {
373
            txn.put(db, record_key.as_slice(), new_state.as_slice())?;
×
374
        }
375
        Ok(res)
×
376
    }
×
377

378
    fn agg_insert(
2✔
379
        &self,
2✔
380
        txn: &mut LmdbExclusiveTransaction,
2✔
381
        db: Database,
2✔
382
        new: &mut Record,
2✔
383
    ) -> Result<Operation, PipelineError> {
2✔
384
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
2✔
385
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
2✔
386

387
        let record_hash = if !self.dimensions.is_empty() {
2✔
388
            get_key(&self.input_schema, new, &self.dimensions)?
2✔
389
        } else {
390
            vec![AGG_DEFAULT_DIMENSION_ID]
×
391
        };
392

393
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
2✔
394

395
        let record_count_key = self.get_record_key(&record_hash, AGG_COUNT_DATASET_ID)?;
2✔
396
        self.update_segment_count(txn, db, record_count_key, 1, false)?;
2✔
397

398
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
2✔
399
        let new_state = self.calc_and_fill_measures(
2✔
400
            txn,
2✔
401
            &cur_state,
2✔
402
            None,
2✔
403
            Some(new),
2✔
404
            &mut out_rec_delete,
2✔
405
            &mut out_rec_insert,
2✔
406
            AggregatorOperation::Insert,
2✔
407
        )?;
2✔
408

409
        let res = if cur_state.is_none() {
2✔
410
            Operation::Insert {
411
                new: self.build_projection(new, out_rec_insert)?,
1✔
412
            }
413
        } else {
414
            Operation::Update {
415
                new: self.build_projection(new, out_rec_insert)?,
1✔
416
                old: self.build_projection(new, out_rec_delete)?,
1✔
417
            }
418
        };
419

420
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
2✔
421
        Ok(res)
2✔
422
    }
2✔
423

424
    fn agg_update(
×
425
        &self,
×
426
        txn: &mut LmdbExclusiveTransaction,
×
427
        db: Database,
×
428
        old: &mut Record,
×
429
        new: &mut Record,
×
430
        record_hash: Vec<u8>,
×
431
    ) -> Result<Operation, PipelineError> {
×
432
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
×
433
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
×
434
        let record_key = self.get_record_key(&record_hash, AGG_VALUES_DATASET_ID)?;
×
435

436
        let cur_state = txn.get(db, record_key.as_slice())?.map(|b| b.to_vec());
×
437
        let new_state = self.calc_and_fill_measures(
×
438
            txn,
×
439
            &cur_state,
×
440
            Some(old),
×
441
            Some(new),
×
442
            &mut out_rec_delete,
×
443
            &mut out_rec_insert,
×
444
            AggregatorOperation::Update,
×
445
        )?;
×
446

447
        let res = Operation::Update {
×
448
            new: self.build_projection(new, out_rec_insert)?,
×
449
            old: self.build_projection(old, out_rec_delete)?,
×
450
        };
451

452
        txn.put(db, record_key.as_slice(), new_state.as_slice())?;
×
453

454
        Ok(res)
×
455
    }
×
456

457
    pub fn build_projection(
3✔
458
        &self,
3✔
459
        original: &mut Record,
3✔
460
        measures: Vec<Field>,
3✔
461
    ) -> Result<Record, PipelineError> {
3✔
462
        let original_len = original.values.len();
3✔
463
        original.values.extend(measures);
3✔
464
        let mut output = Vec::<Field>::with_capacity(self.projections.len());
3✔
465
        for exp in &self.projections {
9✔
466
            output.push(exp.evaluate(original, &self.aggregation_schema)?);
6✔
467
        }
468
        original.values.drain(original_len..);
3✔
469
        Ok(Record::new(None, output, None))
3✔
470
    }
3✔
471

472
    pub fn aggregate(
2✔
473
        &self,
2✔
474
        txn: &mut LmdbExclusiveTransaction,
2✔
475
        db: Database,
2✔
476
        mut op: Operation,
2✔
477
    ) -> Result<Vec<Operation>, PipelineError> {
2✔
478
        match op {
2✔
479
            Operation::Insert { ref mut new } => Ok(vec![self.agg_insert(txn, db, new)?]),
2✔
480
            Operation::Delete { ref mut old } => Ok(vec![self.agg_delete(txn, db, old)?]),
×
481
            Operation::Update {
482
                ref mut old,
×
483
                ref mut new,
×
484
            } => {
485
                let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() {
×
486
                    (
×
487
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
488
                        vec![AGG_DEFAULT_DIMENSION_ID],
×
489
                    )
×
490
                } else {
491
                    (
492
                        get_key(&self.input_schema, old, &self.dimensions)?,
×
493
                        get_key(&self.input_schema, new, &self.dimensions)?,
×
494
                    )
495
                };
496

497
                if old_record_hash == new_record_hash {
×
498
                    Ok(vec![self.agg_update(txn, db, old, new, old_record_hash)?])
×
499
                } else {
500
                    Ok(vec![
×
501
                        self.agg_delete(txn, db, old)?,
×
502
                        self.agg_insert(txn, db, new)?,
×
503
                    ])
504
                }
505
            }
506
        }
507
    }
2✔
508
}
509

510
fn get_key(
2✔
511
    schema: &Schema,
2✔
512
    record: &Record,
2✔
513
    dimensions: &[Expression],
2✔
514
) -> Result<Vec<u8>, PipelineError> {
2✔
515
    let mut tot_size = 0_usize;
2✔
516
    let mut buffers = Vec::<Vec<u8>>::with_capacity(dimensions.len());
2✔
517

518
    for dimension in dimensions.iter() {
2✔
519
        let value = dimension.evaluate(record, schema)?;
2✔
520
        let bytes = value.encode();
2✔
521
        tot_size += bytes.len();
2✔
522
        buffers.push(bytes);
2✔
523
    }
524

525
    let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
2✔
526
    for i in buffers {
4✔
527
        res_buffer.extend(i);
2✔
528
    }
2✔
529
    Ok(res_buffer)
2✔
530
}
2✔
531

532
impl Processor for AggregationProcessor {
533
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
1✔
534
        internal_err!(self.init_store(state))
×
535
    }
1✔
536

537
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
×
538
        Ok(())
×
539
    }
×
540

541
    fn process(
×
542
        &mut self,
×
543
        _from_port: PortHandle,
×
544
        op: Operation,
×
545
        fw: &mut dyn ProcessorChannelForwarder,
×
546
        txn: &SharedTransaction,
×
547
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
×
548
    ) -> Result<(), ExecutionError> {
×
549
        match self.db {
×
550
            Some(d) => {
×
551
                let ops = internal_err!(self.aggregate(&mut txn.write(), d, op))?;
×
552
                for fop in ops {
×
553
                    fw.send(fop, DEFAULT_PORT_HANDLE)?;
×
554
                }
555
                Ok(())
×
556
            }
557
            _ => Err(ExecutionError::InvalidDatabase),
×
558
        }
559
    }
×
560
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc