• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4377467257

pending completion
4377467257

push

github

GitHub
implement `HAVING` (#1198)

395 of 395 new or added lines in 6 files covered. (100.0%)

27638 of 38584 relevant lines covered (71.63%)

27777.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.37
/dozer-sql/src/pipeline/aggregation/processor.rs
1
#![allow(clippy::too_many_arguments)]
2

3
use crate::pipeline::errors::PipelineError;
4
use crate::pipeline::expression::execution::ExpressionExecutor;
5
use crate::pipeline::{aggregation::aggregator::Aggregator, expression::execution::Expression};
6
use dozer_core::channels::ProcessorChannelForwarder;
7
use dozer_core::errors::ExecutionError;
8
use dozer_core::errors::ExecutionError::InternalError;
9
use dozer_core::node::{PortHandle, Processor};
10
use dozer_core::storage::lmdb_storage::SharedTransaction;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::types::{Field, FieldType, Operation, Record, Schema};
13
use std::hash::{Hash, Hasher};
14

15
use crate::pipeline::aggregation::aggregator::{
16
    get_aggregator_from_aggregator_type, get_aggregator_type_from_aggregation_expression,
17
    AggregatorType,
18
};
19
use ahash::AHasher;
20
use dozer_core::epoch::Epoch;
21
use hashbrown::HashMap;
22

23
const DEFAULT_SEGMENT_KEY: &str = "DOZER_DEFAULT_SEGMENT_KEY";
24

25
#[derive(Debug)]
×
26
struct AggregationState {
27
    count: usize,
28
    states: Vec<Box<dyn Aggregator>>,
29
    values: Option<Vec<Field>>,
30
}
31

32
impl AggregationState {
33
    pub fn new(types: &[AggregatorType], ret_types: &[FieldType]) -> Self {
21,213✔
34
        let mut states: Vec<Box<dyn Aggregator>> = Vec::new();
21,213✔
35
        for (idx, typ) in types.iter().enumerate() {
21,213✔
36
            let mut aggr = get_aggregator_from_aggregator_type(*typ);
21,213✔
37
            aggr.init(ret_types[idx]);
21,213✔
38
            states.push(aggr);
21,213✔
39
        }
21,213✔
40

41
        Self {
21,214✔
42
            count: 0,
21,214✔
43
            states,
21,214✔
44
            values: None,
21,214✔
45
        }
21,214✔
46
    }
21,214✔
47
}
48

49
#[derive(Debug)]
×
50
pub struct AggregationProcessor {
51
    dimensions: Vec<Expression>,
52
    measures: Vec<Vec<Expression>>,
53
    measures_types: Vec<AggregatorType>,
54
    measures_return_types: Vec<FieldType>,
55
    projections: Vec<Expression>,
56
    having: Option<Expression>,
57
    input_schema: Schema,
58
    aggregation_schema: Schema,
59
    states: HashMap<u64, AggregationState>,
60
    default_segment_key: u64,
61
    having_eval_schema: Schema,
62
}
63

64
enum AggregatorOperation {
65
    Insert,
66
    Delete,
67
    Update,
68
}
69

×
70
impl AggregationProcessor {
×
71
    pub fn new(
116✔
72
        dimensions: Vec<Expression>,
116✔
73
        measures: Vec<Expression>,
116✔
74
        projections: Vec<Expression>,
116✔
75
        having: Option<Expression>,
116✔
76
        input_schema: Schema,
116✔
77
        aggregation_schema: Schema,
116✔
78
    ) -> Result<Self, PipelineError> {
116✔
79
        let mut aggr_types = Vec::new();
116✔
80
        let mut aggr_measures = Vec::new();
116✔
81
        let mut aggr_measures_ret_types = Vec::new();
116✔
82

×
83
        for measure in measures {
233✔
84
            let (aggr_measure, aggr_type) =
116✔
85
                get_aggregator_type_from_aggregation_expression(&measure, &input_schema)?;
116✔
86
            aggr_measures.push(aggr_measure);
116✔
87
            aggr_types.push(aggr_type);
116✔
88
            aggr_measures_ret_types.push(measure.get_type(&input_schema)?.return_type)
116✔
89
        }
×
90

×
91
        let mut hasher = AHasher::default();
117✔
92
        DEFAULT_SEGMENT_KEY.hash(&mut hasher);
117✔
93

117✔
94
        let mut having_eval_schema_fields = input_schema.fields.clone();
117✔
95
        having_eval_schema_fields.extend(aggregation_schema.fields.clone());
117✔
96

117✔
97
        Ok(Self {
117✔
98
            dimensions,
117✔
99
            projections,
117✔
100
            input_schema,
117✔
101
            aggregation_schema,
117✔
102
            states: HashMap::new(),
117✔
103
            measures: aggr_measures,
117✔
104
            having,
117✔
105
            measures_types: aggr_types,
117✔
106
            measures_return_types: aggr_measures_ret_types,
117✔
107
            default_segment_key: hasher.finish(),
117✔
108
            having_eval_schema: Schema {
117✔
109
                fields: having_eval_schema_fields,
117✔
110
                primary_index: vec![],
117✔
111
                identifier: None,
117✔
112
            },
117✔
113
        })
117✔
114
    }
117✔
115

×
116
    fn calc_and_fill_measures(
21,434✔
117
        curr_state: &mut AggregationState,
21,434✔
118
        deleted_record: Option<&Record>,
21,434✔
119
        inserted_record: Option<&Record>,
21,434✔
120
        out_rec_delete: &mut Vec<Field>,
21,434✔
121
        out_rec_insert: &mut Vec<Field>,
21,434✔
122
        op: AggregatorOperation,
21,434✔
123
        measures: &Vec<Vec<Expression>>,
21,434✔
124
        input_schema: &Schema,
21,434✔
125
    ) -> Result<Vec<Field>, PipelineError> {
21,434✔
126
        //
21,434✔
127

21,434✔
128
        let mut new_fields: Vec<Field> = Vec::with_capacity(measures.len());
21,434✔
129

×
130
        for (idx, measure) in measures.iter().enumerate() {
21,436✔
131
            let curr_aggr = &mut curr_state.states[idx];
21,436✔
132
            let curr_val_opt: Option<&Field> = curr_state.values.as_ref().map(|e| &e[idx]);
21,436✔
133

134
            let new_val = match op {
21,442✔
135
                AggregatorOperation::Insert => {
×
136
                    let mut inserted_fields = Vec::with_capacity(measure.len());
21,210✔
137
                    for m in measure {
42,422✔
138
                        inserted_fields.push(m.evaluate(inserted_record.unwrap(), input_schema)?);
21,211✔
139
                    }
×
140
                    if let Some(curr_val) = curr_val_opt {
21,211✔
141
                        out_rec_delete.push(curr_val.clone());
11,074✔
142
                    }
20,134✔
143
                    curr_aggr.insert(&inserted_fields)?
21,211✔
144
                }
×
145
                AggregatorOperation::Delete => {
×
146
                    let mut deleted_fields = Vec::with_capacity(measure.len());
172✔
147
                    for m in measure {
345✔
148
                        deleted_fields.push(m.evaluate(deleted_record.unwrap(), input_schema)?);
172✔
149
                    }
×
150
                    if let Some(curr_val) = curr_val_opt {
173✔
151
                        out_rec_delete.push(curr_val.clone());
173✔
152
                    }
173✔
153
                    curr_aggr.delete(&deleted_fields)?
173✔
154
                }
×
155
                AggregatorOperation::Update => {
×
156
                    let mut deleted_fields = Vec::with_capacity(measure.len());
54✔
157
                    for m in measure {
108✔
158
                        deleted_fields.push(m.evaluate(deleted_record.unwrap(), input_schema)?);
54✔
159
                    }
×
160
                    let mut inserted_fields = Vec::with_capacity(measure.len());
54✔
161
                    for m in measure {
108✔
162
                        inserted_fields.push(m.evaluate(inserted_record.unwrap(), input_schema)?);
54✔
163
                    }
164
                    if let Some(curr_val) = curr_val_opt {
54✔
165
                        out_rec_delete.push(curr_val.clone());
54✔
166
                    }
54✔
167
                    curr_aggr.update(&deleted_fields, &inserted_fields)?
54✔
168
                }
×
169
            };
×
170
            out_rec_insert.push(new_val.clone());
21,442✔
171
            new_fields.push(new_val);
21,442✔
172
        }
173
        Ok(new_fields)
21,437✔
174
    }
21,437✔
175

×
176
    fn agg_delete(&mut self, old: &mut Record) -> Result<Vec<Operation>, PipelineError> {
173✔
177
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
173✔
178
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
173✔
179

×
180
        let key = if !self.dimensions.is_empty() {
173✔
181
            get_key(&self.input_schema, old, &self.dimensions)?
167✔
182
        } else {
×
183
            self.default_segment_key
6✔
184
        };
×
185

×
186
        let curr_state_opt = self.states.get_mut(&key);
173✔
187
        assert!(
173✔
188
            curr_state_opt.is_some(),
173✔
189
            "Unable to find aggregator state during DELETE operation"
×
190
        );
×
191
        let mut curr_state = curr_state_opt.unwrap();
173✔
192

×
193
        let new_values = Self::calc_and_fill_measures(
173✔
194
            curr_state,
173✔
195
            Some(old),
173✔
196
            None,
173✔
197
            &mut out_rec_delete,
173✔
198
            &mut out_rec_insert,
173✔
199
            AggregatorOperation::Delete,
173✔
200
            &self.measures,
173✔
201
            &self.input_schema,
173✔
202
        )?;
173✔
203

×
204
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
173✔
205
        {
×
206
            None => (true, true),
165✔
207
            Some(having) => (
8✔
208
                Self::having_is_satisfied(
8✔
209
                    &self.having_eval_schema,
8✔
210
                    old,
8✔
211
                    having,
8✔
212
                    &mut out_rec_delete,
8✔
213
                )?,
8✔
214
                Self::having_is_satisfied(
8✔
215
                    &self.having_eval_schema,
8✔
216
                    old,
8✔
217
                    having,
8✔
218
                    &mut out_rec_insert,
8✔
219
                )?,
8✔
220
            ),
221
        };
×
222

×
223
        let res = if curr_state.count == 1 {
173✔
224
            self.states.remove(&key);
101✔
225
            if out_rec_delete_having_satisfied {
101✔
226
                vec![Operation::Delete {
99✔
227
                    old: Self::build_projection(
99✔
228
                        old,
99✔
229
                        out_rec_delete,
99✔
230
                        &self.projections,
99✔
231
                        &self.aggregation_schema,
99✔
232
                    )?,
99✔
233
                }]
234
            } else {
×
235
                vec![]
2✔
236
            }
×
237
        } else {
×
238
            curr_state.count -= 1;
72✔
239
            curr_state.values = Some(new_values);
72✔
240

72✔
241
            Self::generate_op_for_existing_segment(
72✔
242
                out_rec_delete_having_satisfied,
72✔
243
                out_rec_insert_having_satisfied,
72✔
244
                out_rec_delete,
72✔
245
                out_rec_insert,
72✔
246
                old,
72✔
247
                &self.projections,
72✔
248
                &self.aggregation_schema,
72✔
249
            )?
72✔
250
        };
×
251

252
        Ok(res)
173✔
253
    }
173✔
254

×
255
    fn agg_insert(&mut self, new: &mut Record) -> Result<Vec<Operation>, PipelineError> {
21,211✔
256
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
21,211✔
257
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
21,211✔
258

259
        let key = if !self.dimensions.is_empty() {
21,211✔
260
            get_key(&self.input_schema, new, &self.dimensions)?
10,195✔
261
        } else {
×
262
            self.default_segment_key
11,016✔
263
        };
×
264

×
265
        let curr_state = self.states.entry(key).or_insert(AggregationState::new(
21,211✔
266
            &self.measures_types,
21,211✔
267
            &self.measures_return_types,
21,211✔
268
        ));
21,211✔
269

×
270
        let new_values = Self::calc_and_fill_measures(
21,211✔
271
            curr_state,
21,211✔
272
            None,
21,211✔
273
            Some(new),
21,211✔
274
            &mut out_rec_delete,
21,211✔
275
            &mut out_rec_insert,
21,211✔
276
            AggregatorOperation::Insert,
21,211✔
277
            &self.measures,
21,211✔
278
            &self.input_schema,
21,211✔
279
        )?;
21,211✔
280

×
281
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
21,211✔
282
        {
×
283
            None => (true, true),
21,195✔
284
            Some(having) => (
16✔
285
                Self::having_is_satisfied(
16✔
286
                    &self.having_eval_schema,
16✔
287
                    new,
16✔
288
                    having,
16✔
289
                    &mut out_rec_delete,
16✔
290
                )?,
16✔
291
                Self::having_is_satisfied(
16✔
292
                    &self.having_eval_schema,
16✔
293
                    new,
16✔
294
                    having,
16✔
295
                    &mut out_rec_insert,
16✔
296
                )?,
16✔
297
            ),
298
        };
×
299

×
300
        let res = if curr_state.count == 0 {
21,211✔
301
            if out_rec_insert_having_satisfied {
10,139✔
302
                vec![Operation::Insert {
10,132✔
303
                    new: Self::build_projection(
10,132✔
304
                        new,
10,132✔
305
                        out_rec_insert,
10,132✔
306
                        &self.projections,
10,132✔
307
                        &self.aggregation_schema,
10,132✔
308
                    )?,
10,132✔
309
                }]
×
310
            } else {
×
311
                vec![]
7✔
312
            }
×
313
        } else {
×
314
            Self::generate_op_for_existing_segment(
11,072✔
315
                out_rec_delete_having_satisfied,
11,072✔
316
                out_rec_insert_having_satisfied,
11,072✔
317
                out_rec_delete,
11,072✔
318
                out_rec_insert,
11,072✔
319
                new,
11,072✔
320
                &self.projections,
11,072✔
321
                &self.aggregation_schema,
11,072✔
322
            )?
11,072✔
323
        };
324

×
325
        curr_state.count += 1;
21,210✔
326
        curr_state.values = Some(new_values);
21,210✔
327

21,210✔
328
        Ok(res)
21,210✔
329
    }
21,210✔
330

×
331
    fn generate_op_for_existing_segment(
11,147✔
332
        out_rec_delete_having_satisfied: bool,
11,147✔
333
        out_rec_insert_having_satisfied: bool,
11,147✔
334
        out_rec_delete: Vec<Field>,
11,147✔
335
        out_rec_insert: Vec<Field>,
11,147✔
336
        rec: &mut Record,
11,147✔
337
        projections: &Vec<Expression>,
11,147✔
338
        aggregation_schema: &Schema,
11,147✔
339
    ) -> Result<Vec<Operation>, PipelineError> {
11,147✔
340
        Ok(
11,147✔
341
            match (
11,147✔
342
                out_rec_delete_having_satisfied,
11,147✔
343
                out_rec_insert_having_satisfied,
11,147✔
344
            ) {
11,147✔
345
                (false, true) => vec![Operation::Insert {
5✔
346
                    new: Self::build_projection(
5✔
347
                        rec,
5✔
348
                        out_rec_insert,
5✔
349
                        projections,
5✔
350
                        aggregation_schema,
5✔
351
                    )?,
5✔
352
                }],
×
353
                (true, false) => vec![Operation::Delete {
3✔
354
                    old: Self::build_projection(
3✔
355
                        rec,
3✔
356
                        out_rec_delete,
3✔
357
                        projections,
3✔
358
                        aggregation_schema,
3✔
359
                    )?,
3✔
360
                }],
361
                (true, true) => vec![Operation::Update {
11,136✔
362
                    new: Self::build_projection(
11,136✔
363
                        rec,
11,136✔
364
                        out_rec_insert,
11,136✔
365
                        projections,
11,136✔
366
                        aggregation_schema,
11,136✔
367
                    )?,
11,136✔
368
                    old: Self::build_projection(
11,136✔
369
                        rec,
11,136✔
370
                        out_rec_delete,
11,136✔
371
                        projections,
11,136✔
372
                        aggregation_schema,
11,136✔
373
                    )?,
11,136✔
374
                }],
×
375
                (false, false) => vec![],
3✔
376
            },
×
377
        )
×
378
    }
11,146✔
379

380
    fn having_is_satisfied(
68✔
381
        having_eval_schema: &Schema,
68✔
382
        original_record: &mut Record,
68✔
383
        having: &Expression,
68✔
384
        out_rec: &mut Vec<Field>,
68✔
385
    ) -> Result<bool, PipelineError> {
68✔
386
        //
68✔
387
        let original_record_len = original_record.values.len();
68✔
388
        Ok(match out_rec.len() {
68✔
389
            0 => false,
8✔
390
            _ => {
391
                original_record
60✔
392
                    .values
60✔
393
                    .extend(out_rec.drain(0..).collect::<Vec<Field>>());
60✔
394
                let r = having
60✔
395
                    .evaluate(original_record, having_eval_schema)?
60✔
396
                    .as_boolean()
60✔
397
                    .unwrap_or(false);
60✔
398
                out_rec.extend(
60✔
399
                    original_record
60✔
400
                        .values
60✔
401
                        .drain(original_record_len..)
60✔
402
                        .collect::<Vec<Field>>(),
60✔
403
                );
60✔
404
                r
60✔
405
            }
406
        })
407
    }
68✔
408

409
    fn agg_update(
54✔
410
        &mut self,
54✔
411
        old: &mut Record,
54✔
412
        new: &mut Record,
54✔
413
        key: u64,
54✔
414
    ) -> Result<Vec<Operation>, PipelineError> {
54✔
415
        let mut out_rec_delete: Vec<Field> = Vec::with_capacity(self.measures.len());
54✔
416
        let mut out_rec_insert: Vec<Field> = Vec::with_capacity(self.measures.len());
54✔
417

54✔
418
        let curr_state_opt = self.states.get_mut(&key);
54✔
419
        assert!(
54✔
420
            curr_state_opt.is_some(),
54✔
421
            "Unable to find aggregator state during UPDATE operation"
×
422
        );
423
        let mut curr_state = curr_state_opt.unwrap();
54✔
424

425
        let new_values = Self::calc_and_fill_measures(
54✔
426
            curr_state,
54✔
427
            Some(old),
54✔
428
            Some(new),
54✔
429
            &mut out_rec_delete,
54✔
430
            &mut out_rec_insert,
54✔
431
            AggregatorOperation::Update,
54✔
432
            &self.measures,
54✔
433
            &self.input_schema,
54✔
434
        )?;
54✔
435

436
        let (out_rec_delete_having_satisfied, out_rec_insert_having_satisfied) = match &self.having
54✔
437
        {
438
            None => (true, true),
44✔
439
            Some(having) => (
10✔
440
                Self::having_is_satisfied(
10✔
441
                    &self.having_eval_schema,
10✔
442
                    old,
10✔
443
                    having,
10✔
444
                    &mut out_rec_delete,
10✔
445
                )?,
10✔
446
                Self::having_is_satisfied(
10✔
447
                    &self.having_eval_schema,
10✔
448
                    new,
10✔
449
                    having,
10✔
450
                    &mut out_rec_insert,
10✔
451
                )?,
10✔
452
            ),
453
        };
454

455
        let res = match (
54✔
456
            out_rec_delete_having_satisfied,
54✔
457
            out_rec_insert_having_satisfied,
54✔
458
        ) {
54✔
459
            (false, true) => vec![Operation::Insert {
2✔
460
                new: Self::build_projection(
2✔
461
                    new,
2✔
462
                    out_rec_insert,
2✔
463
                    &self.projections,
2✔
464
                    &self.aggregation_schema,
2✔
465
                )?,
2✔
466
            }],
467
            (true, false) => vec![Operation::Delete {
2✔
468
                old: Self::build_projection(
2✔
469
                    old,
2✔
470
                    out_rec_delete,
2✔
471
                    &self.projections,
2✔
472
                    &self.aggregation_schema,
2✔
473
                )?,
2✔
474
            }],
475
            (true, true) => vec![Operation::Update {
46✔
476
                new: Self::build_projection(
46✔
477
                    new,
46✔
478
                    out_rec_insert,
46✔
479
                    &self.projections,
46✔
480
                    &self.aggregation_schema,
46✔
481
                )?,
46✔
482
                old: Self::build_projection(
46✔
483
                    old,
46✔
484
                    out_rec_delete,
46✔
485
                    &self.projections,
46✔
486
                    &self.aggregation_schema,
46✔
487
                )?,
46✔
488
            }],
489
            (false, false) => vec![],
4✔
490
        };
491

492
        curr_state.values = Some(new_values);
54✔
493
        Ok(res)
54✔
494
    }
54✔
495

496
    pub fn build_projection(
32,601✔
497
        original: &mut Record,
32,601✔
498
        measures: Vec<Field>,
32,601✔
499
        projections: &Vec<Expression>,
32,601✔
500
        aggregation_schema: &Schema,
32,601✔
501
    ) -> Result<Record, PipelineError> {
32,601✔
502
        let original_len = original.values.len();
32,601✔
503
        original.values.extend(measures);
32,601✔
504
        let mut output = Vec::<Field>::with_capacity(projections.len());
32,601✔
505
        for exp in projections {
95,782✔
506
            output.push(exp.evaluate(original, aggregation_schema)?);
63,180✔
507
        }
508
        original.values.drain(original_len..);
32,602✔
509
        Ok(Record::new(None, output, None))
32,602✔
510
    }
32,602✔
511

512
    pub fn aggregate(&mut self, mut op: Operation) -> Result<Vec<Operation>, PipelineError> {
21,410✔
513
        match op {
21,410✔
514
            Operation::Insert { ref mut new } => Ok(self.agg_insert(new)?),
21,187✔
515
            Operation::Delete { ref mut old } => Ok(self.agg_delete(old)?),
143✔
516
            Operation::Update {
517
                ref mut old,
80✔
518
                ref mut new,
80✔
519
            } => {
520
                let (old_record_hash, new_record_hash) = if self.dimensions.is_empty() {
80✔
521
                    (self.default_segment_key, self.default_segment_key)
10✔
522
                } else {
523
                    (
524
                        get_key(&self.input_schema, old, &self.dimensions)?,
70✔
525
                        get_key(&self.input_schema, new, &self.dimensions)?,
70✔
526
                    )
527
                };
528

529
                if old_record_hash == new_record_hash {
79✔
530
                    Ok(self.agg_update(old, new, old_record_hash)?)
54✔
531
                } else {
532
                    let mut r = Vec::with_capacity(2);
25✔
533
                    r.extend(self.agg_delete(old)?);
25✔
534
                    r.extend(self.agg_insert(new)?);
25✔
535
                    Ok(r)
26✔
536
                }
537
            }
538
        }
539
    }
21,407✔
540
}
541

542
fn get_key(
10,495✔
543
    schema: &Schema,
10,495✔
544
    record: &Record,
10,495✔
545
    dimensions: &[Expression],
10,495✔
546
) -> Result<u64, PipelineError> {
10,495✔
547
    let mut key = Vec::<Field>::with_capacity(dimensions.len());
10,495✔
548
    for dimension in dimensions.iter() {
10,499✔
549
        key.push(dimension.evaluate(record, schema)?);
10,499✔
550
    }
551
    let mut hasher = AHasher::default();
10,497✔
552
    key.hash(&mut hasher);
10,497✔
553
    let v = hasher.finish();
10,497✔
554
    Ok(v)
10,497✔
555
}
10,497✔
556

557
impl Processor for AggregationProcessor {
558
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
66✔
559
        Ok(())
66✔
560
    }
66✔
561

562
    fn process(
21,090✔
563
        &mut self,
21,090✔
564
        _from_port: PortHandle,
21,090✔
565
        op: Operation,
21,090✔
566
        fw: &mut dyn ProcessorChannelForwarder,
21,090✔
567
        _txn: &SharedTransaction,
21,090✔
568
    ) -> Result<(), ExecutionError> {
21,090✔
569
        let ops = self.aggregate(op).map_err(|e| InternalError(Box::new(e)))?;
21,090✔
570
        for fop in ops {
42,180✔
571
            fw.send(fop, DEFAULT_PORT_HANDLE)?;
21,090✔
572
        }
573
        Ok(())
21,090✔
574
    }
21,090✔
575
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc