• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4302087115

pending completion
4302087115

push

github

GitHub
chore: Move `SnapshottingDone` out of `Operation` so processors don't have to know it.(#1103)

364 of 364 new or added lines in 33 files covered. (100.0%)

28623 of 40224 relevant lines covered (71.16%)

56785.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.54
/dozer-core/src/executor/sink_node.rs
1
use std::{borrow::Cow, collections::HashMap, mem::swap};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_storage::lmdb_storage::SharedTransaction;
6
use dozer_types::{log::debug, node::NodeHandle};
7

8
use crate::{
9
    builder_dag::NodeKind,
10
    epoch::Epoch,
11
    errors::ExecutionError,
12
    forwarder::StateWriter,
13
    node::{PortHandle, Sink},
14
    record_store::RecordReader,
15
};
16

17
use super::execution_dag::ExecutionDag;
18
use super::{name::Name, receiver_loop::ReceiverLoop, ExecutorOperation};
19

20
/// A sink in the execution DAG.
21
#[derive(Debug)]
×
22
pub struct SinkNode {
23
    /// Node handle in description DAG.
24
    node_handle: NodeHandle,
25
    /// Input port handles.
26
    port_handles: Vec<PortHandle>,
27
    /// Input data channels.
28
    receivers: Vec<Receiver<ExecutorOperation>>,
29
    /// The sink.
30
    sink: Box<dyn Sink>,
31
    /// Record readers of the input ports. Every record reader reads the state of corresponding output port.
32
    record_readers: HashMap<PortHandle, Box<dyn RecordReader>>,
33
    /// The transaction for this node's environment. Sink uses it to persist data.
34
    master_tx: SharedTransaction,
35
    /// This node's state writer, for writing metadata and port state.
36
    state_writer: StateWriter,
37
}
38

39
impl SinkNode {
40
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
336✔
41
        let node = dag.node_weight_mut(node_index);
336✔
42
        let node_handle = node.handle.clone();
336✔
43
        let node_storage = node.storage.clone();
336✔
44
        let Some(NodeKind::Sink(sink)) = node.kind.take() else {
336✔
45
            panic!("Must pass in a sink node");
×
46
        };
47

48
        let (port_handles, receivers, record_readers) =
336✔
49
            dag.collect_receivers_and_record_readers(node_index);
336✔
50

336✔
51
        let state_writer = StateWriter::new(
336✔
52
            node_storage.meta_db,
336✔
53
            HashMap::new(),
336✔
54
            node_storage.master_txn.clone(),
336✔
55
        );
336✔
56

336✔
57
        Self {
336✔
58
            node_handle,
336✔
59
            port_handles,
336✔
60
            receivers,
336✔
61
            sink,
336✔
62
            record_readers,
336✔
63
            master_tx: node_storage.master_txn,
336✔
64
            state_writer,
336✔
65
        }
336✔
66
    }
336✔
67

68
    pub fn handle(&self) -> &NodeHandle {
336✔
69
        &self.node_handle
336✔
70
    }
336✔
71
}
72

73
impl Name for SinkNode {
74
    fn name(&self) -> Cow<str> {
×
75
        Cow::Owned(self.node_handle.to_string())
×
76
    }
×
77
}
78

79
impl ReceiverLoop for SinkNode {
80
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
336✔
81
        let mut result = vec![];
336✔
82
        swap(&mut self.receivers, &mut result);
336✔
83
        result
336✔
84
    }
336✔
85

86
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
87
        Cow::Owned(self.port_handles[index].to_string())
×
88
    }
×
89

90
    fn on_op(
4,111,819✔
91
        &mut self,
4,111,819✔
92
        index: usize,
4,111,819✔
93
        op: dozer_types::types::Operation,
4,111,819✔
94
    ) -> Result<(), ExecutionError> {
4,111,819✔
95
        self.sink.process(
4,111,819✔
96
            self.port_handles[index],
4,111,819✔
97
            op,
4,111,819✔
98
            &self.master_tx,
4,111,819✔
99
            &self.record_readers,
4,111,819✔
100
        )
4,111,819✔
101
    }
4,111,819✔
102

103
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
1,318✔
104
        debug!("[{}] Checkpointing - {}", self.node_handle, epoch);
1,318✔
105
        self.sink.commit(epoch, &self.master_tx)?;
1,318✔
106
        self.state_writer.store_commit_info(epoch)
1,318✔
107
    }
1,318✔
108

109
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
331✔
110
        Ok(())
331✔
111
    }
331✔
112

113
    fn on_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
114
        self.sink.on_source_snapshotting_done()
×
115
    }
×
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc