• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4381907514

pending completion
4381907514

push

github

GitHub
feat: implement tracing using open telemetry (#1176)

510 of 510 new or added lines in 31 files covered. (100.0%)

27878 of 39615 relevant lines covered (70.37%)

47752.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.99
/dozer-sql/src/pipeline/product/processor.rs
1
use crate::pipeline::errors::{PipelineError, ProductError};
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::DEFAULT_PORT_HANDLE;
7

8
use dozer_core::storage::lmdb_storage::SharedTransaction;
9
use dozer_types::types::{Field, Operation, Record};
10
use std::collections::HashMap;
11

12
use super::join::{JoinAction, JoinSource};
13

14
/// Cartesian Product Processor
15
#[derive(Debug)]
×
16
pub struct FromProcessor {
17
    /// Join operations
18
    operator: JoinSource,
19

20
    source_names: HashMap<PortHandle, String>,
21
}
22

23
impl FromProcessor {
24
    /// Creates a new [`FromProcessor`].
25
    pub fn new(
337✔
26
        operator: JoinSource,
337✔
27
        source_names: HashMap<PortHandle, String>,
337✔
28
    ) -> Result<Self, PipelineError> {
337✔
29
        Ok(Self {
337✔
30
            operator,
337✔
31
            source_names,
337✔
32
        })
337✔
33
    }
337✔
34

35
    fn delete(
135✔
36
        &mut self,
135✔
37
        from_port: PortHandle,
135✔
38
        record: &Record,
135✔
39
    ) -> Result<Vec<(JoinAction, Record, Vec<Field>)>, ProductError> {
135✔
40
        self.operator
135✔
41
            .execute(JoinAction::Delete, from_port, record)
135✔
42
            .map_err(|err| ProductError::DeleteError(self.get_port_name(from_port), Box::new(err)))
135✔
43
    }
135✔
44

45
    fn insert(
268,905✔
46
        &mut self,
268,905✔
47
        from_port: PortHandle,
268,905✔
48
        record: &Record,
268,905✔
49
    ) -> Result<Vec<(JoinAction, Record, Vec<Field>)>, ProductError> {
268,905✔
50
        self.operator
268,905✔
51
            .execute(JoinAction::Insert, from_port, record)
268,905✔
52
            .map_err(|err| ProductError::InsertError(self.get_port_name(from_port), Box::new(err)))
268,905✔
53
    }
268,905✔
54

55
    #[allow(clippy::type_complexity)]
56
    fn update(
65✔
57
        &mut self,
65✔
58
        from_port: PortHandle,
65✔
59
        old: &Record,
65✔
60
        new: &Record,
65✔
61
    ) -> Result<
65✔
62
        (
65✔
63
            Vec<(JoinAction, Record, Vec<Field>)>,
65✔
64
            Vec<(JoinAction, Record, Vec<Field>)>,
65✔
65
        ),
65✔
66
        ProductError,
65✔
67
    > {
65✔
68
        let old_records = self
65✔
69
            .operator
65✔
70
            .execute(JoinAction::Delete, from_port, old)
65✔
71
            .map_err(|err| {
65✔
72
                ProductError::UpdateOldError(self.get_port_name(from_port), Box::new(err))
×
73
            })?;
65✔
74

75
        let new_records = self
65✔
76
            .operator
65✔
77
            .execute(JoinAction::Insert, from_port, new)
65✔
78
            .map_err(|err| {
65✔
79
                ProductError::UpdateNewError(self.get_port_name(from_port), Box::new(err))
×
80
            })?;
65✔
81

82
        Ok((old_records, new_records))
65✔
83
    }
65✔
84

85
    fn get_port_name(&self, from_port: u16) -> String {
×
86
        self.source_names
×
87
            .get(&from_port)
×
88
            .unwrap_or(&from_port.to_string())
×
89
            .to_string()
×
90
    }
×
91
}
92

93
impl Processor for FromProcessor {
94
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
346✔
95
        Ok(())
346✔
96
    }
346✔
97

98
    fn process(
269,170✔
99
        &mut self,
269,170✔
100
        from_port: PortHandle,
269,170✔
101
        op: Operation,
269,170✔
102
        fw: &mut dyn ProcessorChannelForwarder,
269,170✔
103
        _transaction: &SharedTransaction,
269,170✔
104
    ) -> Result<(), ExecutionError> {
269,170✔
105
        // match op.clone() {
269,170✔
106
        //     Operation::Delete { old } => info!("p{from_port}: - {:?}", old.values),
269,170✔
107
        //     Operation::Insert { new } => info!("p{from_port}: + {:?}", new.values),
269,170✔
108
        //     Operation::Update { old, new } => {
269,170✔
109
        //         info!("p{from_port}: - {:?}, + {:?}", old.values, new.values)
269,170✔
110
        //     }
269,170✔
111
        // }
269,170✔
112

269,170✔
113
        match op {
269,170✔
114
            Operation::Delete { ref old } => {
110✔
115
                let records = self
110✔
116
                    .delete(from_port, old)
110✔
117
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
110✔
118

119
                for (action, record, _key) in records.into_iter() {
110✔
120
                    match action {
110✔
121
                        JoinAction::Insert => {
122
                            fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
×
123
                        }
124
                        JoinAction::Delete => {
125
                            fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
110✔
126
                        }
127
                    }
128
                }
129
            }
130
            Operation::Insert { ref new } => {
268,995✔
131
                let records = self
268,995✔
132
                    .insert(from_port, new)
268,995✔
133
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
268,995✔
134

135
                for (action, record, _key) in records.into_iter() {
268,996✔
136
                    match action {
257,721✔
137
                        JoinAction::Insert => {
138
                            fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
256,726✔
139
                        }
140
                        JoinAction::Delete => {
141
                            fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
995✔
142
                        }
143
                    }
144
                }
145
            }
146
            Operation::Update { ref old, ref new } => {
65✔
147
                let (old_join_records, new_join_records) = self
65✔
148
                    .update(from_port, old, new)
65✔
149
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
65✔
150

151
                for (action, old, _key) in old_join_records.into_iter() {
65✔
152
                    match action {
50✔
153
                        JoinAction::Insert => {
154
                            fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE)?;
×
155
                        }
156
                        JoinAction::Delete => {
157
                            fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE)?;
50✔
158
                        }
159
                    }
160
                }
161

162
                for (action, new, _key) in new_join_records.into_iter() {
65✔
163
                    match action {
50✔
164
                        JoinAction::Insert => {
165
                            fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE)?;
50✔
166
                        }
167
                        JoinAction::Delete => {
168
                            fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE)?;
×
169
                        }
170
                    }
171
                }
172
            }
173
        }
174
        Ok(())
269,106✔
175
    }
269,106✔
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc