• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5928419529

21 Aug 2023 03:36PM UTC coverage: 76.128% (+1.5%) from 74.676%
5928419529

push

github

web-flow
feat: Write record store checkpoints (#1854)

* feat: Write record store checkpoints

* feat: Synchronize pipeline and log persisting

* chore: Construct `CheckpointFactory` before constructing `ProcessFactory`

* chore: Only retry `Error::Storage` in `Queue`. Return `SendError` instead of `String`

* chore: Change an `Option<bool>` to `enum` for readability

818 of 818 new or added lines in 34 files covered. (100.0%)

46482 of 61058 relevant lines covered (76.13%)

40888.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.55
/dozer-core/src/processor_record.rs
1
use std::hash::Hash;
2
use std::sync::Arc;
3

4
use dozer_storage::errors::StorageError;
5
use dozer_types::{
6
    bincode,
7
    parking_lot::RwLock,
8
    serde::{Deserialize, Serialize},
9
    types::{Field, Lifetime, Operation, Record},
10
};
11

12
use crate::{errors::ExecutionError, executor_operation::ProcessorOperation};
13

14
#[derive(Debug)]
×
15
pub struct ProcessorRecordStore {
16
    records: RwLock<Vec<RecordRef>>,
17
}
18

19
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
5,869,669✔
20
#[serde(crate = "dozer_types::serde")]
21
pub struct RecordRef(Arc<[Field]>);
22

23
impl ProcessorRecordStore {
24
    pub fn new() -> Result<Self, ExecutionError> {
6,202✔
25
        Ok(Self {
6,202✔
26
            records: RwLock::new(vec![]),
6,202✔
27
        })
6,202✔
28
    }
6,202✔
29

30
    pub fn num_records(&self) -> usize {
1,286✔
31
        self.records.read().len()
1,286✔
32
    }
1,286✔
33

34
    pub fn serialize_slice(&self, start: usize) -> Result<(Vec<u8>, usize), StorageError> {
37✔
35
        let records = self.records.read();
37✔
36
        let slice = &records[start..];
37✔
37
        let data = bincode::serialize(slice).map_err(|e| StorageError::SerializationError {
37✔
38
            typ: "[RecordRef]",
×
39
            reason: Box::new(e),
×
40
        })?;
37✔
41
        Ok((data, slice.len()))
37✔
42
    }
37✔
43

44
    pub fn create_ref(&self, values: &[Field]) -> Result<RecordRef, StorageError> {
3,729,727✔
45
        let record = RecordRef(values.to_vec().into());
3,729,727✔
46
        self.records.write().push(record.clone());
3,729,727✔
47
        Ok(record)
3,729,727✔
48
    }
3,729,727✔
49

50
    pub fn load_ref(&self, record_ref: &RecordRef) -> Result<Vec<Field>, StorageError> {
394,591✔
51
        Ok(record_ref.0.to_vec())
394,591✔
52
    }
394,591✔
53

54
    pub fn create_record(&self, record: &Record) -> Result<ProcessorRecord, StorageError> {
3,729,679✔
55
        let record_ref = self.create_ref(&record.values)?;
3,729,679✔
56
        let mut processor_record = ProcessorRecord::new();
3,729,679✔
57
        processor_record.push(record_ref);
3,729,679✔
58
        processor_record.set_lifetime(record.lifetime.clone());
3,729,679✔
59
        Ok(processor_record)
3,729,679✔
60
    }
3,729,679✔
61

62
    pub fn load_record(&self, processor_record: &ProcessorRecord) -> Result<Record, StorageError> {
391,621✔
63
        let mut record = Record::default();
391,621✔
64
        for record_ref in processor_record.values.iter() {
394,591✔
65
            let fields = self.load_ref(record_ref)?;
394,591✔
66
            record.values.extend(fields.iter().cloned());
394,591✔
67
        }
68
        record.set_lifetime(processor_record.get_lifetime());
391,621✔
69
        Ok(record)
391,621✔
70
    }
391,621✔
71

72
    pub fn create_operation(
3,672,577✔
73
        &self,
3,672,577✔
74
        operation: &Operation,
3,672,577✔
75
    ) -> Result<ProcessorOperation, StorageError> {
3,672,577✔
76
        match operation {
3,672,577✔
77
            Operation::Delete { old } => {
1,150✔
78
                let old = self.create_record(old)?;
1,150✔
79
                Ok(ProcessorOperation::Delete { old })
1,150✔
80
            }
81
            Operation::Insert { new } => {
3,620,472✔
82
                let new = self.create_record(new)?;
3,620,472✔
83
                Ok(ProcessorOperation::Insert { new })
3,620,472✔
84
            }
85
            Operation::Update { old, new } => {
50,955✔
86
                let old = self.create_record(old)?;
50,955✔
87
                let new = self.create_record(new)?;
50,955✔
88
                Ok(ProcessorOperation::Update { old, new })
50,955✔
89
            }
90
        }
91
    }
3,672,577✔
92

93
    pub fn load_operation(
60,865✔
94
        &self,
60,865✔
95
        operation: &ProcessorOperation,
60,865✔
96
    ) -> Result<Operation, StorageError> {
60,865✔
97
        match operation {
60,865✔
98
            ProcessorOperation::Delete { old } => {
720✔
99
                let old = self.load_record(old)?;
720✔
100
                Ok(Operation::Delete { old })
720✔
101
            }
102
            ProcessorOperation::Insert { new } => {
59,920✔
103
                let new = self.load_record(new)?;
59,920✔
104
                Ok(Operation::Insert { new })
59,920✔
105
            }
106
            ProcessorOperation::Update { old, new } => {
225✔
107
                let old = self.load_record(old)?;
225✔
108
                let new = self.load_record(new)?;
225✔
109
                Ok(Operation::Update { old, new })
225✔
110
            }
111
        }
112
    }
60,865✔
113
}
114

115
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
3,731,984✔
116
pub struct ProcessorRecord {
117
    /// All `Field`s in this record. The `Field`s are grouped by `Arc` to reduce memory usage.
118
    values: Vec<RecordRef>,
119

120
    /// Time To Live for this record. If the value is None, the record will never expire.
121
    lifetime: Option<Box<Lifetime>>,
122
}
123

124
impl ProcessorRecord {
125
    pub fn new() -> Self {
3,731,984✔
126
        Self::default()
3,731,984✔
127
    }
3,731,984✔
128

129
    pub fn get_lifetime(&self) -> Option<Lifetime> {
408,086✔
130
        self.lifetime.as_ref().map(|lifetime| *lifetime.clone())
408,086✔
131
    }
408,086✔
132
    pub fn set_lifetime(&mut self, lifetime: Option<Lifetime>) {
3,729,689✔
133
        self.lifetime = lifetime.map(Box::new);
3,729,689✔
134
    }
3,729,689✔
135

136
    pub fn extend(&mut self, other: ProcessorRecord) {
4,605✔
137
        self.values.extend(other.values);
4,605✔
138
    }
4,605✔
139

140
    pub fn push(&mut self, record_ref: RecordRef) {
3,729,724✔
141
        self.values.push(record_ref);
3,729,724✔
142
    }
3,729,724✔
143

144
    pub fn pop(&mut self) -> Option<RecordRef> {
×
145
        self.values.pop()
×
146
    }
×
147
}
148

149
#[cfg(test)]
150
mod tests {
151
    use std::time::Duration;
152

153
    use dozer_types::types::Timestamp;
154

155
    use super::*;
156

157
    #[test]
1✔
158
    fn test_record_roundtrip() {
1✔
159
        let mut record = Record::new(vec![
1✔
160
            Field::Int(1),
1✔
161
            Field::Int(2),
1✔
162
            Field::Int(3),
1✔
163
            Field::Int(4),
1✔
164
        ]);
1✔
165
        record.lifetime = Some(Lifetime {
1✔
166
            reference: Timestamp::parse_from_rfc3339("2020-01-01T00:13:00Z").unwrap(),
1✔
167
            duration: Duration::from_secs(10),
1✔
168
        });
1✔
169

1✔
170
        let record_store = ProcessorRecordStore::new().unwrap();
1✔
171
        let processor_record = record_store.create_record(&record).unwrap();
1✔
172
        assert_eq!(record_store.load_record(&processor_record).unwrap(), record);
1✔
173
    }
1✔
174
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc