• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6033855373

31 Aug 2023 06:28AM UTC coverage: 77.271%. First build
6033855373

Pull #1948

github

chubei
feat: Add `sink_operation` metric
Pull Request #1948: feat: Add `sink_operation` metric

22 of 22 new or added lines in 1 file covered. (100.0%)

49152 of 63610 relevant lines covered (77.27%)

51493.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.73
/dozer-core/src/executor/sink_node.rs
1
use std::{borrow::Cow, mem::swap, sync::Arc};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_tracing::LabelsAndProgress;
6
use dozer_types::node::NodeHandle;
7
use metrics::{describe_counter, describe_histogram, histogram, increment_counter};
8

9
use crate::{
10
    builder_dag::NodeKind,
11
    epoch::{Epoch, EpochManager},
12
    error_manager::ErrorManager,
13
    errors::ExecutionError,
14
    executor_operation::{ExecutorOperation, ProcessorOperation},
15
    node::{PortHandle, Sink},
16
};
17

18
use super::execution_dag::ExecutionDag;
19
use super::{name::Name, receiver_loop::ReceiverLoop};
20

21
/// A sink in the execution DAG.
×
22
#[derive(Debug)]
×
23
pub struct SinkNode {
24
    /// Node handle in description DAG.
25
    node_handle: NodeHandle,
26
    /// Input port handles.
27
    port_handles: Vec<PortHandle>,
28
    /// Input data channels.
29
    receivers: Vec<Receiver<ExecutorOperation>>,
30
    /// The sink.
31
    sink: Box<dyn Sink>,
32
    /// Where all the records from ingested data are stored.
33
    epoch_manager: Arc<EpochManager>,
34
    /// The error manager, for reporting non-fatal errors.
35
    error_manager: Arc<ErrorManager>,
36
    /// The metrics labels.
37
    labels: LabelsAndProgress,
38
}
39

40
const SINK_OPERATION_COUNTER_NAME: &str = "sink_operation";
×
41
const PIPELINE_LATENCY_HISTOGRAM_NAME: &str = "pipeline_latency";
×
42

×
43
impl SinkNode {
44
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
633✔
45
        let Some(node) = dag.node_weight_mut(node_index).take() else {
633✔
46
            panic!("Must pass in a node")
×
47
        };
48
        let node_handle = node.handle;
633✔
49
        let NodeKind::Sink(sink) = node.kind else {
633✔
50
            panic!("Must pass in a sink node");
×
51
        };
×
52

×
53
        let (port_handles, receivers) = dag.collect_receivers(node_index);
633✔
54

633✔
55
        describe_counter!(
633✔
56
            SINK_OPERATION_COUNTER_NAME,
×
57
            "Number of operation processed by the sink"
×
58
        );
×
59
        describe_histogram!(
633✔
60
            PIPELINE_LATENCY_HISTOGRAM_NAME,
×
61
            "The pipeline processing latency in seconds"
×
62
        );
×
63

×
64
        Self {
633✔
65
            node_handle,
633✔
66
            port_handles,
633✔
67
            receivers,
633✔
68
            sink,
633✔
69
            epoch_manager: dag.epoch_manager().clone(),
633✔
70
            error_manager: dag.error_manager().clone(),
633✔
71
            labels: dag.labels().clone(),
633✔
72
        }
633✔
73
    }
633✔
74

×
75
    pub fn handle(&self) -> &NodeHandle {
633✔
76
        &self.node_handle
633✔
77
    }
633✔
78
}
×
79

×
80
impl Name for SinkNode {
×
81
    fn name(&self) -> Cow<str> {
×
82
        Cow::Owned(self.node_handle.to_string())
×
83
    }
×
84
}
×
85

×
86
impl ReceiverLoop for SinkNode {
×
87
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
633✔
88
        let mut result = vec![];
633✔
89
        swap(&mut self.receivers, &mut result);
633✔
90
        result
633✔
91
    }
633✔
92

×
93
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
94
        Cow::Owned(self.port_handles[index].to_string())
×
95
    }
×
96

×
97
    fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError> {
3,707,282✔
98
        let mut labels = self.labels.labels().clone();
3,707,282✔
99
        labels.push("table", self.node_handle.id.clone());
3,707,282✔
100
        const OPERATION_TYPE_LABEL: &str = "operation_type";
3,707,282✔
101
        match &op {
3,707,282✔
102
            ProcessorOperation::Insert { .. } => {
3,625,314✔
103
                labels.push(OPERATION_TYPE_LABEL, "insert");
3,625,314✔
104
            }
3,625,314✔
105
            ProcessorOperation::Delete { .. } => {
864✔
106
                labels.push(OPERATION_TYPE_LABEL, "delete");
864✔
107
            }
864✔
108
            ProcessorOperation::Update { .. } => {
81,104✔
109
                labels.push(OPERATION_TYPE_LABEL, "update");
81,104✔
110
            }
81,104✔
111
        }
×
112

×
113
        if let Err(e) = self.sink.process(
3,707,282✔
114
            self.port_handles[index],
3,707,282✔
115
            self.epoch_manager.record_store(),
3,707,282✔
116
            op,
3,707,282✔
117
        ) {
3,707,282✔
118
            self.error_manager.report(e);
2✔
119
        }
3,707,280✔
120

×
121
        increment_counter!(SINK_OPERATION_COUNTER_NAME, labels);
3,707,282✔
122

123
        Ok(())
3,707,282✔
124
    }
3,707,282✔
125

×
126
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
×
127
        // debug!("[{}] Checkpointing - {}", self.node_handle, epoch);
×
128
        if let Err(e) = self.sink.commit(epoch) {
1,217✔
129
            self.error_manager.report(e);
×
130
        }
1,217✔
131

132
        if let Ok(duration) = epoch.decision_instant.elapsed() {
1,217✔
133
            let mut labels = self.labels.labels().clone();
1,217✔
134
            labels.push("endpoint", self.node_handle.id.clone());
1,217✔
135
            histogram!(PIPELINE_LATENCY_HISTOGRAM_NAME, duration, labels);
1,217✔
136
        }
×
137

138
        if let Some(checkpoint_writer) = epoch.common_info.checkpoint_writer.as_ref() {
1,217✔
139
            if let Err(e) = self.sink.persist(checkpoint_writer.queue()) {
31✔
140
                self.error_manager.report(e);
×
141
            }
31✔
142
        }
1,186✔
143

144
        Ok(())
1,217✔
145
    }
1,217✔
146

147
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
628✔
148
        Ok(())
628✔
149
    }
628✔
150

151
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
152
        if let Err(e) = self.sink.on_source_snapshotting_done(connection_name) {
48✔
153
            self.error_manager.report(e);
×
154
        }
48✔
155
        Ok(())
48✔
156
    }
48✔
157
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc