• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4105700744

pending completion
4105700744

Pull #814

github

GitHub
Merge d70d4d25f into 016b3ada5
Pull Request #814: Bump clap from 4.0.32 to 4.1.4

23457 of 37651 relevant lines covered (62.3%)

44725.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.62
/dozer-sql/src/pipeline/product/join.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    dag::{errors::ExecutionError, node::PortHandle, record_store::RecordReader},
5
    storage::{
6
        errors::StorageError, lmdb_storage::SharedTransaction,
7
        prefix_transaction::PrefixTransaction,
8
    },
9
};
10
use dozer_types::{
11
    errors::types::TypeError,
12
    types::{Record, Schema},
13
};
14
use lmdb::Database;
15

16
#[derive(Clone, Debug, PartialEq, Eq)]
341,684✔
17
pub enum JoinAction {
18
    Insert,
19
    Delete,
20
    // Update,
21
}
22

23
#[derive(Clone, Debug, PartialEq, Eq)]
48✔
24
pub enum JoinOperatorType {
25
    Inner,
26
    LeftOuter,
27
    RightOuter,
28
}
29

30
#[derive(Clone, Debug, PartialEq, Eq)]
×
31
pub struct JoinConstraint {
32
    pub left_key_index: usize,
33
    pub right_key_index: usize,
34
}
35

36
#[derive(Clone, Debug)]
233✔
37
pub enum JoinSource {
38
    Table(JoinTable),
39
    Join(JoinOperator),
40
}
41

42
impl JoinSource {
43
    pub fn execute(
225,823✔
44
        &self,
225,823✔
45
        action: JoinAction,
225,823✔
46
        from_port: PortHandle,
225,823✔
47
        record: &Record,
225,823✔
48
        database: &Database,
225,823✔
49
        transaction: &SharedTransaction,
225,823✔
50
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
225,823✔
51
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
225,823✔
52
        match self {
225,823✔
53
            JoinSource::Table(table) => table.execute(action, from_port, record),
108,523✔
54
            JoinSource::Join(join) => {
117,300✔
55
                join.execute(action, from_port, record, database, transaction, readers)
117,300✔
56
            }
57
        }
58
    }
225,823✔
59

60
    pub fn lookup(
148,660✔
61
        &self,
148,660✔
62
        lookup_key: &[u8],
148,660✔
63
        database: &Database,
148,660✔
64
        transaction: &SharedTransaction,
148,660✔
65
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
148,660✔
66
    ) -> Result<Vec<(Record, Vec<u8>)>, ExecutionError> {
148,660✔
67
        match self {
148,660✔
68
            JoinSource::Table(table) => table.lookup(lookup_key, readers),
128,760✔
69
            JoinSource::Join(join) => join.lookup(lookup_key, database, transaction, readers),
19,900✔
70
        }
71
    }
148,660✔
72

73
    pub fn get_output_schema(&self) -> Schema {
22,668✔
74
        match self {
22,668✔
75
            JoinSource::Table(table) => table.schema.clone(),
22,660✔
76
            JoinSource::Join(join) => join.schema.clone(),
8✔
77
        }
78
    }
22,668✔
79

80
    pub fn get_sources(&self) -> Vec<PortHandle> {
261,996✔
81
        match self {
261,996✔
82
            JoinSource::Table(table) => vec![table.get_source()],
235,336✔
83
            JoinSource::Join(join) => join.get_sources(),
26,660✔
84
        }
85
    }
261,996✔
86
}
87

88
#[derive(Clone, Debug)]
225✔
89
pub struct JoinTable {
90
    port: PortHandle,
91

92
    pub schema: Schema,
93
}
94

95
impl JoinTable {
96
    pub fn new(port: PortHandle, schema: Schema) -> Self {
177✔
97
        Self { port, schema }
177✔
98
    }
177✔
99

100
    pub fn get_source(&self) -> PortHandle {
235,336✔
101
        self.port
235,336✔
102
    }
235,336✔
103

104
    fn execute(
108,523✔
105
        &self,
108,523✔
106
        action: JoinAction,
108,523✔
107
        from_port: PortHandle,
108,523✔
108
        record: &Record,
108,523✔
109
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
108,523✔
110
        if self.port == from_port {
108,523✔
111
            let lookup_key = self.encode_lookup_key(record, &self.schema)?;
108,523✔
112
            Ok(vec![(action, record.clone(), lookup_key)])
108,523✔
113
        } else {
×
114
            Err(ExecutionError::InvalidPortHandle(self.port))
×
115
        }
×
116
    }
108,523✔
117

×
118
    fn lookup(
128,760✔
119
        &self,
128,760✔
120
        lookup_key: &[u8],
128,760✔
121
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
128,760✔
122
    ) -> Result<Vec<(Record, Vec<u8>)>, ExecutionError> {
128,760✔
123
        let reader = readers
128,760✔
124
            .get(&self.port)
128,760✔
125
            .ok_or(ExecutionError::InvalidPortHandle(self.port))?;
128,760✔
126

×
127
        let (version, id) = self.decode_lookup_key(lookup_key);
128,760✔
128

128,760✔
129
        let mut output_records = vec![];
128,760✔
130
        if let Some(record) = reader.get(&id, version)? {
128,760✔
131
            output_records.push((record, lookup_key.to_vec()));
128,760✔
132
        }
128,760✔
133
        Ok(output_records)
128,760✔
134
    }
128,760✔
135

×
136
    fn encode_lookup_key(&self, record: &Record, schema: &Schema) -> Result<Vec<u8>, TypeError> {
108,523✔
137
        let mut lookup_key = Vec::with_capacity(64);
108,523✔
138
        if let Some(version) = record.version {
108,523✔
139
            lookup_key.extend_from_slice(&version.to_be_bytes());
108,364✔
140
        } else {
108,523✔
141
            lookup_key.extend_from_slice(&[0_u8; 4]);
159✔
142
        }
159✔
143

×
144
        for key_index in schema.primary_index.iter() {
108,523✔
145
            let key_value = record.get_value(*key_index)?;
108,364✔
146
            let key_bytes = key_value.encode();
108,364✔
147
            lookup_key.extend_from_slice(&key_bytes);
108,364✔
148
        }
149

×
150
        Ok(lookup_key)
108,523✔
151
    }
108,523✔
152

×
153
    fn decode_lookup_key(&self, lookup_key: &[u8]) -> (u32, Vec<u8>) {
128,760✔
154
        let (version_bytes, id) = lookup_key.split_at(4);
128,760✔
155
        let version = u32::from_be_bytes(version_bytes.try_into().unwrap());
128,760✔
156
        (version, id.to_vec())
128,760✔
157
    }
128,760✔
158
}
159

×
160
#[derive(Clone, Debug)]
48✔
161
pub struct JoinOperator {
162
    _operator: JoinOperatorType,
163

164
    left_join_key: Vec<usize>,
165
    right_join_key: Vec<usize>,
166

167
    schema: Schema,
168

169
    left_source: Box<JoinSource>,
170
    right_source: Box<JoinSource>,
171

172
    // Lookup indexes
173
    left_lookup_index: u32,
174

175
    right_lookup_index: u32,
176
}
177

178
pub struct JoinBranch {
179
    pub join_key: Vec<usize>,
180
    pub source: Box<JoinSource>,
181
    pub lookup_index: u32,
182
}
183

184
impl JoinOperator {
×
185
    pub fn new(
40✔
186
        operator: JoinOperatorType,
40✔
187
        schema: Schema,
40✔
188
        left_join_branch: JoinBranch,
40✔
189
        right_join_branch: JoinBranch,
40✔
190
    ) -> Self {
40✔
191
        Self {
40✔
192
            _operator: operator,
40✔
193
            left_join_key: left_join_branch.join_key,
40✔
194
            right_join_key: right_join_branch.join_key,
40✔
195
            schema,
40✔
196
            left_source: left_join_branch.source,
40✔
197
            right_source: right_join_branch.source,
40✔
198
            left_lookup_index: left_join_branch.lookup_index,
40✔
199
            right_lookup_index: right_join_branch.lookup_index,
40✔
200
        }
40✔
201
    }
40✔
202

×
203
    pub fn get_sources(&self) -> Vec<PortHandle> {
26,660✔
204
        [
26,660✔
205
            self.left_source.get_sources().as_slice(),
26,660✔
206
            self.right_source.get_sources().as_slice(),
26,660✔
207
        ]
26,660✔
208
        .concat()
26,660✔
209
    }
26,660✔
210

×
211
    pub fn execute(
117,300✔
212
        &self,
117,300✔
213
        action: JoinAction,
117,300✔
214
        from_port: PortHandle,
117,300✔
215
        record: &Record,
117,300✔
216
        database: &Database,
117,300✔
217
        transaction: &SharedTransaction,
117,300✔
218
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
117,300✔
219
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
117,300✔
220
        // if the source port is under the left branch of the join
117,300✔
221
        if self.left_source.get_sources().contains(&from_port) {
117,300✔
222
            let mut output_records = vec![];
25,924✔
223

224
            // forward the record and the current join constraints to the left source
×
225
            let mut left_records = self.left_source.execute(
25,924✔
226
                action,
25,924✔
227
                from_port,
25,924✔
228
                record,
25,924✔
229
                database,
25,924✔
230
                transaction,
25,924✔
231
                readers,
25,924✔
232
            )?;
25,924✔
233

234
            // update left join index
×
235
            for (_join_action, left_record, left_lookup_key) in left_records.iter_mut() {
25,924✔
236
                let left_join_key: Vec<u8> = encode_join_key(left_record, &self.left_join_key)?;
25,036✔
237
                self.update_index(
25,036✔
238
                    _join_action.clone(),
25,036✔
239
                    &left_join_key,
25,036✔
240
                    left_lookup_key,
25,036✔
241
                    self.left_lookup_index,
25,036✔
242
                    database,
25,036✔
243
                    transaction,
25,036✔
244
                )?;
25,036✔
245

×
246
                let join_records = match self._operator {
25,036✔
247
                    JoinOperatorType::Inner => self.inner_join_left(
24,220✔
248
                        _join_action.clone(),
24,220✔
249
                        left_join_key,
24,220✔
250
                        database,
24,220✔
251
                        transaction,
24,220✔
252
                        readers,
24,220✔
253
                        left_record,
24,220✔
254
                        left_lookup_key,
24,220✔
255
                    )?,
24,220✔
256
                    JoinOperatorType::LeftOuter => self.left_join(
816✔
257
                        _join_action.clone(),
816✔
258
                        left_join_key,
816✔
259
                        database,
816✔
260
                        transaction,
816✔
261
                        readers,
816✔
262
                        left_record,
816✔
263
                        left_lookup_key,
816✔
264
                    )?,
816✔
265
                    JoinOperatorType::RightOuter => self.right_join_reverse(
×
266
                        _join_action.clone(),
×
267
                        left_join_key,
×
268
                        database,
×
269
                        transaction,
×
270
                        readers,
×
271
                        left_record,
×
272
                        left_lookup_key,
×
273
                    )?,
×
274
                };
×
275

×
276
                output_records.extend(join_records);
25,036✔
277
            }
×
278

×
279
            Ok(output_records)
25,924✔
280
        } else if self.right_source.get_sources().contains(&from_port) {
91,376✔
281
            let mut output_records = vec![];
91,376✔
282

283
            // forward the record and the current join constraints to the left source
×
284
            let mut right_records = self.right_source.execute(
91,376✔
285
                action,
91,376✔
286
                from_port,
91,376✔
287
                record,
91,376✔
288
                database,
91,376✔
289
                transaction,
91,376✔
290
                readers,
91,376✔
291
            )?;
91,376✔
292

×
293
            // update right join index
294
            for (_join_action, right_record, right_lookup_key) in right_records.iter_mut() {
91,376✔
295
                let right_join_key: Vec<u8> = encode_join_key(right_record, &self.right_join_key)?;
91,376✔
296
                self.update_index(
91,376✔
297
                    _join_action.clone(),
91,376✔
298
                    &right_join_key,
91,376✔
299
                    right_lookup_key,
91,376✔
300
                    self.right_lookup_index,
91,376✔
301
                    database,
91,376✔
302
                    transaction,
91,376✔
303
                )?;
91,376✔
304

×
305
                let join_records = match self._operator {
91,376✔
306
                    JoinOperatorType::Inner => self.inner_join_right(
69,532✔
307
                        _join_action.clone(),
69,532✔
308
                        right_join_key,
69,532✔
309
                        database,
69,532✔
310
                        transaction,
69,532✔
311
                        readers,
69,532✔
312
                        right_record,
69,532✔
313
                        right_lookup_key,
69,532✔
314
                    )?,
69,532✔
315
                    JoinOperatorType::RightOuter => self.right_join(
×
316
                        _join_action.clone(),
×
317
                        right_join_key,
×
318
                        database,
×
319
                        transaction,
×
320
                        readers,
×
321
                        right_record,
×
322
                        right_lookup_key,
×
323
                    )?,
×
324
                    JoinOperatorType::LeftOuter => self.left_join_reverse(
21,844✔
325
                        _join_action.clone(),
21,844✔
326
                        right_join_key,
21,844✔
327
                        database,
21,844✔
328
                        transaction,
21,844✔
329
                        readers,
21,844✔
330
                        right_record,
21,844✔
331
                        right_lookup_key,
21,844✔
332
                    )?,
21,844✔
333
                };
×
334
                output_records.extend(join_records);
91,376✔
335
            }
×
336

×
337
            return Ok(output_records);
91,376✔
338
        } else {
×
339
            return Err(ExecutionError::InvalidPortHandle(from_port));
×
340
        }
×
341
    }
117,300✔
342

×
343
    #[allow(clippy::too_many_arguments)]
344
    fn inner_join_left(
24,220✔
345
        &self,
24,220✔
346
        action: JoinAction,
24,220✔
347
        left_join_key: Vec<u8>,
24,220✔
348
        database: &Database,
24,220✔
349
        transaction: &SharedTransaction,
24,220✔
350
        readers: &HashMap<u16, Box<dyn RecordReader>>,
24,220✔
351
        left_record: &mut Record,
24,220✔
352
        left_lookup_key: &mut [u8],
24,220✔
353
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
24,220✔
354
        let right_lookup_keys = self.read_index(
24,220✔
355
            &left_join_key,
24,220✔
356
            self.right_lookup_index,
24,220✔
357
            database,
24,220✔
358
            transaction,
24,220✔
359
        )?;
24,220✔
360
        let mut output_records = vec![];
24,220✔
361

×
362
        for right_lookup_key in right_lookup_keys.iter() {
24,220✔
363
            // lookup on the right branch to find matching records
×
364
            let mut right_records =
1,872✔
365
                self.right_source
1,872✔
366
                    .lookup(right_lookup_key, database, transaction, readers)?;
1,872✔
367

×
368
            for (right_record, right_lookup_key) in right_records.iter_mut() {
1,872✔
369
                let join_record = join_records(left_record, right_record);
1,872✔
370
                let join_lookup_key =
1,872✔
371
                    self.encode_join_lookup_key(left_lookup_key, right_lookup_key);
1,872✔
372

1,872✔
373
                output_records.push((action.clone(), join_record, join_lookup_key));
1,872✔
374
            }
1,872✔
375
        }
×
376
        Ok(output_records)
24,220✔
377
    }
24,220✔
378

×
379
    #[allow(clippy::too_many_arguments)]
×
380
    fn inner_join_right(
69,532✔
381
        &self,
69,532✔
382
        action: JoinAction,
69,532✔
383
        right_join_key: Vec<u8>,
69,532✔
384
        database: &Database,
69,532✔
385
        transaction: &SharedTransaction,
69,532✔
386
        readers: &HashMap<u16, Box<dyn RecordReader>>,
69,532✔
387
        right_record: &mut Record,
69,532✔
388
        right_lookup_key: &mut [u8],
69,532✔
389
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
69,532✔
390
        let left_lookup_keys = self.read_index(
69,532✔
391
            &right_join_key,
69,532✔
392
            self.left_lookup_index,
69,532✔
393
            database,
69,532✔
394
            transaction,
69,532✔
395
        )?;
69,532✔
396

×
397
        let mut output_records = vec![];
69,532✔
398
        for left_lookup_key in left_lookup_keys.iter() {
85,216✔
399
            // lookup on the left branch to find matching records
×
400
            let mut left_records =
85,216✔
401
                self.left_source
85,216✔
402
                    .lookup(left_lookup_key, database, transaction, readers)?;
85,216✔
403

×
404
            for (left_record, left_lookup_key) in left_records.iter_mut() {
85,216✔
405
                // join the records
85,216✔
406
                let join_record = join_records(left_record, right_record);
85,216✔
407
                let join_lookup_key =
85,216✔
408
                    self.encode_join_lookup_key(left_lookup_key, right_lookup_key);
85,216✔
409
                output_records.push((action.clone(), join_record, join_lookup_key));
85,216✔
410
            }
85,216✔
411
        }
×
412
        Ok(output_records)
69,532✔
413
    }
69,532✔
414

×
415
    #[allow(clippy::too_many_arguments)]
416
    fn left_join(
816✔
417
        &self,
816✔
418
        action: JoinAction,
816✔
419
        left_join_key: Vec<u8>,
816✔
420
        database: &Database,
816✔
421
        transaction: &SharedTransaction,
816✔
422
        readers: &HashMap<u16, Box<dyn RecordReader>>,
816✔
423
        left_record: &mut Record,
816✔
424
        left_lookup_key: &mut [u8],
816✔
425
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
816✔
426
        let right_lookup_keys = self.read_index(
816✔
427
            &left_join_key,
816✔
428
            self.right_lookup_index,
816✔
429
            database,
816✔
430
            transaction,
816✔
431
        )?;
816✔
432
        let mut output_records = vec![];
816✔
433

816✔
434
        if right_lookup_keys.is_empty() {
816✔
435
            // no matching records on the right branch
×
436
            let right_record = Record::from_schema(&self.right_source.get_output_schema());
816✔
437
            let join_record = join_records(left_record, &right_record);
816✔
438
            let join_lookup_key = self.encode_join_lookup_key(left_lookup_key, &[]);
816✔
439
            output_records.push((action, join_record, join_lookup_key));
816✔
440

816✔
441
            return Ok(output_records);
816✔
442
        }
×
443

×
444
        for right_lookup_key in right_lookup_keys.iter() {
×
445
            // lookup on the right branch to find matching records
×
446
            let mut right_records =
×
447
                self.right_source
×
448
                    .lookup(right_lookup_key, database, transaction, readers)?;
×
449

×
450
            for (right_record, right_lookup_key) in right_records.iter_mut() {
×
451
                let join_record = join_records(left_record, right_record);
×
452
                let join_lookup_key =
×
453
                    self.encode_join_lookup_key(left_lookup_key, right_lookup_key);
×
454

×
455
                output_records.push((action.clone(), join_record, join_lookup_key));
×
456
            }
×
457
        }
×
458
        Ok(output_records)
×
459
    }
816✔
460

461
    #[allow(clippy::too_many_arguments)]
×
462
    fn right_join(
×
463
        &self,
×
464
        action: JoinAction,
×
465
        right_join_key: Vec<u8>,
×
466
        database: &Database,
×
467
        transaction: &SharedTransaction,
×
468
        readers: &HashMap<u16, Box<dyn RecordReader>>,
×
469
        right_record: &mut Record,
×
470
        right_lookup_key: &mut [u8],
×
471
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
×
472
        let left_lookup_keys = self.read_index(
×
473
            &right_join_key,
×
474
            self.left_lookup_index,
×
475
            database,
×
476
            transaction,
×
477
        )?;
×
478

×
479
        let mut output_records = vec![];
×
480

×
481
        if left_lookup_keys.is_empty() {
×
482
            // no matching records on the right branch
×
483
            let left_record = Record::from_schema(&self.left_source.get_output_schema());
×
484
            let join_record = join_records(&left_record, right_record);
×
485
            let join_lookup_key = self.encode_join_lookup_key(right_lookup_key, &[]);
×
486
            output_records.push((action, join_record, join_lookup_key));
×
487

×
488
            return Ok(output_records);
×
489
        }
×
490

×
491
        for left_lookup_key in left_lookup_keys.iter() {
×
492
            // lookup on the left branch to find matching records
493
            let mut left_records =
×
494
                self.left_source
×
495
                    .lookup(left_lookup_key, database, transaction, readers)?;
×
496

497
            for (left_record, left_lookup_key) in left_records.iter_mut() {
×
498
                // join the records
×
499
                let join_record = join_records(left_record, right_record);
×
500
                let join_lookup_key =
×
501
                    self.encode_join_lookup_key(left_lookup_key, right_lookup_key);
×
502
                output_records.push((action.clone(), join_record, join_lookup_key));
×
503
            }
×
504
        }
×
505
        Ok(output_records)
×
506
    }
×
507

×
508
    #[allow(clippy::too_many_arguments)]
×
509
    fn right_join_reverse(
×
510
        &self,
×
511
        action: JoinAction,
×
512
        left_join_key: Vec<u8>,
×
513
        database: &Database,
×
514
        transaction: &SharedTransaction,
×
515
        readers: &HashMap<u16, Box<dyn RecordReader>>,
×
516
        left_record: &mut Record,
×
517
        left_lookup_key: &mut [u8],
×
518
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
×
519
        let right_lookup_keys = self.read_index(
×
520
            &left_join_key,
×
521
            self.right_lookup_index,
×
522
            database,
×
523
            transaction,
×
524
        )?;
×
525
        let mut output_records = vec![];
×
526

×
527
        // if there are no matching records on the left branch, no records will be returned
×
528
        if right_lookup_keys.is_empty() {
×
529
            return Ok(output_records);
×
530
        }
×
531

532
        for right_lookup_key in right_lookup_keys.iter() {
×
533
            // lookup on the right branch to find matching records
×
534
            let mut right_records =
×
535
                self.right_source
×
536
                    .lookup(right_lookup_key, database, transaction, readers)?;
×
537

×
538
            for (right_record, right_lookup_key) in right_records.iter_mut() {
×
539
                let left_matching_count =
×
540
                    self.get_left_matching_count(&action, right_record, database, transaction)?;
×
541

×
542
                let join_record = join_records(left_record, right_record);
×
543
                let join_lookup_key =
×
544
                    self.encode_join_lookup_key(left_lookup_key, right_lookup_key);
×
545

×
546
                if left_matching_count > 0 {
×
547
                    // if there are multiple matching records on the left branch, the right record will be just returned
×
548
                    output_records.push((action.clone(), join_record, join_lookup_key));
×
549
                } else {
×
550
                    match action {
×
551
                        JoinAction::Insert => {
×
552
                            let old_join_record = join_records(
×
553
                                &Record::from_schema(&self.left_source.get_output_schema()),
×
554
                                right_record,
×
555
                            );
×
556
                            let old_join_lookup_key =
×
557
                                self.encode_join_lookup_key(left_lookup_key, &[]);
×
558
                            output_records.push((
×
559
                                JoinAction::Delete,
×
560
                                old_join_record,
×
561
                                old_join_lookup_key,
×
562
                            ));
×
563

×
564
                            output_records.push((JoinAction::Insert, join_record, join_lookup_key));
×
565
                        }
×
566
                        JoinAction::Delete => {
×
567
                            let new_join_record = join_records(
×
568
                                &Record::from_schema(&self.left_source.get_output_schema()),
×
569
                                right_record,
×
570
                            );
×
571
                            let new_join_lookup_key =
×
572
                                self.encode_join_lookup_key(left_lookup_key, &[]);
×
573
                            output_records.push((JoinAction::Delete, join_record, join_lookup_key));
×
574
                            output_records.push((
×
575
                                JoinAction::Insert,
×
576
                                new_join_record,
×
577
                                new_join_lookup_key,
×
578
                            ));
×
579
                        }
×
580
                    }
×
581
                }
582
            }
×
583
        }
×
584
        Ok(output_records)
×
585
    }
×
586

×
587
    #[allow(clippy::too_many_arguments)]
×
588
    fn left_join_reverse(
21,844✔
589
        &self,
21,844✔
590
        action: JoinAction,
21,844✔
591
        right_join_key: Vec<u8>,
21,844✔
592
        database: &Database,
21,844✔
593
        transaction: &SharedTransaction,
21,844✔
594
        readers: &HashMap<u16, Box<dyn RecordReader>>,
21,844✔
595
        right_record: &mut Record,
21,844✔
596
        right_lookup_key: &mut [u8],
21,844✔
597
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
21,844✔
598
        let left_lookup_keys = self.read_index(
21,844✔
599
            &right_join_key,
21,844✔
600
            self.left_lookup_index,
21,844✔
601
            database,
21,844✔
602
            transaction,
21,844✔
603
        )?;
21,844✔
604

×
605
        let mut output_records = vec![];
21,844✔
606

21,844✔
607
        // if there are no matching records on the left branch, no records will be returned
21,844✔
608
        if left_lookup_keys.is_empty() {
21,844✔
609
            return Ok(output_records);
72✔
610
        }
21,772✔
611

×
612
        for left_lookup_key in left_lookup_keys.iter() {
21,772✔
613
            // lookup on the left branch to find matching records
×
614
            let mut left_records =
21,772✔
615
                self.left_source
21,772✔
616
                    .lookup(left_lookup_key, database, transaction, readers)?;
21,772✔
617

×
618
            for (left_record, left_lookup_key) in left_records.iter_mut() {
21,772✔
619
                let right_matching_count =
21,772✔
620
                    self.get_right_matching_count(&action, left_record, database, transaction)?;
21,772✔
621

×
622
                let join_record = join_records(left_record, right_record);
21,772✔
623
                let join_lookup_key =
21,772✔
624
                    self.encode_join_lookup_key(left_lookup_key, right_lookup_key);
21,772✔
625

21,772✔
626
                if right_matching_count > 0 {
21,772✔
627
                    // if there are multiple matching records on the right branch, the left record will be just returned
×
628
                    output_records.push((action.clone(), join_record, join_lookup_key));
×
629
                } else {
×
630
                    match action {
21,772✔
631
                        JoinAction::Insert => {
21,772✔
632
                            let old_join_record = join_records(
21,772✔
633
                                left_record,
21,772✔
634
                                &Record::from_schema(&self.right_source.get_output_schema()),
21,772✔
635
                            );
21,772✔
636
                            let old_join_lookup_key =
21,772✔
637
                                self.encode_join_lookup_key(left_lookup_key, &[]);
21,772✔
638

21,772✔
639
                            // delete the "first left join" record
21,772✔
640
                            output_records.push((
21,772✔
641
                                JoinAction::Delete,
21,772✔
642
                                old_join_record,
21,772✔
643
                                old_join_lookup_key,
21,772✔
644
                            ));
21,772✔
645
                            // insert the new left join record
21,772✔
646
                            output_records.push((action.clone(), join_record, join_lookup_key));
21,772✔
647
                        }
21,772✔
648
                        JoinAction::Delete => {
×
649
                            let new_join_record = join_records(
×
650
                                left_record,
×
651
                                &Record::from_schema(&self.right_source.get_output_schema()),
×
652
                            );
×
653
                            let new_join_lookup_key =
×
654
                                self.encode_join_lookup_key(left_lookup_key, &[]);
×
655
                            output_records.push((action.clone(), join_record, join_lookup_key));
×
656
                            output_records.push((
×
657
                                JoinAction::Insert,
×
658
                                new_join_record,
×
659
                                new_join_lookup_key,
×
660
                            ));
×
661
                        }
×
662
                    }
663
                }
664

665
                // join the records
666
                // let join_record = join_records(left_record, right_record);
667
                // let join_lookup_key =
668
                //     self.encode_join_lookup_key(left_lookup_key, right_lookup_key);
669
                // output_records.push((action.clone(), join_record, join_lookup_key));
670
            }
671
        }
672
        Ok(output_records)
21,772✔
673
    }
21,844✔
674

675
    fn get_right_matching_count(
21,772✔
676
        &self,
21,772✔
677
        action: &JoinAction,
21,772✔
678
        left_record: &mut Record,
21,772✔
679
        database: &Database,
21,772✔
680
        transaction: &SharedTransaction,
21,772✔
681
    ) -> Result<usize, ExecutionError> {
21,772✔
682
        let left_join_key: Vec<u8> = encode_join_key(left_record, &self.left_join_key)?;
21,772✔
683
        let right_lookup_keys = self.read_index(
21,772✔
684
            &left_join_key,
21,772✔
685
            self.right_lookup_index,
21,772✔
686
            database,
21,772✔
687
            transaction,
21,772✔
688
        )?;
21,772✔
689
        let mut records_count = right_lookup_keys.len();
21,772✔
690
        if action == &JoinAction::Insert {
21,772✔
691
            records_count -= 1;
21,772✔
692
        }
21,772✔
693
        Ok(records_count)
21,772✔
694
    }
21,772✔
695

696
    fn get_left_matching_count(
×
697
        &self,
×
698
        action: &JoinAction,
×
699
        right_record: &mut Record,
×
700
        database: &Database,
×
701
        transaction: &SharedTransaction,
×
702
    ) -> Result<usize, ExecutionError> {
×
703
        let right_join_key: Vec<u8> = encode_join_key(right_record, &self.right_join_key)?;
×
704
        let left_lookup_keys = self.read_index(
×
705
            &right_join_key,
×
706
            self.left_lookup_index,
×
707
            database,
×
708
            transaction,
×
709
        )?;
×
710
        let mut records_count = left_lookup_keys.len();
×
711
        if action == &JoinAction::Insert {
×
712
            records_count -= 1;
×
713
        }
×
714
        Ok(records_count)
×
715
    }
×
716

717
    fn lookup(
19,900✔
718
        &self,
19,900✔
719
        lookup_key: &[u8],
19,900✔
720
        database: &Database,
19,900✔
721
        transaction: &SharedTransaction,
19,900✔
722
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
19,900✔
723
    ) -> Result<Vec<(Record, Vec<u8>)>, ExecutionError> {
19,900✔
724
        let mut output_records = vec![];
19,900✔
725

19,900✔
726
        let (left_loookup_key, right_lookup_key) = self.decode_join_lookup_key(lookup_key);
19,900✔
727

728
        let mut left_records =
19,900✔
729
            self.left_source
19,900✔
730
                .lookup(&left_loookup_key, database, transaction, readers)?;
19,900✔
731

732
        let mut right_records =
19,900✔
733
            self.right_source
19,900✔
734
                .lookup(&right_lookup_key, database, transaction, readers)?;
19,900✔
735

736
        for (left_record, left_lookup_key) in left_records.iter_mut() {
19,900✔
737
            for (right_record, right_lookup_key) in right_records.iter_mut() {
19,900✔
738
                let join_record = join_records(left_record, right_record);
19,900✔
739
                let join_lookup_key =
19,900✔
740
                    self.encode_join_lookup_key(left_lookup_key, right_lookup_key);
19,900✔
741

19,900✔
742
                output_records.push((join_record, join_lookup_key));
19,900✔
743
            }
19,900✔
744
        }
745

746
        Ok(output_records)
19,900✔
747
    }
19,900✔
748

749
    pub fn update_index(
116,412✔
750
        &self,
116,412✔
751
        action: JoinAction,
116,412✔
752
        key: &[u8],
116,412✔
753
        value: &[u8],
116,412✔
754
        prefix: u32,
116,412✔
755
        database: &Database,
116,412✔
756
        transaction: &SharedTransaction,
116,412✔
757
    ) -> Result<(), ExecutionError> {
116,412✔
758
        let mut exclusive_transaction = transaction.write();
116,412✔
759
        let mut prefix_transaction = PrefixTransaction::new(&mut exclusive_transaction, prefix);
116,412✔
760

116,412✔
761
        match action {
116,412✔
762
            JoinAction::Insert => {
763
                prefix_transaction.put(*database, key, value)?;
116,380✔
764
            }
765
            JoinAction::Delete => {
766
                prefix_transaction.del(*database, key, Some(value))?;
32✔
767
            }
768
        }
769

770
        Ok(())
116,412✔
771
    }
116,412✔
772

773
    fn read_index(
138,184✔
774
        &self,
138,184✔
775
        join_key: &[u8],
138,184✔
776
        prefix: u32,
138,184✔
777
        database: &Database,
138,184✔
778
        transaction: &SharedTransaction,
138,184✔
779
    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
138,184✔
780
        let mut join_keys = vec![];
138,184✔
781

138,184✔
782
        let mut exclusive_transaction = transaction.write();
138,184✔
783
        let right_prefix_transaction = PrefixTransaction::new(&mut exclusive_transaction, prefix);
138,184✔
784

785
        let cursor = right_prefix_transaction.open_cursor(*database)?;
138,184✔
786

787
        if !cursor.seek(join_key)? {
138,184✔
788
            return Ok(join_keys);
23,476✔
789
        }
114,708✔
790

791
        loop {
792
            let entry = cursor.read()?.ok_or(ExecutionError::InternalDatabaseError(
223,240✔
793
                StorageError::InvalidRecord,
223,240✔
794
            ))?;
223,240✔
795

796
            if entry.0 != join_key {
223,240✔
797
                break;
92,608✔
798
            }
130,632✔
799

130,632✔
800
            join_keys.push(entry.1.to_vec());
130,632✔
801

130,632✔
802
            if !cursor.next()? {
130,632✔
803
                break;
22,100✔
804
            }
108,532✔
805
        }
806

807
        Ok(join_keys)
114,708✔
808
    }
138,184✔
809

810
    fn encode_join_lookup_key(&self, left_lookup_key: &[u8], right_lookup_key: &[u8]) -> Vec<u8> {
151,348✔
811
        let mut composite_lookup_key = Vec::with_capacity(64);
151,348✔
812
        composite_lookup_key.extend_from_slice(&(left_lookup_key.len() as u32).to_be_bytes());
151,348✔
813
        composite_lookup_key.extend_from_slice(left_lookup_key);
151,348✔
814
        composite_lookup_key.extend_from_slice(&(right_lookup_key.len() as u32).to_be_bytes());
151,348✔
815
        composite_lookup_key.extend_from_slice(right_lookup_key);
151,348✔
816
        composite_lookup_key
151,348✔
817
    }
151,348✔
818

819
    fn decode_join_lookup_key(&self, join_lookup_key: &[u8]) -> (Vec<u8>, Vec<u8>) {
19,900✔
820
        let mut offset = 0;
19,900✔
821

19,900✔
822
        let left_length = u32::from_be_bytes([
19,900✔
823
            join_lookup_key[offset],
19,900✔
824
            join_lookup_key[offset + 1],
19,900✔
825
            join_lookup_key[offset + 2],
19,900✔
826
            join_lookup_key[offset + 3],
19,900✔
827
        ]);
19,900✔
828
        offset += 4;
19,900✔
829
        let left_lookup_key = &join_lookup_key[offset..offset + left_length as usize];
19,900✔
830
        offset += left_length as usize;
19,900✔
831

19,900✔
832
        let right_length = u32::from_be_bytes([
19,900✔
833
            join_lookup_key[offset],
19,900✔
834
            join_lookup_key[offset + 1],
19,900✔
835
            join_lookup_key[offset + 2],
19,900✔
836
            join_lookup_key[offset + 3],
19,900✔
837
        ]);
19,900✔
838
        offset += 4;
19,900✔
839
        let right_lookup_key = &join_lookup_key[offset..offset + right_length as usize];
19,900✔
840

19,900✔
841
        (left_lookup_key.to_vec(), right_lookup_key.to_vec())
19,900✔
842
    }
19,900✔
843
}
844

845
fn join_records(left_record: &Record, right_record: &Record) -> Record {
151,348✔
846
    let concat_values = [left_record.values.clone(), right_record.values.clone()].concat();
151,348✔
847
    Record::new(None, concat_values, None)
151,348✔
848
}
151,348✔
849

850
fn encode_join_key(record: &Record, join_keys: &[usize]) -> Result<Vec<u8>, TypeError> {
138,184✔
851
    let mut composite_lookup_key = vec![];
138,184✔
852
    for key in join_keys.iter() {
138,184✔
853
        let value = &record.values[*key].encode();
138,184✔
854
        let length = value.len() as u32;
138,184✔
855
        composite_lookup_key.extend_from_slice(&length.to_be_bytes());
138,184✔
856
        composite_lookup_key.extend_from_slice(value.as_slice());
138,184✔
857
    }
138,184✔
858
    Ok(composite_lookup_key)
138,184✔
859
}
138,184✔
860

861
// fn join_records(left_record: &Record, right_record: &Record) -> Record {
862
//     let concat_values = [left_record.values.clone(), right_record.values.clone()].concat();
863
//     let mut left_version = 0;
864
//     if let Some(version) = left_record.version {
865
//         left_version = version;
866
//     }
867
//     let mut right_version = 0;
868
//     if let Some(version) = right_record.version {
869
//         right_version = version;
870
//     }
871
//     Record::new(
872
//         None,
873
//         concat_values,
874
//         Some((left_version * 100) + right_version),
875
//     )
876
// }
877

878
// fn encode_join_key(record: &Record, join_keys: &[usize]) -> Result<Vec<u8>, TypeError> {
879
//     let mut composite_lookup_key = vec![];
880
//     let mut version = 0_u32;
881
//     if let Some(record_version) = &record.version {
882
//         version = *record_version;
883
//     }
884
//     composite_lookup_key.extend_from_slice(&version.to_be_bytes());
885
//     for key in join_keys.iter() {
886
//         let value = &record.values[*key].encode();
887
//         let length = value.len() as u32;
888
//         composite_lookup_key.extend_from_slice(&length.to_be_bytes());
889
//         composite_lookup_key.extend_from_slice(value.as_slice());
890
//     }
891
//     Ok(composite_lookup_key)
892
// }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc