• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965135367

pending completion
3965135367

Pull #680

github

GitHub
Merge 1add77327 into 56c0cf2b3
Pull Request #680: feat: Implement nested queries and CTE.

506 of 506 new or added lines in 18 files covered. (100.0%)

21999 of 33062 relevant lines covered (66.54%)

50489.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

3.35
/dozer-sql/src/pipeline/product/join.rs
1
use std::collections::HashMap;
2

3
use dozer_core::dag::node::PortHandle;
4
use dozer_core::dag::record_store::RecordReader;
5
use dozer_core::storage::common::Database;
6
use dozer_core::storage::errors::StorageError;
7
use dozer_core::storage::lmdb_storage::SharedTransaction;
8
use dozer_core::{dag::errors::ExecutionError, storage::prefix_transaction::PrefixTransaction};
9
use dozer_types::bincode;
10
use dozer_types::errors::types::TypeError;
11
use dozer_types::types::{Record, Schema};
12

13
use crate::pipeline::builder::NameOrAlias;
14
use crate::pipeline::product::join::StorageError::SerializationError;
15

16
const REVERSE_JOIN_FLAG: u32 = 0x80000000;
17

18
#[derive(Debug, Clone)]
76✔
19
pub struct JoinTable {
20
    pub name: NameOrAlias,
×
21
    pub schema: Schema,
22
    pub left: Option<JoinOperator>,
23
    pub right: Option<JoinOperator>,
24
}
25

26
impl JoinTable {
27
    pub fn from(name: &NameOrAlias, schema: &Schema) -> Self {
76✔
28
        Self {
76✔
29
            name: name.clone(),
76✔
30
            schema: schema.clone(),
76✔
31
            left: None,
76✔
32
            right: None,
76✔
33
        }
76✔
34
    }
76✔
35
}
×
36

×
37
#[derive(Clone, Debug, PartialEq, Eq)]
×
38
pub enum JoinOperatorType {
39
    Inner,
×
40
    // LeftOuter,
41
    // RightOuter,
42
    // FullOuter,
43
    // CrossJoin,
44
    // CrossApply,
45
    // OuterApply,
46
}
47

48
pub trait JoinExecutor: Send + Sync {
49
    fn execute_right(
50
        &self,
51
        records: Vec<Record>,
52
        join_key: &[u8],
53
        database: &Database,
54
        transaction: &SharedTransaction,
55
        reader: &HashMap<PortHandle, RecordReader>,
56
        join_tables: &HashMap<PortHandle, JoinTable>,
57
    ) -> Result<Vec<Record>, ExecutionError>;
58

59
    fn execute_left(
60
        &self,
61
        records: Vec<Record>,
62
        join_key: &[u8],
63
        database: &Database,
64
        transaction: &SharedTransaction,
65
        reader: &HashMap<PortHandle, RecordReader>,
66
        join_tables: &HashMap<PortHandle, JoinTable>,
67
    ) -> Result<Vec<Record>, ExecutionError>;
68

69
    fn insert_right_index(
70
        &self,
71
        key: &[u8],
72
        value: &[u8],
73
        db: &Database,
74
        txn: &SharedTransaction,
75
    ) -> Result<(), ExecutionError>;
76

77
    fn insert_left_index(
78
        &self,
79
        key: &[u8],
80
        value: &[u8],
81
        db: &Database,
82
        txn: &SharedTransaction,
83
    ) -> Result<(), ExecutionError>;
84

85
    fn delete_right_index(
86
        &self,
87
        key: &[u8],
88
        value: &[u8],
89
        db: &Database,
90
        txn: &SharedTransaction,
91
    ) -> Result<(), ExecutionError>;
92

93
    fn delete_left_index(
94
        &self,
95
        key: &[u8],
96
        value: &[u8],
97
        db: &Database,
98
        txn: &SharedTransaction,
99
    ) -> Result<(), ExecutionError>;
100
}
101

102
#[derive(Clone, Debug, PartialEq, Eq)]
×
103
pub struct JoinOperator {
104
    /// Type of the Join operation
×
105
    _operator: JoinOperatorType,
106

107
    /// relation on the right side of the JOIN
108
    pub right_table: PortHandle,
109

110
    /// key on the right side of the JOIN
111
    left_join_key_indexes: Vec<usize>,
112

113
    /// relation on the left side of the JOIN
114
    pub left_table: PortHandle,
115

116
    /// key on the left side of the JOIN
117
    right_join_key_indexes: Vec<usize>,
118

119
    /// prefix for the index key
120
    left_prefix: u32,
121
}
122

123
impl JoinOperator {
124
    pub fn new(
×
125
        _operator: JoinOperatorType,
×
126
        right_table: PortHandle,
×
127
        left_join_key_indexes: Vec<usize>,
×
128
        left_table: PortHandle,
×
129
        right_join_key_indexes: Vec<usize>,
×
130
    ) -> Self {
×
131
        Self {
×
132
            _operator,
×
133
            right_table,
×
134
            left_join_key_indexes,
×
135
            left_table,
×
136
            right_join_key_indexes,
×
137
            left_prefix: (right_table as u32),
×
138
        }
×
139
    }
×
140

×
141
    pub fn get_left_record_join_key(&self, record: &Record) -> Result<Vec<u8>, TypeError> {
×
142
        get_composite_key(record, self.left_join_key_indexes.as_slice())
×
143
    }
×
144

×
145
    pub fn get_right_record_join_key(&self, record: &Record) -> Result<Vec<u8>, TypeError> {
×
146
        get_composite_key(record, self.right_join_key_indexes.as_slice())
×
147
    }
×
148

×
149
    fn get_right_join_keys(
×
150
        &self,
×
151
        join_key: &[u8],
×
152
        db: &Database,
×
153
        transaction: &SharedTransaction,
×
154
    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
×
155
        let mut join_keys = vec![];
×
156

×
157
        let mut exclusive_transaction = transaction.write();
×
158
        let right_prefix_transaction = PrefixTransaction::new(
×
159
            &mut exclusive_transaction,
×
160
            self.right_table as u32 | REVERSE_JOIN_FLAG,
×
161
        );
×
162

×
163
        let cursor = right_prefix_transaction.open_cursor(*db)?;
×
164

165
        if !cursor.seek(join_key)? {
×
166
            return Ok(join_keys);
×
167
        }
×
168

×
169
        loop {
×
170
            let entry = cursor.read()?.ok_or(ExecutionError::InternalDatabaseError(
×
171
                StorageError::InvalidRecord,
×
172
            ))?;
×
173

×
174
            if entry.0 != join_key {
×
175
                break;
×
176
            }
×
177

×
178
            join_keys.push(entry.1.to_vec());
×
179

×
180
            if !cursor.next()? {
×
181
                break;
×
182
            }
×
183
        }
×
184

×
185
        Ok(join_keys)
×
186
    }
×
187

×
188
    fn get_left_join_keys(
×
189
        &self,
×
190
        join_key: &[u8],
×
191
        db: &Database,
×
192
        transaction: &SharedTransaction,
×
193
    ) -> Result<Vec<Vec<u8>>, ExecutionError> {
×
194
        let mut join_keys = vec![];
×
195

×
196
        let mut exclusive_transaction = transaction.write();
×
197
        let left_prefix_transaction =
×
198
            PrefixTransaction::new(&mut exclusive_transaction, self.left_table as u32);
×
199

×
200
        let cursor = left_prefix_transaction.open_cursor(*db)?;
×
201

202
        if !cursor.seek(join_key)? {
×
203
            return Ok(join_keys);
×
204
        }
×
205

×
206
        loop {
×
207
            let entry = cursor.read()?.ok_or(ExecutionError::InternalDatabaseError(
×
208
                StorageError::InvalidRecord,
×
209
            ))?;
×
210

×
211
            if entry.0 != join_key {
×
212
                break;
×
213
            }
×
214

×
215
            join_keys.push(entry.1.to_vec());
×
216

×
217
            if !cursor.next()? {
×
218
                break;
×
219
            }
×
220
        }
×
221

×
222
        Ok(join_keys)
×
223
    }
×
224
}
×
225

×
226
impl JoinExecutor for JoinOperator {
227
    fn execute_right(
×
228
        &self,
×
229
        mut records: Vec<Record>,
×
230
        join_key: &[u8],
×
231
        db: &Database,
×
232
        transaction: &SharedTransaction,
×
233
        readers: &HashMap<PortHandle, RecordReader>,
×
234
        _join_tables: &HashMap<PortHandle, JoinTable>,
×
235
    ) -> Result<Vec<Record>, ExecutionError> {
×
236
        let mut result_records = vec![];
×
237
        let reader = readers
×
238
            .get(&self.right_table)
×
239
            .ok_or(ExecutionError::InvalidPortHandle(self.right_table))?;
×
240
        for record in records.iter_mut() {
×
241
            // retrieve the lookup keys for the table on the right side of the join
×
242
            let right_lookup_keys = self.get_right_join_keys(join_key, db, transaction)?;
×
243
            // let right_lookup_keys =
×
244
            //     self.get_right_lookup_keys(&right_join_keys, db, transaction)?;
245

×
246
            // retrieve records for the table on the right side of the join
247
            for right_lookup_key in right_lookup_keys.iter() {
×
248
                if let Some(record_bytes) = reader.get(right_lookup_key)? {
×
249
                    let right_record: Record =
×
250
                        bincode::deserialize(&record_bytes).map_err(|e| SerializationError {
×
251
                            typ: "Record".to_string(),
×
252
                            reason: Box::new(e),
×
253
                        })?;
×
254
                    let join_record = join_records(&mut record.clone(), &mut right_record.clone());
×
255
                    result_records.push(join_record);
×
256
                }
×
257
            }
×
258

×
259
            // let join_schema = Schema::empty();
×
260

261
            // let right_table = join_tables.get(&(self.right_table as PortHandle)).ok_or(
262
            //     ExecutionError::InternalDatabaseError(StorageError::InvalidRecord),
263
            // )?;
264

265
            // if let Some(next_join) = &right_table.right {
266
            //     let next_join_records = next_join.execute_right(
267
            //         result_records,
268
            //         &join_schema,
269
            //         db,
270
            //         transaction,
271
            //         readers,
272
            //         join_tables,
273
            //     );
274
            // }
275
        }
276

277
        Ok(result_records)
×
278
    }
×
279

280
    fn execute_left(
×
281
        &self,
×
282
        mut records: Vec<Record>,
×
283
        join_key: &[u8],
×
284
        db: &Database,
×
285
        transaction: &SharedTransaction,
×
286
        readers: &HashMap<PortHandle, RecordReader>,
×
287
        _join_tables: &HashMap<PortHandle, JoinTable>,
×
288
    ) -> Result<Vec<Record>, ExecutionError> {
×
289
        let mut result_records = vec![];
×
290
        let reader = readers
×
291
            .get(&self.left_table)
×
292
            .ok_or(ExecutionError::InvalidPortHandle(self.left_table))?;
×
293

×
294
        for record in records.iter_mut() {
×
295
            // retrieve the lookup keys for the table on the right side of the join
×
296
            let left_lookup_keys = self.get_left_join_keys(join_key, db, transaction)?;
×
297
            //let left_lookup_keys = self.get_left_lookup_keys(&left_join_keys, db, transaction)?;
×
298

299
            // retrieve records for the table on the right side of the join
×
300
            for left_lookup_key in left_lookup_keys.iter() {
×
301
                if let Some(record_bytes) = reader.get(left_lookup_key)? {
×
302
                    let left_record: Record =
×
303
                        bincode::deserialize(&record_bytes).map_err(|e| SerializationError {
×
304
                            typ: "Record".to_string(),
×
305
                            reason: Box::new(e),
×
306
                        })?;
×
307
                    let join_record = join_records(&mut left_record.clone(), &mut record.clone());
×
308
                    result_records.push(join_record);
×
309
                }
×
310
            }
×
311

×
312
            // let join_schema = Schema::empty();
×
313

314
            // let right_table = join_tables.get(&(self.right_table as PortHandle)).ok_or(
315
            //     ExecutionError::InternalDatabaseError(StorageError::InvalidRecord),
316
            // )?;
317

318
            // if let Some(next_join) = &right_table.right {
319
            //     let next_join_records = next_join.execute_right(
320
            //         result_records,
321
            //         &join_schema,
322
            //         db,
323
            //         transaction,
324
            //         readers,
325
            //         join_tables,
326
            //     );
327
            // }
328
        }
329

330
        Ok(result_records)
×
331
    }
×
332

333
    fn insert_right_index(
×
334
        &self,
×
335
        key: &[u8],
×
336
        value: &[u8],
×
337
        db: &Database,
×
338
        transaction: &SharedTransaction,
×
339
    ) -> Result<(), ExecutionError> {
×
340
        let mut exclusive_transaction = transaction.write();
×
341
        let mut prefix_transaction = PrefixTransaction::new(
×
342
            &mut exclusive_transaction,
×
343
            self.right_table as u32 | REVERSE_JOIN_FLAG,
×
344
        );
×
345

×
346
        prefix_transaction.put(*db, key, value)?;
×
347

×
348
        Ok(())
×
349
    }
×
350

351
    fn insert_left_index(
×
352
        &self,
×
353
        key: &[u8],
×
354
        value: &[u8],
×
355
        db: &Database,
×
356
        transaction: &SharedTransaction,
×
357
    ) -> Result<(), ExecutionError> {
×
358
        let mut exclusive_transaction = transaction.write();
×
359
        let mut prefix_transaction =
×
360
            PrefixTransaction::new(&mut exclusive_transaction, self.left_table as u32);
×
361

×
362
        prefix_transaction.put(*db, key, value)?;
×
363

×
364
        Ok(())
×
365
    }
×
366

367
    fn delete_right_index(
×
368
        &self,
×
369
        key: &[u8],
×
370
        value: &[u8],
×
371
        db: &Database,
×
372
        transaction: &SharedTransaction,
×
373
    ) -> Result<(), ExecutionError> {
×
374
        let mut exclusive_transaction = transaction.write();
×
375
        let mut prefix_transaction = PrefixTransaction::new(
×
376
            &mut exclusive_transaction,
×
377
            self.right_table as u32 | REVERSE_JOIN_FLAG,
×
378
        );
×
379

×
380
        prefix_transaction.del(*db, key, Some(value))?;
×
381

×
382
        Ok(())
×
383
    }
×
384

385
    fn delete_left_index(
×
386
        &self,
×
387
        key: &[u8],
×
388
        value: &[u8],
×
389
        db: &Database,
×
390
        transaction: &SharedTransaction,
×
391
    ) -> Result<(), ExecutionError> {
×
392
        let mut exclusive_transaction = transaction.write();
×
393
        let mut prefix_transaction =
×
394
            PrefixTransaction::new(&mut exclusive_transaction, self.left_table as u32);
×
395

×
396
        prefix_transaction.del(*db, key, Some(value))?;
×
397

×
398
        Ok(())
×
399
    }
×
400
}
401

×
402
fn join_records(left_record: &mut Record, right_record: &mut Record) -> Record {
×
403
    left_record.values.append(&mut right_record.values);
×
404
    Record::new(None, left_record.values.clone(), None)
×
405
}
×
406

×
407
pub fn get_composite_key(record: &Record, key_indexes: &[usize]) -> Result<Vec<u8>, TypeError> {
×
408
    let mut join_key = Vec::with_capacity(64);
×
409

410
    for key_index in key_indexes.iter() {
×
411
        let key_value = record.get_value(*key_index)?;
×
412
        let key_bytes = key_value.encode();
×
413
        join_key.extend(key_bytes.iter());
×
414
    }
×
415

×
416
    Ok(join_key)
×
417
}
×
418

419
pub fn get_lookup_key(record: &Record, schema: &Schema) -> Result<Vec<u8>, TypeError> {
×
420
    get_composite_key(record, schema.primary_index.as_slice())
×
421
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc