• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4377467257

pending completion
4377467257

push

github

GitHub
implement `HAVING` (#1198)

395 of 395 new or added lines in 6 files covered. (100.0%)

27638 of 38584 relevant lines covered (71.63%)

27777.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

38.76
/dozer-sql/src/pipeline/product/set_processor.rs
1
use crate::pipeline::errors::{PipelineError, ProductError};
2
use crate::pipeline::product::set::{SetAction, SetOperation};
3
use bloom::CountingBloomFilter;
4
use dozer_core::channels::ProcessorChannelForwarder;
5
use dozer_core::epoch::Epoch;
6
use dozer_core::errors::ExecutionError;
7
use dozer_core::node::{PortHandle, Processor};
8
use dozer_core::storage::lmdb_storage::SharedTransaction;
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::types::{Operation, Record};
11
use std::collections::hash_map::RandomState;
12
use std::fmt::{Debug, Formatter};
13

×
14
pub struct SetProcessor {
15
    /// Set operations
16
    operator: SetOperation,
17
    /// Hashmap containing records with its occurrence
18
    record_map: CountingBloomFilter,
19
}
20

21
const BITS_PER_ENTRY: usize = 8;
22
const FALSE_POSITIVE_RATE: f32 = 0.01;
23
const EXPECTED_NUM_ITEMS: u32 = 10000000;
×
24

×
25
impl SetProcessor {
×
26
    /// Creates a new [`SetProcessor`].
×
27
    pub fn new(operator: SetOperation) -> Result<Self, PipelineError> {
22✔
28
        let _s = RandomState::new();
22✔
29
        Ok(Self {
22✔
30
            operator,
22✔
31
            record_map: CountingBloomFilter::with_rate(
22✔
32
                BITS_PER_ENTRY,
22✔
33
                FALSE_POSITIVE_RATE,
22✔
34
                EXPECTED_NUM_ITEMS,
22✔
35
            ),
22✔
36
        })
22✔
37
    }
22✔
38

39
    fn delete(&mut self, record: &Record) -> Result<Vec<(SetAction, Record)>, ProductError> {
20✔
40
        self.operator
20✔
41
            .execute(SetAction::Delete, record, &mut self.record_map)
20✔
42
            .map_err(|err| {
20✔
43
                ProductError::DeleteError("UNION query error:".to_string(), Box::new(err))
×
44
            })
20✔
45
    }
20✔
46

47
    fn insert(&mut self, record: &Record) -> Result<Vec<(SetAction, Record)>, ProductError> {
56,616✔
48
        self.operator
56,616✔
49
            .execute(SetAction::Insert, record, &mut self.record_map)
56,616✔
50
            .map_err(|err| {
56,616✔
51
                ProductError::InsertError("UNION query error:".to_string(), Box::new(err))
×
52
            })
56,616✔
53
    }
56,616✔
54

×
55
    #[allow(clippy::type_complexity)]
×
56
    fn update(
×
57
        &mut self,
×
58
        old: &Record,
×
59
        new: &Record,
×
60
    ) -> Result<(Vec<(SetAction, Record)>, Vec<(SetAction, Record)>), ProductError> {
×
61
        let old_records = self
×
62
            .operator
×
63
            .execute(SetAction::Delete, old, &mut self.record_map)
×
64
            .map_err(|err| {
×
65
                ProductError::UpdateOldError("UNION query error:".to_string(), Box::new(err))
×
66
            })?;
×
67

×
68
        let new_records = self
×
69
            .operator
×
70
            .execute(SetAction::Insert, new, &mut self.record_map)
×
71
            .map_err(|err| {
×
72
                ProductError::UpdateNewError("UNION query error:".to_string(), Box::new(err))
×
73
            })?;
×
74

×
75
        Ok((old_records, new_records))
×
76
    }
×
77
}
×
78

×
79
impl Debug for SetProcessor {
×
80
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
81
        f.debug_tuple("SetProcessor").field(&self.operator).finish()
×
82
    }
×
83
}
×
84

×
85
impl Processor for SetProcessor {
×
86
    fn commit(&self, _epoch: &Epoch, _tx: &SharedTransaction) -> Result<(), ExecutionError> {
27✔
87
        Ok(())
27✔
88
    }
27✔
89

×
90
    fn process(
56,636✔
91
        &mut self,
56,636✔
92
        _from_port: PortHandle,
56,636✔
93
        op: Operation,
56,636✔
94
        fw: &mut dyn ProcessorChannelForwarder,
56,636✔
95
        _transaction: &SharedTransaction,
56,636✔
96
    ) -> Result<(), ExecutionError> {
56,636✔
97
        match op {
56,636✔
98
            Operation::Delete { ref old } => {
×
99
                let records = self
×
100
                    .delete(old)
×
101
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
×
102

×
103
                for (action, record) in records.into_iter() {
20✔
104
                    match action {
20✔
105
                        SetAction::Insert => {
×
106
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
×
107
                        }
×
108
                        SetAction::Delete => {
20✔
109
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
20✔
110
                        }
20✔
111
                    }
×
112
                }
×
113
            }
114
            Operation::Insert { ref new } => {
56,636✔
115
                let records = self
56,636✔
116
                    .insert(new)
56,636✔
117
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
56,636✔
118

×
119
                for (action, record) in records.into_iter() {
56,636✔
120
                    match action {
29,345✔
121
                        SetAction::Insert => {
29,345✔
122
                            let _ = fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
29,345✔
123
                        }
29,345✔
124
                        SetAction::Delete => {
×
125
                            let _ = fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
×
126
                        }
×
127
                    }
×
128
                }
×
129
            }
130
            Operation::Update { ref old, ref new } => {
×
131
                let (old_records, new_records) = self
×
132
                    .update(old, new)
×
133
                    .map_err(|err| ExecutionError::ProductProcessorError(Box::new(err)))?;
×
134

×
135
                for (action, old) in old_records.into_iter() {
×
136
                    match action {
×
137
                        SetAction::Insert => {
×
138
                            let _ = fw.send(Operation::Insert { new: old }, DEFAULT_PORT_HANDLE);
×
139
                        }
×
140
                        SetAction::Delete => {
×
141
                            let _ = fw.send(Operation::Delete { old }, DEFAULT_PORT_HANDLE);
×
142
                        }
×
143
                    }
144
                }
×
145

×
146
                for (action, new) in new_records.into_iter() {
×
147
                    match action {
×
148
                        SetAction::Insert => {
×
149
                            let _ = fw.send(Operation::Insert { new }, DEFAULT_PORT_HANDLE);
×
150
                        }
×
151
                        SetAction::Delete => {
×
152
                            let _ = fw.send(Operation::Delete { old: new }, DEFAULT_PORT_HANDLE);
×
153
                        }
×
154
                    }
155
                }
156
            }
157
        }
158
        Ok(())
56,596✔
159
    }
56,596✔
160
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc