• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4113913291

pending completion
4113913291

Pull #821

github

GitHub
Merge a8cca3f0b into 8f74ec17e
Pull Request #821: refactor: Make `LmdbRoCache` and `LmdbRwCache` `Send` and `Sync`

869 of 869 new or added lines in 45 files covered. (100.0%)

23486 of 37503 relevant lines covered (62.62%)

36806.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.58
/dozer-sql/src/pipeline/product/processor.rs
1
use crate::pipeline::errors::PipelineError;
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::record_store::RecordReader;
7
use dozer_core::storage::common::Database;
8
use dozer_core::storage::lmdb_storage::{LmdbEnvironmentManager, SharedTransaction};
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::internal_err;
11

12
use dozer_types::types::{Operation, Record};
13
use lmdb::DatabaseFlags;
14
use std::collections::HashMap;
15

16
use dozer_core::errors::ExecutionError::InternalError;
17

18
use super::join::{JoinAction, JoinSource};
19

20
/// Cartesian Product Processor
×
21
#[derive(Debug)]
×
22
pub struct FromProcessor {
23
    /// Join operations
24
    operator: JoinSource,
25

26
    /// Database to store Join indexes
27
    db: Option<Database>,
28
}
29

30
impl FromProcessor {
31
    /// Creates a new [`FromProcessor`].
×
32
    pub fn new(operator: JoinSource) -> Self {
69✔
33
        Self { operator, db: None }
69✔
34
    }
69✔
35

×
36
    fn init_store(&mut self, env: &mut LmdbEnvironmentManager) -> Result<(), PipelineError> {
69✔
37
        self.db = Some(env.create_database(Some("product"), Some(DatabaseFlags::DUP_SORT))?);
69✔
38

×
39
        Ok(())
69✔
40
    }
69✔
41

×
42
    fn delete(
12✔
43
        &self,
12✔
44
        from_port: PortHandle,
12✔
45
        record: &Record,
12✔
46
        transaction: &SharedTransaction,
12✔
47
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
12✔
48
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
12✔
49
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
12✔
50

×
51
        self.operator.execute(
12✔
52
            JoinAction::Delete,
12✔
53
            from_port,
12✔
54
            record,
12✔
55
            database,
12✔
56
            transaction,
12✔
57
            reader,
12✔
58
        )
12✔
59
    }
12✔
60

×
61
    fn insert(
58,533✔
62
        &self,
58,533✔
63
        from_port: PortHandle,
58,533✔
64
        record: &Record,
58,533✔
65
        transaction: &SharedTransaction,
58,533✔
66
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
58,533✔
67
    ) -> Result<Vec<(JoinAction, Record, Vec<u8>)>, ExecutionError> {
58,533✔
68
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
58,533✔
69

×
70
        self.operator.execute(
58,533✔
71
            JoinAction::Insert,
58,533✔
72
            from_port,
58,533✔
73
            record,
58,533✔
74
            database,
58,533✔
75
            transaction,
58,533✔
76
            reader,
58,533✔
77
        )
58,533✔
78
    }
58,533✔
79

80
    #[allow(clippy::type_complexity)]
×
81
    fn update(
12✔
82
        &self,
12✔
83
        from_port: PortHandle,
12✔
84
        old: &Record,
12✔
85
        new: &Record,
12✔
86
        transaction: &SharedTransaction,
12✔
87
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
12✔
88
    ) -> Result<
12✔
89
        (
12✔
90
            Vec<(JoinAction, Record, Vec<u8>)>,
12✔
91
            Vec<(JoinAction, Record, Vec<u8>)>,
12✔
92
        ),
12✔
93
        ExecutionError,
12✔
94
    > {
12✔
95
        let database = &self.db.ok_or(ExecutionError::InvalidDatabase)?;
12✔
96

×
97
        let old_records = self.operator.execute(
12✔
98
            JoinAction::Delete,
12✔
99
            from_port,
12✔
100
            old,
12✔
101
            database,
12✔
102
            transaction,
12✔
103
            reader,
12✔
104
        )?;
12✔
105

×
106
        let new_records = self.operator.execute(
12✔
107
            JoinAction::Insert,
12✔
108
            from_port,
12✔
109
            new,
12✔
110
            database,
12✔
111
            transaction,
12✔
112
            reader,
12✔
113
        )?;
12✔
114

×
115
        Ok((old_records, new_records))
12✔
116
    }
12✔
117
}
×
118

×
119
impl Processor for FromProcessor {
×
120
    fn init(&mut self, state: &mut LmdbEnvironmentManager) -> Result<(), ExecutionError> {
69✔
121
        internal_err!(self.init_store(state))
×
122
    }
69✔
123

×
124
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
95✔
125
        Ok(())
95✔
126
    }
95✔
127

×
128
    fn process(
58,557✔
129
        &mut self,
58,557✔
130
        from_port: PortHandle,
58,557✔
131
        op: Operation,
58,557✔
132
        fw: &mut dyn ProcessorChannelForwarder,
58,557✔
133
        transaction: &SharedTransaction,
58,557✔
134
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
58,557✔
135
    ) -> Result<(), ExecutionError> {
58,557✔
136
        // match op.clone() {
58,557✔
137
        //     Operation::Delete { old } => info!("p{from_port}: - {:?}", old.values),
58,557✔
138
        //     Operation::Insert { new } => info!("p{from_port}: + {:?}", new.values),
58,557✔
139
        //     Operation::Update { old, new } => {
58,557✔
140
        //         info!("p{from_port}: - {:?}, + {:?}", old.values, new.values)
58,557✔
141
        //     }
58,557✔
142
        // }
58,557✔
143

58,557✔
144
        match op {
58,557✔
145
            Operation::Delete { ref old } => {
12✔
146
                let records = self.delete(from_port, old, transaction, reader)?;
12✔
147

×
148
                for (action, record, _key) in records.into_iter() {
12✔
149
                    match action {
6✔
150
                        JoinAction::Insert => {
×
151
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
×
152
                        }
×
153
                        JoinAction::Delete => {
6✔
154
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
6✔
155
                        }
6✔
156
                    }
×
157
                }
×
158
            }
×
159
            Operation::Insert { ref new } => {
58,533✔
160
                let records = self.insert(from_port, new, transaction, reader)?;
58,533✔
161

×
162
                for (action, record, _key) in records.into_iter() {
66,069✔
163
                    match action {
66,069✔
164
                        JoinAction::Insert => {
55,183✔
165
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
55,183✔
166
                        }
55,183✔
167
                        JoinAction::Delete => {
10,886✔
168
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
10,886✔
169
                        }
10,886✔
170
                    }
171
                }
172
            }
×
173
            Operation::Update { ref old, ref new } => {
12✔
174
                let (old_join_records, new_join_records) =
12✔
175
                    self.update(from_port, old, new, transaction, reader)?;
12✔
176

×
177
                for (action, old, _key) in old_join_records.into_iter() {
12✔
178
                    match action {
6✔
179
                        JoinAction::Insert => {
×
180
                            let _ = fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE);
×
181
                        }
×
182
                        JoinAction::Delete => {
6✔
183
                            let _ = fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE);
6✔
184
                        }
6✔
185
                    }
186
                }
187

×
188
                for (action, new, _key) in new_join_records.into_iter() {
12✔
189
                    match action {
6✔
190
                        JoinAction::Insert => {
6✔
191
                            let _ = fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE);
6✔
192
                        }
6✔
193
                        JoinAction::Delete => {
×
194
                            let _ = fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE);
×
195
                        }
×
196
                    }
197
                }
198
            }
199
        }
×
200
        Ok(())
58,557✔
201
    }
58,557✔
202
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc