• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5978430793

25 Aug 2023 04:54PM UTC coverage: 75.575% (-0.7%) from 76.279%
5978430793

push

github

web-flow
Bump ordered-float from 3.4.0 to 3.9.1 (#1919)

Bumps [ordered-float](https://github.com/reem/rust-ordered-float) from 3.4.0 to 3.9.1.
- [Release notes](https://github.com/reem/rust-ordered-float/releases)
- [Commits](https://github.com/reem/rust-ordered-float/compare/v3.4.0...v3.9.1)

---
updated-dependencies:
- dependency-name: ordered-float
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

47272 of 62550 relevant lines covered (75.57%)

49425.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.49
/dozer-sql/src/pipeline/product/join/operator.rs
1
use crate::pipeline::utils::record_hashtable_key::{get_record_hash, RecordKey};
2
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
3
use dozer_types::{
4
    chrono,
5
    types::{Field, Lifetime, Record, Timestamp},
6
};
7
use linked_hash_map::LinkedHashMap;
8
use std::{collections::HashMap, fmt::Debug};
9

10
use crate::pipeline::errors::JoinError;
11

12
use super::JoinResult;
13

14
pub enum JoinBranch {
15
    Left,
16
    Right,
17
}
18

19
// pub trait JoinOperator: Send + Sync {
20
//     fn delete(&mut self, from: JoinBranch, old: &ProcessorRecord) -> JoinResult<Vec<Record>>;
21
//     fn insert(&mut self, from: JoinBranch, new: &ProcessorRecord) -> JoinResult<Vec<Record>>;
22
//     fn update(&mut self, from: JoinBranch, old: &ProcessorRecord, new: &ProcessorRecord) -> JoinResult<Vec<Record>>;
23
// }
24

25
#[derive(Clone, Debug, PartialEq, Eq)]
×
26
pub enum JoinType {
27
    Inner,
28
    LeftOuter,
29
    RightOuter,
×
30
}
31

32
#[derive(Clone, Debug, PartialEq, Eq)]
2,905✔
33
pub enum JoinAction {
34
    Insert,
35
    Delete,
36
}
×
37

38
type JoinKey = RecordKey;
39
type IndexKey = (JoinKey, u64); // (join_key, primary_key)
40

41
#[derive(Debug, Clone)]
×
42
pub struct JoinOperator {
43
    join_type: JoinType,
44

×
45
    left_join_key_indexes: Vec<usize>,
46
    right_join_key_indexes: Vec<usize>,
47

48
    left_primary_key_indexes: Vec<usize>,
49
    right_primary_key_indexes: Vec<usize>,
50

51
    left_default_record: ProcessorRecord,
52
    right_default_record: ProcessorRecord,
53

54
    left_map: HashMap<JoinKey, HashMap<u64, Vec<ProcessorRecord>>>,
55
    right_map: HashMap<JoinKey, HashMap<u64, Vec<ProcessorRecord>>>,
56

57
    left_lifetime_map: LinkedHashMap<Timestamp, Vec<IndexKey>>,
58
    right_lifetime_map: LinkedHashMap<Timestamp, Vec<IndexKey>>,
59

60
    accurate_keys: bool,
61
}
62

63
impl JoinOperator {
64
    pub fn new(
238✔
65
        join_type: JoinType,
238✔
66
        (left_join_key_indexes, right_join_key_indexes): (Vec<usize>, Vec<usize>),
238✔
67
        (left_primary_key_indexes, right_primary_key_indexes): (Vec<usize>, Vec<usize>),
238✔
68
        (left_default_record, right_default_record): (ProcessorRecord, ProcessorRecord),
238✔
69
        enable_probabilistic_optimizations: bool,
238✔
70
    ) -> Self {
238✔
71
        let accurate_keys = !enable_probabilistic_optimizations;
238✔
72
        Self {
238✔
73
            join_type,
238✔
74
            left_join_key_indexes,
238✔
75
            right_join_key_indexes,
238✔
76
            left_primary_key_indexes,
238✔
77
            right_primary_key_indexes,
238✔
78
            left_default_record,
238✔
79
            right_default_record,
238✔
80
            left_map: HashMap::new(),
238✔
81
            right_map: HashMap::new(),
238✔
82
            left_lifetime_map: LinkedHashMap::new(),
238✔
83
            right_lifetime_map: LinkedHashMap::new(),
238✔
84
            accurate_keys,
238✔
85
        }
238✔
86
    }
238✔
87

×
88
    pub fn left_lookup_size(&self) -> usize {
×
89
        self.left_map.len()
×
90
    }
×
91

×
92
    pub fn right_lookup_size(&self) -> usize {
×
93
        self.right_map.len()
×
94
    }
×
95

×
96
    fn inner_join_from_left(
1,925✔
97
        &self,
1,925✔
98
        action: &JoinAction,
1,925✔
99
        join_key: &JoinKey,
1,925✔
100
        left_record: ProcessorRecord,
1,925✔
101
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecord)>> {
1,925✔
102
        let right_records = get_join_records(&self.right_map, join_key);
1,925✔
103

1,925✔
104
        let output_records = right_records
1,925✔
105
            .into_iter()
1,925✔
106
            .map(|right_record| {
1,925✔
107
                (
777✔
108
                    action.clone(),
777✔
109
                    join_records(left_record.clone(), right_record),
777✔
110
                )
777✔
111
            })
1,925✔
112
            .collect::<Vec<(JoinAction, ProcessorRecord)>>();
1,925✔
113

1,925✔
114
        Ok(output_records)
1,925✔
115
    }
1,925✔
116

×
117
    fn inner_join_from_right(
5,495✔
118
        &self,
5,495✔
119
        action: &JoinAction,
5,495✔
120
        join_key: &JoinKey,
5,495✔
121
        right_record: ProcessorRecord,
5,495✔
122
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecord)>> {
5,495✔
123
        let left_records = get_join_records(&self.left_map, join_key);
5,495✔
124

5,495✔
125
        let output_records = left_records
5,495✔
126
            .into_iter()
5,495✔
127
            .map(|left_record| {
5,495✔
128
                (
1,239✔
129
                    action.clone(),
1,239✔
130
                    join_records(left_record, right_record.clone()),
1,239✔
131
                )
1,239✔
132
            })
5,495✔
133
            .collect::<Vec<(JoinAction, ProcessorRecord)>>();
5,495✔
134

5,495✔
135
        Ok(output_records)
5,495✔
136
    }
5,495✔
137

×
138
    fn left_join_from_left(
343✔
139
        &self,
343✔
140
        action: &JoinAction,
343✔
141
        join_key: &JoinKey,
343✔
142
        left_record: ProcessorRecord,
343✔
143
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecord)>> {
343✔
144
        let right_records = get_join_records(&self.right_map, join_key);
343✔
145

343✔
146
        // no joining records on the right branch
343✔
147
        if right_records.is_empty() {
343✔
148
            let join_record = join_records(left_record, self.right_default_record.clone());
252✔
149
            return Ok(vec![(action.clone(), join_record)]);
252✔
150
        }
91✔
151

91✔
152
        let output_records = right_records
91✔
153
            .into_iter()
91✔
154
            .map(|right_record| {
91✔
155
                (
91✔
156
                    action.clone(),
91✔
157
                    join_records(left_record.clone(), right_record),
91✔
158
                )
91✔
159
            })
91✔
160
            .collect::<Vec<(JoinAction, ProcessorRecord)>>();
91✔
161

91✔
162
        Ok(output_records)
91✔
163
    }
343✔
164

×
165
    fn left_join_from_right(
259✔
166
        &self,
259✔
167
        action: &JoinAction,
259✔
168
        join_key: &JoinKey,
259✔
169
        record_store: &ProcessorRecordStore,
259✔
170
        right_record: ProcessorRecord,
259✔
171
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecord)>> {
259✔
172
        let left_records = get_join_records(&self.left_map, join_key);
259✔
173

259✔
174
        // if there are no matching records on the left branch, no records will be returned
259✔
175
        if left_records.is_empty() {
259✔
176
            return Ok(vec![]);
70✔
177
        }
189✔
178

189✔
179
        let mut output_records = vec![];
189✔
180

×
181
        for left_record in left_records.into_iter() {
189✔
182
            let left_record_decoded = record_store.load_record(&left_record)?;
189✔
183
            let right_matching_count =
189✔
184
                self.get_right_matching_count(action, &left_record_decoded)?;
189✔
185
            let join_record = join_records(left_record.clone(), right_record.clone());
189✔
186

189✔
187
            if right_matching_count > 0 {
189✔
188
                // if there are multiple matching records on the right branch, the left record will be just returned
112✔
189
                output_records.push((action.clone(), join_record));
112✔
190
            } else {
112✔
191
                match action {
77✔
192
                    JoinAction::Insert => {
63✔
193
                        let old_join_record =
63✔
194
                            join_records(left_record, self.right_default_record.clone());
63✔
195

63✔
196
                        // delete the "first left join" record
63✔
197
                        output_records.push((JoinAction::Delete, old_join_record));
63✔
198
                        // insert the new left join record
63✔
199
                        output_records.push((action.clone(), join_record));
63✔
200
                    }
63✔
201
                    JoinAction::Delete => {
14✔
202
                        let new_join_record =
14✔
203
                            join_records(left_record, self.right_default_record.clone());
14✔
204

14✔
205
                        output_records.push((JoinAction::Delete, join_record));
14✔
206
                        output_records.push((JoinAction::Insert, new_join_record));
14✔
207
                    }
14✔
208
                }
×
209
            }
210
        }
211
        Ok(output_records)
189✔
212
    }
259✔
213

×
214
    fn right_join_from_left(
252✔
215
        &self,
252✔
216
        action: &JoinAction,
252✔
217
        join_key: &JoinKey,
252✔
218
        record_store: &ProcessorRecordStore,
252✔
219
        left_record: ProcessorRecord,
252✔
220
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecord)>> {
252✔
221
        let right_records = get_join_records(&self.right_map, join_key);
252✔
222

252✔
223
        // if there are no matching records on the left branch, no records will be returned
252✔
224
        if right_records.is_empty() {
252✔
225
            return Ok(vec![]);
70✔
226
        }
182✔
227

182✔
228
        let mut output_records = vec![];
182✔
229

×
230
        for right_record in right_records.into_iter() {
182✔
231
            let right_record_decoded = record_store.load_record(&right_record)?;
182✔
232
            let left_matching_count =
182✔
233
                self.get_left_matching_count(action, &right_record_decoded)?;
182✔
234
            let join_record = join_records(left_record.clone(), right_record.clone());
182✔
235

182✔
236
            if left_matching_count > 0 {
182✔
237
                // if there are multiple matching records on the left branch, the right record will be just returned
×
238
                output_records.push((action.clone(), join_record));
×
239
            } else {
×
240
                match action {
182✔
241
                    JoinAction::Insert => {
140✔
242
                        let old_join_record =
140✔
243
                            join_records(self.left_default_record.clone(), right_record);
140✔
244

140✔
245
                        // delete the "first left join" record
140✔
246
                        output_records.push((JoinAction::Delete, old_join_record));
140✔
247
                        // insert the new left join record
140✔
248
                        output_records.push((action.clone(), join_record));
140✔
249
                    }
140✔
250
                    JoinAction::Delete => {
42✔
251
                        let new_join_record =
42✔
252
                            join_records(self.left_default_record.clone(), right_record);
42✔
253

42✔
254
                        output_records.push((JoinAction::Delete, join_record));
42✔
255
                        output_records.push((JoinAction::Insert, new_join_record));
42✔
256
                    }
42✔
257
                }
×
258
            }
259
        }
260
        Ok(output_records)
182✔
261
    }
252✔
262

×
263
    fn right_join_from_right(
231✔
264
        &self,
231✔
265
        action: &JoinAction,
231✔
266
        join_key: &JoinKey,
231✔
267
        right_record: ProcessorRecord,
231✔
268
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecord)>> {
231✔
269
        let left_records = get_join_records(&self.left_map, join_key);
231✔
270

231✔
271
        // no joining records on the right branch
231✔
272
        if left_records.is_empty() {
231✔
273
            let join_record = join_records(self.left_default_record.clone(), right_record);
140✔
274
            return Ok(vec![(action.clone(), join_record)]);
140✔
275
        }
91✔
276

91✔
277
        let output_records = left_records
91✔
278
            .into_iter()
91✔
279
            .map(|left_record| {
91✔
280
                (
91✔
281
                    action.clone(),
91✔
282
                    join_records(left_record, right_record.clone()),
91✔
283
                )
91✔
284
            })
91✔
285
            .collect::<Vec<(JoinAction, ProcessorRecord)>>();
91✔
286

91✔
287
        Ok(output_records)
91✔
288
    }
231✔
289

×
290
    fn get_left_matching_count(&self, action: &JoinAction, record: &Record) -> JoinResult<usize> {
182✔
291
        let join_key = self.get_join_key(record, &self.right_join_key_indexes);
182✔
292

182✔
293
        let mut matching_count = get_join_records(&self.left_map, &join_key).len();
182✔
294
        if action == &JoinAction::Insert {
182✔
295
            matching_count -= 1;
140✔
296
        }
140✔
297
        Ok(matching_count)
182✔
298
    }
182✔
299

×
300
    fn get_right_matching_count(&self, action: &JoinAction, record: &Record) -> JoinResult<usize> {
189✔
301
        let join_key = self.get_join_key(record, &self.left_join_key_indexes);
189✔
302

189✔
303
        let mut matching_count = get_join_records(&self.right_map, &join_key).len();
189✔
304
        if action == &JoinAction::Insert {
189✔
305
            matching_count -= 1;
147✔
306
        }
147✔
307
        Ok(matching_count)
189✔
308
    }
189✔
309

×
310
    pub fn evict_index(&mut self, from_branch: &JoinBranch, now: &Timestamp) -> Vec<Timestamp> {
×
311
        let (eviction_index, join_index) = match from_branch {
×
312
            JoinBranch::Left => (&self.left_lifetime_map, &mut self.left_map),
×
313
            JoinBranch::Right => (&self.right_lifetime_map, &mut self.right_map),
×
314
        };
×
315

316
        let mut old_instants = vec![];
×
317
        for (eviction_instant, join_index_keys) in eviction_index.iter() {
×
318
            if eviction_instant <= now {
×
319
                old_instants.push(*eviction_instant);
×
320
                for (join_key, primary_key) in join_index_keys {
×
321
                    evict_join_record(join_index, join_key, *primary_key);
×
322
                }
×
323
            } else {
×
324
                break;
×
325
            }
×
326
        }
327
        old_instants
×
328
    }
×
329

×
330
    pub fn insert_evict_index(
×
331
        &mut self,
×
332
        from_branch: &JoinBranch,
×
333
        lifetime: Lifetime,
×
334
        join_key: JoinKey,
×
335
        primary_key: u64,
×
336
    ) -> JoinResult<()> {
×
337
        let eviction_index = match from_branch {
×
338
            JoinBranch::Left => &mut self.left_lifetime_map,
×
339
            JoinBranch::Right => &mut self.right_lifetime_map,
×
340
        };
×
341

342
        let eviction_time = {
×
343
            let eviction_time_result =
×
344
                lifetime
×
345
                    .reference
×
346
                    .checked_add_signed(chrono::Duration::nanoseconds(
×
347
                        lifetime.duration.as_nanos() as i64,
×
348
                    ));
×
349

×
350
            if let Some(eviction_time) = eviction_time_result {
×
351
                eviction_time
×
352
            } else {
×
353
                return Err(JoinError::EvictionTimeOverflow);
×
354
            }
×
355
        };
356

357
        if let Some(join_index_keys) = eviction_index.get_mut(&eviction_time) {
×
358
            join_index_keys.push((join_key, primary_key));
×
359
        } else {
×
360
            eviction_index.insert(eviction_time, vec![(join_key, primary_key)]);
×
361
        }
×
362

×
363
        Ok(())
×
364
    }
×
365

×
366
    pub fn clean_evict_index(&mut self, from_branch: &JoinBranch, old_instants: &[Timestamp]) {
×
367
        let eviction_index = match from_branch {
×
368
            JoinBranch::Left => &mut self.left_lifetime_map,
×
369
            JoinBranch::Right => &mut self.right_lifetime_map,
×
370
        };
×
371
        for old_instant in old_instants {
×
372
            eviction_index.remove(old_instant);
×
373
        }
×
374
    }
×
375

×
376
    pub fn delete(
377
        &mut self,
378
        from: &JoinBranch,
379
        record_store: &ProcessorRecordStore,
380
        old: ProcessorRecord,
381
        old_decoded: Record,
382
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecord)>> {
383
        match (&self.join_type, from) {
343✔
384
            (JoinType::Inner, JoinBranch::Left) => {
×
385
                let join_key = self.get_join_key(&old_decoded, &self.left_join_key_indexes);
21✔
386

21✔
387
                remove_join_record(
21✔
388
                    &mut self.left_map,
21✔
389
                    &self.left_primary_key_indexes,
21✔
390
                    &join_key,
21✔
391
                    &old_decoded,
21✔
392
                );
21✔
393

×
394
                let records = self.inner_join_from_left(&JoinAction::Delete, &join_key, old)?;
21✔
395
                Ok(records)
21✔
396
            }
×
397
            (JoinType::Inner, JoinBranch::Right) => {
398
                let join_key = self.get_join_key(&old_decoded, &self.right_join_key_indexes);
168✔
399

168✔
400
                remove_join_record(
168✔
401
                    &mut self.right_map,
168✔
402
                    &self.right_primary_key_indexes,
168✔
403
                    &join_key,
168✔
404
                    &old_decoded,
168✔
405
                );
168✔
406

×
407
                let records = self.inner_join_from_right(&JoinAction::Delete, &join_key, old)?;
168✔
408
                Ok(records)
168✔
409
            }
×
410
            (JoinType::LeftOuter, JoinBranch::Left) => {
411
                let join_key = self.get_join_key(&old_decoded, &self.left_join_key_indexes);
49✔
412
                remove_join_record(
49✔
413
                    &mut self.left_map,
49✔
414
                    &self.left_primary_key_indexes,
49✔
415
                    &join_key,
49✔
416
                    &old_decoded,
49✔
417
                );
49✔
418
                let records = self.left_join_from_left(&JoinAction::Delete, &join_key, old)?;
49✔
419
                Ok(records)
49✔
420
            }
×
421
            (JoinType::LeftOuter, JoinBranch::Right) => {
422
                let join_key = self.get_join_key(&old_decoded, &self.right_join_key_indexes);
42✔
423
                remove_join_record(
42✔
424
                    &mut self.right_map,
42✔
425
                    &self.right_primary_key_indexes,
42✔
426
                    &join_key,
42✔
427
                    &old_decoded,
42✔
428
                );
42✔
429
                let records =
42✔
430
                    self.left_join_from_right(&JoinAction::Delete, &join_key, record_store, old)?;
42✔
431
                Ok(records)
42✔
432
            }
×
433
            (JoinType::RightOuter, JoinBranch::Left) => {
434
                let join_key = self.get_join_key(&old_decoded, &self.left_join_key_indexes);
42✔
435
                remove_join_record(
42✔
436
                    &mut self.left_map,
42✔
437
                    &self.left_primary_key_indexes,
42✔
438
                    &join_key,
42✔
439
                    &old_decoded,
42✔
440
                );
42✔
441
                let records =
42✔
442
                    self.right_join_from_left(&JoinAction::Delete, &join_key, record_store, old)?;
42✔
443
                Ok(records)
42✔
444
            }
×
445
            (JoinType::RightOuter, JoinBranch::Right) => {
446
                let join_key = self.get_join_key(&old_decoded, &self.right_join_key_indexes);
21✔
447
                remove_join_record(
21✔
448
                    &mut self.right_map,
21✔
449
                    &self.right_primary_key_indexes,
21✔
450
                    &join_key,
21✔
451
                    &old_decoded,
21✔
452
                );
21✔
453
                let records = self.right_join_from_right(&JoinAction::Delete, &join_key, old)?;
21✔
454
                Ok(records)
21✔
455
            }
×
456
        }
457
    }
343✔
458

×
459
    pub fn insert(
460
        &mut self,
461
        from: &JoinBranch,
462
        record_store: &ProcessorRecordStore,
463
        new: ProcessorRecord,
464
        new_decoded: Record,
465
    ) -> JoinResult<Vec<(JoinAction, ProcessorRecord)>> {
466
        match (&self.join_type, from) {
8,162✔
467
            (JoinType::Inner, JoinBranch::Left) => {
×
468
                let join_key = self.get_join_key(&new_decoded, &self.left_join_key_indexes);
1,904✔
469
                let primary_key = get_record_key_hash(&new_decoded, &self.left_primary_key_indexes);
1,904✔
470

1,904✔
471
                add_join_record(&mut self.left_map, join_key.clone(), primary_key, &new);
1,904✔
472

×
473
                if let Some(lifetime) = new.get_lifetime() {
1,904✔
474
                    self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)?
×
475
                }
1,904✔
476

×
477
                let records = self.inner_join_from_left(&JoinAction::Insert, &join_key, new)?;
1,904✔
478
                Ok(records)
1,904✔
479
            }
×
480
            (JoinType::Inner, JoinBranch::Right) => {
481
                let join_key = self.get_join_key(&new_decoded, &self.right_join_key_indexes);
5,327✔
482
                let primary_key =
5,327✔
483
                    get_record_key_hash(&new_decoded, &self.right_primary_key_indexes);
5,327✔
484

5,327✔
485
                add_join_record(&mut self.right_map, join_key.clone(), primary_key, &new);
5,327✔
486

487
                if let Some(lifetime) = new.get_lifetime() {
5,327✔
488
                    self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)?
×
489
                }
5,327✔
490

491
                let records = self.inner_join_from_right(&JoinAction::Insert, &join_key, new)?;
5,327✔
492

493
                Ok(records)
5,327✔
494
            }
495
            (JoinType::LeftOuter, JoinBranch::Left) => {
496
                let join_key = self.get_join_key(&new_decoded, &self.left_join_key_indexes);
294✔
497
                let primary_key = get_record_key_hash(&new_decoded, &self.left_primary_key_indexes);
294✔
498

294✔
499
                add_join_record(&mut self.left_map, join_key.clone(), primary_key, &new);
294✔
500

501
                if let Some(lifetime) = new.get_lifetime() {
294✔
502
                    self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)?
×
503
                }
294✔
504

505
                let records = self.left_join_from_left(&JoinAction::Insert, &join_key, new)?;
294✔
506

507
                Ok(records)
294✔
508
            }
509
            (JoinType::LeftOuter, JoinBranch::Right) => {
510
                let join_key = self.get_join_key(&new_decoded, &self.right_join_key_indexes);
217✔
511
                let primary_key =
217✔
512
                    get_record_key_hash(&new_decoded, &self.right_primary_key_indexes);
217✔
513

217✔
514
                add_join_record(&mut self.right_map, join_key.clone(), primary_key, &new);
217✔
515

×
516
                if let Some(lifetime) = new.get_lifetime() {
217✔
517
                    self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)?
×
518
                }
217✔
519

×
520
                let records =
217✔
521
                    self.left_join_from_right(&JoinAction::Insert, &join_key, record_store, new)?;
217✔
522

×
523
                Ok(records)
217✔
524
            }
525
            (JoinType::RightOuter, JoinBranch::Left) => {
×
526
                let join_key = self.get_join_key(&new_decoded, &self.left_join_key_indexes);
210✔
527
                let primary_key = get_record_key_hash(&new_decoded, &self.left_primary_key_indexes);
210✔
528

210✔
529
                add_join_record(&mut self.left_map, join_key.clone(), primary_key, &new);
210✔
530

×
531
                if let Some(lifetime) = new.get_lifetime() {
210✔
532
                    self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)?
×
533
                }
210✔
534

×
535
                let records =
210✔
536
                    self.right_join_from_left(&JoinAction::Insert, &join_key, record_store, new)?;
210✔
537

×
538
                Ok(records)
210✔
539
            }
540
            (JoinType::RightOuter, JoinBranch::Right) => {
×
541
                let join_key = self.get_join_key(&new_decoded, &self.right_join_key_indexes);
210✔
542
                let primary_key =
210✔
543
                    get_record_key_hash(&new_decoded, &self.right_primary_key_indexes);
210✔
544

210✔
545
                add_join_record(&mut self.right_map, join_key.clone(), primary_key, &new);
210✔
546

×
547
                if let Some(lifetime) = new.get_lifetime() {
210✔
548
                    self.insert_evict_index(from, lifetime, join_key.clone(), primary_key)?
×
549
                }
210✔
550

551
                let records = self.right_join_from_right(&JoinAction::Insert, &join_key, new)?;
210✔
552

553
                Ok(records)
210✔
554
            }
×
555
        }
556
    }
8,162✔
557

558
    fn get_join_key(&self, record: &Record, key_indexes: &[usize]) -> JoinKey {
8,876✔
559
        if self.accurate_keys {
8,876✔
560
            JoinKey::Accurate(get_record_key_fields(record, key_indexes))
8,876✔
561
        } else {
562
            JoinKey::Hash(get_record_key_hash(record, key_indexes))
×
563
        }
×
564
    }
8,876✔
565
}
×
566

×
567
fn add_join_record(
×
568
    join_map: &mut HashMap<JoinKey, HashMap<u64, Vec<ProcessorRecord>>>,
×
569
    join_key: JoinKey,
×
570
    record_key: u64,
×
571
    record: &ProcessorRecord,
×
572
) {
×
573
    if let Some(record_map) = join_map.get_mut(&join_key) {
8,162✔
574
        if let Some(record_vec) = record_map.get_mut(&record_key) {
910✔
575
            record_vec.push(record.clone());
63✔
576
        } else {
847✔
577
            record_map.insert(record_key, vec![record.clone()]);
847✔
578
        }
847✔
579
    } else {
7,252✔
580
        let mut record_map = HashMap::new();
7,252✔
581
        record_map.insert(record_key, vec![record.clone()]);
7,252✔
582
        join_map.insert(join_key, record_map);
7,252✔
583
    }
7,252✔
584
}
8,162✔
585

×
586
fn remove_join_record(
×
587
    join_map: &mut HashMap<JoinKey, HashMap<u64, Vec<ProcessorRecord>>>,
×
588
    primary_key_indexes: &[usize],
×
589
    join_key: &JoinKey,
590
    record: &Record,
591
) {
592
    if let Some(record_map) = join_map.get_mut(join_key) {
343✔
593
        let record_key = get_record_key_hash(record, primary_key_indexes);
343✔
594
        if let Some(record_vec) = record_map.get_mut(&record_key) {
343✔
595
            record_vec.pop();
343✔
596
        }
343✔
597
    }
×
598
}
343✔
599

×
600
fn evict_join_record(
×
601
    join_map: &mut HashMap<JoinKey, HashMap<u64, Vec<ProcessorRecord>>>,
602
    join_key: &JoinKey,
×
603
    primary_key: u64,
×
604
) {
×
605
    if let Some(record_map) = join_map.get_mut(join_key) {
×
606
        if let Some(record_vec) = record_map.get_mut(&primary_key) {
×
607
            record_vec.pop();
×
608
        }
×
609
    }
×
610
}
×
611

×
612
fn get_record_key_hash(record: &Record, key_indexes: &[usize]) -> u64 {
8,505✔
613
    let key_fields = key_indexes.iter().map(|i| &record.values[*i]);
42,308✔
614
    get_record_hash(key_fields)
8,505✔
615
}
8,505✔
616

617
fn get_record_key_fields(record: &Record, key_indexes: &[usize]) -> Vec<Field> {
8,876✔
618
    key_indexes
8,876✔
619
        .iter()
8,876✔
620
        .map(|i| record.values[*i].clone())
8,876✔
621
        .collect()
8,876✔
622
}
8,876✔
623

624
fn get_join_records(
8,876✔
625
    join_map: &HashMap<JoinKey, HashMap<u64, Vec<ProcessorRecord>>>,
8,876✔
626
    join_key: &JoinKey,
8,876✔
627
) -> Vec<ProcessorRecord> {
8,876✔
628
    let join_map = join_map.get(join_key);
8,876✔
629

×
630
    if let Some(records_map) = join_map {
8,876✔
631
        records_map.values().flatten().cloned().collect()
2,940✔
632
    } else {
×
633
        vec![]
5,936✔
634
    }
×
635
}
8,876✔
636

×
637
fn join_records(left_record: ProcessorRecord, right_record: ProcessorRecord) -> ProcessorRecord {
3,220✔
638
    let left_lifetime = left_record.get_lifetime();
3,220✔
639
    let right_lifetime = right_record.get_lifetime();
3,220✔
640

3,220✔
641
    let mut output_record = ProcessorRecord::new();
3,220✔
642
    output_record.extend(left_record);
3,220✔
643
    output_record.extend(right_record);
3,220✔
644

×
645
    if let Some(left_record_lifetime) = left_lifetime {
3,220✔
646
        if let Some(right_record_lifetime) = right_lifetime {
×
647
            if left_record_lifetime.reference > right_record_lifetime.reference {
×
648
                output_record.set_lifetime(Some(left_record_lifetime));
×
649
            } else {
×
650
                output_record.set_lifetime(Some(right_record_lifetime));
×
651
            }
×
652
        } else {
×
653
            output_record.set_lifetime(Some(left_record_lifetime));
×
654
        }
×
655
    } else if let Some(right_record_lifetime) = right_lifetime {
3,220✔
656
        output_record.set_lifetime(Some(right_record_lifetime));
×
657
    }
3,220✔
658

659
    output_record
3,220✔
660
}
3,220✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc