• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4302087115

pending completion
4302087115

push

github

GitHub
chore: Move `SnapshottingDone` out of `Operation` so processors don't have to know it.(#1103)

364 of 364 new or added lines in 33 files covered. (100.0%)

28623 of 40224 relevant lines covered (71.16%)

56785.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.2
/dozer-sql/src/pipeline/product/processor.rs
1
use crate::pipeline::errors::{PipelineError, ProductError};
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::record_store::RecordReader;
7
use dozer_core::storage::common::Database;
8
use dozer_core::storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
9
use dozer_core::DEFAULT_PORT_HANDLE;
10

11
use dozer_types::types::{Operation, Record};
12
use lmdb::DatabaseFlags;
13
use std::collections::HashMap;
14

15
use super::join::{JoinAction, JoinSource};
16

17
/// Cartesian Product Processor
18
#[derive(Debug)]
×
19
pub struct FromProcessor {
20
    /// Join operations
21
    operator: JoinSource,
22

23
    /// Database to store Join indexes
24
    db: Database,
25

26
    source_names: HashMap<PortHandle, String>,
27
}
28

29
impl FromProcessor {
30
    /// Creates a new [`FromProcessor`].
31
    pub fn new(
332✔
32
        operator: JoinSource,
332✔
33
        source_names: HashMap<PortHandle, String>,
332✔
34
        txn: &mut LmdbExclusiveTransaction,
332✔
35
    ) -> Result<Self, PipelineError> {
332✔
36
        Ok(Self {
332✔
37
            operator,
332✔
38
            db: txn.create_database(Some("product"), Some(DatabaseFlags::DUP_SORT))?,
332✔
39
            source_names,
332✔
40
        })
41
    }
332✔
42

43
    fn delete(
135✔
44
        &self,
135✔
45
        from_port: PortHandle,
135✔
46
        record: &Record,
135✔
47
        transaction: &SharedTransaction,
135✔
48
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
135✔
49
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ProductError> {
135✔
50
        self.operator
135✔
51
            .execute(
135✔
52
                JoinAction::Delete,
135✔
53
                from_port,
135✔
54
                record,
135✔
55
                &self.db,
135✔
56
                transaction,
135✔
57
                reader,
135✔
58
            )
135✔
59
            .map_err(|err| ProductError::DeleteError(self.get_port_name(from_port), Box::new(err)))
135✔
60
    }
135✔
61

62
    fn insert(
266,741✔
63
        &self,
266,741✔
64
        from_port: PortHandle,
266,741✔
65
        record: &Record,
266,741✔
66
        transaction: &SharedTransaction,
266,741✔
67
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
266,741✔
68
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ProductError> {
266,741✔
69
        self.operator
266,741✔
70
            .execute(
266,741✔
71
                JoinAction::Insert,
266,741✔
72
                from_port,
266,741✔
73
                record,
266,741✔
74
                &self.db,
266,741✔
75
                transaction,
266,741✔
76
                reader,
266,741✔
77
            )
266,741✔
78
            .map_err(|err| ProductError::InsertError(self.get_port_name(from_port), Box::new(err)))
266,741✔
79
    }
266,741✔
80

81
    #[allow(clippy::type_complexity)]
82
    fn update(
65✔
83
        &self,
65✔
84
        from_port: PortHandle,
65✔
85
        old: &Record,
65✔
86
        new: &Record,
65✔
87
        transaction: &SharedTransaction,
65✔
88
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
65✔
89
    ) -> Result<
65✔
90
        (
65✔
91
            Vec<(JoinAction, Record, Vec<u8>)>,
65✔
92
            Vec<(JoinAction, Record, Vec<u8>)>,
65✔
93
        ),
65✔
94
        ProductError,
65✔
95
    > {
65✔
96
        let old_records = self
65✔
97
            .operator
65✔
98
            .execute(
65✔
99
                JoinAction::Delete,
65✔
100
                from_port,
65✔
101
                old,
65✔
102
                &self.db,
65✔
103
                transaction,
65✔
104
                reader,
65✔
105
            )
65✔
106
            .map_err(|err| {
65✔
107
                ProductError::UpdateOldError(self.get_port_name(from_port), Box::new(err))
×
108
            })?;
65✔
109

110
        let new_records = self
65✔
111
            .operator
65✔
112
            .execute(
65✔
113
                JoinAction::Insert,
65✔
114
                from_port,
65✔
115
                new,
65✔
116
                &self.db,
65✔
117
                transaction,
65✔
118
                reader,
65✔
119
            )
65✔
120
            .map_err(|err| {
65✔
121
                ProductError::UpdateNewError(self.get_port_name(from_port), Box::new(err))
×
122
            })?;
65✔
123

124
        Ok((old_records, new_records))
65✔
125
    }
65✔
126

127
    fn get_port_name(&self, from_port: u16) -> String {
×
128
        self.source_names
×
129
            .get(&from_port)
×
130
            .unwrap_or(&from_port.to_string())
×
131
            .to_string()
×
132
    }
×
133
}
134

135
impl Processor for FromProcessor {
136
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
542✔
137
        Ok(())
542✔
138
    }
542✔
139

140
    fn process(
266,941✔
141
        &mut self,
266,941✔
142
        from_port: PortHandle,
266,941✔
143
        op: Operation,
266,941✔
144
        fw: &mut dyn ProcessorChannelForwarder,
266,941✔
145
        transaction: &SharedTransaction,
266,941✔
146
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
266,941✔
147
    ) -> Result<(), ExecutionError> {
266,941✔
148
        // match op.clone() {
266,941✔
149
        //     Operation::Delete { old } => info!("p{from_port}: - {:?}", old.values),
266,941✔
150
        //     Operation::Insert { new } => info!("p{from_port}: + {:?}", new.values),
266,941✔
151
        //     Operation::Update { old, new } => {
266,941✔
152
        //         info!("p{from_port}: - {:?}, + {:?}", old.values, new.values)
266,941✔
153
        //     }
266,941✔
154
        // }
266,941✔
155

266,941✔
156
        match op {
266,941✔
157
            Operation::Delete { ref old } => {
135✔
158
                let records = self
135✔
159
                    .delete(from_port, old, transaction, reader)
135✔
160
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
135✔
161

162
                for (action, record, _key) in records.into_iter() {
135✔
163
                    match action {
110✔
164
                        JoinAction::Insert => {
×
165
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
×
166
                        }
×
167
                        JoinAction::Delete => {
110✔
168
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
110✔
169
                        }
110✔
170
                    }
171
                }
172
            }
173
            Operation::Insert { ref new } => {
266,741✔
174
                let records = self
266,741✔
175
                    .insert(from_port, new, transaction, reader)
266,741✔
176
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
266,741✔
177

178
                for (action, record, _key) in records.into_iter() {
284,481✔
179
                    match action {
284,481✔
180
                        JoinAction::Insert => {
257,266✔
181
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
257,266✔
182
                        }
257,266✔
183
                        JoinAction::Delete => {
27,215✔
184
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
27,215✔
185
                        }
27,215✔
186
                    }
187
                }
188
            }
189
            Operation::Update { ref old, ref new } => {
65✔
190
                let (old_join_records, new_join_records) = self
65✔
191
                    .update(from_port, old, new, transaction, reader)
65✔
192
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
65✔
193

194
                for (action, old, _key) in old_join_records.into_iter() {
65✔
195
                    match action {
50✔
196
                        JoinAction::Insert => {
×
197
                            let _ = fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE);
×
198
                        }
×
199
                        JoinAction::Delete => {
50✔
200
                            let _ = fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE);
50✔
201
                        }
50✔
202
                    }
203
                }
204

205
                for (action, new, _key) in new_join_records.into_iter() {
65✔
206
                    match action {
50✔
207
                        JoinAction::Insert => {
50✔
208
                            let _ = fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE);
50✔
209
                        }
50✔
210
                        JoinAction::Delete => {
×
211
                            let _ = fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE);
×
212
                        }
×
213
                    }
214
                }
215
            }
216
        }
×
217
        Ok(())
266,941✔
218
    }
266,941✔
219
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc