• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5819697753

pending completion
5819697753

push

github

web-flow
chore: Add "persist" message to pipeline (#1841)

* chore: Replace tuple with `ClosedEpoch`

* chore: Reference all records in `ProcessorRecordStore`

* chore: Include persist message in `ClosedEpoch`

* chore: `Sink`s call `EpochManager::finalize_epoch` after commit

* chore: Set processor output ports to stateless

* chore: clippy fix

* chore: Rename `EpochCommitDetails` to `EpochCommonInfo`

203 of 203 new or added lines in 11 files covered. (100.0%)

45614 of 59524 relevant lines covered (76.63%)

41187.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.04
/dozer-core/src/processor_record.rs
1
use std::hash::Hash;
2
use std::sync::Arc;
3

4
use dozer_storage::errors::StorageError;
5
use dozer_types::{
6
    parking_lot::RwLock,
7
    types::{Field, Lifetime, Operation, Record},
8
};
9

10
use crate::{errors::ExecutionError, executor_operation::ProcessorOperation};
11

12
#[derive(Debug)]
×
13
pub struct ProcessorRecordStore {
14
    records: RwLock<Vec<RecordRef>>,
15
}
16

17
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4,088,174✔
18
pub struct RecordRef(Arc<[Field]>);
19

20
impl ProcessorRecordStore {
21
    pub fn new() -> Result<Self, ExecutionError> {
6,202✔
22
        Ok(Self {
6,202✔
23
            records: RwLock::new(vec![]),
6,202✔
24
        })
6,202✔
25
    }
6,202✔
26

27
    pub fn num_record(&self) -> usize {
931✔
28
        self.records.read().len()
931✔
29
    }
931✔
30

31
    pub fn create_ref(&self, values: &[Field]) -> Result<RecordRef, StorageError> {
3,729,228✔
32
        let record = RecordRef(values.to_vec().into());
3,729,228✔
33
        self.records.write().push(record.clone());
3,729,228✔
34
        Ok(record)
3,729,228✔
35
    }
3,729,228✔
36

37
    pub fn load_ref(&self, record_ref: &RecordRef) -> Result<Vec<Field>, StorageError> {
394,591✔
38
        Ok(record_ref.0.to_vec())
394,591✔
39
    }
394,591✔
40

41
    pub fn create_record(&self, record: &Record) -> Result<ProcessorRecord, StorageError> {
3,729,182✔
42
        let record_ref = self.create_ref(&record.values)?;
3,729,182✔
43
        let mut processor_record = ProcessorRecord::new();
3,729,182✔
44
        processor_record.push(record_ref);
3,729,182✔
45
        processor_record.set_lifetime(record.lifetime.clone());
3,729,182✔
46
        Ok(processor_record)
3,729,182✔
47
    }
3,729,182✔
48

49
    pub fn load_record(&self, processor_record: &ProcessorRecord) -> Result<Record, StorageError> {
391,621✔
50
        let mut record = Record::default();
391,621✔
51
        for record_ref in processor_record.values.iter() {
394,591✔
52
            let fields = self.load_ref(record_ref)?;
394,591✔
53
            record.values.extend(fields.iter().cloned());
394,591✔
54
        }
55
        record.set_lifetime(processor_record.get_lifetime());
391,621✔
56
        Ok(record)
391,621✔
57
    }
391,621✔
58

59
    pub fn create_operation(
3,672,080✔
60
        &self,
3,672,080✔
61
        operation: &Operation,
3,672,080✔
62
    ) -> Result<ProcessorOperation, StorageError> {
3,672,080✔
63
        match operation {
3,672,080✔
64
            Operation::Delete { old } => {
1,150✔
65
                let old = self.create_record(old)?;
1,150✔
66
                Ok(ProcessorOperation::Delete { old })
1,150✔
67
            }
68
            Operation::Insert { new } => {
3,619,975✔
69
                let new = self.create_record(new)?;
3,619,975✔
70
                Ok(ProcessorOperation::Insert { new })
3,619,975✔
71
            }
72
            Operation::Update { old, new } => {
50,955✔
73
                let old = self.create_record(old)?;
50,955✔
74
                let new = self.create_record(new)?;
50,955✔
75
                Ok(ProcessorOperation::Update { old, new })
50,955✔
76
            }
77
        }
78
    }
3,672,080✔
79

80
    pub fn load_operation(
60,865✔
81
        &self,
60,865✔
82
        operation: &ProcessorOperation,
60,865✔
83
    ) -> Result<Operation, StorageError> {
60,865✔
84
        match operation {
60,865✔
85
            ProcessorOperation::Delete { old } => {
720✔
86
                let old = self.load_record(old)?;
720✔
87
                Ok(Operation::Delete { old })
720✔
88
            }
89
            ProcessorOperation::Insert { new } => {
59,920✔
90
                let new = self.load_record(new)?;
59,920✔
91
                Ok(Operation::Insert { new })
59,920✔
92
            }
93
            ProcessorOperation::Update { old, new } => {
225✔
94
                let old = self.load_record(old)?;
225✔
95
                let new = self.load_record(new)?;
225✔
96
                Ok(Operation::Update { old, new })
225✔
97
            }
98
        }
99
    }
60,865✔
100
}
101

102
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
3,731,487✔
103
pub struct ProcessorRecord {
104
    /// All `Field`s in this record. The `Field`s are grouped by `Arc` to reduce memory usage.
105
    values: Vec<RecordRef>,
106

107
    /// Time To Live for this record. If the value is None, the record will never expire.
108
    lifetime: Option<Box<Lifetime>>,
109
}
110

111
impl ProcessorRecord {
112
    pub fn new() -> Self {
3,731,487✔
113
        Self::default()
3,731,487✔
114
    }
3,731,487✔
115

116
    pub fn get_lifetime(&self) -> Option<Lifetime> {
408,086✔
117
        self.lifetime.as_ref().map(|lifetime| *lifetime.clone())
408,086✔
118
    }
408,086✔
119
    pub fn set_lifetime(&mut self, lifetime: Option<Lifetime>) {
3,729,192✔
120
        self.lifetime = lifetime.map(Box::new);
3,729,192✔
121
    }
3,729,192✔
122

123
    pub fn extend(&mut self, other: ProcessorRecord) {
4,605✔
124
        self.values.extend(other.values);
4,605✔
125
    }
4,605✔
126

127
    pub fn push(&mut self, record_ref: RecordRef) {
3,729,227✔
128
        self.values.push(record_ref);
3,729,227✔
129
    }
3,729,227✔
130

131
    pub fn pop(&mut self) -> Option<RecordRef> {
×
132
        self.values.pop()
×
133
    }
×
134
}
135

136
#[cfg(test)]
137
mod tests {
138
    use std::time::Duration;
139

140
    use dozer_types::types::Timestamp;
141

142
    use super::*;
143

144
    #[test]
1✔
145
    fn test_record_roundtrip() {
1✔
146
        let mut record = Record::new(vec![
1✔
147
            Field::Int(1),
1✔
148
            Field::Int(2),
1✔
149
            Field::Int(3),
1✔
150
            Field::Int(4),
1✔
151
        ]);
1✔
152
        record.lifetime = Some(Lifetime {
1✔
153
            reference: Timestamp::parse_from_rfc3339("2020-01-01T00:13:00Z").unwrap(),
1✔
154
            duration: Duration::from_secs(10),
1✔
155
        });
1✔
156

1✔
157
        let record_store = ProcessorRecordStore::new().unwrap();
1✔
158
        let processor_record = record_store.create_record(&record).unwrap();
1✔
159
        assert_eq!(record_store.load_record(&processor_record).unwrap(), record);
1✔
160
    }
1✔
161
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc