• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5664079078

pending completion
5664079078

push

github

web-flow
perf: Replace `Vec<u8>` with `u64` in join lookup table to save memory (#1795)

80 of 80 new or added lines in 1 file covered. (100.0%)

45351 of 58795 relevant lines covered (77.13%)

39431.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.88
/dozer-sql/src/pipeline/product/join/operator.rs
1
use ahash::AHasher;
2
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordRef};
3
use dozer_types::{
4
    chrono,
5
    types::{Field, Lifetime},
6
};
7
use linked_hash_map::LinkedHashMap;
8
use std::{
9
    collections::HashMap,
10
    fmt::Debug,
11
    hash::{Hash, Hasher},
12
};
13

14
use crate::pipeline::errors::JoinError;
15

16
use super::JoinResult;
17

18
pub enum JoinBranch {
19
    Left,
20
    Right,
21
}
22

23
// pub trait JoinOperator: Send + Sync {
24
//     fn delete(&mut self, from: JoinBranch, old: &ProcessorRecord) -> JoinResult<Vec<Record>>;
25
//     fn insert(&mut self, from: JoinBranch, new: &ProcessorRecord) -> JoinResult<Vec<Record>>;
26
//     fn update(&mut self, from: JoinBranch, old: &ProcessorRecord, new: &ProcessorRecord) -> JoinResult<Vec<Record>>;
27
// }
28

29
#[derive(Clone, Debug, PartialEq, Eq)]
×
30
pub enum JoinType {
31
    Inner,
32
    LeftOuter,
33
    RightOuter,
34
}
35

36
#[derive(Clone, Debug, PartialEq, Eq)]
1,660✔
37
pub enum JoinAction {
38
    Insert,
39
    Delete,
40
}
41

42
type IndexKey = (u64, u64); // (join_key, primary_key)
43

44
#[derive(Debug, Clone)]
×
45
pub struct JoinOperator {
46
    join_type: JoinType,
47

48
    left_join_key_indexes: Vec<usize>,
49
    right_join_key_indexes: Vec<usize>,
50

51
    left_primary_key_indexes: Vec<usize>,
52
    right_primary_key_indexes: Vec<usize>,
53

54
    left_default_record: ProcessorRecordRef,
55
    right_default_record: ProcessorRecordRef,
56

57
    left_map: HashMap<u64, HashMap<u64, Vec<ProcessorRecordRef>>>,
58
    right_map: HashMap<u64, HashMap<u64, Vec<ProcessorRecordRef>>>,
59

60
    left_lifetime_map: LinkedHashMap<Field, Vec<IndexKey>>,
61
    right_lifetime_map: LinkedHashMap<Field, Vec<IndexKey>>,
62
}
63

64
impl JoinOperator {
65
    pub fn new(
136✔
66
        join_type: JoinType,
136✔
67
        left_join_key_indexes: Vec<usize>,
136✔
68
        right_join_key_indexes: Vec<usize>,
136✔
69
        left_primary_key_indexes: Vec<usize>,
136✔
70
        right_primary_key_indexes: Vec<usize>,
136✔
71
        left_default_record: ProcessorRecordRef,
136✔
72
        right_default_record: ProcessorRecordRef,
136✔
73
    ) -> Self {
136✔
74
        Self {
136✔
75
            join_type,
136✔
76
            left_join_key_indexes,
136✔
77
            right_join_key_indexes,
136✔
78
            left_primary_key_indexes,
136✔
79
            right_primary_key_indexes,
136✔
80
            left_default_record,
136✔
81
            right_default_record,
136✔
82
            left_map: HashMap::new(),
136✔
83
            right_map: HashMap::new(),
136✔
84
            left_lifetime_map: LinkedHashMap::new(),
136✔
85
            right_lifetime_map: LinkedHashMap::new(),
136✔
86
        }
136✔
87
    }
136✔
88

89
    pub fn left_lookup_size(&self) -> usize {
×
90
        self.left_map.len()
×
91
    }
×
92

93
    pub fn right_lookup_size(&self) -> usize {
×
94
        self.right_map.len()
×
95
    }
×
96

97
    fn inner_join_from_left(
1,100✔
98
        &self,
1,100✔
99
        action: &JoinAction,
1,100✔
100
        join_key: u64,
1,100✔
101
        left_record: ProcessorRecordRef,
1,100✔
102
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecordRef)>> {
1,100✔
103
        let right_records = get_join_records(&self.right_map, join_key);
1,100✔
104

1,100✔
105
        let output_records = right_records
1,100✔
106
            .into_iter()
1,100✔
107
            .map(|right_record| {
1,100✔
108
                (
432✔
109
                    action.clone(),
432✔
110
                    join_records(left_record.clone(), right_record),
432✔
111
                )
432✔
112
            })
1,100✔
113
            .collect::<Vec<(JoinAction, ProcessorRecordRef)>>();
1,100✔
114

1,100✔
115
        Ok(output_records)
1,100✔
116
    }
1,100✔
117

118
    fn inner_join_from_right(
3,140✔
119
        &self,
3,140✔
120
        action: &JoinAction,
3,140✔
121
        join_key: u64,
3,140✔
122
        right_record: ProcessorRecordRef,
3,140✔
123
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecordRef)>> {
3,140✔
124
        let left_records = get_join_records(&self.left_map, join_key);
3,140✔
125

3,140✔
126
        let output_records = left_records
3,140✔
127
            .into_iter()
3,140✔
128
            .map(|left_record| {
3,140✔
129
                (
720✔
130
                    action.clone(),
720✔
131
                    join_records(left_record, right_record.clone()),
720✔
132
                )
720✔
133
            })
3,140✔
134
            .collect::<Vec<(JoinAction, ProcessorRecordRef)>>();
3,140✔
135

3,140✔
136
        Ok(output_records)
3,140✔
137
    }
3,140✔
138

139
    fn left_join_from_left(
196✔
140
        &self,
196✔
141
        action: &JoinAction,
196✔
142
        join_key: u64,
196✔
143
        left_record: ProcessorRecordRef,
196✔
144
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecordRef)>> {
196✔
145
        let right_records = get_join_records(&self.right_map, join_key);
196✔
146

196✔
147
        // no joining records on the right branch
196✔
148
        if right_records.is_empty() {
196✔
149
            let join_record = join_records(left_record, self.right_default_record.clone());
144✔
150
            return Ok(vec![(action.clone(), join_record)]);
144✔
151
        }
52✔
152

52✔
153
        let output_records = right_records
52✔
154
            .into_iter()
52✔
155
            .map(|right_record| {
52✔
156
                (
52✔
157
                    action.clone(),
52✔
158
                    join_records(left_record.clone(), right_record),
52✔
159
                )
52✔
160
            })
52✔
161
            .collect::<Vec<(JoinAction, ProcessorRecordRef)>>();
52✔
162

52✔
163
        Ok(output_records)
52✔
164
    }
196✔
165

166
    fn left_join_from_right(
148✔
167
        &self,
148✔
168
        action: &JoinAction,
148✔
169
        join_key: u64,
148✔
170
        right_record: ProcessorRecordRef,
148✔
171
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecordRef)>> {
148✔
172
        let left_records = get_join_records(&self.left_map, join_key);
148✔
173

148✔
174
        // if there are no matching records on the left branch, no records will be returned
148✔
175
        if left_records.is_empty() {
148✔
176
            return Ok(vec![]);
40✔
177
        }
108✔
178

108✔
179
        let mut output_records = vec![];
108✔
180

181
        for left_record in left_records.into_iter() {
108✔
182
            let right_matching_count = self.get_right_matching_count(action, &left_record)?;
108✔
183
            let join_record = join_records(left_record.clone(), right_record.clone());
108✔
184

108✔
185
            if right_matching_count > 0 {
108✔
186
                // if there are multiple matching records on the right branch, the left record will be just returned
64✔
187
                output_records.push((action.clone(), join_record));
64✔
188
            } else {
64✔
189
                match action {
44✔
190
                    JoinAction::Insert => {
36✔
191
                        let old_join_record =
36✔
192
                            join_records(left_record, self.right_default_record.clone());
36✔
193

36✔
194
                        // delete the "first left join" record
36✔
195
                        output_records.push((JoinAction::Delete, old_join_record));
36✔
196
                        // insert the new left join record
36✔
197
                        output_records.push((action.clone(), join_record));
36✔
198
                    }
36✔
199
                    JoinAction::Delete => {
8✔
200
                        let new_join_record =
8✔
201
                            join_records(left_record, self.right_default_record.clone());
8✔
202

8✔
203
                        output_records.push((JoinAction::Delete, join_record));
8✔
204
                        output_records.push((JoinAction::Insert, new_join_record));
8✔
205
                    }
8✔
206
                }
207
            }
208
        }
209
        Ok(output_records)
108✔
210
    }
148✔
211

212
    fn right_join_from_left(
144✔
213
        &self,
144✔
214
        action: &JoinAction,
144✔
215
        join_key: u64,
144✔
216
        left_record: ProcessorRecordRef,
144✔
217
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecordRef)>> {
144✔
218
        let right_records = get_join_records(&self.right_map, join_key);
144✔
219

144✔
220
        // if there are no matching records on the left branch, no records will be returned
144✔
221
        if right_records.is_empty() {
144✔
222
            return Ok(vec![]);
40✔
223
        }
104✔
224

104✔
225
        let mut output_records = vec![];
104✔
226

227
        for right_record in right_records.into_iter() {
104✔
228
            let left_matching_count = self.get_left_matching_count(action, &right_record)?;
104✔
229
            let join_record = join_records(left_record.clone(), right_record.clone());
104✔
230

104✔
231
            if left_matching_count > 0 {
104✔
232
                // if there are multiple matching records on the left branch, the right record will be just returned
×
233
                output_records.push((action.clone(), join_record));
×
234
            } else {
×
235
                match action {
104✔
236
                    JoinAction::Insert => {
80✔
237
                        let old_join_record =
80✔
238
                            join_records(self.left_default_record.clone(), right_record);
80✔
239

80✔
240
                        // delete the "first left join" record
80✔
241
                        output_records.push((JoinAction::Delete, old_join_record));
80✔
242
                        // insert the new left join record
80✔
243
                        output_records.push((action.clone(), join_record));
80✔
244
                    }
80✔
245
                    JoinAction::Delete => {
24✔
246
                        let new_join_record =
24✔
247
                            join_records(self.left_default_record.clone(), right_record);
24✔
248

24✔
249
                        output_records.push((JoinAction::Delete, join_record));
24✔
250
                        output_records.push((JoinAction::Insert, new_join_record));
24✔
251
                    }
24✔
252
                }
253
            }
254
        }
255
        Ok(output_records)
104✔
256
    }
144✔
257

258
    fn right_join_from_right(
132✔
259
        &self,
132✔
260
        action: &JoinAction,
132✔
261
        join_key: u64,
132✔
262
        right_record: ProcessorRecordRef,
132✔
263
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecordRef)>> {
132✔
264
        let left_records = get_join_records(&self.left_map, join_key);
132✔
265

132✔
266
        // no joining records on the right branch
132✔
267
        if left_records.is_empty() {
132✔
268
            let join_record = join_records(self.left_default_record.clone(), right_record);
80✔
269
            return Ok(vec![(action.clone(), join_record)]);
80✔
270
        }
52✔
271

52✔
272
        let output_records = left_records
52✔
273
            .into_iter()
52✔
274
            .map(|left_record| {
52✔
275
                (
52✔
276
                    action.clone(),
52✔
277
                    join_records(left_record, right_record.clone()),
52✔
278
                )
52✔
279
            })
52✔
280
            .collect::<Vec<(JoinAction, ProcessorRecordRef)>>();
52✔
281

52✔
282
        Ok(output_records)
52✔
283
    }
132✔
284

285
    fn get_left_matching_count(
104✔
286
        &self,
104✔
287
        action: &JoinAction,
104✔
288
        record: &ProcessorRecordRef,
104✔
289
    ) -> JoinResult<usize> {
104✔
290
        let join_key = get_record_key(record, &self.right_join_key_indexes);
104✔
291

104✔
292
        let mut matching_count = get_join_records(&self.left_map, join_key).len();
104✔
293
        if action == &JoinAction::Insert {
104✔
294
            matching_count -= 1;
80✔
295
        }
80✔
296
        Ok(matching_count)
104✔
297
    }
104✔
298

299
    fn get_right_matching_count(
108✔
300
        &self,
108✔
301
        action: &JoinAction,
108✔
302
        record: &ProcessorRecordRef,
108✔
303
    ) -> JoinResult<usize> {
108✔
304
        let join_key = get_record_key(record, &self.left_join_key_indexes);
108✔
305

108✔
306
        let mut matching_count = get_join_records(&self.right_map, join_key).len();
108✔
307
        if action == &JoinAction::Insert {
108✔
308
            matching_count -= 1;
84✔
309
        }
84✔
310
        Ok(matching_count)
108✔
311
    }
108✔
312

313
    pub fn evict_index(&mut self, from_branch: &JoinBranch, now: &Field) -> Vec<Field> {
×
314
        let (eviction_index, join_index) = match from_branch {
×
315
            JoinBranch::Left => (&self.left_lifetime_map, &mut self.left_map),
×
316
            JoinBranch::Right => (&self.right_lifetime_map, &mut self.right_map),
×
317
        };
318

319
        let mut old_instants = vec![];
×
320
        for (eviction_instant, join_index_keys) in eviction_index.iter() {
×
321
            if eviction_instant <= now {
×
322
                old_instants.push(eviction_instant.clone());
×
323
                for (join_key, primary_key) in join_index_keys {
×
324
                    evict_join_record(join_index, *join_key, *primary_key);
×
325
                }
×
326
            } else {
327
                break;
×
328
            }
329
        }
330
        old_instants
×
331
    }
×
332

333
    pub fn insert_evict_index(
×
334
        &mut self,
×
335
        from_branch: &JoinBranch,
×
336
        lifetime: Lifetime,
×
337
        join_key: u64,
×
338
        primary_key: u64,
×
339
    ) -> JoinResult<()> {
×
340
        let eviction_index = match from_branch {
×
341
            JoinBranch::Left => &mut self.left_lifetime_map,
×
342
            JoinBranch::Right => &mut self.right_lifetime_map,
×
343
        };
344

345
        let eviction_time = match (lifetime.reference,) {
×
346
            (Field::Timestamp(reference),) => {
×
347
                let eviction_time_result = reference.checked_add_signed(
×
348
                    chrono::Duration::nanoseconds(lifetime.duration.0.as_nanos() as i64),
×
349
                );
×
350

351
                if let Some(eviction_time) = eviction_time_result {
×
352
                    Field::Timestamp(eviction_time)
×
353
                } else {
354
                    return Err(JoinError::EvictionTimeOverflow);
×
355
                }
356
            }
357
            _ => return Err(JoinError::EvictionTypeOverflow),
×
358
        };
359

360
        if let Some(join_index_keys) = eviction_index.get_mut(&eviction_time) {
×
361
            join_index_keys.push((join_key, primary_key));
×
362
        } else {
×
363
            eviction_index.insert(eviction_time, vec![(join_key, primary_key)]);
×
364
        }
×
365

×
366
        Ok(())
×
367
    }
×
368

369
    pub fn clean_evict_index(&mut self, from_branch: &JoinBranch, old_instants: &[Field]) {
×
370
        let eviction_index = match from_branch {
×
371
            JoinBranch::Left => &mut self.left_lifetime_map,
×
372
            JoinBranch::Right => &mut self.right_lifetime_map,
×
373
        };
×
374
        for old_instant in old_instants {
×
375
            eviction_index.remove(old_instant);
×
376
        }
×
377
    }
×
378

×
379
    pub fn delete(
×
380
        &mut self,
×
381
        from: &JoinBranch,
382
        old: ProcessorRecordRef,
383
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecordRef)>> {
384
        match (&self.join_type, from) {
196✔
385
            (JoinType::Inner, JoinBranch::Left) => {
386
                let join_key = get_record_key(&old, &self.left_join_key_indexes);
12✔
387

12✔
388
                remove_join_record(
12✔
389
                    &mut self.left_map,
12✔
390
                    &self.left_primary_key_indexes,
12✔
391
                    join_key,
12✔
392
                    &old,
12✔
393
                );
12✔
394

×
395
                let records = self.inner_join_from_left(&JoinAction::Delete, join_key, old)?;
12✔
396
                Ok(records)
12✔
397
            }
398
            (JoinType::Inner, JoinBranch::Right) => {
×
399
                let join_key = get_record_key(&old, &self.right_join_key_indexes);
96✔
400

96✔
401
                remove_join_record(
96✔
402
                    &mut self.right_map,
96✔
403
                    &self.right_primary_key_indexes,
96✔
404
                    join_key,
96✔
405
                    &old,
96✔
406
                );
96✔
407

×
408
                let records = self.inner_join_from_right(&JoinAction::Delete, join_key, old)?;
96✔
409
                Ok(records)
96✔
410
            }
411
            (JoinType::LeftOuter, JoinBranch::Left) => {
×
412
                let join_key = get_record_key(&old, &self.left_join_key_indexes);
28✔
413
                remove_join_record(
28✔
414
                    &mut self.left_map,
28✔
415
                    &self.left_primary_key_indexes,
28✔
416
                    join_key,
28✔
417
                    &old,
28✔
418
                );
28✔
419
                let records = self.left_join_from_left(&JoinAction::Delete, join_key, old)?;
28✔
420
                Ok(records)
28✔
421
            }
×
422
            (JoinType::LeftOuter, JoinBranch::Right) => {
×
423
                let join_key = get_record_key(&old, &self.right_join_key_indexes);
24✔
424
                remove_join_record(
24✔
425
                    &mut self.right_map,
24✔
426
                    &self.right_primary_key_indexes,
24✔
427
                    join_key,
24✔
428
                    &old,
24✔
429
                );
24✔
430
                let records = self.left_join_from_right(&JoinAction::Delete, join_key, old)?;
24✔
431
                Ok(records)
24✔
432
            }
×
433
            (JoinType::RightOuter, JoinBranch::Left) => {
×
434
                let join_key = get_record_key(&old, &self.left_join_key_indexes);
24✔
435
                remove_join_record(
24✔
436
                    &mut self.left_map,
24✔
437
                    &self.left_primary_key_indexes,
24✔
438
                    join_key,
24✔
439
                    &old,
24✔
440
                );
24✔
441
                let records = self.right_join_from_left(&JoinAction::Delete, join_key, old)?;
24✔
442
                Ok(records)
24✔
443
            }
×
444
            (JoinType::RightOuter, JoinBranch::Right) => {
×
445
                let join_key = get_record_key(&old, &self.right_join_key_indexes);
12✔
446
                remove_join_record(
12✔
447
                    &mut self.right_map,
12✔
448
                    &self.right_primary_key_indexes,
12✔
449
                    join_key,
12✔
450
                    &old,
12✔
451
                );
12✔
452
                let records = self.right_join_from_right(&JoinAction::Delete, join_key, old)?;
12✔
453
                Ok(records)
12✔
454
            }
×
455
        }
×
456
    }
196✔
457

458
    pub fn insert(
459
        &mut self,
×
460
        from: &JoinBranch,
461
        new: ProcessorRecordRef,
462
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecordRef)>> {
463
        match (&self.join_type, from) {
4,664✔
464
            (JoinType::Inner, JoinBranch::Left) => {
465
                let join_key = get_record_key(&new, &self.left_join_key_indexes);
1,088✔
466
                let primary_key = get_record_key(&new, &self.left_primary_key_indexes);
1,088✔
467

1,088✔
468
                add_join_record(&mut self.left_map, join_key, primary_key, &new);
1,088✔
469

×
470
                if let Some(lifetime) = new.get_record().lifetime.clone() {
1,088✔
471
                    self.insert_evict_index(from, lifetime, join_key, primary_key)?
×
472
                }
1,088✔
473

×
474
                let records = self.inner_join_from_left(&JoinAction::Insert, join_key, new)?;
1,088✔
475
                Ok(records)
1,088✔
476
            }
477
            (JoinType::Inner, JoinBranch::Right) => {
×
478
                let join_key = get_record_key(&new, &self.right_join_key_indexes);
3,044✔
479
                let primary_key = get_record_key(&new, &self.right_primary_key_indexes);
3,044✔
480

3,044✔
481
                add_join_record(&mut self.right_map, join_key, primary_key, &new);
3,044✔
482

×
483
                if let Some(lifetime) = new.get_record().lifetime.clone() {
3,044✔
484
                    self.insert_evict_index(from, lifetime, join_key, primary_key)?
×
485
                }
3,044✔
486

×
487
                let records = self.inner_join_from_right(&JoinAction::Insert, join_key, new)?;
3,044✔
488

×
489
                Ok(records)
3,044✔
490
            }
×
491
            (JoinType::LeftOuter, JoinBranch::Left) => {
492
                let join_key = get_record_key(&new, &self.left_join_key_indexes);
168✔
493
                let primary_key = get_record_key(&new, &self.left_primary_key_indexes);
168✔
494

168✔
495
                add_join_record(&mut self.left_map, join_key, primary_key, &new);
168✔
496

×
497
                if let Some(lifetime) = new.get_record().lifetime.clone() {
168✔
498
                    self.insert_evict_index(from, lifetime, join_key, primary_key)?
×
499
                }
168✔
500

×
501
                let records = self.left_join_from_left(&JoinAction::Insert, join_key, new)?;
168✔
502

×
503
                Ok(records)
168✔
504
            }
×
505
            (JoinType::LeftOuter, JoinBranch::Right) => {
506
                let join_key = get_record_key(&new, &self.right_join_key_indexes);
124✔
507
                let primary_key = get_record_key(&new, &self.right_primary_key_indexes);
124✔
508

124✔
509
                add_join_record(&mut self.right_map, join_key, primary_key, &new);
124✔
510

×
511
                if let Some(lifetime) = new.get_record().lifetime.clone() {
124✔
512
                    self.insert_evict_index(from, lifetime, join_key, primary_key)?
×
513
                }
124✔
514

×
515
                let records = self.left_join_from_right(&JoinAction::Insert, join_key, new)?;
124✔
516

×
517
                Ok(records)
124✔
518
            }
×
519
            (JoinType::RightOuter, JoinBranch::Left) => {
520
                let join_key = get_record_key(&new, &self.left_join_key_indexes);
120✔
521
                let primary_key = get_record_key(&new, &self.left_primary_key_indexes);
120✔
522

120✔
523
                add_join_record(&mut self.left_map, join_key, primary_key, &new);
120✔
524

×
525
                if let Some(lifetime) = new.get_record().lifetime.clone() {
120✔
526
                    self.insert_evict_index(from, lifetime, join_key, primary_key)?
×
527
                }
120✔
528

×
529
                let records = self.right_join_from_left(&JoinAction::Insert, join_key, new)?;
120✔
530

×
531
                Ok(records)
120✔
532
            }
×
533
            (JoinType::RightOuter, JoinBranch::Right) => {
534
                let join_key = get_record_key(&new, &self.right_join_key_indexes);
120✔
535
                let primary_key = get_record_key(&new, &self.right_primary_key_indexes);
120✔
536

120✔
537
                add_join_record(&mut self.right_map, join_key, primary_key, &new);
120✔
538

×
539
                if let Some(lifetime) = new.get_record().lifetime.clone() {
120✔
540
                    self.insert_evict_index(from, lifetime, join_key, primary_key)?
×
541
                }
120✔
542

×
543
                let records = self.right_join_from_right(&JoinAction::Insert, join_key, new)?;
120✔
544

×
545
                Ok(records)
120✔
546
            }
×
547
        }
548
    }
4,664✔
549
}
550

551
fn add_join_record(
×
552
    join_map: &mut HashMap<u64, HashMap<u64, Vec<ProcessorRecordRef>>>,
553
    join_key: u64,
554
    record_key: u64,
555
    record: &ProcessorRecordRef,
556
) {
557
    if let Some(record_map) = join_map.get_mut(&join_key) {
4,664✔
558
        if let Some(record_vec) = record_map.get_mut(&record_key) {
520✔
559
            record_vec.push(record.clone());
36✔
560
        } else {
484✔
561
            record_map.insert(record_key, vec![record.clone()]);
484✔
562
        }
484✔
563
    } else {
4,144✔
564
        let mut record_map = HashMap::new();
4,144✔
565
        record_map.insert(record_key, vec![record.clone()]);
4,144✔
566
        join_map.insert(join_key, record_map);
4,144✔
567
    }
4,144✔
568
}
4,664✔
569

×
570
fn remove_join_record(
×
571
    join_map: &mut HashMap<u64, HashMap<u64, Vec<ProcessorRecordRef>>>,
×
572
    primary_key_indexes: &[usize],
573
    join_key: u64,
574
    record: &ProcessorRecordRef,
575
) {
576
    if let Some(record_map) = join_map.get_mut(&join_key) {
196✔
577
        let record_key = get_record_key(record, primary_key_indexes);
196✔
578
        if let Some(record_vec) = record_map.get_mut(&record_key) {
196✔
579
            record_vec.pop();
196✔
580
        }
196✔
581
    }
×
582
}
196✔
583

×
584
fn evict_join_record(
×
585
    join_map: &mut HashMap<u64, HashMap<u64, Vec<ProcessorRecordRef>>>,
×
586
    join_key: u64,
587
    primary_key: u64,
588
) {
589
    if let Some(record_map) = join_map.get_mut(&join_key) {
×
590
        if let Some(record_vec) = record_map.get_mut(&primary_key) {
×
591
            record_vec.pop();
×
592
        }
×
593
    }
×
594
}
×
595

×
596
fn get_record_key(record: &ProcessorRecordRef, key_indexes: &[usize]) -> u64 {
9,932✔
597
    let mut hasher = AHasher::default();
9,932✔
598
    for index in key_indexes.iter() {
29,248✔
599
        let val = record.get_record().get_field_by_index(*index as u32);
29,248✔
600
        val.hash(&mut hasher);
29,248✔
601
    }
29,248✔
602
    hasher.finish()
9,932✔
603
}
9,932✔
604

×
605
fn get_join_records(
5,072✔
606
    join_map: &HashMap<u64, HashMap<u64, Vec<ProcessorRecordRef>>>,
5,072✔
607
    join_key: u64,
5,072✔
608
) -> Vec<ProcessorRecordRef> {
5,072✔
609
    let join_map = join_map.get(&join_key);
5,072✔
610

×
611
    if let Some(records_map) = join_map {
5,072✔
612
        records_map.values().flatten().cloned().collect()
1,676✔
613
    } else {
×
614
        vec![]
3,396✔
615
    }
616
}
5,072✔
617

×
618
fn join_records(
1,840✔
619
    left_record: ProcessorRecordRef,
1,840✔
620
    right_record: ProcessorRecordRef,
1,840✔
621
) -> ProcessorRecordRef {
1,840✔
622
    let left_lifetime = left_record.get_record().lifetime.clone();
1,840✔
623
    let right_lifetime = right_record.get_record().lifetime.clone();
1,840✔
624

1,840✔
625
    let mut output_record = ProcessorRecord::new();
1,840✔
626
    output_record.extend_referenced_record(left_record);
1,840✔
627
    output_record.extend_referenced_record(right_record);
1,840✔
628

×
629
    if let Some(left_record_lifetime) = left_lifetime {
1,840✔
630
        if let Some(right_record_lifetime) = right_lifetime {
×
631
            if left_record_lifetime.reference > right_record_lifetime.reference {
×
632
                output_record.set_lifetime(Some(left_record_lifetime));
×
633
            } else {
×
634
                output_record.set_lifetime(Some(right_record_lifetime));
×
635
            }
×
636
        } else {
×
637
            output_record.set_lifetime(Some(left_record_lifetime));
×
638
        }
×
639
    } else if let Some(right_record_lifetime) = right_lifetime {
1,840✔
640
        output_record.set_lifetime(Some(right_record_lifetime));
×
641
    }
1,840✔
642

×
643
    ProcessorRecordRef::new(output_record)
1,840✔
644
}
1,840✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc