• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4123314638

pending completion
4123314638

Pull #831

github

GitHub
Merge b8720faa6 into f4fe30c14
Pull Request #831: chore: Improve Join processor errors

136 of 136 new or added lines in 4 files covered. (100.0%)

23567 of 34840 relevant lines covered (67.64%)

39485.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.31
/dozer-sql/src/pipeline/product/processor.rs
1
use crate::pipeline::errors::PipelineError;
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::record_store::RecordReader;
7
use dozer_core::storage::common::Database;
8
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::internal_err;
11

12
use dozer_types::types::{Operation, Record};
13
use lmdb::DatabaseFlags;
14
use std::collections::HashMap;
15

16
use dozer_core::errors::ExecutionError::InternalError;
17

18
use super::join::{JoinAction, JoinSource};
19

20
/// Cartesian Product Processor
×
21
#[derive(Debug)]
×
22
pub struct FromProcessor {
23
    /// Join operations
24
    operator: JoinSource,
25

26
    /// Database to store Join indexes
27
    db: Option<Database>,
28
}
29

30
impl FromProcessor {
31
    /// Creates a new [`FromProcessor`].
×
32
    pub fn new(operator: JoinSource) -> Self {
105✔
33
        Self { operator, db: None }
105✔
34
    }
105✔
35

×
36
    fn init_store(&mut self, env: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
105✔
37
        self.db = Some(env.create_database(Some("product"), Some(DatabaseFlags::DUP_SORT))?);
105✔
38

×
39
        Ok(())
105✔
40
    }
105✔
41

×
42
    fn delete(
42✔
43
        &self,
42✔
44
        from_port: PortHandle,
42✔
45
        record: &Record,
42✔
46
        transaction: &SharedTransaction,
42✔
47
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
42✔
48
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
42✔
49
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
42✔
50

×
51
        self.operator
42✔
52
            .execute(
42✔
53
                JoinAction::Delete,
42✔
54
                from_port,
42✔
55
                record,
42✔
56
                database,
42✔
57
                transaction,
42✔
58
                reader,
42✔
59
            )
42✔
60
            .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))
42✔
61
    }
42✔
62

×
63
    fn insert(
72,259✔
64
        &self,
72,259✔
65
        from_port: PortHandle,
72,259✔
66
        record: &Record,
72,259✔
67
        transaction: &SharedTransaction,
72,259✔
68
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
72,259✔
69
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
72,259✔
70
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
72,259✔
71

×
72
        self.operator
72,259✔
73
            .execute(
72,259✔
74
                JoinAction::Insert,
72,259✔
75
                from_port,
72,259✔
76
                record,
72,259✔
77
                database,
72,259✔
78
                transaction,
72,259✔
79
                reader,
72,259✔
80
            )
72,259✔
81
            .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))
72,259✔
82
    }
72,259✔
83

×
84
    #[allow(clippy::type_complexity)]
×
85
    fn update(
22✔
86
        &self,
22✔
87
        from_port: PortHandle,
22✔
88
        old: &Record,
22✔
89
        new: &Record,
22✔
90
        transaction: &SharedTransaction,
22✔
91
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
22✔
92
    ) -> Result<
22✔
93
        (
22✔
94
            Vec<(JoinAction, Record, Vec<u8>)>,
22✔
95
            Vec<(JoinAction, Record, Vec<u8>)>,
22✔
96
        ),
22✔
97
        ExecutionError,
22✔
98
    > {
22✔
99
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
22✔
100

×
101
        let old_records = self
22✔
102
            .operator
22✔
103
            .execute(
22✔
104
                JoinAction::Delete,
22✔
105
                from_port,
22✔
106
                old,
22✔
107
                database,
22✔
108
                transaction,
22✔
109
                reader,
22✔
110
            )
22✔
111
            .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
22✔
112

×
113
        let new_records = self
22✔
114
            .operator
22✔
115
            .execute(
22✔
116
                JoinAction::Insert,
22✔
117
                from_port,
22✔
118
                new,
22✔
119
                database,
22✔
120
                transaction,
22✔
121
                reader,
22✔
122
            )
22✔
123
            .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
22✔
124

×
125
        Ok((old_records, new_records))
22✔
126
    }
22✔
127
}
×
128

×
129
impl Processor for FromProcessor {
×
130
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
105✔
131
        internal_err!(self.init_store(state))
×
132
    }
105✔
133

×
134
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
139✔
135
        Ok(())
139✔
136
    }
139✔
137

×
138
    fn process(
72,323✔
139
        &mut self,
72,323✔
140
        from_port: PortHandle,
72,323✔
141
        op: Operation,
72,323✔
142
        fw: &mut dyn ProcessorChannelForwarder,
72,323✔
143
        transaction: &SharedTransaction,
72,323✔
144
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
72,323✔
145
    ) -> Result<(), ExecutionError> {
72,323✔
146
        // match op.clone() {
72,323✔
147
        //     Operation::Delete { old } => info!("p{from_port}: - {:?}", old.values),
72,323✔
148
        //     Operation::Insert { new } => info!("p{from_port}: + {:?}", new.values),
72,323✔
149
        //     Operation::Update { old, new } => {
72,323✔
150
        //         info!("p{from_port}: - {:?}, + {:?}", old.values, new.values)
72,323✔
151
        //     }
72,323✔
152
        // }
72,323✔
153

72,323✔
154
        match op {
72,323✔
155
            Operation::Delete { ref old } => {
42✔
156
                let records = self.delete(from_port, old, transaction, reader)?;
42✔
157

158
                for (action, record, _key) in records.into_iter() {
42✔
159
                    match action {
32✔
160
                        JoinAction::Insert => {
×
161
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
×
162
                        }
×
163
                        JoinAction::Delete => {
32✔
164
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
32✔
165
                        }
32✔
166
                    }
×
167
                }
×
168
            }
×
169
            Operation::Insert { ref new } => {
72,259✔
170
                let records = self.insert(from_port, new, transaction, reader)?;
72,259✔
171

172
                for (action, record, _key) in records.into_iter() {
79,355✔
173
                    match action {
79,355✔
174
                        JoinAction::Insert => {
68,469✔
175
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
68,469✔
176
                        }
68,469✔
177
                        JoinAction::Delete => {
10,886✔
178
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
10,886✔
179
                        }
10,886✔
180
                    }
×
181
                }
×
182
            }
×
183
            Operation::Update { ref old, ref new } => {
22✔
184
                let (old_join_records, new_join_records) =
22✔
185
                    self.update(from_port, old, new, transaction, reader)?;
22✔
186

187
                for (action, old, _key) in old_join_records.into_iter() {
22✔
188
                    match action {
16✔
189
                        JoinAction::Insert => {
×
190
                            let _ = fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE);
×
191
                        }
×
192
                        JoinAction::Delete => {
16✔
193
                            let _ = fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE);
16✔
194
                        }
16✔
195
                    }
×
196
                }
197

198
                for (action, new, _key) in new_join_records.into_iter() {
22✔
199
                    match action {
16✔
200
                        JoinAction::Insert => {
16✔
201
                            let _ = fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE);
16✔
202
                        }
16✔
203
                        JoinAction::Delete => {
×
204
                            let _ = fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE);
×
205
                        }
×
206
                    }
207
                }
208
            }
209
        }
210
        Ok(())
72,323✔
211
    }
72,323✔
212
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc