• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5630015928

pending completion
5630015928

push

github

web-flow
Bump version (#1779)

42841 of 55898 relevant lines covered (76.64%)

32850.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.05
/dozer-core/src/executor/sink_node.rs
1
use std::{borrow::Cow, collections::HashMap, mem::swap, sync::Arc};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_types::{
6
    epoch::{Epoch, ExecutorOperation},
7
    log::debug,
8
    node::NodeHandle,
9
};
10
use metrics::{describe_histogram, histogram};
11

12
use crate::{
13
    builder_dag::NodeKind,
14
    error_manager::ErrorManager,
15
    errors::ExecutionError,
16
    forwarder::StateWriter,
17
    node::{PortHandle, Sink},
18
};
19

20
use super::execution_dag::ExecutionDag;
21
use super::{name::Name, receiver_loop::ReceiverLoop};
22

23
/// A sink in the execution DAG.
24
#[derive(Debug)]
×
25
pub struct SinkNode {
26
    /// Node handle in description DAG.
27
    node_handle: NodeHandle,
28
    /// Input port handles.
29
    port_handles: Vec<PortHandle>,
30
    /// Input data channels.
31
    receivers: Vec<Receiver<ExecutorOperation>>,
32
    /// The sink.
33
    sink: Box<dyn Sink>,
34
    /// This node's state writer, for writing metadata and port state.
35
    state_writer: StateWriter,
36
    /// The error manager, for reporting non-fatal errors.
37
    error_manager: Arc<ErrorManager>,
38
}
39

40
const PIPELINE_LATENCY_HISTOGRAM_NAME: &str = "pipeline_latency";
41

42
impl SinkNode {
43
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
387✔
44
        let Some(node) = dag.node_weight_mut(node_index).take() else {
387✔
45
            panic!("Must pass in a node")
×
46
        };
47
        let node_handle = node.handle;
387✔
48
        let NodeKind::Sink(sink) = node.kind else {
387✔
49
            panic!("Must pass in a sink node");
×
50
        };
51

52
        let (port_handles, receivers) = dag.collect_receivers(node_index);
387✔
53

387✔
54
        let state_writer = StateWriter::new(HashMap::new());
387✔
55

387✔
56
        describe_histogram!(
387✔
57
            PIPELINE_LATENCY_HISTOGRAM_NAME,
×
58
            "The pipeline processing latency in seconds"
×
59
        );
60

61
        Self {
387✔
62
            node_handle,
387✔
63
            port_handles,
387✔
64
            receivers,
387✔
65
            sink,
387✔
66
            state_writer,
387✔
67
            error_manager: dag.error_manager().clone(),
387✔
68
        }
387✔
69
    }
387✔
70

71
    pub fn handle(&self) -> &NodeHandle {
387✔
72
        &self.node_handle
387✔
73
    }
387✔
74
}
75

76
impl Name for SinkNode {
77
    fn name(&self) -> Cow<str> {
×
78
        Cow::Owned(self.node_handle.to_string())
×
79
    }
×
80
}
81

82
impl ReceiverLoop for SinkNode {
83
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
387✔
84
        let mut result = vec![];
387✔
85
        swap(&mut self.receivers, &mut result);
387✔
86
        result
387✔
87
    }
387✔
88

89
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
90
        Cow::Owned(self.port_handles[index].to_string())
×
91
    }
×
92

93
    fn on_op(
94
        &mut self,
95
        index: usize,
96
        op: dozer_types::types::Operation,
97
    ) -> Result<(), ExecutionError> {
98
        if let Err(e) = self.sink.process(self.port_handles[index], op) {
3,770,030✔
99
            self.error_manager.report(e);
3,311✔
100
        }
3,766,719✔
101
        Ok(())
3,770,030✔
102
    }
3,770,030✔
103

104
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
830✔
105
        debug!("[{}] Checkpointing - {}", self.node_handle, epoch);
830✔
106
        if let Err(e) = self.sink.commit(epoch) {
830✔
107
            self.error_manager.report(e);
×
108
        }
830✔
109
        self.state_writer.store_commit_info(epoch)?;
830✔
110

111
        if let Ok(duration) = epoch.decision_instant.elapsed() {
830✔
112
            histogram!(PIPELINE_LATENCY_HISTOGRAM_NAME, duration, "endpoint" => self.node_handle.id.clone());
830✔
113
        }
×
114

115
        Ok(())
830✔
116
    }
830✔
117

118
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
382✔
119
        Ok(())
382✔
120
    }
382✔
121

122
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
123
        if let Err(e) = self.sink.on_source_snapshotting_done(connection_name) {
30✔
124
            self.error_manager.report(e);
×
125
        }
30✔
126
        Ok(())
30✔
127
    }
30✔
128
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc