• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5672512448

pending completion
5672512448

push

github

web-flow
chore: Change `make_from!` in `from_arrow` to func to improve readability (#1792)

31 of 31 new or added lines in 4 files covered. (100.0%)

45630 of 59777 relevant lines covered (76.33%)

38810.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.08
/dozer-core/src/processor_record.rs
1
use std::hash::Hash;
2
use std::sync::Arc;
3

4
use dozer_types::types::{Field, Lifetime, Record, Schema};
5

6
#[derive(Debug, PartialEq, Eq, Hash, Default)]
4,252,370✔
7
pub struct ProcessorRecord {
8
    /// Every element of this `Vec` is either a referenced `ProcessorRecord` (can be nested recursively) or a direct field.
9
    values: Vec<RefOrField>,
10
    /// This is a cache of sum of number of fields in `values` recursively. Must be kept consistent with `values`.
11
    total_len: u32,
12

13
    /// Time To Live for this record. If the value is None, the record will never expire.
14
    lifetime: Option<Box<Lifetime>>,
15
}
16

17
#[derive(Debug, PartialEq, Eq, Hash)]
2,220✔
18

19
enum RefOrField {
20
    Ref(ProcessorRecordRef),
×
21
    Field(Field),
22
}
23

24
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
379,930✔
25
pub struct ProcessorRecordRef(Arc<ProcessorRecord>);
26

27
impl ProcessorRecordRef {
×
28
    pub fn new(record: ProcessorRecord) -> Self {
3,848,776✔
29
        ProcessorRecordRef(Arc::new(record))
3,848,776✔
30
    }
3,848,776✔
31

×
32
    pub fn get_record(&self) -> &ProcessorRecord {
803,251✔
33
        &self.0
803,251✔
34
    }
803,251✔
35
}
×
36

×
37
impl From<Record> for ProcessorRecord {
×
38
    fn from(record: Record) -> Self {
3,715,836✔
39
        let mut ref_record = ProcessorRecord::new();
3,715,836✔
40
        for field in record.values {
11,219,958✔
41
            ref_record.extend_direct_field(field);
7,504,122✔
42
        }
7,504,122✔
43
        ref_record
3,715,836✔
44
    }
3,715,836✔
45
}
×
46

×
47
impl ProcessorRecord {
×
48
    pub fn new() -> Self {
4,252,370✔
49
        Self::default()
4,252,370✔
50
    }
4,252,370✔
51

×
52
    pub fn from_referenced_record(record: ProcessorRecordRef) -> Self {
108,965✔
53
        let mut result = ProcessorRecord::new();
108,965✔
54
        result.extend_referenced_record(record);
108,965✔
55
        result
108,965✔
56
    }
108,965✔
57

×
58
    pub fn clone_deref(&self) -> Record {
5,470✔
59
        let mut values: Vec<Field> = Vec::new();
5,470✔
60
        for field in self.get_fields() {
12,215✔
61
            values.push(field.clone());
12,215✔
62
        }
12,215✔
63
        let mut record = Record::new(values);
5,470✔
64
        record.set_lifetime(self.get_lifetime());
5,470✔
65
        record
5,470✔
66
    }
5,470✔
67

×
68
    pub fn get_lifetime(&self) -> Option<Lifetime> {
138,010✔
69
        self.lifetime.as_ref().map(|lifetime| *lifetime.clone())
138,010✔
70
    }
138,010✔
71
    pub fn set_lifetime(&mut self, lifetime: Option<Lifetime>) {
116,085✔
72
        self.lifetime = lifetime.map(Box::new);
116,085✔
73
    }
116,085✔
74

×
75
    pub fn extend_referenced_record(&mut self, other: ProcessorRecordRef) {
113,574✔
76
        self.total_len += other.get_record().total_len;
113,574✔
77

113,574✔
78
        self.values.push(RefOrField::Ref(other));
113,574✔
79
    }
113,574✔
80

×
81
    pub fn extend_direct_field(&mut self, field: Field) {
7,912,476✔
82
        self.values.push(RefOrField::Field(field));
7,912,476✔
83
        self.total_len += 1;
7,912,476✔
84
    }
7,912,476✔
85

×
86
    pub fn get_fields(&self) -> Vec<&Field> {
11,265✔
87
        let mut fields = Vec::new();
11,265✔
88
        self.get_fields_impl(&mut fields);
11,265✔
89
        fields
11,265✔
90
    }
11,265✔
91

×
92
    fn get_fields_impl<'a>(&'a self, fields: &mut Vec<&'a Field>) {
11,269✔
93
        for ref_or_field in &self.values {
29,294✔
94
            match ref_or_field {
18,025✔
95
                RefOrField::Ref(record_ref) => {
4✔
96
                    record_ref.get_record().get_fields_impl(fields);
4✔
97
                }
4✔
98
                RefOrField::Field(field) => {
18,021✔
99
                    fields.push(field);
18,021✔
100
                }
18,021✔
101
            }
×
102
        }
×
103
    }
11,269✔
104

×
105
    // Function to get a field by its index
106
    pub fn get_field_by_index(&self, index: u32) -> &Field {
958,806✔
107
        let mut current_index = index;
958,806✔
108

×
109
        // Iterate through the values and update the counts
×
110
        for field_or_ref in self.values.iter() {
1,593,316✔
111
            match field_or_ref {
1,593,316✔
112
                RefOrField::Ref(record_ref) => {
228,983✔
113
                    // If it's a reference, check if it matches the given index
228,983✔
114
                    let rec = record_ref.get_record();
228,983✔
115
                    let count = rec.total_len;
228,983✔
116
                    if current_index < count {
228,983✔
117
                        return rec.get_field_by_index(current_index);
116,035✔
118
                    }
112,948✔
119
                    current_index -= count;
112,948✔
120
                }
×
121
                RefOrField::Field(field) => {
1,364,333✔
122
                    // If it's a field, check if it matches the given index
1,364,333✔
123
                    if current_index == 0 {
1,364,333✔
124
                        return field;
842,771✔
125
                    }
521,562✔
126
                    current_index -= 1;
521,562✔
127
                }
×
128
            }
×
129
        }
×
130

×
131
        panic!("Index {index} out of range {}", self.total_len);
×
132
    }
958,806✔
133

×
134
    pub fn get_key(&self, indexes: &[usize]) -> Vec<u8> {
261,695✔
135
        debug_assert!(!indexes.is_empty(), "Primary key indexes cannot be empty");
261,695✔
136

×
137
        let mut tot_size = 0_usize;
261,695✔
138
        let mut buffers = Vec::<Vec<u8>>::with_capacity(indexes.len());
261,695✔
139
        for i in indexes {
549,355✔
140
            let bytes = self.get_field_by_index(*i as u32).encode();
287,660✔
141
            tot_size += bytes.len();
287,660✔
142
            buffers.push(bytes);
287,660✔
143
        }
287,660✔
144

145
        let mut res_buffer = Vec::<u8>::with_capacity(tot_size);
261,695✔
146
        for i in buffers {
549,355✔
147
            res_buffer.extend(i);
287,660✔
148
        }
287,660✔
149
        res_buffer
261,695✔
150
    }
261,695✔
151

×
152
    pub fn nulls_from_schema(schema: &Schema) -> ProcessorRecord {
340✔
153
        Self::nulls(schema.fields.len())
340✔
154
    }
340✔
155

×
156
    pub fn nulls(size: usize) -> ProcessorRecord {
341✔
157
        ProcessorRecord {
341✔
158
            values: (0..size).map(|_| RefOrField::Field(Field::Null)).collect(),
2,008✔
159
            total_len: size as u32,
341✔
160
            lifetime: None,
341✔
161
        }
341✔
162
    }
341✔
163
}
×
164

×
165
#[cfg(test)]
×
166
mod tests {
×
167
    use super::*;
×
168

169
    #[test]
1✔
170
    fn test_processor_record_nulls() {
1✔
171
        let record = ProcessorRecord::nulls(3);
1✔
172
        assert_eq!(record.get_fields().len(), 3);
1✔
173
        assert_eq!(record.get_field_by_index(0), &Field::Null);
1✔
174
        assert_eq!(record.get_field_by_index(1), &Field::Null);
1✔
175
        assert_eq!(record.get_field_by_index(2), &Field::Null);
1✔
176
    }
1✔
177

×
178
    #[test]
1✔
179
    fn test_processor_record_extend_direct_field() {
1✔
180
        let mut record = ProcessorRecord::new();
1✔
181
        record.extend_direct_field(Field::Int(1));
1✔
182

1✔
183
        assert_eq!(record.get_fields().len(), 1);
1✔
184
        assert_eq!(record.get_field_by_index(0), &Field::Int(1));
1✔
185
    }
1✔
186

187
    #[test]
1✔
188
    fn test_processor_record_extend_referenced_record() {
1✔
189
        let mut record = ProcessorRecord::new();
1✔
190
        let mut other = ProcessorRecord::new();
1✔
191
        other.extend_direct_field(Field::Int(1));
1✔
192
        other.extend_direct_field(Field::Int(2));
1✔
193
        record.extend_referenced_record(ProcessorRecordRef::new(other));
1✔
194

1✔
195
        assert_eq!(record.get_fields().len(), 2);
1✔
196
        assert_eq!(record.get_field_by_index(0), &Field::Int(1));
1✔
197
        assert_eq!(record.get_field_by_index(1), &Field::Int(2));
1✔
198
    }
1✔
199

×
200
    #[test]
1✔
201
    fn test_processor_record_extend_interleave() {
1✔
202
        let mut record = ProcessorRecord::new();
1✔
203
        let mut other = ProcessorRecord::new();
1✔
204
        other.extend_direct_field(Field::Int(1));
1✔
205
        other.extend_direct_field(Field::Int(2));
1✔
206
        let other = ProcessorRecordRef::new(other);
1✔
207
        record.extend_direct_field(Field::Int(3));
1✔
208
        record.extend_referenced_record(other.clone());
1✔
209
        record.extend_direct_field(Field::Int(4));
1✔
210

1✔
211
        assert_eq!(record.get_fields().len(), 4);
1✔
212
        assert_eq!(record.get_field_by_index(0), &Field::Int(3));
1✔
213
        assert_eq!(record.get_field_by_index(1), &Field::Int(1));
1✔
214
        assert_eq!(record.get_field_by_index(2), &Field::Int(2));
1✔
215
        assert_eq!(record.get_field_by_index(3), &Field::Int(4));
1✔
216
    }
1✔
217

×
218
    #[test]
1✔
219
    fn test_processor_record_extend_nested() {
1✔
220
        let mut nested_inner = ProcessorRecord::new();
1✔
221
        nested_inner.extend_direct_field(Field::Int(1));
1✔
222
        nested_inner.extend_direct_field(Field::Int(2));
1✔
223
        let nested = ProcessorRecordRef::new(nested_inner);
1✔
224

1✔
225
        let mut nested_outer = ProcessorRecord::new();
1✔
226
        nested_outer.extend_direct_field(Field::Int(3));
1✔
227
        nested_outer.extend_referenced_record(nested);
1✔
228
        nested_outer.extend_direct_field(Field::Int(4));
1✔
229
        let nested_outer = ProcessorRecordRef::new(nested_outer);
1✔
230

1✔
231
        let mut record = ProcessorRecord::new();
1✔
232
        record.extend_direct_field(Field::Int(5));
1✔
233
        record.extend_referenced_record(nested_outer);
1✔
234
        record.extend_direct_field(Field::Int(6));
1✔
235

1✔
236
        assert_eq!(record.get_fields().len(), 6);
1✔
237
        assert_eq!(record.get_field_by_index(0), &Field::Int(5));
1✔
238
        assert_eq!(record.get_field_by_index(1), &Field::Int(3));
1✔
239
        assert_eq!(record.get_field_by_index(2), &Field::Int(1));
1✔
240
        assert_eq!(record.get_field_by_index(3), &Field::Int(2));
1✔
241
        assert_eq!(record.get_field_by_index(4), &Field::Int(4));
1✔
242
        assert_eq!(record.get_field_by_index(5), &Field::Int(6));
1✔
243
    }
1✔
244
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc