• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4302087115

pending completion
4302087115

push

github

GitHub
chore: Move `SnapshottingDone` out of `Operation` so processors don't have to know it.(#1103)

364 of 364 new or added lines in 33 files covered. (100.0%)

28623 of 40224 relevant lines covered (71.16%)

56785.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.24
/dozer-sql/src/pipeline/product/set_processor.rs
1
use crate::pipeline::errors::{PipelineError, ProductError};
2
use crate::pipeline::product::set::{SetAction, SetOperation};
3
use dozer_core::channels::ProcessorChannelForwarder;
4
use dozer_core::epoch::Epoch;
5
use dozer_core::errors::ExecutionError;
6
use dozer_core::node::{PortHandle, Processor};
7
use dozer_core::record_store::RecordReader;
8
use dozer_core::storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::types::{Operation, Record};
11
use lmdb::{Database, DatabaseFlags};
12
use std::collections::HashMap;
13

14
#[derive(Debug)]
×
15
pub struct SetProcessor {
16
    /// Set operations
17
    operator: SetOperation,
18
    /// Database to store Join indexes
19
    db: Database,
20
}
21

22
impl SetProcessor {
23
    /// Creates a new [`FromProcessor`].
24
    pub fn new(
22✔
25
        operator: SetOperation,
22✔
26
        txn: &mut LmdbExclusiveTransaction,
22✔
27
    ) -> Result<Self, PipelineError> {
22✔
28
        Ok(Self {
22✔
29
            operator,
22✔
30
            db: txn.create_database(Some("set"), Some(DatabaseFlags::empty()))?,
22✔
31
        })
32
    }
22✔
33

34
    fn delete(
20✔
35
        &mut self,
20✔
36
        _from_port: PortHandle,
20✔
37
        record: &Record,
20✔
38
        txn: &SharedTransaction,
20✔
39
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
20✔
40
    ) -> Result<Vec<(SetAction, Record)>, ProductError> {
20✔
41
        self.operator
20✔
42
            .execute(SetAction::Delete, record, &self.db, txn)
20✔
43
            .map_err(|err| {
20✔
44
                ProductError::DeleteError("UNION query error:".to_string(), Box::new(err))
×
45
            })
20✔
46
    }
20✔
47

48
    fn insert(
56,646✔
49
        &mut self,
56,646✔
50
        _from_port: PortHandle,
56,646✔
51
        record: &Record,
56,646✔
52
        txn: &SharedTransaction,
56,646✔
53
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
56,646✔
54
    ) -> Result<Vec<(SetAction, Record)>, ProductError> {
56,646✔
55
        self.operator
56,646✔
56
            .execute(SetAction::Insert, record, &self.db, txn)
56,646✔
57
            .map_err(|err| {
56,646✔
58
                ProductError::InsertError("UNION query error:".to_string(), Box::new(err))
×
59
            })
56,646✔
60
    }
56,646✔
61

62
    #[allow(clippy::type_complexity)]
63
    fn update(
×
64
        &mut self,
×
65
        _from_port: PortHandle,
×
66
        old: &Record,
×
67
        new: &Record,
×
68
        txn: &SharedTransaction,
×
69
        _reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
×
70
    ) -> Result<(Vec<(SetAction, Record)>, Vec<(SetAction, Record)>), ProductError> {
×
71
        let old_records = self
×
72
            .operator
×
73
            .execute(SetAction::Delete, old, &self.db, txn)
×
74
            .map_err(|err| {
×
75
                ProductError::UpdateOldError("UNION query error:".to_string(), Box::new(err))
×
76
            })?;
×
77

78
        let new_records = self
×
79
            .operator
×
80
            .execute(SetAction::Insert, new, &self.db, txn)
×
81
            .map_err(|err| {
×
82
                ProductError::UpdateNewError("UNION query error:".to_string(), Box::new(err))
×
83
            })?;
×
84

85
        Ok((old_records, new_records))
×
86
    }
×
87
}
88

89
impl Processor for SetProcessor {
90
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
62✔
91
        Ok(())
62✔
92
    }
62✔
93

94
    fn process(
56,666✔
95
        &mut self,
56,666✔
96
        from_port: PortHandle,
56,666✔
97
        op: Operation,
56,666✔
98
        fw: &mut dyn ProcessorChannelForwarder,
56,666✔
99
        transaction: &SharedTransaction,
56,666✔
100
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
56,666✔
101
    ) -> Result<(), ExecutionError> {
56,666✔
102
        match op {
56,666✔
103
            Operation::Delete { ref old } => {
20✔
104
                let records = self
20✔
105
                    .delete(from_port, old, transaction, reader)
20✔
106
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
20✔
107

108
                for (action, record) in records.into_iter() {
20✔
109
                    match action {
20✔
110
                        SetAction::Insert => {
×
111
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
×
112
                        }
×
113
                        SetAction::Delete => {
20✔
114
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
20✔
115
                        }
20✔
116
                    }
117
                }
118
            }
119
            Operation::Insert { ref new } => {
56,646✔
120
                let records = self
56,646✔
121
                    .insert(from_port, new, transaction, reader)
56,646✔
122
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
56,646✔
123

124
                for (action, record) in records.into_iter() {
56,646✔
125
                    match action {
29,345✔
126
                        SetAction::Insert => {
29,345✔
127
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
29,345✔
128
                        }
29,345✔
129
                        SetAction::Delete => {
×
130
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
×
131
                        }
×
132
                    }
133
                }
134
            }
135
            Operation::Update { ref old, ref new } => {
×
136
                let (old_records, new_records) = self
×
137
                    .update(from_port, old, new, transaction, reader)
×
138
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
×
139

140
                for (action, old) in old_records.into_iter() {
×
141
                    match action {
×
142
                        SetAction::Insert => {
×
143
                            let _ = fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE);
×
144
                        }
×
145
                        SetAction::Delete => {
×
146
                            let _ = fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE);
×
147
                        }
×
148
                    }
149
                }
150

151
                for (action, new) in new_records.into_iter() {
×
152
                    match action {
×
153
                        SetAction::Insert => {
×
154
                            let _ = fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE);
×
155
                        }
×
156
                        SetAction::Delete => {
×
157
                            let _ = fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE);
×
158
                        }
×
159
                    }
160
                }
161
            }
162
        }
×
163
        Ok(())
56,666✔
164
    }
56,666✔
165
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc