• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5665828881

pending completion
5665828881

push

github

web-flow
perf: Optimize memory usage for `Lifetime` (#1796)

* perf: Optimize memory usage for `Lifetime`

* perf: Wrap `ProcessorRecord::lifetime` in box to avoid occupying memory when it's `None`

64 of 64 new or added lines in 8 files covered. (100.0%)

45672 of 59729 relevant lines covered (76.47%)

38867.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.13
/dozer-sql/src/pipeline/projection/processor.rs
1
use crate::pipeline::errors::PipelineError;
2
use crate::pipeline::expression::execution::Expression;
3

4
use dozer_core::channels::ProcessorChannelForwarder;
5
use dozer_core::epoch::Epoch;
6
use dozer_core::executor_operation::ProcessorOperation;
7
use dozer_core::node::{PortHandle, Processor};
8
use dozer_core::processor_record::{ProcessorRecord, ProcessorRecordRef};
9
use dozer_core::DEFAULT_PORT_HANDLE;
10
use dozer_types::errors::internal::BoxedError;
11
use dozer_types::types::Schema;
12

13
#[derive(Debug)]
×
14
pub struct ProjectionProcessor {
15
    expressions: Vec<Expression>,
16
    input_schema: Schema,
17
}
18

19
impl ProjectionProcessor {
20
    pub fn new(input_schema: Schema, expressions: Vec<Expression>) -> Self {
1,394✔
21
        Self {
1,394✔
22
            input_schema,
1,394✔
23
            expressions,
1,394✔
24
        }
1,394✔
25
    }
1,394✔
26

27
    fn delete(&mut self, record: &ProcessorRecordRef) -> Result<ProcessorOperation, PipelineError> {
200✔
28
        let mut output_record = ProcessorRecord::new();
200✔
29
        for expr in &self.expressions {
588✔
30
            output_record
31
                .extend_direct_field(expr.evaluate(record.get_record(), &self.input_schema)?);
388✔
32
        }
33

34
        output_record.set_lifetime(record.get_record().get_lifetime());
200✔
35

200✔
36
        Ok(ProcessorOperation::Delete {
200✔
37
            old: ProcessorRecordRef::new(output_record),
200✔
38
        })
200✔
39
    }
200✔
40

41
    fn insert(&mut self, record: &ProcessorRecordRef) -> Result<ProcessorOperation, PipelineError> {
2,886✔
42
        let mut output_record = ProcessorRecord::new();
2,886✔
43

44
        for expr in self.expressions.clone() {
4,918✔
45
            output_record
46
                .extend_direct_field(expr.evaluate(record.get_record(), &self.input_schema)?);
4,918✔
47
        }
48

49
        output_record.set_lifetime(record.get_record().get_lifetime());
2,886✔
50
        Ok(ProcessorOperation::Insert {
2,886✔
51
            new: ProcessorRecordRef::new(output_record),
2,886✔
52
        })
2,886✔
53
    }
2,886✔
54

55
    fn update(
×
56
        &self,
×
57
        old: &ProcessorRecordRef,
×
58
        new: &ProcessorRecordRef,
×
59
    ) -> Result<ProcessorOperation, PipelineError> {
×
60
        let mut old_output_record = ProcessorRecord::new();
×
61
        let mut new_output_record = ProcessorRecord::new();
×
62
        for expr in &self.expressions {
×
63
            old_output_record
64
                .extend_direct_field(expr.evaluate(old.get_record(), &self.input_schema)?);
×
65
            new_output_record
66
                .extend_direct_field(expr.evaluate(new.get_record(), &self.input_schema)?);
×
67
        }
68

69
        old_output_record.set_lifetime(old.get_record().get_lifetime());
×
70

×
71
        new_output_record.set_lifetime(new.get_record().get_lifetime());
×
72
        Ok(ProcessorOperation::Update {
×
73
            old: ProcessorRecordRef::new(old_output_record),
×
74
            new: ProcessorRecordRef::new(new_output_record),
×
75
        })
×
76
    }
×
77
}
78

79
impl Processor for ProjectionProcessor {
80
    fn process(
3,086✔
81
        &mut self,
3,086✔
82
        _from_port: PortHandle,
3,086✔
83
        op: ProcessorOperation,
3,086✔
84
        fw: &mut dyn ProcessorChannelForwarder,
3,086✔
85
    ) -> Result<(), BoxedError> {
3,086✔
86
        match op {
3,086✔
87
            ProcessorOperation::Delete { ref old } => {
200✔
88
                fw.send(self.delete(old)?, DEFAULT_PORT_HANDLE)
200✔
89
            }
90
            ProcessorOperation::Insert { ref new } => {
2,886✔
91
                fw.send(self.insert(new)?, DEFAULT_PORT_HANDLE)
2,886✔
92
            }
93
            ProcessorOperation::Update { ref old, ref new } => {
×
94
                fw.send(self.update(old, new)?, DEFAULT_PORT_HANDLE)
×
95
            }
96
        };
97
        Ok(())
3,086✔
98
    }
3,086✔
99

100
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
244✔
101
        Ok(())
244✔
102
    }
244✔
103
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc