• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5972853941

25 Aug 2023 06:52AM UTC coverage: 76.247% (+0.8%) from 75.446%
5972853941

push

github

web-flow
feat: make probabilistic optimizations optional and tunable in the YAML config (#1912)

Probabilistic optimization sacrifices accuracy in order to reduce memory consumption. In certain parts of the pipeline, a Bloom Filter is used ([set_processor](https://github.com/getdozer/dozer/blob/<a class=hub.com/getdozer/dozer/commit/<a class="double-link" href="https://git"><a class=hub.com/getdozer/dozer/commit/2e3ba96c3f4bdf9a691747191ab15617564d8ca2">2e3ba96c3/dozer-sql/src/pipeline/product/set/set_processor.rs#L20)), while in other parts, hash tables that store only the hash of the keys instead of the full keys are used ([aggregation_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/aggregation/processor.rs#L59) and [join_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/join/operator.rs#L57-L58)).

This commit makes these optimizations disabled by default and offers user-configurable flags to enable each of these optimizations separately.

This is an example of how to turn on probabilistic optimizations for each processor in the Dozer configuration.

```
flags:
  enable_probabilistic_optimizations:
    in_sets: true # enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false
    in_joins: true # enable probabilistic optimizations in JOIN operations; Default: false
    in_aggregations: true # enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false
```

347 of 347 new or added lines in 25 files covered. (100.0%)

47165 of 61858 relevant lines covered (76.25%)

48442.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.2
/dozer-sql/src/pipeline/product/set/set_processor.rs
1
use super::operator::{SetAction, SetOperation};
2
use super::record_map::{
3
    AccurateCountingRecordMap, CountingRecordMapEnum, ProbabilisticCountingRecordMap,
4
};
5
use crate::pipeline::errors::{PipelineError, ProductError};
6
use dozer_core::channels::ProcessorChannelForwarder;
7
use dozer_core::epoch::Epoch;
8
use dozer_core::executor_operation::ProcessorOperation;
9
use dozer_core::node::{PortHandle, Processor};
10
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::errors::internal::BoxedError;
13
use std::collections::hash_map::RandomState;
14
use std::fmt::{Debug, Formatter};
15

16
pub struct SetProcessor {
17
    _id: String,
18
    /// Set operations
19
    operator: SetOperation,
20
    /// Hashmap containing records with its occurrence
21
    record_map: CountingRecordMapEnum,
22
}
23

24
impl SetProcessor {
25
    /// Creates a new [`SetProcessor`].
26
    pub fn new(
28✔
27
        id: String,
28✔
28
        operator: SetOperation,
28✔
29
        enable_probabilistic_optimizations: bool,
28✔
30
    ) -> Result<Self, PipelineError> {
28✔
31
        let _s = RandomState::new();
28✔
32
        Ok(Self {
28✔
33
            _id: id,
28✔
34
            operator,
28✔
35
            record_map: if enable_probabilistic_optimizations {
28✔
36
                ProbabilisticCountingRecordMap::new().into()
×
37
            } else {
38
                AccurateCountingRecordMap::new().into()
28✔
39
            },
40
        })
41
    }
28✔
42

43
    fn delete(
×
44
        &mut self,
×
45
        record: ProcessorRecord,
×
46
    ) -> Result<Vec<(SetAction, ProcessorRecord)>, ProductError> {
×
47
        self.operator
×
48
            .execute(SetAction::Delete, record, &mut self.record_map)
×
49
            .map_err(|err| {
×
50
                ProductError::DeleteError("UNION query error:".to_string(), Box::new(err))
×
51
            })
×
52
    }
×
53

54
    fn insert(
560✔
55
        &mut self,
560✔
56
        record: ProcessorRecord,
560✔
57
    ) -> Result<Vec<(SetAction, ProcessorRecord)>, ProductError> {
560✔
58
        self.operator
560✔
59
            .execute(SetAction::Insert, record, &mut self.record_map)
560✔
60
            .map_err(|err| {
560✔
61
                ProductError::InsertError("UNION query error:".to_string(), Box::new(err))
×
62
            })
560✔
63
    }
560✔
64

65
    #[allow(clippy::type_complexity)]
66
    fn update(
×
67
        &mut self,
×
68
        old: ProcessorRecord,
×
69
        new: ProcessorRecord,
×
70
    ) -> Result<
×
71
        (
×
72
            Vec<(SetAction, ProcessorRecord)>,
×
73
            Vec<(SetAction, ProcessorRecord)>,
×
74
        ),
×
75
        ProductError,
×
76
    > {
×
77
        let old_records = self
×
78
            .operator
×
79
            .execute(SetAction::Delete, old, &mut self.record_map)
×
80
            .map_err(|err| {
×
81
                ProductError::UpdateOldError("UNION query error:".to_string(), Box::new(err))
×
82
            })?;
×
83

84
        let new_records = self
×
85
            .operator
×
86
            .execute(SetAction::Insert, new, &mut self.record_map)
×
87
            .map_err(|err| {
×
88
                ProductError::UpdateNewError("UNION query error:".to_string(), Box::new(err))
×
89
            })?;
×
90

91
        Ok((old_records, new_records))
×
92
    }
×
93
}
94

95
impl Debug for SetProcessor {
96
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
97
        f.debug_tuple("SetProcessor").field(&self.operator).finish()
×
98
    }
×
99
}
100

101
impl Processor for SetProcessor {
102
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
28✔
103
        Ok(())
28✔
104
    }
28✔
105

106
    fn process(
560✔
107
        &mut self,
560✔
108
        _from_port: PortHandle,
560✔
109
        _record_store: &ProcessorRecordStore,
560✔
110
        op: ProcessorOperation,
560✔
111
        fw: &mut dyn ProcessorChannelForwarder,
560✔
112
    ) -> Result<(), BoxedError> {
560✔
113
        match op {
560✔
114
            ProcessorOperation::Delete { old } => {
×
115
                let records = self.delete(old).map_err(PipelineError::ProductError)?;
×
116

117
                for (action, record) in records.into_iter() {
×
118
                    match action {
×
119
                        SetAction::Insert => {
×
120
                            fw.send(
×
121
                                ProcessorOperation::Insert { new: record },
×
122
                                DEFAULT_PORT_HANDLE,
×
123
                            );
×
124
                        }
×
125
                        SetAction::Delete => {
×
126
                            fw.send(
×
127
                                ProcessorOperation::Delete { old: record },
×
128
                                DEFAULT_PORT_HANDLE,
×
129
                            );
×
130
                        }
×
131
                    }
132
                }
133
            }
134
            ProcessorOperation::Insert { new } => {
560✔
135
                let records = self.insert(new).map_err(PipelineError::ProductError)?;
560✔
136

137
                for (action, record) in records.into_iter() {
560✔
138
                    match action {
420✔
139
                        SetAction::Insert => {
420✔
140
                            fw.send(
420✔
141
                                ProcessorOperation::Insert { new: record },
420✔
142
                                DEFAULT_PORT_HANDLE,
420✔
143
                            );
420✔
144
                        }
420✔
145
                        SetAction::Delete => {
×
146
                            fw.send(
×
147
                                ProcessorOperation::Delete { old: record },
×
148
                                DEFAULT_PORT_HANDLE,
×
149
                            );
×
150
                        }
×
151
                    }
152
                }
153
            }
154
            ProcessorOperation::Update { old, new } => {
×
155
                let (old_records, new_records) =
×
156
                    self.update(old, new).map_err(PipelineError::ProductError)?;
×
157

158
                for (action, old) in old_records.into_iter() {
×
159
                    match action {
×
160
                        SetAction::Insert => {
×
161
                            fw.send(ProcessorOperation::Insert { new: old }, DEFAULT_PORT_HANDLE);
×
162
                        }
×
163
                        SetAction::Delete => {
×
164
                            fw.send(ProcessorOperation::Delete { old }, DEFAULT_PORT_HANDLE);
×
165
                        }
×
166
                    }
167
                }
168

169
                for (action, new) in new_records.into_iter() {
×
170
                    match action {
×
171
                        SetAction::Insert => {
×
172
                            fw.send(ProcessorOperation::Insert { new }, DEFAULT_PORT_HANDLE);
×
173
                        }
×
174
                        SetAction::Delete => {
×
175
                            fw.send(ProcessorOperation::Delete { old: new }, DEFAULT_PORT_HANDLE);
×
176
                        }
×
177
                    }
178
                }
179
            }
180
        }
181
        Ok(())
560✔
182
    }
560✔
183
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc