• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4439171992

pending completion
4439171992

push

github

GitHub
fix: Don't panic if operation cannot be read from indexing thread. (#1247)

6 of 6 new or added lines in 1 file covered. (100.0%)

28267 of 40217 relevant lines covered (70.29%)

84996.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.99
/dozer-sql/src/pipeline/product/processor.rs
1
use crate::pipeline::errors::{PipelineError, ProductError};
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::DEFAULT_PORT_HANDLE;
7

8
use dozer_core::storage::lmdb_storage::SharedTransaction;
9
use dozer_types::types::{Field, Operation, Record};
10
use std::collections::HashMap;
11

12
use super::join::{JoinAction, JoinSource};
13

14
/// Cartesian Product Processor
15
#[derive(Debug)]
×
16
pub struct FromProcessor {
17
    /// Join operations
18
    operator: JoinSource,
19

20
    source_names: HashMap<PortHandle, String>,
21
}
22

23
impl FromProcessor {
24
    /// Creates a new [`FromProcessor`].
25
    pub fn new(
469✔
26
        operator: JoinSource,
469✔
27
        source_names: HashMap<PortHandle, String>,
469✔
28
    ) -> Result<Self, PipelineError> {
469✔
29
        Ok(Self {
469✔
30
            operator,
469✔
31
            source_names,
469✔
32
        })
469✔
33
    }
469✔
34

35
    fn delete(
189✔
36
        &mut self,
189✔
37
        from_port: PortHandle,
189✔
38
        record: &Record,
189✔
39
    ) -> Result<Vec<(JoinAction, Record, Vec<Field>)>, ProductError> {
189✔
40
        self.operator
189✔
41
            .execute(JoinAction::Delete, from_port, record)
189✔
42
            .map_err(|err| ProductError::DeleteError(self.get_port_name(from_port), Box::new(err)))
189✔
43
    }
189✔
44

45
    fn insert(
372,106✔
46
        &mut self,
372,106✔
47
        from_port: PortHandle,
372,106✔
48
        record: &Record,
372,106✔
49
    ) -> Result<Vec<(JoinAction, Record, Vec<Field>)>, ProductError> {
372,106✔
50
        self.operator
372,106✔
51
            .execute(JoinAction::Insert, from_port, record)
372,106✔
52
            .map_err(|err| ProductError::InsertError(self.get_port_name(from_port), Box::new(err)))
372,106✔
53
    }
372,106✔
54

55
    #[allow(clippy::type_complexity)]
56
    fn update(
91✔
57
        &mut self,
91✔
58
        from_port: PortHandle,
91✔
59
        old: &Record,
91✔
60
        new: &Record,
91✔
61
    ) -> Result<
91✔
62
        (
91✔
63
            Vec<(JoinAction, Record, Vec<Field>)>,
91✔
64
            Vec<(JoinAction, Record, Vec<Field>)>,
91✔
65
        ),
91✔
66
        ProductError,
91✔
67
    > {
91✔
68
        let old_records = self
91✔
69
            .operator
91✔
70
            .execute(JoinAction::Delete, from_port, old)
91✔
71
            .map_err(|err| {
91✔
72
                ProductError::UpdateOldError(self.get_port_name(from_port), Box::new(err))
×
73
            })?;
91✔
74

75
        let new_records = self
91✔
76
            .operator
91✔
77
            .execute(JoinAction::Insert, from_port, new)
91✔
78
            .map_err(|err| {
91✔
79
                ProductError::UpdateNewError(self.get_port_name(from_port), Box::new(err))
×
80
            })?;
91✔
81

82
        Ok((old_records, new_records))
91✔
83
    }
91✔
84

85
    fn get_port_name(&self, from_port: u16) -> String {
×
86
        self.source_names
×
87
            .get(&from_port)
×
88
            .unwrap_or(&from_port.to_string())
×
89
            .to_string()
×
90
    }
×
91
}
92

93
impl Processor for FromProcessor {
94
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
553✔
95
        Ok(())
553✔
96
    }
553✔
97

98
    fn process(
372,428✔
99
        &mut self,
372,428✔
100
        from_port: PortHandle,
372,428✔
101
        op: Operation,
372,428✔
102
        fw: &mut dyn ProcessorChannelForwarder,
372,428✔
103
        _transaction: &SharedTransaction,
372,428✔
104
    ) -> Result<(), ExecutionError> {
372,428✔
105
        // match op.clone() {
372,428✔
106
        //     Operation::Delete { old } => info!("p{from_port}: - {:?}", old.values),
372,428✔
107
        //     Operation::Insert { new } => info!("p{from_port}: + {:?}", new.values),
372,428✔
108
        //     Operation::Update { old, new } => {
372,428✔
109
        //         info!("p{from_port}: - {:?}, + {:?}", old.values, new.values)
372,428✔
110
        //     }
372,428✔
111
        // }
372,428✔
112

372,428✔
113
        match op {
372,428✔
114
            Operation::Delete { ref old } => {
84✔
115
                let records = self
84✔
116
                    .delete(from_port, old)
84✔
117
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
84✔
118

119
                for (action, record, _key) in records.into_iter() {
154✔
120
                    match action {
154✔
121
                        JoinAction::Insert => {
122
                            fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
×
123
                        }
124
                        JoinAction::Delete => {
125
                            fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
154✔
126
                        }
127
                    }
128
                }
129
            }
130
            Operation::Insert { ref new } => {
372,253✔
131
                let records = self
372,253✔
132
                    .insert(from_port, new)
372,253✔
133
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
372,253✔
134

135
                for (action, record, _key) in records.into_iter() {
372,253✔
136
                    match action {
356,866✔
137
                        JoinAction::Insert => {
138
                            fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
355,473✔
139
                        }
140
                        JoinAction::Delete => {
141
                            fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
1,393✔
142
                        }
143
                    }
144
                }
145
            }
146
            Operation::Update { ref old, ref new } => {
91✔
147
                let (old_join_records, new_join_records) = self
91✔
148
                    .update(from_port, old, new)
91✔
149
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
91✔
150

151
                for (action, old, _key) in old_join_records.into_iter() {
91✔
152
                    match action {
70✔
153
                        JoinAction::Insert => {
154
                            fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE)?;
×
155
                        }
156
                        JoinAction::Delete => {
157
                            fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE)?;
70✔
158
                        }
159
                    }
160
                }
161

162
                for (action, new, _key) in new_join_records.into_iter() {
91✔
163
                    match action {
70✔
164
                        JoinAction::Insert => {
165
                            fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE)?;
70✔
166
                        }
167
                        JoinAction::Delete => {
168
                            fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE)?;
×
169
                        }
170
                    }
171
                }
172
            }
173
        }
174
        Ok(())
372,714✔
175
    }
372,714✔
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc