• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5939715234

22 Aug 2023 01:47PM UTC coverage: 74.755% (-1.3%) from 76.052%
5939715234

push

github

web-flow
chore: Run e2e tests nightly (#1886)

* chore: Run e2e tests nightly

* chore: Run Dozer CI on default runners

46459 of 62148 relevant lines covered (74.76%)

40132.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.24
/dozer-core/src/executor/sink_node.rs
1
use std::{borrow::Cow, mem::swap, sync::Arc};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_types::{log::debug, node::NodeHandle};
6
use metrics::{describe_histogram, histogram};
7

8
use crate::{
9
    builder_dag::NodeKind,
10
    epoch::{Epoch, EpochManager},
11
    error_manager::ErrorManager,
12
    errors::ExecutionError,
13
    executor_operation::{ExecutorOperation, ProcessorOperation},
14
    node::{PortHandle, Sink},
15
};
16

17
use super::execution_dag::ExecutionDag;
18
use super::{name::Name, receiver_loop::ReceiverLoop};
19

20
/// A sink in the execution DAG.
21
#[derive(Debug)]
×
22
pub struct SinkNode {
23
    /// Node handle in description DAG.
24
    node_handle: NodeHandle,
25
    /// Input port handles.
26
    port_handles: Vec<PortHandle>,
27
    /// Input data channels.
28
    receivers: Vec<Receiver<ExecutorOperation>>,
29
    /// The sink.
30
    sink: Box<dyn Sink>,
31
    /// Where all the records from ingested data are stored.
32
    epoch_manager: Arc<EpochManager>,
33
    /// The error manager, for reporting non-fatal errors.
34
    error_manager: Arc<ErrorManager>,
35
}
36

37
const PIPELINE_LATENCY_HISTOGRAM_NAME: &str = "pipeline_latency";
38

39
impl SinkNode {
40
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
387✔
41
        let Some(node) = dag.node_weight_mut(node_index).take() else {
387✔
42
            panic!("Must pass in a node")
×
43
        };
44
        let node_handle = node.handle;
387✔
45
        let NodeKind::Sink(sink) = node.kind else {
387✔
46
            panic!("Must pass in a sink node");
×
47
        };
48

49
        let (port_handles, receivers) = dag.collect_receivers(node_index);
387✔
50

387✔
51
        describe_histogram!(
387✔
52
            PIPELINE_LATENCY_HISTOGRAM_NAME,
×
53
            "The pipeline processing latency in seconds"
×
54
        );
55

56
        Self {
387✔
57
            node_handle,
387✔
58
            port_handles,
387✔
59
            receivers,
387✔
60
            sink,
387✔
61
            epoch_manager: dag.epoch_manager().clone(),
387✔
62
            error_manager: dag.error_manager().clone(),
387✔
63
        }
387✔
64
    }
387✔
65

66
    pub fn handle(&self) -> &NodeHandle {
387✔
67
        &self.node_handle
387✔
68
    }
387✔
69
}
70

71
impl Name for SinkNode {
72
    fn name(&self) -> Cow<str> {
×
73
        Cow::Owned(self.node_handle.to_string())
×
74
    }
×
75
}
76

77
impl ReceiverLoop for SinkNode {
78
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
387✔
79
        let mut result = vec![];
387✔
80
        swap(&mut self.receivers, &mut result);
387✔
81
        result
387✔
82
    }
387✔
83

84
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
85
        Cow::Owned(self.port_handles[index].to_string())
×
86
    }
×
87

88
    fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError> {
89
        if let Err(e) = self.sink.process(
3,686,414✔
90
            self.port_handles[index],
3,686,414✔
91
            self.epoch_manager.record_store(),
3,686,414✔
92
            op,
3,686,414✔
93
        ) {
3,686,414✔
94
            self.error_manager.report(e);
2✔
95
        }
3,686,412✔
96
        Ok(())
3,686,414✔
97
    }
3,686,414✔
98

99
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
961✔
100
        debug!("[{}] Checkpointing - {}", self.node_handle, epoch);
961✔
101
        if let Err(e) = self.sink.commit(epoch) {
961✔
102
            self.error_manager.report(e);
×
103
        }
961✔
104

105
        if let Ok(duration) = epoch.decision_instant.elapsed() {
961✔
106
            histogram!(PIPELINE_LATENCY_HISTOGRAM_NAME, duration, "endpoint" => self.node_handle.id.clone());
961✔
107
        }
×
108

×
109
        if let Some(checkpoint_writer) = epoch.common_info.checkpoint_writer.as_ref() {
961✔
110
            if let Err(e) = self.sink.persist(checkpoint_writer.queue()) {
31✔
111
                self.error_manager.report(e);
×
112
            }
31✔
113
        }
930✔
114

×
115
        Ok(())
961✔
116
    }
961✔
117

118
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
382✔
119
        Ok(())
382✔
120
    }
382✔
121

×
122
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
×
123
        if let Err(e) = self.sink.on_source_snapshotting_done(connection_name) {
30✔
124
            self.error_manager.report(e);
×
125
        }
30✔
126
        Ok(())
30✔
127
    }
30✔
128
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc