• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4392484403

pending completion
4392484403

push

github

GitHub
feat: Asynchoronous indexing (#1206)

270 of 270 new or added lines in 13 files covered. (100.0%)

28714 of 38777 relevant lines covered (74.05%)

89484.24 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.99
/dozer-sql/src/pipeline/product/processor.rs
1
use crate::pipeline::errors::{PipelineError, ProductError};
2
use dozer_core::channels::ProcessorChannelForwarder;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::errors::ExecutionError;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::DEFAULT_PORT_HANDLE;
7

8
use dozer_core::storage::lmdb_storage::SharedTransaction;
9
use dozer_types::types::{Field, Operation, Record};
10
use std::collections::HashMap;
11

12
use super::join::{JoinAction, JoinSource};
13

14
/// Cartesian Product Processor
15
#[derive(Debug)]
×
16
pub struct FromProcessor {
17
    /// Join operations
18
    operator: JoinSource,
19

20
    source_names: HashMap<PortHandle, String>,
21
}
22

23
impl FromProcessor {
24
    /// Creates a new [`FromProcessor`].
25
    pub fn new(
667✔
26
        operator: JoinSource,
667✔
27
        source_names: HashMap<PortHandle, String>,
667✔
28
    ) -> Result<Self, PipelineError> {
667✔
29
        Ok(Self {
667✔
30
            operator,
667✔
31
            source_names,
667✔
32
        })
667✔
33
    }
667✔
34

35
    fn delete(
270✔
36
        &mut self,
270✔
37
        from_port: PortHandle,
270✔
38
        record: &Record,
270✔
39
    ) -> Result<Vec<(JoinAction, Record, Vec<Field>)>, ProductError> {
270✔
40
        self.operator
270✔
41
            .execute(JoinAction::Delete, from_port, record)
270✔
42
            .map_err(|err| ProductError::DeleteError(self.get_port_name(from_port), Box::new(err)))
270✔
43
    }
270✔
44

45
    fn insert(
527,801✔
46
        &mut self,
527,801✔
47
        from_port: PortHandle,
527,801✔
48
        record: &Record,
527,801✔
49
    ) -> Result<Vec<(JoinAction, Record, Vec<Field>)>, ProductError> {
527,801✔
50
        self.operator
527,801✔
51
            .execute(JoinAction::Insert, from_port, record)
527,801✔
52
            .map_err(|err| ProductError::InsertError(self.get_port_name(from_port), Box::new(err)))
527,801✔
53
    }
527,801✔
54

55
    #[allow(clippy::type_complexity)]
56
    fn update(
130✔
57
        &mut self,
130✔
58
        from_port: PortHandle,
130✔
59
        old: &Record,
130✔
60
        new: &Record,
130✔
61
    ) -> Result<
130✔
62
        (
130✔
63
            Vec<(JoinAction, Record, Vec<Field>)>,
130✔
64
            Vec<(JoinAction, Record, Vec<Field>)>,
130✔
65
        ),
130✔
66
        ProductError,
130✔
67
    > {
130✔
68
        let old_records = self
130✔
69
            .operator
130✔
70
            .execute(JoinAction::Delete, from_port, old)
130✔
71
            .map_err(|err| {
130✔
72
                ProductError::UpdateOldError(self.get_port_name(from_port), Box::new(err))
×
73
            })?;
130✔
74

75
        let new_records = self
130✔
76
            .operator
130✔
77
            .execute(JoinAction::Insert, from_port, new)
130✔
78
            .map_err(|err| {
130✔
79
                ProductError::UpdateNewError(self.get_port_name(from_port), Box::new(err))
×
80
            })?;
130✔
81

82
        Ok((old_records, new_records))
130✔
83
    }
130✔
84

85
    fn get_port_name(&self, from_port: u16) -> String {
×
86
        self.source_names
×
87
            .get(&from_port)
×
88
            .unwrap_or(&from_port.to_string())
×
89
            .to_string()
×
90
    }
×
91
}
92

93
impl Processor for FromProcessor {
94
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
737✔
95
        Ok(())
737✔
96
    }
737✔
97

98
    fn process(
528,341✔
99
        &mut self,
528,341✔
100
        from_port: PortHandle,
528,341✔
101
        op: Operation,
528,341✔
102
        fw: &mut dyn ProcessorChannelForwarder,
528,341✔
103
        _transaction: &SharedTransaction,
528,341✔
104
    ) -> Result<(), ExecutionError> {
528,341✔
105
        // match op.clone() {
528,341✔
106
        //     Operation::Delete { old } => info!("p{from_port}: - {:?}", old.values),
528,341✔
107
        //     Operation::Insert { new } => info!("p{from_port}: + {:?}", new.values),
528,341✔
108
        //     Operation::Update { old, new } => {
528,341✔
109
        //         info!("p{from_port}: - {:?}, + {:?}", old.values, new.values)
528,341✔
110
        //     }
528,341✔
111
        // }
528,341✔
112

528,341✔
113
        match op {
528,341✔
114
            Operation::Delete { ref old } => {
230✔
115
                let records = self
230✔
116
                    .delete(from_port, old)
230✔
117
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
230✔
118

119
                for (action, record, _key) in records.into_iter() {
230✔
120
                    match action {
220✔
121
                        JoinAction::Insert => {
122
                            fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
×
123
                        }
124
                        JoinAction::Delete => {
125
                            fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
220✔
126
                        }
127
                    }
128
                }
129
            }
130
            Operation::Insert { ref new } => {
527,981✔
131
                let records = self
527,981✔
132
                    .insert(from_port, new)
527,981✔
133
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
527,981✔
134

135
                for (action, record, _key) in records.into_iter() {
527,981✔
136
                    match action {
505,531✔
137
                        JoinAction::Insert => {
138
                            fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE)?;
503,541✔
139
                        }
140
                        JoinAction::Delete => {
141
                            fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE)?;
1,990✔
142
                        }
143
                    }
144
                }
145
            }
146
            Operation::Update { ref old, ref new } => {
130✔
147
                let (old_join_records, new_join_records) = self
130✔
148
                    .update(from_port, old, new)
130✔
149
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
130✔
150

151
                for (action, old, _key) in old_join_records.into_iter() {
130✔
152
                    match action {
100✔
153
                        JoinAction::Insert => {
154
                            fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE)?;
×
155
                        }
156
                        JoinAction::Delete => {
157
                            fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE)?;
100✔
158
                        }
159
                    }
160
                }
161

162
                for (action, new, _key) in new_join_records.into_iter() {
130✔
163
                    match action {
100✔
164
                        JoinAction::Insert => {
165
                            fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE)?;
100✔
166
                        }
167
                        JoinAction::Delete => {
168
                            fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE)?;
×
169
                        }
170
                    }
171
                }
172
            }
173
        }
174
        Ok(())
528,161✔
175
    }
528,161✔
176
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc