• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4007818786

pending completion
4007818786

Pull #733

github

GitHub
Merge baf5c38aa into 6c0ac2b2c
Pull Request #733: Bump diesel from 2.0.2 to 2.0.3

23389 of 34432 relevant lines covered (67.93%)

40326.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.73
/dozer-sql/src/pipeline/product/join.rs
1
use std::collections::HashMap;
2

3
use dozer_core::dag::node::PortHandle;
4
use dozer_core::dag::record_store::RecordReader;
5
use dozer_core::storage::common::Database;
6
use dozer_core::storage::errors::StorageError;
7
use dozer_core::storage::lmdb_storage::SharedTransaction;
8
use dozer_core::{dag::errors::ExecutionError, storage::prefix_transaction::PrefixTransaction};
9
use dozer_types::errors::types::TypeError;
10
use dozer_types::types::{Record, Schema};
11

12
const REVERSE_JOIN_FLAG: u32 = 0x80000000;
13

14
#[derive(Debug, Clone)]
148✔
15
pub struct JoinTable {
16
    pub schema: Schema,
×
17
    pub left: Option<JoinOperator>,
18
    pub right: Option<JoinOperator>,
19
}
20

21
impl JoinTable {
22
    pub fn from(schema: &Schema) -> Self {
139✔
23
        Self {
139✔
24
            schema: schema.clone(),
139✔
25
            left: None,
139✔
26
            right: None,
139✔
27
        }
139✔
28
    }
139✔
29
}
30

31
#[derive(Clone, Debug, PartialEq, Eq)]
27✔
32
pub enum JoinOperatorType {
33
    Inner,
34
    // LeftOuter,
35
    // RightOuter,
×
36
    // FullOuter,
37
    // CrossJoin,
38
    // CrossApply,
39
    // OuterApply,
40
}
41

42
pub trait JoinExecutor: Send + Sync {
43
    fn execute_right(
44
        &self,
45
        records: Vec<Record>,
46
        join_key: &[u8],
47
        database: &Database,
48
        transaction: &SharedTransaction,
49
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
50
        join_tables: &HashMap<PortHandle, JoinTable>,
51
    ) -> Result<Vec<Record>, ExecutionError>;
52

53
    fn execute_left(
54
        &self,
55
        records: Vec<Record>,
56
        join_key: &[u8],
57
        database: &Database,
58
        transaction: &SharedTransaction,
59
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
60
        join_tables: &HashMap<PortHandle, JoinTable>,
61
    ) -> Result<Vec<Record>, ExecutionError>;
62

63
    fn insert_right_index(
64
        &self,
65
        key: &[u8],
66
        value: &[u8],
67
        db: &Database,
68
        txn: &SharedTransaction,
69
    ) -> Result<(), ExecutionError>;
70

71
    fn insert_left_index(
72
        &self,
73
        key: &[u8],
74
        value: &[u8],
75
        db: &Database,
76
        txn: &SharedTransaction,
77
    ) -> Result<(), ExecutionError>;
78

79
    fn delete_right_index(
80
        &self,
81
        key: &[u8],
82
        value: &[u8],
83
        db: &Database,
84
        txn: &SharedTransaction,
85
    ) -> Result<(), ExecutionError>;
86

87
    fn delete_left_index(
88
        &self,
89
        key: &[u8],
90
        value: &[u8],
91
        db: &Database,
92
        txn: &SharedTransaction,
93
    ) -> Result<(), ExecutionError>;
94
}
95

96
#[derive(Clone, Debug, PartialEq, Eq)]
27✔
97
pub struct JoinOperator {
98
    /// Type of the Join operation
99
    _operator: JoinOperatorType,
100

×
101
    /// relation on the right side of the JOIN
102
    pub right_table: PortHandle,
103

104
    /// key on the right side of the JOIN
105
    left_join_key_indexes: Vec<usize>,
106

107
    /// relation on the left side of the JOIN
108
    pub left_table: PortHandle,
109

110
    /// key on the left side of the JOIN
111
    right_join_key_indexes: Vec<usize>,
112

113
    /// prefix for the index key
114
    left_prefix: u32,
115
}
116

117
impl JoinOperator {
118
    pub fn new(
9✔
119
        _operator: JoinOperatorType,
9✔
120
        right_table: PortHandle,
9✔
121
        left_join_key_indexes: Vec<usize>,
9✔
122
        left_table: PortHandle,
9✔
123
        right_join_key_indexes: Vec<usize>,
9✔
124
    ) -> Self {
9✔
125
        Self {
9✔
126
            _operator,
9✔
127
            right_table,
9✔
128
            left_join_key_indexes,
9✔
129
            left_table,
9✔
130
            right_join_key_indexes,
9✔
131
            left_prefix: (right_table as u32),
9✔
132
        }
9✔
133
    }
9✔
134

×
135
    pub fn get_left_record_join_key(&self, record: &Record) -> Result<Vec<u8>, TypeError> {
1,215✔
136
        get_composite_key(record, self.left_join_key_indexes.as_slice())
1,215✔
137
    }
1,215✔
138

139
    pub fn get_right_record_join_key(&self, record: &Record) -> Result<Vec<u8>, TypeError> {
65,532✔
140
        get_composite_key(record, self.right_join_key_indexes.as_slice())
65,532✔
141
    }
65,532✔
142

143
    fn get_right_lookup_keys(
1,209✔
144
        &self,
1,209✔
145
        join_key: &[u8],
1,209✔
146
        db: &Database,
1,209✔
147
        transaction: &SharedTransaction,
1,209✔
148
    ) -> Result<Vec<(Vec<u8>, u32)>, ExecutionError> {
1,209✔
149
        let mut join_keys = vec![];
1,209✔
150

1,209✔
151
        let mut exclusive_transaction = transaction.write();
1,209✔
152
        let right_prefix_transaction = PrefixTransaction::new(
1,209✔
153
            &mut exclusive_transaction,
1,209✔
154
            self.right_table as u32 | REVERSE_JOIN_FLAG,
1,209✔
155
        );
1,209✔
156

×
157
        let cursor = right_prefix_transaction.open_cursor(*db)?;
1,209✔
158

×
159
        if !cursor.seek(join_key)? {
1,209✔
160
            return Ok(join_keys);
1,209✔
161
        }
×
162

163
        loop {
×
164
            let entry = cursor.read()?.ok_or(ExecutionError::InternalDatabaseError(
×
165
                StorageError::InvalidRecord,
×
166
            ))?;
×
167

168
            if entry.0 != join_key {
×
169
                break;
×
170
            }
×
171

×
172
            let (version_bytes, key_bytes) = entry.1.split_at(4);
×
173
            let version = u32::from_be_bytes(version_bytes.try_into().unwrap());
×
174
            join_keys.push((key_bytes.to_vec(), version));
×
175

×
176
            if !cursor.next()? {
×
177
                break;
×
178
            }
×
179
        }
×
180

×
181
        Ok(join_keys)
×
182
    }
1,209✔
183

184
    fn get_left_lookup_keys(
32,766✔
185
        &self,
32,766✔
186
        join_key: &[u8],
32,766✔
187
        db: &Database,
32,766✔
188
        transaction: &SharedTransaction,
32,766✔
189
    ) -> Result<Vec<(Vec<u8>, u32)>, ExecutionError> {
32,766✔
190
        let mut join_keys = vec![];
32,766✔
191

32,766✔
192
        let mut exclusive_transaction = transaction.write();
32,766✔
193
        let left_prefix_transaction =
32,766✔
194
            PrefixTransaction::new(&mut exclusive_transaction, self.left_table as u32);
32,766✔
195

196
        let cursor = left_prefix_transaction.open_cursor(*db)?;
32,766✔
197

198
        if !cursor.seek(join_key)? {
32,766✔
199
            return Ok(join_keys);
108✔
200
        }
32,658✔
201

202
        loop {
203
            let entry = cursor.read()?.ok_or(ExecutionError::InternalDatabaseError(
65,196✔
204
                StorageError::InvalidRecord,
65,196✔
205
            ))?;
65,196✔
206

207
            if entry.0 != join_key {
65,196✔
208
                break;
32,538✔
209
            }
32,658✔
210

32,658✔
211
            let (version_bytes, key_bytes) = entry.1.split_at(4);
32,658✔
212
            let version = u32::from_be_bytes(version_bytes.try_into().unwrap());
32,658✔
213
            join_keys.push((key_bytes.to_vec(), version));
32,658✔
214

32,658✔
215
            if !cursor.next()? {
32,658✔
216
                break;
120✔
217
            }
32,538✔
218
        }
219

220
        Ok(join_keys)
32,658✔
221
    }
32,766✔
222
}
223

224
impl JoinExecutor for JoinOperator {
225
    fn execute_right(
1,209✔
226
        &self,
1,209✔
227
        mut records: Vec<Record>,
1,209✔
228
        join_key: &[u8],
1,209✔
229
        db: &Database,
1,209✔
230
        transaction: &SharedTransaction,
1,209✔
231
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
1,209✔
232
        _join_tables: &HashMap<PortHandle, JoinTable>,
1,209✔
233
    ) -> Result<Vec<Record>, ExecutionError> {
1,209✔
234
        let mut result_records = vec![];
1,209✔
235
        let reader = readers
1,209✔
236
            .get(&self.right_table)
1,209✔
237
            .ok_or(ExecutionError::InvalidPortHandle(self.right_table))?;
1,209✔
238
        for record in records.iter_mut() {
1,209✔
239
            // retrieve the lookup keys for the table on the right side of the join
240
            let right_lookup_keys = self.get_right_lookup_keys(join_key, db, transaction)?;
1,209✔
241

242
            // retrieve records for the table on the right side of the join
243
            for (right_lookup_key, right_lookup_version) in right_lookup_keys.iter() {
1,209✔
244
                if let Some(right_record) = reader.get(right_lookup_key, *right_lookup_version)? {
×
245
                    let join_record = join_records(&mut record.clone(), &mut right_record.clone());
×
246
                    result_records.push(join_record);
×
247
                }
×
248
            }
249

250
            // let join_schema = Schema::empty();
251

252
            // let right_table = join_tables.get(&(self.right_table as PortHandle)).ok_or(
253
            //     ExecutionError::InternalDatabaseError(StorageError::InvalidRecord),
254
            // )?;
255

256
            // if let Some(next_join) = &right_table.right {
257
            //     let next_join_records = next_join.execute_right(
258
            //         result_records,
259
            //         &join_schema,
260
            //         db,
261
            //         transaction,
262
            //         readers,
263
            //         join_tables,
264
            //     );
265
            // }
266
        }
267

268
        Ok(result_records)
1,209✔
269
    }
1,209✔
270

271
    fn execute_left(
32,766✔
272
        &self,
32,766✔
273
        mut records: Vec<Record>,
32,766✔
274
        join_key: &[u8],
32,766✔
275
        db: &Database,
32,766✔
276
        transaction: &SharedTransaction,
32,766✔
277
        readers: &HashMap<PortHandle, Box<dyn RecordReader>>,
32,766✔
278
        _join_tables: &HashMap<PortHandle, JoinTable>,
32,766✔
279
    ) -> Result<Vec<Record>, ExecutionError> {
32,766✔
280
        let mut result_records = vec![];
32,766✔
281
        let reader = readers
32,766✔
282
            .get(&self.left_table)
32,766✔
283
            .ok_or(ExecutionError::InvalidPortHandle(self.left_table))?;
32,766✔
284

285
        for record in records.iter_mut() {
32,766✔
286
            // retrieve the lookup keys for the table on the right side of the join
287
            let left_lookup_keys = self.get_left_lookup_keys(join_key, db, transaction)?;
32,766✔
288

289
            // retrieve records for the table on the right side of the join
290
            for (left_lookup_key, left_lookup_version) in left_lookup_keys.iter() {
32,766✔
291
                if let Some(left_record) = reader.get(left_lookup_key, *left_lookup_version)? {
32,658✔
292
                    let join_record = join_records(&mut left_record.clone(), &mut record.clone());
32,658✔
293
                    result_records.push(join_record);
32,658✔
294
                }
32,658✔
295
            }
296

297
            // let join_schema = Schema::empty();
298

299
            // let right_table = join_tables.get(&(self.right_table as PortHandle)).ok_or(
300
            //     ExecutionError::InternalDatabaseError(StorageError::InvalidRecord),
301
            // )?;
302

303
            // if let Some(next_join) = &right_table.right {
304
            //     let next_join_records = next_join.execute_right(
305
            //         result_records,
306
            //         &join_schema,
307
            //         db,
308
            //         transaction,
309
            //         readers,
310
            //         join_tables,
311
            //     );
312
            // }
313
        }
314

315
        Ok(result_records)
32,766✔
316
    }
32,766✔
317

318
    fn insert_right_index(
32,766✔
319
        &self,
32,766✔
320
        key: &[u8],
32,766✔
321
        value: &[u8],
32,766✔
322
        db: &Database,
32,766✔
323
        transaction: &SharedTransaction,
32,766✔
324
    ) -> Result<(), ExecutionError> {
32,766✔
325
        let mut exclusive_transaction = transaction.write();
32,766✔
326
        let mut prefix_transaction = PrefixTransaction::new(
32,766✔
327
            &mut exclusive_transaction,
32,766✔
328
            self.right_table as u32 | REVERSE_JOIN_FLAG,
32,766✔
329
        );
32,766✔
330

32,766✔
331
        prefix_transaction.put(*db, key, value)?;
32,766✔
332

×
333
        Ok(())
32,766✔
334
    }
32,766✔
335

×
336
    fn insert_left_index(
1,203✔
337
        &self,
1,203✔
338
        key: &[u8],
1,203✔
339
        value: &[u8],
1,203✔
340
        db: &Database,
1,203✔
341
        transaction: &SharedTransaction,
1,203✔
342
    ) -> Result<(), ExecutionError> {
1,203✔
343
        let mut exclusive_transaction = transaction.write();
1,203✔
344
        let mut prefix_transaction =
1,203✔
345
            PrefixTransaction::new(&mut exclusive_transaction, self.left_table as u32);
1,203✔
346

1,203✔
347
        prefix_transaction.put(*db, key, value)?;
1,203✔
348

349
        Ok(())
1,203✔
350
    }
1,203✔
351

352
    fn delete_right_index(
×
353
        &self,
×
354
        key: &[u8],
×
355
        value: &[u8],
×
356
        db: &Database,
×
357
        transaction: &SharedTransaction,
×
358
    ) -> Result<(), ExecutionError> {
×
359
        let mut exclusive_transaction = transaction.write();
×
360
        let mut prefix_transaction = PrefixTransaction::new(
×
361
            &mut exclusive_transaction,
×
362
            self.right_table as u32 | REVERSE_JOIN_FLAG,
×
363
        );
×
364

×
365
        prefix_transaction.del(*db, key, Some(value))?;
×
366

×
367
        Ok(())
×
368
    }
×
369

×
370
    fn delete_left_index(
6✔
371
        &self,
6✔
372
        key: &[u8],
6✔
373
        value: &[u8],
6✔
374
        db: &Database,
6✔
375
        transaction: &SharedTransaction,
6✔
376
    ) -> Result<(), ExecutionError> {
6✔
377
        let mut exclusive_transaction = transaction.write();
6✔
378
        let mut prefix_transaction =
6✔
379
            PrefixTransaction::new(&mut exclusive_transaction, self.left_table as u32);
6✔
380

6✔
381
        prefix_transaction.del(*db, key, Some(value))?;
6✔
382

×
383
        Ok(())
6✔
384
    }
6✔
385
}
×
386

387
fn join_records(left_record: &mut Record, right_record: &mut Record) -> Record {
32,658✔
388
    left_record.values.append(&mut right_record.values);
32,658✔
389
    Record::new(None, left_record.values.clone(), None)
32,658✔
390
}
32,658✔
391

392
pub fn get_composite_key(record: &Record, key_indexes: &[usize]) -> Result<Vec<u8>, TypeError> {
100,722✔
393
    let mut join_key = Vec::with_capacity(64);
100,722✔
394

395
    for key_index in key_indexes.iter() {
100,722✔
396
        let key_value = record.get_value(*key_index)?;
100,722✔
397
        let key_bytes = key_value.encode();
100,722✔
398
        join_key.extend(key_bytes.iter());
100,722✔
399
    }
400

401
    Ok(join_key)
100,722✔
402
}
100,722✔
403

404
pub fn get_lookup_key(record: &Record, schema: &Schema) -> Result<Vec<u8>, TypeError> {
33,975✔
405
    let mut lookup_key = Vec::with_capacity(64);
33,975✔
406
    if let Some(version) = record.version {
33,975✔
407
        lookup_key.extend_from_slice(&version.to_be_bytes());
33,975✔
408
    } else {
33,975✔
409
        lookup_key.extend_from_slice(&[0_u8; 4]);
×
410
    }
×
411

412
    let key = get_composite_key(record, schema.primary_index.as_slice())?;
33,975✔
413
    lookup_key.extend(key);
33,975✔
414
    Ok(lookup_key)
33,975✔
415
}
33,975✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc