• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5610743655

pending completion
5610743655

push

github

web-flow
feat: Register errors from error manager in jaeger (#1753)

11 of 11 new or added lines in 2 files covered. (100.0%)

42948 of 56422 relevant lines covered (76.12%)

41547.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.61
/dozer-sql/src/pipeline/product/join/processor.rs
1
use dozer_core::channels::ProcessorChannelForwarder;
2
use dozer_core::epoch::Epoch;
3
use dozer_core::node::{PortHandle, Processor};
4
use dozer_core::DEFAULT_PORT_HANDLE;
5
use dozer_types::errors::internal::BoxedError;
6
use dozer_types::labels::Labels;
7
use dozer_types::types::Operation;
8
use metrics::{
9
    counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram,
10
    increment_counter,
11
};
12

13
use crate::pipeline::errors::PipelineError;
14

15
use super::operator::{JoinAction, JoinBranch, JoinOperator};
16

17
#[derive(Debug)]
×
18
pub struct ProductProcessor {
19
    join_operator: JoinOperator,
20
    labels: Labels,
21
}
22

23
const LEFT_LOOKUP_SIZE: &str = "product.left_lookup_size";
24
const RIGHT_LOOKUP_SIZE: &str = "product.right_lookup_size";
25
const UNSATISFIED_JOINS: &str = "product.unsatisfied_joins";
26
const IN_OPS: &str = "product.in_ops";
27
const OUT_OPS: &str = "product.out_ops";
28
const LATENCY: &str = "product.latency";
29

30
impl ProductProcessor {
31
    pub fn new(id: String, join_operator: JoinOperator) -> Self {
136✔
32
        describe_gauge!(
136✔
33
            LEFT_LOOKUP_SIZE,
×
34
            "Total number of items in the left lookup table"
×
35
        );
36
        describe_gauge!(
136✔
37
            RIGHT_LOOKUP_SIZE,
×
38
            "Total number of items in the right lookup table"
×
39
        );
40
        describe_counter!(
136✔
41
            UNSATISFIED_JOINS,
×
42
            "Operations not matching the Join condition"
×
43
        );
44
        describe_counter!(
136✔
45
            IN_OPS,
×
46
            "Number of records received by the product processor"
×
47
        );
48
        describe_counter!(
136✔
49
            OUT_OPS,
×
50
            "Number of records forwarded by the product processor"
×
51
        );
52

53
        describe_histogram!(LATENCY, "Processing latency");
136✔
54

55
        let mut labels = Labels::empty();
136✔
56
        labels.push("pid", id);
136✔
57
        Self {
136✔
58
            join_operator,
136✔
59
            labels,
136✔
60
        }
136✔
61
    }
136✔
62

63
    fn update_eviction_index(&mut self, lifetime: &dozer_types::types::Lifetime) {
×
64
        let now = &lifetime.reference;
×
65
        let old_instants = self.join_operator.evict_index(&JoinBranch::Left, now);
×
66
        self.join_operator
×
67
            .clean_evict_index(&JoinBranch::Left, &old_instants);
×
68
        let old_instants = self.join_operator.evict_index(&JoinBranch::Right, now);
×
69
        self.join_operator
×
70
            .clean_evict_index(&JoinBranch::Right, &old_instants);
×
71
    }
×
72
}
×
73

×
74
impl Processor for ProductProcessor {
×
75
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
152✔
76
        Ok(())
152✔
77
    }
152✔
78

×
79
    fn process(
4,828✔
80
        &mut self,
4,828✔
81
        from_port: PortHandle,
4,828✔
82
        op: Operation,
4,828✔
83
        fw: &mut dyn ProcessorChannelForwarder,
4,828✔
84
    ) -> Result<(), BoxedError> {
4,828✔
85
        let from_branch = match from_port {
4,828✔
86
            0 => &JoinBranch::Left,
1,440✔
87
            1 => &JoinBranch::Right,
3,388✔
88
            _ => return Err(PipelineError::InvalidPort(from_port).into()),
×
89
        };
×
90

×
91
        let now = std::time::Instant::now();
4,828✔
92
        let records = match op {
4,828✔
93
            Operation::Delete { ref old } => {
168✔
94
                if let Some(lifetime) = &old.lifetime {
168✔
95
                    self.update_eviction_index(lifetime);
4✔
96
                }
164✔
97

×
98
                self.join_operator
168✔
99
                    .delete(from_branch, old)
168✔
100
                    .map_err(PipelineError::JoinError)?
168✔
101
            }
102
            Operation::Insert { ref new } => {
4,628✔
103
                if let Some(lifetime) = &new.lifetime {
4,628✔
104
                    self.update_eviction_index(lifetime);
×
105
                }
4,628✔
106

×
107
                self.join_operator
4,628✔
108
                    .insert(from_branch, new)
4,628✔
109
                    .map_err(PipelineError::JoinError)?
4,628✔
110
            }
×
111
            Operation::Update { ref old, ref new } => {
32✔
112
                if let Some(lifetime) = &old.lifetime {
32✔
113
                    self.update_eviction_index(lifetime);
×
114
                }
32✔
115

×
116
                let old_records = self
32✔
117
                    .join_operator
32✔
118
                    .delete(from_branch, old)
32✔
119
                    .map_err(PipelineError::JoinError)?;
32✔
120

×
121
                let new_records = self
32✔
122
                    .join_operator
32✔
123
                    .insert(from_branch, new)
32✔
124
                    .map_err(PipelineError::JoinError)?;
32✔
125

×
126
                old_records.into_iter().chain(new_records).collect()
32✔
127
            }
×
128
        };
×
129

×
130
        let elapsed = now.elapsed();
4,828✔
131
        histogram!(LATENCY, elapsed, self.labels.clone());
4,828✔
132
        increment_counter!(IN_OPS, self.labels.clone());
4,828✔
133

×
134
        counter!(OUT_OPS, records.len() as u64, self.labels.clone());
4,828✔
135

136
        gauge!(
4,828✔
137
            LEFT_LOOKUP_SIZE,
138
            self.join_operator.left_lookup_size() as f64,
×
139
            self.labels.clone()
×
140
        );
141
        gauge!(
4,828✔
142
            RIGHT_LOOKUP_SIZE,
143
            self.join_operator.right_lookup_size() as f64,
×
144
            self.labels.clone()
×
145
        );
146

147
        if records.is_empty() {
4,828✔
148
            increment_counter!(UNSATISFIED_JOINS, self.labels.clone());
3,176✔
149
        }
1,652✔
150

151
        for (action, record) in records {
6,660✔
152
            match action {
1,836✔
153
                JoinAction::Insert => {
1,524✔
154
                    fw.send(Operation::Insert { new: record }, DEFAULT_PORT_HANDLE);
1,524✔
155
                }
1,524✔
156
                JoinAction::Delete => {
308✔
157
                    fw.send(Operation::Delete { old: record }, DEFAULT_PORT_HANDLE);
308✔
158
                }
308✔
159
            }
160
        }
161

162
        Ok(())
4,824✔
163
    }
4,824✔
164
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc