• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.78
/dozer-sql/src/pipeline/product/set/record_map/mod.rs
1
use dozer_core::{
2
    dozer_log::storage::Object,
3
    processor_record::{ProcessorRecord, ProcessorRecordStore},
4
};
5
use dozer_types::serde::{Deserialize, Serialize};
6
use enum_dispatch::enum_dispatch;
7
use std::collections::HashMap;
8

9
use crate::pipeline::utils::serialize::{
10
    deserialize_bincode, deserialize_record, deserialize_u64, serialize_bincode, serialize_record,
11
    serialize_u64, Cursor, DeserializationError, SerializationError,
12
};
13

14
#[enum_dispatch(CountingRecordMap)]
15
pub enum CountingRecordMapEnum {
16
    AccurateCountingRecordMap,
17
    ProbabilisticCountingRecordMap,
18
}
19

20
#[enum_dispatch]
598✔
21
pub trait CountingRecordMap {
22
    /// Inserts a record, or increases its insertion count if it already exixts in the map.
23
    fn insert(&mut self, record: &ProcessorRecord);
24

25
    /// Decreases the insertion count of a record, and removes it if the count reaches zero.
26
    fn remove(&mut self, record: &ProcessorRecord);
27

28
    /// Returns an estimate of the number of times this record has been inserted into the filter.
29
    /// Depending on the implementation, this number may not be accurate.
30
    fn estimate_count(&self, record: &ProcessorRecord) -> u64;
31

32
    /// Clears the map, removing all records.
33
    fn clear(&mut self);
34

35
    /// Serializes the map to a `Object`. `ProcessorRecord`s should be serialized as an `u64`.
36
    fn serialize(
37
        &self,
38
        record_store: &ProcessorRecordStore,
39
        object: &mut Object,
40
    ) -> Result<(), SerializationError>;
41
}
42

43
#[derive(Clone, Debug, PartialEq, Eq)]
×
44
pub struct AccurateCountingRecordMap {
45
    map: HashMap<ProcessorRecord, u64>,
46
}
47

48
impl AccurateCountingRecordMap {
49
    pub fn new(
29✔
50
        cursor_and_record_store: Option<(&mut Cursor, &ProcessorRecordStore)>,
29✔
51
    ) -> Result<Self, DeserializationError> {
29✔
52
        Ok(
53
            if let Some((cursor, record_store)) = cursor_and_record_store {
29✔
54
                let len = deserialize_u64(cursor)? as usize;
×
55
                let mut map = HashMap::with_capacity(len);
×
56
                for _ in 0..len {
×
57
                    let record = deserialize_record(cursor, record_store)?;
×
58
                    let count = deserialize_u64(cursor)?;
×
59
                    map.insert(record, count);
×
60
                }
61
                Self { map }
×
62
            } else {
63
                Self {
29✔
64
                    map: HashMap::new(),
29✔
65
                }
29✔
66
            },
67
        )
68
    }
29✔
69
}
70

71
impl CountingRecordMap for AccurateCountingRecordMap {
72
    fn insert(&mut self, record: &ProcessorRecord) {
284✔
73
        let count = self.map.entry(record.clone()).or_insert(0);
284✔
74
        if *count < u64::max_value() {
284✔
75
            *count += 1;
284✔
76
        }
284✔
77
    }
284✔
78

79
    fn remove(&mut self, record: &ProcessorRecord) {
80
        if let Some(count) = self.map.get_mut(record) {
2✔
81
            *count -= 1;
2✔
82
            if *count == 0 {
2✔
83
                self.map.remove(record);
1✔
84
            }
1✔
85
        }
×
86
    }
2✔
87

88
    fn estimate_count(&self, record: &ProcessorRecord) -> u64 {
292✔
89
        self.map.get(record).copied().unwrap_or(0)
292✔
90
    }
292✔
91

92
    fn clear(&mut self) {
1✔
93
        self.map.clear();
1✔
94
    }
1✔
95

96
    fn serialize(
97
        &self,
98
        record_store: &ProcessorRecordStore,
99
        object: &mut Object,
100
    ) -> Result<(), SerializationError> {
101
        serialize_u64(self.map.len() as u64, object)?;
×
102
        for (key, value) in &self.map {
×
103
            serialize_record(key, record_store, object)?;
×
104
            serialize_u64(*value, object)?;
×
105
        }
106
        Ok(())
×
107
    }
×
108
}
109

110
#[derive(Debug, Serialize, Deserialize)]
×
111
#[serde(crate = "dozer_types::serde")]
112
pub struct ProbabilisticCountingRecordMap {
113
    map: bloom::CountingBloomFilter,
114
}
115

116
impl ProbabilisticCountingRecordMap {
117
    const FALSE_POSITIVE_RATE: f32 = 0.01;
118
    const EXPECTED_NUM_ITEMS: u32 = 10000000;
119

120
    pub fn new(cursor: Option<&mut Cursor>) -> Result<Self, DeserializationError> {
1✔
121
        Ok(if let Some(cursor) = cursor {
1✔
122
            Self {
123
                map: deserialize_bincode(cursor)?,
×
124
            }
125
        } else {
126
            Self {
1✔
127
                map: bloom::CountingBloomFilter::with_rate(
1✔
128
                    Self::FALSE_POSITIVE_RATE,
1✔
129
                    Self::EXPECTED_NUM_ITEMS,
1✔
130
                ),
1✔
131
            }
1✔
132
        })
133
    }
1✔
134
}
135

136
impl CountingRecordMap for ProbabilisticCountingRecordMap {
137
    fn insert(&mut self, record: &ProcessorRecord) {
4✔
138
        self.map.insert(record);
4✔
139
    }
4✔
140

141
    fn remove(&mut self, record: &ProcessorRecord) {
2✔
142
        self.map.remove(record);
2✔
143
    }
2✔
144

145
    fn estimate_count(&self, record: &ProcessorRecord) -> u64 {
12✔
146
        self.map.estimate_count(record) as u64
12✔
147
    }
12✔
148

149
    fn clear(&mut self) {
1✔
150
        self.map.clear();
1✔
151
    }
1✔
152

153
    fn serialize(
×
154
        &self,
×
155
        _record_store: &ProcessorRecordStore,
×
156
        object: &mut Object,
×
157
    ) -> Result<(), SerializationError> {
×
158
        serialize_bincode(&self.map, object)
×
159
    }
×
160
}
161

162
mod bloom;
163

164
#[cfg(test)]
165
mod tests {
166
    use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordStore};
167
    use dozer_types::types::{Field, Record};
168

169
    use super::{
170
        AccurateCountingRecordMap, CountingRecordMap, CountingRecordMapEnum,
171
        ProbabilisticCountingRecordMap,
172
    };
173

174
    fn test_map(mut map: CountingRecordMapEnum) {
2✔
175
        let record_store = ProcessorRecordStore::new().unwrap();
2✔
176
        let make_record = |fields: Vec<Field>| -> ProcessorRecord {
4✔
177
            record_store.create_record(&Record::new(fields)).unwrap()
4✔
178
        };
4✔
179

180
        let a = make_record(vec![Field::String('a'.into())]);
2✔
181
        let b = make_record(vec![Field::String('b'.into())]);
2✔
182

2✔
183
        assert_eq!(map.estimate_count(&a), 0);
2✔
184
        assert_eq!(map.estimate_count(&b), 0);
2✔
185

186
        map.insert(&a);
2✔
187
        map.insert(&b);
2✔
188
        assert_eq!(map.estimate_count(&a), 1);
2✔
189
        assert_eq!(map.estimate_count(&b), 1);
2✔
190

191
        map.insert(&b);
2✔
192
        map.insert(&b);
2✔
193
        assert_eq!(map.estimate_count(&a), 1);
2✔
194
        assert_eq!(map.estimate_count(&b), 3);
2✔
195

196
        map.remove(&b);
2✔
197
        assert_eq!(map.estimate_count(&a), 1);
2✔
198
        assert_eq!(map.estimate_count(&b), 2);
2✔
199

200
        map.remove(&a);
2✔
201
        assert_eq!(map.estimate_count(&a), 0);
2✔
202
        assert_eq!(map.estimate_count(&b), 2);
2✔
203

204
        map.clear();
2✔
205
        assert_eq!(map.estimate_count(&a), 0);
2✔
206
        assert_eq!(map.estimate_count(&b), 0);
2✔
207
    }
2✔
208

209
    #[test]
1✔
210
    fn test_maps() {
1✔
211
        let accurate_map = AccurateCountingRecordMap::new(None).unwrap().into();
1✔
212
        test_map(accurate_map);
1✔
213

1✔
214
        let probabilistic_map = ProbabilisticCountingRecordMap::new(None).unwrap().into();
1✔
215
        test_map(probabilistic_map);
1✔
216
    }
1✔
217
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc