• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3965135367

pending completion
3965135367

Pull #680

github

GitHub
Merge 1add77327 into 56c0cf2b3
Pull Request #680: feat: Implement nested queries and CTE.

506 of 506 new or added lines in 18 files covered. (100.0%)

21999 of 33062 relevant lines covered (66.54%)

50489.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.91
/dozer-sql/src/pipeline/product/processor.rs
1
use crate::pipeline::errors::PipelineError;
2
use dozer_core::dag::channels::ProcessorChannelForwarder;
3
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
4
use dozer_core::dag::epoch::Epoch;
5
use dozer_core::dag::errors::{ExecutionError, JoinError};
6
use dozer_core::dag::node::{PortHandle, Processor};
7
use dozer_core::dag::record_store::RecordReader;
8
use dozer_core::storage::common::Database;
9
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
10
use dozer_types::internal_err;
11
use dozer_types::types::{Operation, Record};
12
use std::collections::HashMap;
13

14
use dozer_core::dag::errors::ExecutionError::InternalError;
15

16
use super::join::{get_lookup_key, JoinExecutor, JoinTable};
17

18
/// Cartesian Product Processor
19
#[derive(Debug)]
×
20
pub struct ProductProcessor {
21
    /// Join operations
22
    join_tables: HashMap<PortHandle, JoinTable>,
23

24
    /// Database to store Join indexes
25
    db: Option<Database>,
26
}
27

28
impl ProductProcessor {
29
    /// Creates a new [`ProductProcessor`].
30
    pub fn new(join_tables: HashMap<PortHandle, JoinTable>) -> Self {
76✔
31
        Self {
76✔
32
            join_tables,
76✔
33
            db: None,
76✔
34
        }
76✔
35
    }
76✔
36

37
    fn init_store(&mut self, env: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
76✔
38
        self.db = Some(env.open_database("product", true)?);
76✔
39

40
        Ok(())
76✔
41
    }
76✔
42

43
    fn delete(
44
        &self,
45
        from_port: PortHandle,
46
        record: &Record,
47
        transaction: &SharedTransaction,
48
        reader: &HashMap<PortHandle, RecordReader>,
49
    ) -> Result<Vec<Record>, ExecutionError> {
50
        // Get the input Table based on the port of the incoming message
51
        if let Some(input_table) = self.join_tables.get(&from_port) {
3✔
52
            let mut records = vec![record.clone()];
3✔
53

54
            let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
3✔
55

56
            let mut input_left_join = &input_table.left;
3✔
57

58
            if let Some(left_join) = input_left_join {
3✔
59
                // generate the key with the fields of the left table used in the join contstraint
60
                let join_key: Vec<u8> = left_join.get_right_record_join_key(record)?;
×
61
                // generate the key with theprimary key fields of the left table
62
                let lookup_key: Vec<u8> = get_lookup_key(record, &input_table.schema)?;
×
63
                // Update the Join index
64
                left_join.delete_right_index(&join_key, &lookup_key, database, transaction)?;
×
65
            }
3✔
66

67
            while let Some(left_join) = input_left_join {
3✔
68
                let join_key: Vec<u8> = left_join.get_right_record_join_key(record)?;
×
69
                records = left_join.execute_left(
×
70
                    records,
×
71
                    &join_key,
×
72
                    database,
×
73
                    transaction,
×
74
                    reader,
×
75
                    &self.join_tables,
×
76
                )?;
×
77

78
                let next_table = self
×
79
                    .join_tables
×
80
                    .get(&left_join.left_table)
×
81
                    .ok_or(ExecutionError::InvalidPortHandle(left_join.left_table))?;
×
82
                input_left_join = &next_table.left;
×
83
            }
84

85
            let mut input_right_join = &input_table.right;
3✔
86

87
            while let Some(right_join) = input_right_join {
3✔
88
                // generate the key with the fields of the left table used in the join contstraint
89
                let join_key: Vec<u8> = right_join.get_left_record_join_key(record)?;
×
90
                // generate the key with theprimary key fields of the left table
91
                let lookup_key: Vec<u8> = get_lookup_key(record, &input_table.schema)?;
×
92
                // Update the Join index
93
                right_join.delete_left_index(&join_key, &lookup_key, database, transaction)?;
×
94

95
                records = right_join.execute_right(
×
96
                    records,
×
97
                    &join_key,
×
98
                    database,
×
99
                    transaction,
×
100
                    reader,
×
101
                    &self.join_tables,
×
102
                )?;
×
103

104
                let next_table = self
×
105
                    .join_tables
×
106
                    .get(&right_join.right_table)
×
107
                    .ok_or(ExecutionError::InvalidPortHandle(right_join.left_table))?;
×
108
                input_right_join = &next_table.right;
×
109
            }
110

111
            return Ok(records);
3✔
112
        }
×
113

×
114
        Err(ExecutionError::InvalidPortHandle(from_port))
×
115
    }
3✔
116

117
    fn insert(
118
        &self,
119
        from_port: PortHandle,
120
        record: &Record,
121
        transaction: &SharedTransaction,
122
        reader: &HashMap<PortHandle, RecordReader>,
123
    ) -> Result<Vec<Record>, ExecutionError> {
124
        // Get the input Table based on the port of the incoming message
125

126
        if let Some(input_table) = self.join_tables.get(&from_port) {
17,278✔
127
            let mut records = vec![record.clone()];
17,278✔
128

129
            let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
17,278✔
130

131
            let mut input_left_join = &input_table.left;
17,278✔
132

133
            // Update the Join index
×
134
            if let Some(left_join) = input_left_join {
17,278✔
135
                // generate the key with the fields of the left table used in the join contstraint
×
136
                let join_key: Vec<u8> = left_join.get_right_record_join_key(record)?;
×
137
                // generate the key with theprimary key fields of the left table
×
138
                let lookup_key: Vec<u8> = get_lookup_key(record, &input_table.schema)?;
×
139
                // Update the Join index
×
140
                left_join.insert_right_index(&join_key, &lookup_key, database, transaction)?;
×
141
            }
17,278✔
142

×
143
            while let Some(left_join) = input_left_join {
17,278✔
144
                let join_key: Vec<u8> = left_join.get_right_record_join_key(record)?;
×
145
                records = left_join.execute_left(
×
146
                    records,
×
147
                    &join_key,
×
148
                    database,
×
149
                    transaction,
×
150
                    reader,
×
151
                    &self.join_tables,
×
152
                )?;
×
153

×
154
                let next_table = self.join_tables.get(&left_join.left_table).ok_or(
×
155
                    ExecutionError::JoinError(JoinError::InsertPortError(left_join.left_table)),
×
156
                )?;
×
157
                input_left_join = &next_table.left;
×
158
            }
159

160
            let mut input_right_join = &input_table.right;
17,278✔
161

162
            while let Some(right_join) = input_right_join {
17,278✔
163
                // generate the key with the fields of the left table used in the join contstraint
164
                let join_key: Vec<u8> = right_join.get_left_record_join_key(record)?;
×
165
                // generate the key with theprimary key fields of the left table
166
                let lookup_key: Vec<u8> = get_lookup_key(record, &input_table.schema)?;
×
167
                // Update the Join index
168
                right_join.insert_left_index(&join_key, &lookup_key, database, transaction)?;
×
169

170
                records = right_join.execute_right(
×
171
                    records,
×
172
                    &join_key,
×
173
                    database,
×
174
                    transaction,
×
175
                    reader,
×
176
                    &self.join_tables,
×
177
                )?;
×
178

179
                let next_table = self.join_tables.get(&right_join.right_table).ok_or(
×
180
                    ExecutionError::JoinError(JoinError::InsertPortError(right_join.left_table)),
×
181
                )?;
×
182
                input_right_join = &next_table.right;
×
183
            }
×
184

185
            return Ok(records);
17,278✔
186
        }
×
187

×
188
        Err(ExecutionError::JoinError(JoinError::PortNotConnected(
×
189
            from_port,
×
190
        )))
×
191
    }
17,278✔
192

×
193
    fn update(
3✔
194
        &self,
3✔
195
        from_port: PortHandle,
3✔
196
        old: &Record,
3✔
197
        new: &Record,
3✔
198
        transaction: &SharedTransaction,
3✔
199
        reader: &HashMap<PortHandle, RecordReader>,
3✔
200
    ) -> Result<(Vec<Record>, Vec<Record>), ExecutionError> {
3✔
201
        let input_table = self
3✔
202
            .join_tables
3✔
203
            .get(&from_port)
3✔
204
            .ok_or(ExecutionError::InvalidPortHandle(from_port))?;
3✔
205

×
206
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
3✔
207

×
208
        let mut old_records = vec![old.clone()];
3✔
209
        let mut new_records = vec![new.clone()];
3✔
210

3✔
211
        let mut input_left_join = &input_table.left;
3✔
212

×
213
        if let Some(left_join) = input_left_join {
3✔
214
            let old_join_key: Vec<u8> = left_join.get_right_record_join_key(old)?;
×
215
            let old_lookup_key: Vec<u8> = get_lookup_key(old, &input_table.schema)?;
×
216
            let new_join_key: Vec<u8> = left_join.get_right_record_join_key(new)?;
×
217
            let new_lookup_key: Vec<u8> = get_lookup_key(new, &input_table.schema)?;
×
218

219
            // Update the Join index
×
220
            left_join.delete_right_index(&old_join_key, &old_lookup_key, database, transaction)?;
×
221
            left_join.insert_right_index(&new_join_key, &new_lookup_key, database, transaction)?;
×
222
        }
3✔
223

×
224
        while let Some(left_join) = input_left_join {
3✔
225
            let old_join_key: Vec<u8> = left_join.get_right_record_join_key(old)?;
×
226
            old_records = left_join.execute_left(
×
227
                old_records,
×
228
                &old_join_key,
×
229
                database,
×
230
                transaction,
×
231
                reader,
×
232
                &self.join_tables,
×
233
            )?;
×
234

×
235
            let new_join_key: Vec<u8> = left_join.get_right_record_join_key(new)?;
×
236
            new_records = left_join.execute_left(
×
237
                new_records,
×
238
                &new_join_key,
×
239
                database,
×
240
                transaction,
×
241
                reader,
×
242
                &self.join_tables,
×
243
            )?;
×
244

×
245
            let next_table = self
×
246
                .join_tables
×
247
                .get(&left_join.left_table)
×
248
                .ok_or(ExecutionError::InvalidPortHandle(left_join.left_table))?;
×
249
            input_left_join = &next_table.left;
×
250
        }
251

×
252
        let mut input_right_join = &input_table.right;
3✔
253

×
254
        if let Some(right_join) = input_right_join {
3✔
255
            let old_join_key: Vec<u8> = right_join.get_left_record_join_key(old)?;
×
256
            let old_lookup_key: Vec<u8> = get_lookup_key(old, &input_table.schema)?;
×
257
            let new_join_key: Vec<u8> = right_join.get_left_record_join_key(new)?;
×
258
            let new_lookup_key: Vec<u8> = get_lookup_key(new, &input_table.schema)?;
×
259

260
            // Update the Join index
×
261
            right_join.delete_left_index(&old_join_key, &old_lookup_key, database, transaction)?;
×
262
            right_join.insert_left_index(&new_join_key, &new_lookup_key, database, transaction)?;
×
263
        }
3✔
264

×
265
        while let Some(right_join) = input_right_join {
3✔
266
            let old_join_key: Vec<u8> = right_join.get_left_record_join_key(old)?;
×
267

×
268
            old_records = right_join.execute_right(
×
269
                old_records,
×
270
                &old_join_key,
×
271
                database,
×
272
                transaction,
×
273
                reader,
×
274
                &self.join_tables,
×
275
            )?;
×
276

×
277
            let new_join_key: Vec<u8> = right_join.get_left_record_join_key(new)?;
×
278

×
279
            new_records = right_join.execute_right(
×
280
                new_records,
×
281
                &new_join_key,
×
282
                database,
×
283
                transaction,
×
284
                reader,
×
285
                &self.join_tables,
×
286
            )?;
×
287

×
288
            let next_table = self
×
289
                .join_tables
×
290
                .get(&right_join.right_table)
×
291
                .ok_or(ExecutionError::InvalidPortHandle(right_join.left_table))?;
×
292
            input_right_join = &next_table.right;
×
293
        }
294

×
295
        Ok((old_records, new_records))
3✔
296
    }
3✔
297
}
298

299
impl Processor for ProductProcessor {
×
300
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
76✔
301
        internal_err!(self.init_store(state))
×
302
    }
76✔
303

×
304
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
1,487✔
305
        Ok(())
1,487✔
306
    }
1,487✔
307

×
308
    fn process(
17,284✔
309
        &mut self,
17,284✔
310
        from_port: PortHandle,
17,284✔
311
        op: Operation,
17,284✔
312
        fw: &mut dyn ProcessorChannelForwarder,
17,284✔
313
        transaction: &SharedTransaction,
17,284✔
314
        reader: &HashMap<PortHandle, RecordReader>,
17,284✔
315
    ) -> Result<(), ExecutionError> {
17,284✔
316
        match op {
17,284✔
317
            Operation::Delete { ref old } => {
3✔
318
                let records = self.delete(from_port, old, transaction, reader)?;
3✔
319

×
320
                for record in records.into_iter() {
3✔
321
                    let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
3✔
322
                }
3✔
323
            }
×
324
            Operation::Insert { ref new } => {
17,278✔
325
                let records = self.insert(from_port, new, transaction, reader)?;
17,278✔
326

×
327
                for record in records.into_iter() {
17,278✔
328
                    let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
17,278✔
329
                }
17,278✔
330
            }
×
331
            Operation::Update { ref old, ref new } => {
3✔
332
                let (old_join_records, new_join_records) =
3✔
333
                    self.update(from_port, old, new, transaction, reader)?;
3✔
334

×
335
                for old in old_join_records.into_iter() {
3✔
336
                    let _ = fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE);
3✔
337
                }
3✔
338

×
339
                for new in new_join_records.into_iter() {
3✔
340
                    let _ = fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE);
3✔
341
                }
3✔
342
            }
343
        }
×
344
        Ok(())
17,284✔
345
    }
17,284✔
346
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc