• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6105410942

07 Sep 2023 04:28AM UTC coverage: 77.562% (-0.1%) from 77.686%
6105410942

push

github

chloeminkyung
feat: onnx image

1141 of 1141 new or added lines in 66 files covered. (100.0%)

49957 of 64409 relevant lines covered (77.56%)

50900.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.56
/dozer-sql/src/pipeline/product/join/processor.rs
1
use dozer_core::channels::ProcessorChannelForwarder;
2
use dozer_core::dozer_log::storage::Object;
3
use dozer_core::epoch::Epoch;
4
use dozer_core::executor_operation::ProcessorOperation;
5
use dozer_core::node::{PortHandle, Processor};
6
use dozer_core::processor_record::ProcessorRecordStore;
7
use dozer_core::DEFAULT_PORT_HANDLE;
8
use dozer_tracing::Labels;
9
use dozer_types::errors::internal::BoxedError;
10
use dozer_types::types::Lifetime;
11
use metrics::{
12
    counter, describe_counter, describe_gauge, describe_histogram, gauge, histogram,
13
    increment_counter,
14
};
15

16
use crate::pipeline::errors::PipelineError;
17

18
use super::operator::{JoinAction, JoinBranch, JoinOperator};
19

20
#[derive(Debug)]
×
21
pub struct ProductProcessor {
22
    join_operator: JoinOperator,
23
    labels: Labels,
24
}
25

26
const LEFT_LOOKUP_SIZE: &str = "product.left_lookup_size";
27
const RIGHT_LOOKUP_SIZE: &str = "product.right_lookup_size";
28
const UNSATISFIED_JOINS: &str = "product.unsatisfied_joins";
29
const IN_OPS: &str = "product.in_ops";
30
const OUT_OPS: &str = "product.out_ops";
31
const LATENCY: &str = "product.latency";
32

33
impl ProductProcessor {
34
    pub fn new(id: String, join_operator: JoinOperator) -> Self {
238✔
35
        describe_gauge!(
238✔
36
            LEFT_LOOKUP_SIZE,
×
37
            "Total number of items in the left lookup table"
×
38
        );
39
        describe_gauge!(
238✔
40
            RIGHT_LOOKUP_SIZE,
×
41
            "Total number of items in the right lookup table"
×
42
        );
43
        describe_counter!(
238✔
44
            UNSATISFIED_JOINS,
×
45
            "Operations not matching the Join condition"
×
46
        );
47
        describe_counter!(
238✔
48
            IN_OPS,
×
49
            "Number of records received by the product processor"
×
50
        );
51
        describe_counter!(
238✔
52
            OUT_OPS,
×
53
            "Number of records forwarded by the product processor"
×
54
        );
55

56
        describe_histogram!(LATENCY, "Processing latency");
238✔
57

58
        let mut labels = Labels::empty();
238✔
59
        labels.push("pid", id);
238✔
60
        Self {
238✔
61
            join_operator,
238✔
62
            labels,
238✔
63
        }
238✔
64
    }
238✔
65

66
    fn update_eviction_index(&mut self, lifetime: Lifetime) {
×
67
        self.join_operator.evict_index(&lifetime.reference);
×
68
    }
×
69
}
70

71
impl Processor for ProductProcessor {
72
    fn commit(&self, _epoch: &Epoch) -> Result<(), BoxedError> {
252✔
73
        Ok(())
252✔
74
    }
252✔
75

76
    fn process(
8,449✔
77
        &mut self,
8,449✔
78
        from_port: PortHandle,
8,449✔
79
        record_store: &ProcessorRecordStore,
8,449✔
80
        op: ProcessorOperation,
8,449✔
81
        fw: &mut dyn ProcessorChannelForwarder,
8,449✔
82
    ) -> Result<(), BoxedError> {
8,449✔
83
        let from_branch = match from_port {
8,449✔
84
            0 => JoinBranch::Left,
2,520✔
85
            1 => JoinBranch::Right,
5,929✔
86
            _ => return Err(PipelineError::InvalidPort(from_port).into()),
×
87
        };
88

89
        let now = std::time::Instant::now();
8,449✔
90
        let records = match op {
8,449✔
91
            ProcessorOperation::Delete { old } => {
287✔
92
                if let Some(lifetime) = old.get_lifetime() {
287✔
93
                    self.update_eviction_index(lifetime);
×
94
                }
287✔
95

96
                let old_decoded = record_store.load_record(&old)?;
287✔
97
                self.join_operator.delete(from_branch, &old, &old_decoded)
287✔
98
            }
99
            ProcessorOperation::Insert { new } => {
8,106✔
100
                if let Some(lifetime) = new.get_lifetime() {
8,106✔
101
                    self.update_eviction_index(lifetime);
×
102
                }
8,106✔
103

104
                let new_decoded = record_store.load_record(&new)?;
8,106✔
105
                self.join_operator
8,106✔
106
                    .insert(from_branch, &new, &new_decoded)
8,106✔
107
                    .map_err(PipelineError::JoinError)?
8,106✔
108
            }
109
            ProcessorOperation::Update { old, new } => {
56✔
110
                if let Some(lifetime) = old.get_lifetime() {
56✔
111
                    self.update_eviction_index(lifetime);
×
112
                }
56✔
113

114
                let old_decoded = record_store.load_record(&old)?;
56✔
115
                let new_decoded = record_store.load_record(&new)?;
56✔
116

117
                let mut old_records = self.join_operator.delete(from_branch, &old, &old_decoded);
56✔
118

119
                let new_records = self
56✔
120
                    .join_operator
56✔
121
                    .insert(from_branch, &new, &new_decoded)
56✔
122
                    .map_err(PipelineError::JoinError)?;
56✔
123

124
                old_records.extend(new_records);
56✔
125
                old_records
56✔
126
            }
127
        };
128

129
        let elapsed = now.elapsed();
8,449✔
130
        histogram!(LATENCY, elapsed, self.labels.clone());
8,449✔
131
        increment_counter!(IN_OPS, self.labels.clone());
8,449✔
132

133
        counter!(OUT_OPS, records.len() as u64, self.labels.clone());
8,449✔
134

135
        gauge!(
8,449✔
136
            LEFT_LOOKUP_SIZE,
137
            self.join_operator.left_lookup_size() as f64,
×
138
            self.labels.clone()
×
139
        );
140
        gauge!(
8,449✔
141
            RIGHT_LOOKUP_SIZE,
142
            self.join_operator.right_lookup_size() as f64,
×
143
            self.labels.clone()
×
144
        );
145

146
        if records.is_empty() {
8,449✔
147
            increment_counter!(UNSATISFIED_JOINS, self.labels.clone());
5,544✔
148
        }
2,905✔
149

150
        for (action, record) in records {
11,669✔
151
            match action {
3,220✔
152
                JoinAction::Insert => {
2,674✔
153
                    fw.send(
2,674✔
154
                        ProcessorOperation::Insert { new: record },
2,674✔
155
                        DEFAULT_PORT_HANDLE,
2,674✔
156
                    );
2,674✔
157
                }
2,674✔
158
                JoinAction::Delete => {
546✔
159
                    fw.send(
546✔
160
                        ProcessorOperation::Delete { old: record },
546✔
161
                        DEFAULT_PORT_HANDLE,
546✔
162
                    );
546✔
163
                }
546✔
164
            }
165
        }
166

167
        Ok(())
8,449✔
168
    }
8,449✔
169

170
    fn serialize(
×
171
        &mut self,
×
172
        record_store: &ProcessorRecordStore,
×
173
        object: Object,
×
174
    ) -> Result<(), BoxedError> {
×
175
        self.join_operator
×
176
            .serialize(record_store, object)
×
177
            .map_err(Into::into)
×
178
    }
×
179
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc