• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4061292542

pending completion
4061292542

Pull #729

github

GitHub
Merge 069171d20 into de98caa91
Pull Request #729: feat: Implement multi-way JOIN

1356 of 1356 new or added lines in 10 files covered. (100.0%)

24817 of 38526 relevant lines covered (64.42%)

39509.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.81
/dozer-sql/src/pipeline/product/processor.rs
1
use crate::pipeline::errors::PipelineError;
2
use dozer_core::dag::channels::ProcessorChannelForwarder;
3
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
4
use dozer_core::dag::epoch::Epoch;
5
use dozer_core::dag::errors::ExecutionError;
6
use dozer_core::dag::node::{PortHandle, Processor};
7
use dozer_core::dag::record_store::RecordReader;
8
use dozer_core::storage::common::Database;
9
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
10
use dozer_types::internal_err;
11

12
use dozer_types::types::{Operation, Record};
13
use std::collections::HashMap;
14

15
use dozer_core::dag::errors::ExecutionError::InternalError;
16

17
use super::join::{JoinAction, JoinSource};
18

19
/// Cartesian Product Processor
×
20
#[derive(Debug)]
×
21
pub struct FromProcessor {
22
    /// Join operations
23
    operator: JoinSource,
24

25
    /// Database to store Join indexes
26
    db: Option<Database>,
27
}
28

29
impl FromProcessor {
30
    /// Creates a new [`FromProcessor`].
×
31
    pub fn new(operator: JoinSource) -> Self {
139✔
32
        Self { operator, db: None }
139✔
33
    }
139✔
34

×
35
    fn init_store(&mut self, env: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
139✔
36
        self.db = Some(env.open_database("product", true)?);
139✔
37

×
38
        Ok(())
139✔
39
    }
139✔
40

×
41
    fn delete(
51✔
42
        &self,
51✔
43
        from_port: PortHandle,
51✔
44
        record: &Record,
51✔
45
        transaction: &SharedTransaction,
51✔
46
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
51✔
47
    ) -> Result<Vec<(Record, Vec<u8>)>, ExecutionError> {
51✔
48
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
51✔
49

50
        self.operator.execute(
51✔
51
            &JoinAction::Delete,
51✔
52
            from_port,
51✔
53
            record,
51✔
54
            database,
51✔
55
            transaction,
51✔
56
            reader,
51✔
57
        )
51✔
58
    }
51✔
59

60
    fn insert(
76,549✔
61
        &self,
76,549✔
62
        from_port: PortHandle,
76,549✔
63
        record: &Record,
76,549✔
64
        transaction: &SharedTransaction,
76,549✔
65
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
76,549✔
66
    ) -> Result<Vec<(Record, Vec<u8>)>, ExecutionError> {
76,549✔
67
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
76,549✔
68

×
69
        self.operator.execute(
76,549✔
70
            &JoinAction::Insert,
76,549✔
71
            from_port,
76,549✔
72
            record,
76,549✔
73
            database,
76,549✔
74
            transaction,
76,549✔
75
            reader,
76,549✔
76
        )
76,549✔
77
    }
76,549✔
78

×
79
    #[allow(clippy::type_complexity)]
×
80
    fn update(
27✔
81
        &self,
27✔
82
        from_port: PortHandle,
27✔
83
        old: &Record,
27✔
84
        new: &Record,
27✔
85
        transaction: &SharedTransaction,
27✔
86
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
27✔
87
    ) -> Result<(Vec<(Record, Vec<u8>)>, Vec<(Record, Vec<u8>)>), ExecutionError> {
27✔
88
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
27✔
89

×
90
        let old_records = self.operator.execute(
27✔
91
            &JoinAction::Delete,
27✔
92
            from_port,
27✔
93
            old,
27✔
94
            database,
27✔
95
            transaction,
27✔
96
            reader,
27✔
97
        )?;
27✔
98

×
99
        let new_records = self.operator.execute(
27✔
100
            &JoinAction::Insert,
27✔
101
            from_port,
27✔
102
            new,
27✔
103
            database,
27✔
104
            transaction,
27✔
105
            reader,
27✔
106
        )?;
27✔
107

×
108
        Ok((old_records, new_records))
27✔
109
    }
27✔
110
}
111

×
112
impl Processor for FromProcessor {
×
113
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
139✔
114
        internal_err!(self.init_store(state))
×
115
    }
139✔
116

117
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
169✔
118
        Ok(())
169✔
119
    }
169✔
120

121
    fn process(
76,627✔
122
        &mut self,
76,627✔
123
        from_port: PortHandle,
76,627✔
124
        op: Operation,
76,627✔
125
        fw: &mut dyn ProcessorChannelForwarder,
76,627✔
126
        transaction: &SharedTransaction,
76,627✔
127
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
76,627✔
128
    ) -> Result<(), ExecutionError> {
76,627✔
129
        // match op.clone() {
76,627✔
130
        //     Operation::Delete { old } => info!("p{from_port}: - {:?}", old.values),
76,627✔
131
        //     Operation::Insert { new } => info!("p{from_port}: + {:?}", new.values),
76,627✔
132
        //     Operation::Update { old, new } => {
76,627✔
133
        //         info!("p{from_port}: - {:?}, + {:?}", old.values, new.values)
76,627✔
134
        //     }
76,627✔
135
        // }
76,627✔
136

76,627✔
137
        match op {
76,627✔
138
            Operation::Delete { ref old } => {
51✔
139
                let records = self.delete(from_port, old, transaction, reader)?;
51✔
140

×
141
                for (record, _lookup_key) in records.into_iter() {
51✔
142
                    let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
42✔
143
                }
42✔
144
            }
×
145
            Operation::Insert { ref new } => {
76,549✔
146
                let records = self.insert(from_port, new, transaction, reader)?;
76,549✔
147

×
148
                for (record, _lookup_key) in records.into_iter() {
76,549✔
149
                    let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
71,584✔
150
                }
71,584✔
151
            }
×
152
            Operation::Update { ref old, ref new } => {
27✔
153
                let (old_join_records, new_join_records) =
27✔
154
                    self.update(from_port, old, new, transaction, reader)?;
27✔
155

×
156
                for old in old_join_records.into_iter() {
27✔
157
                    let _ = fw.send(Operation::Delete { old: old.0 }, DEFAULT_PORT_HANDLE);
18✔
158
                }
18✔
159

160
                for new in new_join_records.into_iter() {
27✔
161
                    let _ = fw.send(Operation::Insert { new: new.0 }, DEFAULT_PORT_HANDLE);
18✔
162
                }
18✔
163
            }
164
        }
×
165
        Ok(())
76,633✔
166
    }
76,633✔
167
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc