• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.25
/dozer-sql/src/pipeline/product/set/set_processor.rs
1
use super::operator::{SetAction, SetOperation};
2
use super::record_map::{
3
    AccurateCountingRecordMap, CountingRecordMap, CountingRecordMapEnum,
4
    ProbabilisticCountingRecordMap,
5
};
6
use crate::pipeline::errors::{PipelineError, ProductError, SetError};
7
use crate::pipeline::utils::serialize::Cursor;
8
use dozer_core::channels::ProcessorChannelForwarder;
9
use dozer_core::dozer_log::storage::Object;
10
use dozer_core::epoch::Epoch;
11
use dozer_core::executor_operation::ProcessorOperation;
12
use dozer_core::node::{PortHandle, Processor};
13
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
14
use dozer_core::DEFAULT_PORT_HANDLE;
15
use dozer_types::errors::internal::BoxedError;
16
use std::fmt::{Debug, Formatter};
17

18
pub struct SetProcessor {
19
    _id: String,
20
    /// Set operations
21
    operator: SetOperation,
22
    /// Hashmap containing records with its occurrence
23
    record_map: CountingRecordMapEnum,
24
}
25

26
impl SetProcessor {
27
    /// Creates a new [`SetProcessor`].
28
    pub fn new(
28✔
29
        id: String,
28✔
30
        operator: SetOperation,
28✔
31
        enable_probabilistic_optimizations: bool,
28✔
32
        record_store: &ProcessorRecordStore,
28✔
33
        checkpoint_data: Option<Vec<u8>>,
28✔
34
    ) -> Result<Self, SetError> {
28✔
35
        let mut cursor = checkpoint_data.as_deref().map(Cursor::new);
28✔
36
        Ok(Self {
28✔
37
            _id: id,
28✔
38
            operator,
28✔
39
            record_map: if enable_probabilistic_optimizations {
28✔
40
                ProbabilisticCountingRecordMap::new(cursor.as_mut())?.into()
×
41
            } else {
42
                AccurateCountingRecordMap::new(
28✔
43
                    cursor.as_mut().map(|cursor| (cursor, record_store)),
28✔
44
                )?
28✔
45
                .into()
28✔
46
            },
47
        })
48
    }
28✔
49

50
    fn delete(
×
51
        &mut self,
×
52
        record: ProcessorRecord,
×
53
    ) -> Result<Vec<(SetAction, ProcessorRecord)>, ProductError> {
×
54
        self.operator
×
55
            .execute(SetAction::Delete, record, &mut self.record_map)
×
56
            .map_err(|err| {
×
57
                ProductError::DeleteError("UNION query error:".to_string(), Box::new(err))
×
58
            })
×
59
    }
×
60

61
    fn insert(
560✔
62
        &mut self,
560✔
63
        record: ProcessorRecord,
560✔
64
    ) -> Result<Vec<(SetAction, ProcessorRecord)>, ProductError> {
560✔
65
        self.operator
560✔
66
            .execute(SetAction::Insert, record, &mut self.record_map)
560✔
67
            .map_err(|err| {
560✔
68
                ProductError::InsertError("UNION query error:".to_string(), Box::new(err))
×
69
            })
560✔
70
    }
560✔
71

72
    #[allow(clippy::type_complexity)]
73
    fn update(
×
74
        &mut self,
×
75
        old: ProcessorRecord,
×
76
        new: ProcessorRecord,
×
77
    ) -> Result<
×
78
        (
×
79
            Vec<(SetAction, ProcessorRecord)>,
×
80
            Vec<(SetAction, ProcessorRecord)>,
×
81
        ),
×
82
        ProductError,
×
83
    > {
×
84
        let old_records = self
×
85
            .operator
×
86
            .execute(SetAction::Delete, old, &mut self.record_map)
×
87
            .map_err(|err| {
×
88
                ProductError::UpdateOldError("UNION query error:".to_string(), Box::new(err))
×
89
            })?;
×
90

91
        let new_records = self
×
92
            .operator
×
93
            .execute(SetAction::Insert, new, &mut self.record_map)
×
94
            .map_err(|err| {
×
95
                ProductError::UpdateNewError("UNION query error:".to_string(), Box::new(err))
×
96
            })?;
×
97

98
        Ok((old_records, new_records))
×
99
    }
×
100
}
101

102
impl Debug for SetProcessor {
103
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
104
        f.debug_tuple("SetProcessor").field(&self.operator).finish()
×
105
    }
×
106
}
107

108
impl Processor for SetProcessor {
109
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
28✔
110
        Ok(())
28✔
111
    }
28✔
112

113
    fn process(
560✔
114
        &mut self,
560✔
115
        _from_port: PortHandle,
560✔
116
        _record_store: &ProcessorRecordStore,
560✔
117
        op: ProcessorOperation,
560✔
118
        fw: &mut dyn ProcessorChannelForwarder,
560✔
119
    ) -> Result<(), BoxedError> {
560✔
120
        match op {
560✔
121
            ProcessorOperation::Delete { old } => {
×
122
                let records = self.delete(old).map_err(PipelineError::ProductError)?;
×
123

124
                for (action, record) in records.into_iter() {
×
125
                    match action {
×
126
                        SetAction::Insert => {
×
127
                            fw.send(
×
128
                                ProcessorOperation::Insert { new: record },
×
129
                                DEFAULT_PORT_HANDLE,
×
130
                            );
×
131
                        }
×
132
                        SetAction::Delete => {
×
133
                            fw.send(
×
134
                                ProcessorOperation::Delete { old: record },
×
135
                                DEFAULT_PORT_HANDLE,
×
136
                            );
×
137
                        }
×
138
                    }
139
                }
140
            }
141
            ProcessorOperation::Insert { new } => {
560✔
142
                let records = self.insert(new).map_err(PipelineError::ProductError)?;
560✔
143

144
                for (action, record) in records.into_iter() {
560✔
145
                    match action {
420✔
146
                        SetAction::Insert => {
420✔
147
                            fw.send(
420✔
148
                                ProcessorOperation::Insert { new: record },
420✔
149
                                DEFAULT_PORT_HANDLE,
420✔
150
                            );
420✔
151
                        }
420✔
152
                        SetAction::Delete => {
×
153
                            fw.send(
×
154
                                ProcessorOperation::Delete { old: record },
×
155
                                DEFAULT_PORT_HANDLE,
×
156
                            );
×
157
                        }
×
158
                    }
159
                }
160
            }
161
            ProcessorOperation::Update { old, new } => {
×
162
                let (old_records, new_records) =
×
163
                    self.update(old, new).map_err(PipelineError::ProductError)?;
×
164

165
                for (action, old) in old_records.into_iter() {
×
166
                    match action {
×
167
                        SetAction::Insert => {
×
168
                            fw.send(ProcessorOperation::Insert { new: old }, DEFAULT_PORT_HANDLE);
×
169
                        }
×
170
                        SetAction::Delete => {
×
171
                            fw.send(ProcessorOperation::Delete { old }, DEFAULT_PORT_HANDLE);
×
172
                        }
×
173
                    }
174
                }
175

176
                for (action, new) in new_records.into_iter() {
×
177
                    match action {
×
178
                        SetAction::Insert => {
×
179
                            fw.send(ProcessorOperation::Insert { new }, DEFAULT_PORT_HANDLE);
×
180
                        }
×
181
                        SetAction::Delete => {
×
182
                            fw.send(ProcessorOperation::Delete { old: new }, DEFAULT_PORT_HANDLE);
×
183
                        }
×
184
                    }
185
                }
186
            }
187
        }
188
        Ok(())
560✔
189
    }
560✔
190

191
    fn serialize(
×
192
        &mut self,
×
193
        record_store: &ProcessorRecordStore,
×
194
        mut object: Object,
×
195
    ) -> Result<(), BoxedError> {
×
196
        self.record_map
×
197
            .serialize(record_store, &mut object)
×
198
            .map_err(Into::into)
×
199
    }
×
200
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc