• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5665828881

pending completion
5665828881

push

github

web-flow
perf: Optimize memory usage for `Lifetime` (#1796)

* perf: Optimize memory usage for `Lifetime`

* perf: Wrap `ProcessorRecord::lifetime` in box to avoid occupying memory when it's `None`

64 of 64 new or added lines in 8 files covered. (100.0%)

45672 of 59729 relevant lines covered (76.47%)

38867.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.89
/dozer-sql/src/pipeline/product/join/processor.rs
1
use dozer_core::channels::ProcessorChannelForwarder;
2
use dozer_core::epoch::Epoch;
3
use dozer_core::executor_operation::ProcessorOperation;
4
use dozer_core::node::{PortHandle, Processor};
5
use dozer_core::DEFAULT_PORT_HANDLE;
6
use dozer_types::errors::internal::BoxedError;
7
use dozer_types::labels::Labels;
8
use dozer_types::types::Lifetime;
9
use metrics::{
10
    counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram,
11
    increment_counter,
12
};
13

14
use crate::pipeline::errors::PipelineError;
15

16
use super::operator::{JoinAction, JoinBranch, JoinOperator};
17

×
18
#[derive(Debug)]
×
19
pub struct ProductProcessor {
20
    join_operator: JoinOperator,
21
    labels: Labels,
22
}
23

24
const LEFT_LOOKUP_SIZE: &str = "product.left_lookup_size";
25
const RIGHT_LOOKUP_SIZE: &str = "product.right_lookup_size";
26
const UNSATISFIED_JOINS: &str = "product.unsatisfied_joins";
27
const IN_OPS: &str = "product.in_ops";
28
const OUT_OPS: &str = "product.out_ops";
29
const LATENCY: &str = "product.latency";
30

31
impl ProductProcessor {
×
32
    pub fn new(id: String, join_operator: JoinOperator) -> Self {
136✔
33
        describe_gauge!(
136✔
34
            LEFT_LOOKUP_SIZE,
×
35
            "Total number of items in the left lookup table"
×
36
        );
×
37
        describe_gauge!(
136✔
38
            RIGHT_LOOKUP_SIZE,
×
39
            "Total number of items in the right lookup table"
×
40
        );
×
41
        describe_counter!(
136✔
42
            UNSATISFIED_JOINS,
×
43
            "Operations not matching the Join condition"
×
44
        );
×
45
        describe_counter!(
136✔
46
            IN_OPS,
×
47
            "Number of records received by the product processor"
×
48
        );
×
49
        describe_counter!(
136✔
50
            OUT_OPS,
×
51
            "Number of records forwarded by the product processor"
×
52
        );
53

×
54
        describe_histogram!(LATENCY, "Processing latency");
136✔
55

×
56
        let mut labels = Labels::empty();
136✔
57
        labels.push("pid", id);
136✔
58
        Self {
136✔
59
            join_operator,
136✔
60
            labels,
136✔
61
        }
136✔
62
    }
136✔
63

×
64
    fn update_eviction_index(&mut self, lifetime: Lifetime) {
×
65
        let now = &lifetime.reference;
×
66
        let old_instants = self.join_operator.evict_index(&JoinBranch::Left, now);
×
67
        self.join_operator
×
68
            .clean_evict_index(&JoinBranch::Left, &old_instants);
×
69
        let old_instants = self.join_operator.evict_index(&JoinBranch::Right, now);
×
70
        self.join_operator
×
71
            .clean_evict_index(&JoinBranch::Right, &old_instants);
×
72
    }
×
73
}
74

75
impl Processor for ProductProcessor {
×
76
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
152✔
77
        Ok(())
152✔
78
    }
152✔
79

×
80
    fn process(
4,828✔
81
        &mut self,
4,828✔
82
        from_port: PortHandle,
4,828✔
83
        op: ProcessorOperation,
4,828✔
84
        fw: &mut dyn ProcessorChannelForwarder,
4,828✔
85
    ) -> Result<(), BoxedError> {
4,828✔
86
        let from_branch = match from_port {
4,828✔
87
            0 => &JoinBranch::Left,
1,440✔
88
            1 => &JoinBranch::Right,
3,388✔
89
            _ => return Err(PipelineError::InvalidPort(from_port).into()),
×
90
        };
91

×
92
        let now = std::time::Instant::now();
4,828✔
93
        let records = match op {
4,828✔
94
            ProcessorOperation::Delete { old } => {
164✔
95
                if let Some(lifetime) = old.get_record().get_lifetime() {
164✔
96
                    self.update_eviction_index(lifetime);
×
97
                }
164✔
98

×
99
                self.join_operator
164✔
100
                    .delete(from_branch, old)
164✔
101
                    .map_err(PipelineError::JoinError)?
164✔
102
            }
×
103
            ProcessorOperation::Insert { new } => {
4,632✔
104
                if let Some(lifetime) = new.get_record().get_lifetime() {
4,632✔
105
                    self.update_eviction_index(lifetime);
×
106
                }
4,632✔
107

×
108
                self.join_operator
4,632✔
109
                    .insert(from_branch, new)
4,632✔
110
                    .map_err(PipelineError::JoinError)?
4,632✔
111
            }
×
112
            ProcessorOperation::Update { old, new } => {
32✔
113
                if let Some(lifetime) = old.get_record().get_lifetime() {
32✔
114
                    self.update_eviction_index(lifetime);
×
115
                }
32✔
116

×
117
                let old_records = self
32✔
118
                    .join_operator
32✔
119
                    .delete(from_branch, old)
32✔
120
                    .map_err(PipelineError::JoinError)?;
32✔
121

×
122
                let new_records = self
32✔
123
                    .join_operator
32✔
124
                    .insert(from_branch, new)
32✔
125
                    .map_err(PipelineError::JoinError)?;
32✔
126

×
127
                old_records.into_iter().chain(new_records).collect()
32✔
128
            }
129
        };
130

×
131
        let elapsed = now.elapsed();
4,828✔
132
        histogram!(LATENCY, elapsed, self.labels.clone());
4,828✔
133
        increment_counter!(IN_OPS, self.labels.clone());
4,828✔
134

×
135
        counter!(OUT_OPS, records.len() as u64, self.labels.clone());
4,828✔
136

×
137
        gauge!(
4,828✔
138
            LEFT_LOOKUP_SIZE,
×
139
            self.join_operator.left_lookup_size() as f64,
×
140
            self.labels.clone()
×
141
        );
×
142
        gauge!(
4,828✔
143
            RIGHT_LOOKUP_SIZE,
×
144
            self.join_operator.right_lookup_size() as f64,
×
145
            self.labels.clone()
×
146
        );
147

×
148
        if records.is_empty() {
4,828✔
149
            increment_counter!(UNSATISFIED_JOINS, self.labels.clone());
3,172✔
150
        }
1,656✔
151

×
152
        for (action, record) in records {
6,668✔
153
            match action {
1,840✔
154
                JoinAction::Insert => {
1,528✔
155
                    fw.send(
1,528✔
156
                        ProcessorOperation::Insert { new: record },
1,528✔
157
                        DEFAULT_PORT_HANDLE,
1,528✔
158
                    );
1,528✔
159
                }
1,528✔
160
                JoinAction::Delete => {
312✔
161
                    fw.send(
312✔
162
                        ProcessorOperation::Delete { old: record },
312✔
163
                        DEFAULT_PORT_HANDLE,
312✔
164
                    );
312✔
165
                }
312✔
166
            }
167
        }
168

×
169
        Ok(())
4,828✔
170
    }
4,828✔
171
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc