• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.44
/dozer-core/src/processor_record.rs
1
use std::hash::Hash;
2
use std::sync::Arc;
3

4
use dozer_storage::errors::StorageError;
5
use dozer_types::{
6
    parking_lot::RwLock,
7
    types::{Field, Lifetime, Operation, Record},
8
};
9

10
use crate::{errors::ExecutionError, executor_operation::ProcessorOperation};
11

12
#[derive(Debug)]
×
13
pub struct ProcessorRecordStore {
14
    records: RwLock<Vec<RecordRef>>,
×
15
}
16

17
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
4,086,412✔
18
pub struct RecordRef(Arc<[Field]>);
19

×
20
impl ProcessorRecordStore {
21
    pub fn new() -> Result<Self, ExecutionError> {
6,202✔
22
        Ok(Self {
6,202✔
23
            records: RwLock::new(vec![]),
6,202✔
24
        })
6,202✔
25
    }
6,202✔
26

×
27
    pub fn num_record(&self) -> usize {
911✔
28
        self.records.read().len()
911✔
29
    }
911✔
30

×
31
    pub fn create_ref(&self, values: &[Field]) -> Result<RecordRef, StorageError> {
3,727,466✔
32
        let record = RecordRef(values.to_vec().into());
3,727,466✔
33
        self.records.write().push(record.clone());
3,727,466✔
34
        Ok(record)
3,727,466✔
35
    }
3,727,466✔
36

×
37
    pub fn load_ref(&self, record_ref: &RecordRef) -> Result<Vec<Field>, StorageError> {
394,591✔
38
        Ok(record_ref.0.to_vec())
394,591✔
39
    }
394,591✔
40

×
41
    pub fn create_record(&self, record: &Record) -> Result<ProcessorRecord, StorageError> {
3,727,420✔
42
        let record_ref = self.create_ref(&record.values)?;
3,727,420✔
43
        let mut processor_record = ProcessorRecord::new();
3,727,420✔
44
        processor_record.push(record_ref);
3,727,420✔
45
        processor_record.set_lifetime(record.lifetime.clone());
3,727,420✔
46
        Ok(processor_record)
3,727,420✔
47
    }
3,727,420✔
48

×
49
    pub fn load_record(&self, processor_record: &ProcessorRecord) -> Result<Record, StorageError> {
391,621✔
50
        let mut record = Record::default();
391,621✔
51
        for record_ref in processor_record.values.iter() {
394,591✔
52
            let fields = self.load_ref(record_ref)?;
394,591✔
53
            record.values.extend(fields.iter().cloned());
394,591✔
54
        }
×
55
        record.set_lifetime(processor_record.get_lifetime());
391,621✔
56
        Ok(record)
391,621✔
57
    }
391,621✔
58

×
59
    pub fn create_operation(
3,670,318✔
60
        &self,
3,670,318✔
61
        operation: &Operation,
3,670,318✔
62
    ) -> Result<ProcessorOperation, StorageError> {
3,670,318✔
63
        match operation {
3,670,318✔
64
            Operation::Delete { old } => {
1,150✔
65
                let old = self.create_record(old)?;
1,150✔
66
                Ok(ProcessorOperation::Delete { old })
1,150✔
67
            }
68
            Operation::Insert { new } => {
3,618,213✔
69
                let new = self.create_record(new)?;
3,618,213✔
70
                Ok(ProcessorOperation::Insert { new })
3,618,213✔
71
            }
72
            Operation::Update { old, new } => {
50,955✔
73
                let old = self.create_record(old)?;
50,955✔
74
                let new = self.create_record(new)?;
50,955✔
75
                Ok(ProcessorOperation::Update { old, new })
50,955✔
76
            }
×
77
        }
×
78
    }
3,670,318✔
79

×
80
    pub fn load_operation(
60,865✔
81
        &self,
60,865✔
82
        operation: &ProcessorOperation,
60,865✔
83
    ) -> Result<Operation, StorageError> {
60,865✔
84
        match operation {
60,865✔
85
            ProcessorOperation::Delete { old } => {
720✔
86
                let old = self.load_record(old)?;
720✔
87
                Ok(Operation::Delete { old })
720✔
88
            }
×
89
            ProcessorOperation::Insert { new } => {
59,920✔
90
                let new = self.load_record(new)?;
59,920✔
91
                Ok(Operation::Insert { new })
59,920✔
92
            }
93
            ProcessorOperation::Update { old, new } => {
225✔
94
                let old = self.load_record(old)?;
225✔
95
                let new = self.load_record(new)?;
225✔
96
                Ok(Operation::Update { old, new })
225✔
97
            }
×
98
        }
×
99
    }
60,865✔
100
}
×
101

102
#[derive(Debug, Clone, PartialEq, Eq, Hash, Default)]
3,729,725✔
103
pub struct ProcessorRecord {
×
104
    /// All `Field`s in this record. The `Field`s are grouped by `Arc` to reduce memory usage.
×
105
    values: Vec<RecordRef>,
106

×
107
    /// Time To Live for this record. If the value is None, the record will never expire.
×
108
    lifetime: Option<Box<Lifetime>>,
×
109
}
×
110

111
impl ProcessorRecord {
112
    pub fn new() -> Self {
3,729,725✔
113
        Self::default()
3,729,725✔
114
    }
3,729,725✔
115

×
116
    pub fn get_lifetime(&self) -> Option<Lifetime> {
408,086✔
117
        self.lifetime.as_ref().map(|lifetime| *lifetime.clone())
408,086✔
118
    }
408,086✔
119
    pub fn set_lifetime(&mut self, lifetime: Option<Lifetime>) {
3,727,430✔
120
        self.lifetime = lifetime.map(Box::new);
3,727,430✔
121
    }
3,727,430✔
122

123
    pub fn extend(&mut self, other: ProcessorRecord) {
4,605✔
124
        self.values.extend(other.values);
4,605✔
125
    }
4,605✔
126

×
127
    pub fn push(&mut self, record_ref: RecordRef) {
3,727,465✔
128
        self.values.push(record_ref);
3,727,465✔
129
    }
3,727,465✔
130

×
131
    pub fn pop(&mut self) -> Option<RecordRef> {
×
132
        self.values.pop()
×
133
    }
×
134
}
×
135

136
#[cfg(test)]
×
137
mod tests {
×
138
    use std::time::Duration;
×
139

140
    use dozer_types::types::Timestamp;
×
141

×
142
    use super::*;
×
143

144
    #[test]
1✔
145
    fn test_record_roundtrip() {
1✔
146
        let mut record = Record::new(vec![
1✔
147
            Field::Int(1),
1✔
148
            Field::Int(2),
1✔
149
            Field::Int(3),
1✔
150
            Field::Int(4),
1✔
151
        ]);
1✔
152
        record.lifetime = Some(Lifetime {
1✔
153
            reference: Timestamp::parse_from_rfc3339("2020-01-01T00:13:00Z").unwrap(),
1✔
154
            duration: Duration::from_secs(10),
1✔
155
        });
1✔
156

1✔
157
        let record_store = ProcessorRecordStore::new().unwrap();
1✔
158
        let processor_record = record_store.create_record(&record).unwrap();
1✔
159
        assert_eq!(record_store.load_record(&processor_record).unwrap(), record);
1✔
160
    }
1✔
161
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc