• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.01
/dozer-core/src/executor/sink_node.rs
1
use std::{borrow::Cow, mem::swap, sync::Arc};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_types::{log::debug, node::NodeHandle};
6
use metrics::{describe_histogram, histogram};
7

8
use crate::{
9
    builder_dag::NodeKind,
10
    epoch::{Epoch, EpochManager},
11
    error_manager::ErrorManager,
12
    errors::ExecutionError,
13
    executor_operation::{ExecutorOperation, ProcessorOperation},
14
    node::{PortHandle, Sink},
15
};
16

17
use super::execution_dag::ExecutionDag;
18
use super::{name::Name, receiver_loop::ReceiverLoop};
19

20
/// A sink in the execution DAG.
21
#[derive(Debug)]
×
22
pub struct SinkNode {
23
    /// Node handle in description DAG.
24
    node_handle: NodeHandle,
25
    /// Input port handles.
26
    port_handles: Vec<PortHandle>,
27
    /// Input data channels.
28
    receivers: Vec<Receiver<ExecutorOperation>>,
29
    /// The sink.
30
    sink: Box<dyn Sink>,
31
    /// Where all the records from ingested data are stored.
32
    epoch_manager: Arc<EpochManager>,
33
    /// The error manager, for reporting non-fatal errors.
34
    error_manager: Arc<ErrorManager>,
35
}
36

37
const PIPELINE_LATENCY_HISTOGRAM_NAME: &str = "pipeline_latency";
38

39
impl SinkNode {
40
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
387✔
41
        let Some(node) = dag.node_weight_mut(node_index).take() else {
387✔
42
            panic!("Must pass in a node")
×
43
        };
44
        let node_handle = node.handle;
387✔
45
        let NodeKind::Sink(sink) = node.kind else {
387✔
46
            panic!("Must pass in a sink node");
×
47
        };
48

49
        let (port_handles, receivers) = dag.collect_receivers(node_index);
387✔
50

387✔
51
        describe_histogram!(
387✔
52
            PIPELINE_LATENCY_HISTOGRAM_NAME,
×
53
            "The pipeline processing latency in seconds"
×
54
        );
55

56
        Self {
387✔
57
            node_handle,
387✔
58
            port_handles,
387✔
59
            receivers,
387✔
60
            sink,
387✔
61
            epoch_manager: dag.epoch_manager().clone(),
387✔
62
            error_manager: dag.error_manager().clone(),
387✔
63
        }
387✔
64
    }
387✔
65

66
    pub fn handle(&self) -> &NodeHandle {
387✔
67
        &self.node_handle
387✔
68
    }
387✔
69
}
70

71
impl Name for SinkNode {
72
    fn name(&self) -> Cow<str> {
×
73
        Cow::Owned(self.node_handle.to_string())
×
74
    }
×
75
}
76

77
impl ReceiverLoop for SinkNode {
78
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
387✔
79
        let mut result = vec![];
387✔
80
        swap(&mut self.receivers, &mut result);
387✔
81
        result
387✔
82
    }
387✔
83

84
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
85
        Cow::Owned(self.port_handles[index].to_string())
×
86
    }
×
87

88
    fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError> {
89
        if let Err(e) = self.sink.process(
3,689,897✔
90
            self.port_handles[index],
3,689,897✔
91
            self.epoch_manager.record_store(),
3,689,897✔
92
            op,
3,689,897✔
93
        ) {
3,689,897✔
94
            self.error_manager.report(e);
2✔
95
        }
3,689,895✔
96
        Ok(())
3,689,897✔
97
    }
3,689,897✔
98

99
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
945✔
100
        debug!("[{}] Checkpointing - {}", self.node_handle, epoch);
945✔
101
        if let Err(e) = self.sink.commit(epoch) {
945✔
102
            self.error_manager.report(e);
×
103
        }
945✔
104

105
        self.epoch_manager.finalize_epoch(epoch);
945✔
106

×
107
        if let Ok(duration) = epoch.decision_instant.elapsed() {
945✔
108
            histogram!(PIPELINE_LATENCY_HISTOGRAM_NAME, duration, "endpoint" => self.node_handle.id.clone());
945✔
109
        }
×
110

×
111
        Ok(())
945✔
112
    }
945✔
113

×
114
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
382✔
115
        Ok(())
382✔
116
    }
382✔
117

118
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
×
119
        if let Err(e) = self.sink.on_source_snapshotting_done(connection_name) {
30✔
120
            self.error_manager.report(e);
×
121
        }
30✔
122
        Ok(())
30✔
123
    }
30✔
124
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc