• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

81.11
/dozer-core/src/executor/sink_node.rs
1
use std::{borrow::Cow, mem::swap, sync::Arc};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_tracing::LabelsAndProgress;
6
use dozer_types::node::NodeHandle;
7
use metrics::{describe_counter, describe_histogram, histogram, increment_counter};
8

9
use crate::{
10
    builder_dag::NodeKind,
11
    epoch::{Epoch, EpochManager},
12
    error_manager::ErrorManager,
13
    errors::ExecutionError,
14
    executor_operation::{ExecutorOperation, ProcessorOperation},
15
    node::{PortHandle, Sink},
16
};
17

18
use super::execution_dag::ExecutionDag;
19
use super::{name::Name, receiver_loop::ReceiverLoop};
20

21
/// A sink in the execution DAG.
22
#[derive(Debug)]
×
23
pub struct SinkNode {
24
    /// Node handle in description DAG.
25
    node_handle: NodeHandle,
26
    /// The epoch id the sink was constructed for.
27
    initial_epoch_id: u64,
28
    /// Input port handles.
29
    port_handles: Vec<PortHandle>,
30
    /// Input data channels.
31
    receivers: Vec<Receiver<ExecutorOperation>>,
32
    /// The sink.
33
    sink: Box<dyn Sink>,
34
    /// Where all the records from ingested data are stored.
35
    epoch_manager: Arc<EpochManager>,
36
    /// The error manager, for reporting non-fatal errors.
37
    error_manager: Arc<ErrorManager>,
38
    /// The metrics labels.
39
    labels: LabelsAndProgress,
40
}
41

42
const SINK_OPERATION_COUNTER_NAME: &str = "sink_operation";
43
const PIPELINE_LATENCY_HISTOGRAM_NAME: &str = "pipeline_latency";
44

45
impl SinkNode {
46
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex, initial_epoch_id: u64) -> Self {
633✔
47
        let Some(node) = dag.node_weight_mut(node_index).take() else {
633✔
48
            panic!("Must pass in a node")
×
49
        };
50
        let node_handle = node.handle;
633✔
51
        let NodeKind::Sink(sink) = node.kind else {
633✔
52
            panic!("Must pass in a sink node");
×
53
        };
54

55
        let (port_handles, receivers) = dag.collect_receivers(node_index);
633✔
56

633✔
57
        describe_counter!(
633✔
58
            SINK_OPERATION_COUNTER_NAME,
×
59
            "Number of operation processed by the sink"
×
60
        );
61
        describe_histogram!(
633✔
62
            PIPELINE_LATENCY_HISTOGRAM_NAME,
×
63
            "The pipeline processing latency in seconds"
×
64
        );
65

66
        Self {
633✔
67
            node_handle,
633✔
68
            initial_epoch_id,
633✔
69
            port_handles,
633✔
70
            receivers,
633✔
71
            sink,
633✔
72
            epoch_manager: dag.epoch_manager().clone(),
633✔
73
            error_manager: dag.error_manager().clone(),
633✔
74
            labels: dag.labels().clone(),
633✔
75
        }
633✔
76
    }
633✔
77

78
    pub fn handle(&self) -> &NodeHandle {
633✔
79
        &self.node_handle
633✔
80
    }
633✔
81
}
82

83
impl Name for SinkNode {
84
    fn name(&self) -> Cow<str> {
×
85
        Cow::Owned(self.node_handle.to_string())
×
86
    }
×
87
}
88

89
impl ReceiverLoop for SinkNode {
90
    fn initial_epoch_id(&self) -> u64 {
633✔
91
        self.initial_epoch_id
633✔
92
    }
633✔
93

94
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
633✔
95
        let mut result = vec![];
633✔
96
        swap(&mut self.receivers, &mut result);
633✔
97
        result
633✔
98
    }
633✔
99

100
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
101
        Cow::Owned(self.port_handles[index].to_string())
×
102
    }
×
103

104
    fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError> {
3,574,451✔
105
        let mut labels = self.labels.labels().clone();
3,574,451✔
106
        labels.push("table", self.node_handle.id.clone());
3,574,451✔
107
        const OPERATION_TYPE_LABEL: &str = "operation_type";
3,574,451✔
108
        match &op {
3,574,451✔
109
            ProcessorOperation::Insert { .. } => {
3,492,459✔
110
                labels.push(OPERATION_TYPE_LABEL, "insert");
3,492,459✔
111
            }
3,492,459✔
112
            ProcessorOperation::Delete { .. } => {
880✔
113
                labels.push(OPERATION_TYPE_LABEL, "delete");
880✔
114
            }
880✔
115
            ProcessorOperation::Update { .. } => {
81,112✔
116
                labels.push(OPERATION_TYPE_LABEL, "update");
81,112✔
117
            }
81,112✔
118
        }
119

120
        if let Err(e) = self.sink.process(
3,574,451✔
121
            self.port_handles[index],
3,574,451✔
122
            self.epoch_manager.record_store(),
3,574,451✔
123
            op,
3,574,451✔
124
        ) {
3,574,451✔
125
            self.error_manager.report(e);
2✔
126
        }
3,574,449✔
127

128
        increment_counter!(SINK_OPERATION_COUNTER_NAME, labels);
3,574,451✔
129

130
        Ok(())
3,574,451✔
131
    }
3,574,451✔
132

133
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
134
        // debug!("[{}] Checkpointing - {}", self.node_handle, epoch);
135
        if let Err(e) = self.sink.commit(epoch) {
1,540✔
136
            self.error_manager.report(e);
×
137
        }
1,540✔
138

139
        if let Ok(duration) = epoch.decision_instant.elapsed() {
1,540✔
140
            let mut labels = self.labels.labels().clone();
1,540✔
141
            labels.push("endpoint", self.node_handle.id.clone());
1,540✔
142
            histogram!(PIPELINE_LATENCY_HISTOGRAM_NAME, duration, labels);
1,540✔
143
        }
×
144

145
        if let Some(checkpoint_writer) = epoch.common_info.checkpoint_writer.as_ref() {
1,540✔
146
            if let Err(e) = self.sink.persist(checkpoint_writer.queue()) {
30✔
147
                self.error_manager.report(e);
×
148
            }
30✔
149
        }
1,510✔
150

151
        Ok(())
1,540✔
152
    }
1,540✔
153

154
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
627✔
155
        Ok(())
627✔
156
    }
627✔
157

158
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
159
        if let Err(e) = self.sink.on_source_snapshotting_done(connection_name) {
48✔
160
            self.error_manager.report(e);
×
161
        }
48✔
162
        Ok(())
48✔
163
    }
48✔
164
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc