• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4360628700

pending completion
4360628700

push

github

GitHub
refactor: pipeline memory storage (#1135)

1152 of 1152 new or added lines in 40 files covered. (100.0%)

27472 of 39301 relevant lines covered (69.9%)

28369.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.04
/dozer-sql/src/pipeline/product/set_processor.rs
1
use crate::pipeline::errors::{PipelineError, ProductError};
2
use crate::pipeline::product::set::{SetAction, SetOperation};
3
use dozer_core::channels::ProcessorChannelForwarder;
4
use dozer_core::epoch::Epoch;
5
use dozer_core::errors::ExecutionError;
6
use dozer_core::node::{PortHandle, Processor};
7
use dozer_core::storage::lmdb_storage::SharedTransaction;
8
use dozer_core::DEFAULT_PORT_HANDLE;
9
use dozer_types::types::{Operation, Record};
10
use hashbrown::HashMap;
11
use std::collections::hash_map::RandomState;
12

13
#[derive(Debug)]
×
14
pub struct SetProcessor {
×
15
    /// Set operations
16
    operator: SetOperation,
17
    /// Hashmap containing records with its occurrence
18
    record_map: HashMap<Record, usize>,
19
}
20

21
impl SetProcessor {
22
    /// Creates a new [`SetProcessor`].
23
    pub fn new(operator: SetOperation) -> Result<Self, PipelineError> {
22✔
24
        let _s = RandomState::new();
22✔
25
        Ok(Self {
22✔
26
            operator,
22✔
27
            record_map: HashMap::new(),
22✔
28
        })
22✔
29
    }
22✔
30

×
31
    fn delete(&mut self, record: &Record) -> Result<Vec<(SetAction, Record)>, ProductError> {
20✔
32
        self.operator
20✔
33
            .execute(SetAction::Delete, record, &mut self.record_map)
20✔
34
            .map_err(|err| {
20✔
35
                ProductError::DeleteError("UNION query error:".to_string(), Box::new(err))
×
36
            })
20✔
37
    }
20✔
38

×
39
    fn insert(&mut self, record: &Record) -> Result<Vec<(SetAction, Record)>, ProductError> {
56,616✔
40
        self.operator
56,616✔
41
            .execute(SetAction::Insert, record, &mut self.record_map)
56,616✔
42
            .map_err(|err| {
56,616✔
43
                ProductError::InsertError("UNION query error:".to_string(), Box::new(err))
×
44
            })
56,616✔
45
    }
56,616✔
46

×
47
    #[allow(clippy::type_complexity)]
48
    fn update(
×
49
        &mut self,
×
50
        old: &Record,
×
51
        new: &Record,
×
52
    ) -> Result<(Vec<(SetAction, Record)>, Vec<(SetAction, Record)>), ProductError> {
×
53
        let old_records = self
×
54
            .operator
×
55
            .execute(SetAction::Delete, old, &mut self.record_map)
×
56
            .map_err(|err| {
×
57
                ProductError::UpdateOldError("UNION query error:".to_string(), Box::new(err))
×
58
            })?;
×
59

×
60
        let new_records = self
×
61
            .operator
×
62
            .execute(SetAction::Insert, new, &mut self.record_map)
×
63
            .map_err(|err| {
×
64
                ProductError::UpdateNewError("UNION query error:".to_string(), Box::new(err))
×
65
            })?;
×
66

×
67
        Ok((old_records, new_records))
×
68
    }
×
69
}
×
70

×
71
impl Processor for SetProcessor {
×
72
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
27✔
73
        Ok(())
27✔
74
    }
27✔
75

×
76
    fn process(
56,651✔
77
        &mut self,
56,651✔
78
        _from_port: PortHandle,
56,651✔
79
        op: Operation,
56,651✔
80
        fw: &mut dyn ProcessorChannelForwarder,
56,651✔
81
        _transaction: &SharedTransaction,
56,651✔
82
    ) -> Result<(), ExecutionError> {
56,651✔
83
        match op {
56,651✔
84
            Operation::Delete { ref old } => {
20✔
85
                let records = self
20✔
86
                    .delete(old)
20✔
87
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
20✔
88

89
                for (action, record) in records.into_iter() {
20✔
90
                    match action {
20✔
91
                        SetAction::Insert => {
×
92
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
×
93
                        }
×
94
                        SetAction::Delete => {
20✔
95
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
20✔
96
                        }
20✔
97
                    }
×
98
                }
×
99
            }
×
100
            Operation::Insert { ref new } => {
56,631✔
101
                let records = self
56,631✔
102
                    .insert(new)
56,631✔
103
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
56,631✔
104

×
105
                for (action, record) in records.into_iter() {
56,631✔
106
                    match action {
29,345✔
107
                        SetAction::Insert => {
29,345✔
108
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
29,345✔
109
                        }
29,345✔
110
                        SetAction::Delete => {
×
111
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
×
112
                        }
×
113
                    }
×
114
                }
×
115
            }
×
116
            Operation::Update { ref old, ref new } => {
×
117
                let (old_records, new_records) = self
×
118
                    .update(old, new)
×
119
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
×
120

×
121
                for (action, old) in old_records.into_iter() {
×
122
                    match action {
×
123
                        SetAction::Insert => {
×
124
                            let _ = fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE);
×
125
                        }
×
126
                        SetAction::Delete => {
×
127
                            let _ = fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE);
×
128
                        }
×
129
                    }
×
130
                }
×
131

×
132
                for (action, new) in new_records.into_iter() {
×
133
                    match action {
×
134
                        SetAction::Insert => {
×
135
                            let _ = fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE);
×
136
                        }
×
137
                        SetAction::Delete => {
×
138
                            let _ = fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE);
×
139
                        }
×
140
                    }
×
141
                }
×
142
            }
×
143
        }
×
144
        Ok(())
56,566✔
145
    }
56,566✔
146
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc