• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5809849307

pending completion
5809849307

push

github

web-flow
refactor: Remove nested record ref. Add `ProcessorRecordStore` (#1835)

* temp: Remove `ProcessRecordRef` and let `ProcessorRecord` directly reference `Vec<Field>`

* feat: Abstract away `ProcessorRecordStore`

* fix: Fix tests and clippy

794 of 794 new or added lines in 78 files covered. (100.0%)

45486 of 61064 relevant lines covered (74.49%)

58165.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.42
/dozer-sql/src/pipeline/product/join/processor.rs
1
use dozer_core::channels::ProcessorChannelForwarder;
2
use dozer_core::epoch::Epoch;
3
use dozer_core::executor_operation::ProcessorOperation;
4
use dozer_core::node::{PortHandle, Processor};
5
use dozer_core::processor_record::ProcessorRecordStore;
6
use dozer_core::DEFAULT_PORT_HANDLE;
7
use dozer_types::errors::internal::BoxedError;
8
use dozer_types::labels::Labels;
9
use dozer_types::types::Lifetime;
10
use metrics::{
11
    counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram,
12
    increment_counter,
13
};
14

15
use crate::pipeline::errors::PipelineError;
16

17
use super::operator::{JoinAction, JoinBranch, JoinOperator};
18

×
19
#[derive(Debug)]
×
20
pub struct ProductProcessor {
21
    join_operator: JoinOperator,
22
    labels: Labels,
23
}
24

25
const LEFT_LOOKUP_SIZE: &str = "product.left_lookup_size";
26
const RIGHT_LOOKUP_SIZE: &str = "product.right_lookup_size";
27
const UNSATISFIED_JOINS: &str = "product.unsatisfied_joins";
28
const IN_OPS: &str = "product.in_ops";
29
const OUT_OPS: &str = "product.out_ops";
30
const LATENCY: &str = "product.latency";
31

32
impl ProductProcessor {
×
33
    pub fn new(id: String, join_operator: JoinOperator) -> Self {
136✔
34
        describe_gauge!(
136✔
35
            LEFT_LOOKUP_SIZE,
×
36
            "Total number of items in the left lookup table"
×
37
        );
×
38
        describe_gauge!(
136✔
39
            RIGHT_LOOKUP_SIZE,
×
40
            "Total number of items in the right lookup table"
×
41
        );
×
42
        describe_counter!(
136✔
43
            UNSATISFIED_JOINS,
×
44
            "Operations not matching the Join condition"
×
45
        );
×
46
        describe_counter!(
136✔
47
            IN_OPS,
×
48
            "Number of records received by the product processor"
×
49
        );
×
50
        describe_counter!(
136✔
51
            OUT_OPS,
×
52
            "Number of records forwarded by the product processor"
×
53
        );
54

×
55
        describe_histogram!(LATENCY, "Processing latency");
136✔
56

×
57
        let mut labels = Labels::empty();
136✔
58
        labels.push("pid", id);
136✔
59
        Self {
136✔
60
            join_operator,
136✔
61
            labels,
136✔
62
        }
136✔
63
    }
136✔
64

×
65
    fn update_eviction_index(&mut self, lifetime: Lifetime) {
×
66
        let now = &lifetime.reference;
×
67
        let old_instants = self.join_operator.evict_index(&JoinBranch::Left, now);
×
68
        self.join_operator
×
69
            .clean_evict_index(&JoinBranch::Left, &old_instants);
×
70
        let old_instants = self.join_operator.evict_index(&JoinBranch::Right, now);
×
71
        self.join_operator
×
72
            .clean_evict_index(&JoinBranch::Right, &old_instants);
×
73
    }
×
74
}
75

76
impl Processor for ProductProcessor {
×
77
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
152✔
78
        Ok(())
152✔
79
    }
152✔
80

×
81
    fn process(
4,828✔
82
        &mut self,
4,828✔
83
        from_port: PortHandle,
4,828✔
84
        record_store: &ProcessorRecordStore,
4,828✔
85
        op: ProcessorOperation,
4,828✔
86
        fw: &mut dyn ProcessorChannelForwarder,
4,828✔
87
    ) -> Result<(), BoxedError> {
4,828✔
88
        let from_branch = match from_port {
4,828✔
89
            0 => &JoinBranch::Left,
1,440✔
90
            1 => &JoinBranch::Right,
3,388✔
91
            _ => return Err(PipelineError::InvalidPort(from_port).into()),
×
92
        };
×
93

×
94
        let now = std::time::Instant::now();
4,828✔
95
        let records = match op {
4,828✔
96
            ProcessorOperation::Delete { old } => {
164✔
97
                if let Some(lifetime) = old.get_lifetime() {
164✔
98
                    self.update_eviction_index(lifetime);
×
99
                }
164✔
100

×
101
                let old_decoded = record_store.load_record(&old)?;
164✔
102
                self.join_operator
164✔
103
                    .delete(from_branch, record_store, old, old_decoded)
164✔
104
                    .map_err(PipelineError::JoinError)?
164✔
105
            }
×
106
            ProcessorOperation::Insert { new } => {
4,632✔
107
                if let Some(lifetime) = new.get_lifetime() {
4,632✔
108
                    self.update_eviction_index(lifetime);
×
109
                }
4,632✔
110

×
111
                let new_decoded = record_store.load_record(&new)?;
4,632✔
112
                self.join_operator
4,632✔
113
                    .insert(from_branch, record_store, new, new_decoded)
4,632✔
114
                    .map_err(PipelineError::JoinError)?
4,632✔
115
            }
×
116
            ProcessorOperation::Update { old, new } => {
32✔
117
                if let Some(lifetime) = old.get_lifetime() {
32✔
118
                    self.update_eviction_index(lifetime);
×
119
                }
32✔
120

×
121
                let old_decoded = record_store.load_record(&old)?;
32✔
122
                let new_decoded = record_store.load_record(&new)?;
32✔
123

×
124
                let old_records = self
32✔
125
                    .join_operator
32✔
126
                    .delete(from_branch, record_store, old, old_decoded)
32✔
127
                    .map_err(PipelineError::JoinError)?;
32✔
128

129
                let new_records = self
32✔
130
                    .join_operator
32✔
131
                    .insert(from_branch, record_store, new, new_decoded)
32✔
132
                    .map_err(PipelineError::JoinError)?;
32✔
133

×
134
                old_records.into_iter().chain(new_records).collect()
32✔
135
            }
×
136
        };
137

×
138
        let elapsed = now.elapsed();
4,828✔
139
        histogram!(LATENCY, elapsed, self.labels.clone());
4,828✔
140
        increment_counter!(IN_OPS, self.labels.clone());
4,828✔
141

142
        counter!(OUT_OPS, records.len() as u64, self.labels.clone());
4,828✔
143

144
        gauge!(
4,828✔
145
            LEFT_LOOKUP_SIZE,
×
146
            self.join_operator.left_lookup_size() as f64,
×
147
            self.labels.clone()
×
148
        );
×
149
        gauge!(
4,828✔
150
            RIGHT_LOOKUP_SIZE,
×
151
            self.join_operator.right_lookup_size() as f64,
×
152
            self.labels.clone()
×
153
        );
×
154

×
155
        if records.is_empty() {
4,828✔
156
            increment_counter!(UNSATISFIED_JOINS, self.labels.clone());
3,172✔
157
        }
1,656✔
158

×
159
        for (action, record) in records {
6,668✔
160
            match action {
1,840✔
161
                JoinAction::Insert => {
1,528✔
162
                    fw.send(
1,528✔
163
                        ProcessorOperation::Insert { new: record },
1,528✔
164
                        DEFAULT_PORT_HANDLE,
1,528✔
165
                    );
1,528✔
166
                }
1,528✔
167
                JoinAction::Delete => {
312✔
168
                    fw.send(
312✔
169
                        ProcessorOperation::Delete { old: record },
312✔
170
                        DEFAULT_PORT_HANDLE,
312✔
171
                    );
312✔
172
                }
312✔
173
            }
174
        }
175

176
        Ok(())
4,828✔
177
    }
4,828✔
178
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc