• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4444942462

pending completion
4444942462

push

github

GitHub
Update README.md (#1254)

28368 of 38601 relevant lines covered (73.49%)

64494.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.99
/dozer-sql/src/pipeline/product/processor.rs
1
use crate::pipeline::errors::{PipelineError, ProductError};
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::DEFAULT_PORT_HANDLE;
7

8
use dozer_core::storage::lmdb_storage::SharedTransaction;
9
use dozer_types::types::{Field, Operation, Record};
10
use std::collections::HashMap;
11

12
use super::join::{JoinAction, JoinSource};
13

14
/// Cartesian Product Processor
15
#[derive(Debug)]
×
16
pub struct FromProcessor {
17
    /// Join operations
18
    operator: JoinSource,
19

20
    source_names: HashMap<PortHandle, String>,
21
}
22

23
impl FromProcessor {
24
    /// Creates a new [`FromProcessor`].
25
    pub fn new(
271✔
26
        operator: JoinSource,
271✔
27
        source_names: HashMap<PortHandle, String>,
271✔
28
    ) -> Result<Self, PipelineError> {
271✔
29
        Ok(Self {
271✔
30
            operator,
271✔
31
            source_names,
271✔
32
        })
271✔
33
    }
271✔
34

35
    fn delete(
108✔
36
        &mut self,
108✔
37
        from_port: PortHandle,
108✔
38
        record: &Record,
108✔
39
    ) -> Result<Vec<(JoinAction, Record, Vec<Field>)>, ProductError> {
108✔
40
        self.operator
108✔
41
            .execute(JoinAction::Delete, from_port, record)
108✔
42
            .map_err(|err| ProductError::DeleteError(self.get_port_name(from_port), Box::new(err)))
108✔
43
    }
108✔
44

45
    fn insert(
216,803✔
46
        &mut self,
216,803✔
47
        from_port: PortHandle,
216,803✔
48
        record: &Record,
216,803✔
49
    ) -> Result<Vec<(JoinAction, Record, Vec<Field>)>, ProductError> {
216,803✔
50
        self.operator
216,803✔
51
            .execute(JoinAction::Insert, from_port, record)
216,803✔
52
            .map_err(|err| ProductError::InsertError(self.get_port_name(from_port), Box::new(err)))
216,803✔
53
    }
216,803✔
54

55
    #[allow(clippy::type_complexity)]
56
    fn update(
52✔
57
        &mut self,
52✔
58
        from_port: PortHandle,
52✔
59
        old: &Record,
52✔
60
        new: &Record,
52✔
61
    ) -> Result<
52✔
62
        (
52✔
63
            Vec<(JoinAction, Record, Vec<Field>)>,
52✔
64
            Vec<(JoinAction, Record, Vec<Field>)>,
52✔
65
        ),
52✔
66
        ProductError,
52✔
67
    > {
52✔
68
        let old_records = self
52✔
69
            .operator
52✔
70
            .execute(JoinAction::Delete, from_port, old)
52✔
71
            .map_err(|err| {
52✔
72
                ProductError::UpdateOldError(self.get_port_name(from_port), Box::new(err))
×
73
            })?;
52✔
74

75
        let new_records = self
52✔
76
            .operator
52✔
77
            .execute(JoinAction::Insert, from_port, new)
52✔
78
            .map_err(|err| {
52✔
79
                ProductError::UpdateNewError(self.get_port_name(from_port), Box::new(err))
×
80
            })?;
52✔
81

82
        Ok((old_records, new_records))
52✔
83
    }
52✔
84

85
    fn get_port_name(&self, from_port: u16) -> String {
×
86
        self.source_names
×
87
            .get(&from_port)
×
88
            .unwrap_or(&from_port.to_string())
×
89
            .to_string()
×
90
    }
×
91
}
92

93
impl Processor for FromProcessor {
94
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
311✔
95
        Ok(())
311✔
96
    }
311✔
97

98
    fn process(
216,967✔
99
        &mut self,
216,967✔
100
        from_port: PortHandle,
216,967✔
101
        op: Operation,
216,967✔
102
        fw: &mut dyn ProcessorChannelForwarder,
216,967✔
103
        _transaction: &SharedTransaction,
216,967✔
104
    ) -> Result<(), ExecutionError> {
216,967✔
105
        // match op.clone() {
216,967✔
106
        //     Operation::Delete { old } => info!("p{from_port}: - {:?}", old.values),
216,967✔
107
        //     Operation::Insert { new } => info!("p{from_port}: + {:?}", new.values),
216,967✔
108
        //     Operation::Update { old, new } => {
216,967✔
109
        //         info!("p{from_port}: - {:?}, + {:?}", old.values, new.values)
216,967✔
110
        //     }
216,967✔
111
        // }
216,967✔
112

216,967✔
113
        match op {
216,967✔
114
            Operation::Delete { ref old } => {
8✔
115
                let records = self
8✔
116
                    .delete(from_port, old)
8✔
117
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
8✔
118

119
                for (action, record, _key) in records.into_iter() {
88✔
120
                    match action {
88✔
121
                        JoinAction::Insert => {
122
                            fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
×
123
                        }
124
                        JoinAction::Delete => {
125
                            fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
88✔
126
                        }
127
                    }
128
                }
129
            }
130
            Operation::Insert { ref new } => {
216,907✔
131
                let records = self
216,907✔
132
                    .insert(from_port, new)
216,907✔
133
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
216,907✔
134

135
                for (action, record, _key) in records.into_iter() {
216,907✔
136
                    match action {
208,026✔
137
                        JoinAction::Insert => {
138
                            fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
207,230✔
139
                        }
140
                        JoinAction::Delete => {
141
                            fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
796✔
142
                        }
143
                    }
144
                }
145
            }
146
            Operation::Update { ref old, ref new } => {
52✔
147
                let (old_join_records, new_join_records) = self
52✔
148
                    .update(from_port, old, new)
52✔
149
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
52✔
150

151
                for (action, old, _key) in old_join_records.into_iter() {
52✔
152
                    match action {
40✔
153
                        JoinAction::Insert => {
154
                            fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE)?;
×
155
                        }
156
                        JoinAction::Delete => {
157
                            fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE)?;
40✔
158
                        }
159
                    }
160
                }
161

162
                for (action, new, _key) in new_join_records.into_iter() {
52✔
163
                    match action {
40✔
164
                        JoinAction::Insert => {
165
                            fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE)?;
40✔
166
                        }
167
                        JoinAction::Delete => {
168
                            fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE)?;
×
169
                        }
170
                    }
171
                }
172
            }
173
        }
174
        Ok(())
216,987✔
175
    }
216,987✔
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc