• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5978430793

25 Aug 2023 04:54PM UTC coverage: 75.575% (-0.7%) from 76.279%
5978430793

push

github

web-flow
Bump ordered-float from 3.4.0 to 3.9.1 (#1919)

Bumps [ordered-float](https://github.com/reem/rust-ordered-float) from 3.4.0 to 3.9.1.
- [Release notes](https://github.com/reem/rust-ordered-float/releases)
- [Commits](https://github.com/reem/rust-ordered-float/compare/v3.4.0...v3.9.1)

---
updated-dependencies:
- dependency-name: ordered-float
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

47272 of 62550 relevant lines covered (75.57%)

49425.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.77
/dozer-sql/src/pipeline/product/set/set_processor.rs
1
use super::operator::{SetAction, SetOperation};
2
use super::record_map::{
3
    AccurateCountingRecordMap, CountingRecordMapEnum, ProbabilisticCountingRecordMap,
4
};
5
use crate::pipeline::errors::{PipelineError, ProductError};
6
use dozer_core::channels::ProcessorChannelForwarder;
7
use dozer_core::epoch::Epoch;
8
use dozer_core::executor_operation::ProcessorOperation;
9
use dozer_core::node::{PortHandle, Processor};
10
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_types::errors::internal::BoxedError;
13
use std::collections::hash_map::RandomState;
14
use std::fmt::{Debug, Formatter};
15

16
pub struct SetProcessor {
17
    _id: String,
18
    /// Set operations
19
    operator: SetOperation,
20
    /// Hashmap containing records with its occurrence
21
    record_map: CountingRecordMapEnum,
22
}
23

24
impl SetProcessor {
25
    /// Creates a new [`SetProcessor`].
26
    pub fn new(
28✔
27
        id: String,
28✔
28
        operator: SetOperation,
28✔
29
        enable_probabilistic_optimizations: bool,
28✔
30
    ) -> Result<Self, PipelineError> {
28✔
31
        let _s = RandomState::new();
28✔
32
        Ok(Self {
28✔
33
            _id: id,
28✔
34
            operator,
28✔
35
            record_map: if enable_probabilistic_optimizations {
28✔
36
                ProbabilisticCountingRecordMap::new().into()
×
37
            } else {
×
38
                AccurateCountingRecordMap::new().into()
28✔
39
            },
×
40
        })
×
41
    }
28✔
42

×
43
    fn delete(
×
44
        &mut self,
×
45
        record: ProcessorRecord,
×
46
    ) -> Result<Vec<(SetAction, ProcessorRecord)>, ProductError> {
×
47
        self.operator
×
48
            .execute(SetAction::Delete, record, &mut self.record_map)
×
49
            .map_err(|err| {
×
50
                ProductError::DeleteError("UNION query error:".to_string(), Box::new(err))
×
51
            })
×
52
    }
×
53

×
54
    fn insert(
560✔
55
        &mut self,
560✔
56
        record: ProcessorRecord,
560✔
57
    ) -> Result<Vec<(SetAction, ProcessorRecord)>, ProductError> {
560✔
58
        self.operator
560✔
59
            .execute(SetAction::Insert, record, &mut self.record_map)
560✔
60
            .map_err(|err| {
560✔
61
                ProductError::InsertError("UNION query error:".to_string(), Box::new(err))
×
62
            })
560✔
63
    }
560✔
64

65
    #[allow(clippy::type_complexity)]
×
66
    fn update(
×
67
        &mut self,
×
68
        old: ProcessorRecord,
×
69
        new: ProcessorRecord,
×
70
    ) -> Result<
×
71
        (
×
72
            Vec<(SetAction, ProcessorRecord)>,
×
73
            Vec<(SetAction, ProcessorRecord)>,
×
74
        ),
×
75
        ProductError,
×
76
    > {
×
77
        let old_records = self
×
78
            .operator
×
79
            .execute(SetAction::Delete, old, &mut self.record_map)
×
80
            .map_err(|err| {
×
81
                ProductError::UpdateOldError("UNION query error:".to_string(), Box::new(err))
×
82
            })?;
×
83

×
84
        let new_records = self
×
85
            .operator
×
86
            .execute(SetAction::Insert, new, &mut self.record_map)
×
87
            .map_err(|err| {
×
88
                ProductError::UpdateNewError("UNION query error:".to_string(), Box::new(err))
×
89
            })?;
×
90

×
91
        Ok((old_records, new_records))
×
92
    }
×
93
}
94

95
impl Debug for SetProcessor {
×
96
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
97
        f.debug_tuple("SetProcessor").field(&self.operator).finish()
×
98
    }
×
99
}
100

101
impl Processor for SetProcessor {
×
102
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
28✔
103
        Ok(())
28✔
104
    }
28✔
105

×
106
    fn process(
560✔
107
        &mut self,
560✔
108
        _from_port: PortHandle,
560✔
109
        _record_store: &ProcessorRecordStore,
560✔
110
        op: ProcessorOperation,
560✔
111
        fw: &mut dyn ProcessorChannelForwarder,
560✔
112
    ) -> Result<(), BoxedError> {
560✔
113
        match op {
560✔
114
            ProcessorOperation::Delete { old } => {
×
115
                let records = self.delete(old).map_err(PipelineError::ProductError)?;
×
116

×
117
                for (action, record) in records.into_iter() {
×
118
                    match action {
×
119
                        SetAction::Insert => {
×
120
                            fw.send(
×
121
                                ProcessorOperation::Insert { new: record },
×
122
                                DEFAULT_PORT_HANDLE,
×
123
                            );
×
124
                        }
×
125
                        SetAction::Delete => {
×
126
                            fw.send(
×
127
                                ProcessorOperation::Delete { old: record },
×
128
                                DEFAULT_PORT_HANDLE,
×
129
                            );
×
130
                        }
×
131
                    }
132
                }
133
            }
×
134
            ProcessorOperation::Insert { new } => {
560✔
135
                let records = self.insert(new).map_err(PipelineError::ProductError)?;
560✔
136

×
137
                for (action, record) in records.into_iter() {
560✔
138
                    match action {
420✔
139
                        SetAction::Insert => {
420✔
140
                            fw.send(
420✔
141
                                ProcessorOperation::Insert { new: record },
420✔
142
                                DEFAULT_PORT_HANDLE,
420✔
143
                            );
420✔
144
                        }
420✔
145
                        SetAction::Delete => {
×
146
                            fw.send(
×
147
                                ProcessorOperation::Delete { old: record },
×
148
                                DEFAULT_PORT_HANDLE,
×
149
                            );
×
150
                        }
×
151
                    }
152
                }
153
            }
×
154
            ProcessorOperation::Update { old, new } => {
×
155
                let (old_records, new_records) =
×
156
                    self.update(old, new).map_err(PipelineError::ProductError)?;
×
157

×
158
                for (action, old) in old_records.into_iter() {
×
159
                    match action {
×
160
                        SetAction::Insert => {
×
161
                            fw.send(ProcessorOperation::Insert { new: old }, DEFAULT_PORT_HANDLE);
×
162
                        }
×
163
                        SetAction::Delete => {
×
164
                            fw.send(ProcessorOperation::Delete { old }, DEFAULT_PORT_HANDLE);
×
165
                        }
×
166
                    }
167
                }
168

×
169
                for (action, new) in new_records.into_iter() {
×
170
                    match action {
×
171
                        SetAction::Insert => {
×
172
                            fw.send(ProcessorOperation::Insert { new }, DEFAULT_PORT_HANDLE);
×
173
                        }
×
174
                        SetAction::Delete => {
×
175
                            fw.send(ProcessorOperation::Delete { old: new }, DEFAULT_PORT_HANDLE);
×
176
                        }
×
177
                    }
178
                }
179
            }
180
        }
×
181
        Ok(())
560✔
182
    }
560✔
183
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc