• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4315007357

pending completion
4315007357

push

github

GitHub
fix: Sink should only be built after all source checkpoints are checked (#1112)

280 of 280 new or added lines in 24 files covered. (100.0%)

28292 of 38914 relevant lines covered (72.7%)

64132.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.53
/dozer-core/src/executor/sink_node.rs
1
use std::{borrow::Cow, collections::HashMap, mem::swap};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_storage::lmdb_storage::SharedTransaction;
6
use dozer_types::{log::debug, node::NodeHandle};
7

8
use crate::{
9
    builder_dag::NodeKind,
10
    epoch::Epoch,
11
    errors::ExecutionError,
12
    forwarder::StateWriter,
13
    node::{PortHandle, Sink},
14
    record_store::RecordReader,
15
};
16

17
use super::execution_dag::ExecutionDag;
18
use super::{name::Name, receiver_loop::ReceiverLoop, ExecutorOperation};
19

20
/// A sink in the execution DAG.
21
#[derive(Debug)]
×
22
pub struct SinkNode {
23
    /// Node handle in description DAG.
24
    node_handle: NodeHandle,
25
    /// Input port handles.
26
    port_handles: Vec<PortHandle>,
27
    /// Input data channels.
28
    receivers: Vec<Receiver<ExecutorOperation>>,
29
    /// The sink.
30
    sink: Box<dyn Sink>,
31
    /// Record readers of the input ports. Every record reader reads the state of corresponding output port.
32
    record_readers: HashMap<PortHandle, Box<dyn RecordReader>>,
33
    /// The transaction for this node's environment. Sink uses it to persist data.
34
    master_tx: SharedTransaction,
35
    /// This node's state writer, for writing metadata and port state.
36
    state_writer: StateWriter,
37
}
38

39
impl SinkNode {
40
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
336✔
41
        let Some(node) = dag.node_weight_mut(node_index).take() else {
336✔
42
            panic!("Must pass in a node")
×
43
        };
×
44
        let node_handle = node.handle;
336✔
45
        let node_storage = node.storage;
336✔
46
        let NodeKind::Sink(sink) = node.kind else {
336✔
47
            panic!("Must pass in a sink node");
×
48
        };
×
49

×
50
        let (port_handles, receivers, record_readers) =
336✔
51
            dag.collect_receivers_and_record_readers(node_index);
336✔
52

336✔
53
        let state_writer = StateWriter::new(
336✔
54
            node_storage.meta_db,
336✔
55
            HashMap::new(),
336✔
56
            node_storage.master_txn.clone(),
336✔
57
        );
336✔
58

336✔
59
        Self {
336✔
60
            node_handle,
336✔
61
            port_handles,
336✔
62
            receivers,
336✔
63
            sink,
336✔
64
            record_readers,
336✔
65
            master_tx: node_storage.master_txn,
336✔
66
            state_writer,
336✔
67
        }
336✔
68
    }
336✔
69

×
70
    pub fn handle(&self) -> &NodeHandle {
336✔
71
        &self.node_handle
336✔
72
    }
336✔
73
}
74

×
75
impl Name for SinkNode {
×
76
    fn name(&self) -> Cow<str> {
×
77
        Cow::Owned(self.node_handle.to_string())
×
78
    }
×
79
}
80

×
81
impl ReceiverLoop for SinkNode {
×
82
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
336✔
83
        let mut result = vec![];
336✔
84
        swap(&mut self.receivers, &mut result);
336✔
85
        result
336✔
86
    }
336✔
87

×
88
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
89
        Cow::Owned(self.port_handles[index].to_string())
×
90
    }
×
91

×
92
    fn on_op(
4,021,939✔
93
        &mut self,
4,021,939✔
94
        index: usize,
4,021,939✔
95
        op: dozer_types::types::Operation,
4,021,939✔
96
    ) -> Result<(), ExecutionError> {
4,021,939✔
97
        self.sink.process(
4,021,939✔
98
            self.port_handles[index],
4,021,939✔
99
            op,
4,021,939✔
100
            &self.master_tx,
4,021,939✔
101
            &self.record_readers,
4,021,939✔
102
        )
4,021,939✔
103
    }
4,021,939✔
104

×
105
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
1,615✔
106
        debug!("[{}] Checkpointing - {}", self.node_handle, epoch);
1,615✔
107
        self.sink.commit(epoch, &self.master_tx)?;
1,615✔
108
        self.state_writer.store_commit_info(epoch)
1,615✔
109
    }
1,615✔
110

×
111
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
331✔
112
        Ok(())
331✔
113
    }
331✔
114

×
115
    fn on_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
116
        self.sink.on_source_snapshotting_done()
×
117
    }
×
118
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc