• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4360628700

pending completion
4360628700

push

github

GitHub
refactor: pipeline memory storage (#1135)

1152 of 1152 new or added lines in 40 files covered. (100.0%)

27472 of 39301 relevant lines covered (69.9%)

28369.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.97
/dozer-core/src/executor/sink_node.rs
1
use std::{borrow::Cow, collections::HashMap, mem::swap};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_storage::lmdb_storage::SharedTransaction;
6
use dozer_types::{log::debug, node::NodeHandle};
7

8
use crate::{
9
    builder_dag::NodeKind,
10
    epoch::Epoch,
11
    errors::ExecutionError,
12
    forwarder::StateWriter,
13
    node::{PortHandle, Sink},
14
};
15

16
use super::execution_dag::ExecutionDag;
17
use super::{name::Name, receiver_loop::ReceiverLoop, ExecutorOperation};
18

19
/// A sink in the execution DAG.
20
#[derive(Debug)]
×
21
pub struct SinkNode {
×
22
    /// Node handle in description DAG.
23
    node_handle: NodeHandle,
24
    /// Input port handles.
25
    port_handles: Vec<PortHandle>,
26
    /// Input data channels.
27
    receivers: Vec<Receiver<ExecutorOperation>>,
28
    /// The sink.
29
    sink: Box<dyn Sink>,
30
    /// The transaction for this node's environment. Sink uses it to persist data.
31
    master_tx: SharedTransaction,
32
    /// This node's state writer, for writing metadata and port state.
33
    state_writer: StateWriter,
34
}
35

36
impl SinkNode {
37
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
332✔
38
        let Some(node) = dag.node_weight_mut(node_index).take() else {
332✔
39
            panic!("Must pass in a node")
×
40
        };
×
41
        let node_handle = node.handle;
332✔
42
        let node_storage = node.storage;
332✔
43
        let NodeKind::Sink(sink) = node.kind else {
332✔
44
            panic!("Must pass in a sink node");
×
45
        };
×
46

×
47
        let (port_handles, receivers) = dag.collect_receivers(node_index);
332✔
48

332✔
49
        let state_writer = StateWriter::new(
332✔
50
            node_storage.meta_db,
332✔
51
            HashMap::new(),
332✔
52
            node_storage.master_txn.clone(),
332✔
53
        );
332✔
54

332✔
55
        Self {
332✔
56
            node_handle,
332✔
57
            port_handles,
332✔
58
            receivers,
332✔
59
            sink,
332✔
60
            master_tx: node_storage.master_txn,
332✔
61
            state_writer,
332✔
62
        }
332✔
63
    }
332✔
64

×
65
    pub fn handle(&self) -> &NodeHandle {
332✔
66
        &self.node_handle
332✔
67
    }
332✔
68
}
×
69

70
impl Name for SinkNode {
×
71
    fn name(&self) -> Cow<str> {
×
72
        Cow::Owned(self.node_handle.to_string())
×
73
    }
×
74
}
75

76
impl ReceiverLoop for SinkNode {
×
77
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
332✔
78
        let mut result = vec![];
332✔
79
        swap(&mut self.receivers, &mut result);
332✔
80
        result
332✔
81
    }
332✔
82

×
83
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
84
        Cow::Owned(self.port_handles[index].to_string())
×
85
    }
×
86

×
87
    fn on_op(
3,953,301✔
88
        &mut self,
3,953,301✔
89
        index: usize,
3,953,301✔
90
        op: dozer_types::types::Operation,
3,953,301✔
91
    ) -> Result<(), ExecutionError> {
3,953,301✔
92
        self.sink
3,953,301✔
93
            .process(self.port_handles[index], op, &self.master_tx)
3,953,301✔
94
    }
3,953,301✔
95

×
96
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
1,035✔
97
        debug!("[{}] Checkpointing - {}", self.node_handle, epoch);
1,035✔
98
        self.sink.commit(epoch, &self.master_tx)?;
1,035✔
99
        self.state_writer.store_commit_info(epoch)
1,035✔
100
    }
1,035✔
101

×
102
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
327✔
103
        Ok(())
327✔
104
    }
327✔
105

×
106
    fn on_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
107
        self.sink.on_source_snapshotting_done()
×
108
    }
×
109
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc