• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4315007357

pending completion
4315007357

push

github

GitHub
fix: Sink should only be built after all source checkpoints are checked (#1112)

280 of 280 new or added lines in 24 files covered. (100.0%)

28292 of 38914 relevant lines covered (72.7%)

64132.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.9
/dozer-core/src/executor/processor_node.rs
1
use std::{borrow::Cow, collections::HashMap, mem::swap};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_storage::lmdb_storage::SharedTransaction;
6
use dozer_types::node::NodeHandle;
7

8
use crate::{
9
    builder_dag::NodeKind,
10
    errors::ExecutionError,
11
    forwarder::{ProcessorChannelManager, StateWriter},
12
    node::{PortHandle, Processor},
13
    record_store::RecordReader,
14
};
15

16
use super::{
17
    execution_dag::ExecutionDag, name::Name, receiver_loop::ReceiverLoop, ExecutorOperation,
18
};
19

20
/// A processor in the execution DAG.
21
#[derive(Debug)]
×
22
pub struct ProcessorNode {
23
    /// Node handle in description DAG.
24
    node_handle: NodeHandle,
25
    /// Input port handles.
26
    port_handles: Vec<PortHandle>,
27
    /// Input data channels.
28
    receivers: Vec<Receiver<ExecutorOperation>>,
29
    /// The processor.
30
    processor: Box<dyn Processor>,
31
    /// Record readers of the input ports. Every record reader reads the state of corresponding output port.
32
    record_readers: HashMap<PortHandle, Box<dyn RecordReader>>,
33
    /// The transaction for this node's environment. Processor uses it to persist data.
34
    master_tx: SharedTransaction,
35
    /// This node's output channel manager, for forwarding data, writing metadata and writing port state.
36
    channel_manager: ProcessorChannelManager,
37
}
38

39
impl ProcessorNode {
40
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
1,017✔
41
        let Some(node) = dag.node_weight_mut(node_index).take() else {
1,017✔
42
            panic!("Must pass in a node")
×
43
        };
×
44
        let node_handle = node.handle;
1,017✔
45
        let node_storage = node.storage;
1,017✔
46
        let NodeKind::Processor(processor) = node.kind else {
1,017✔
47
            panic!("Must pass in a processor node");
×
48
        };
×
49

×
50
        let (port_handles, receivers, record_readers) =
1,017✔
51
            dag.collect_receivers_and_record_readers(node_index);
1,017✔
52

1,017✔
53
        let (senders, record_writers) = dag.collect_senders_and_record_writers(node_index);
1,017✔
54

1,017✔
55
        let state_writer = StateWriter::new(
1,017✔
56
            node_storage.meta_db,
1,017✔
57
            record_writers,
1,017✔
58
            node_storage.master_txn.clone(),
1,017✔
59
        );
1,017✔
60
        let channel_manager =
1,017✔
61
            ProcessorChannelManager::new(node_handle.clone(), senders, state_writer, true);
1,017✔
62

1,017✔
63
        Self {
1,017✔
64
            node_handle,
1,017✔
65
            port_handles,
1,017✔
66
            receivers,
1,017✔
67
            processor,
1,017✔
68
            record_readers,
1,017✔
69
            master_tx: node_storage.master_txn,
1,017✔
70
            channel_manager,
1,017✔
71
        }
1,017✔
72
    }
1,017✔
73

×
74
    pub fn handle(&self) -> &NodeHandle {
1,017✔
75
        &self.node_handle
1,017✔
76
    }
1,017✔
77
}
78

×
79
impl Name for ProcessorNode {
×
80
    fn name(&self) -> Cow<str> {
×
81
        Cow::Owned(self.node_handle.to_string())
×
82
    }
×
83
}
84

×
85
impl ReceiverLoop for ProcessorNode {
×
86
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
1,017✔
87
        let mut result = vec![];
1,017✔
88
        swap(&mut self.receivers, &mut result);
1,017✔
89
        result
1,017✔
90
    }
1,017✔
91

×
92
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
93
        Cow::Owned(self.port_handles[index].to_string())
×
94
    }
×
95

×
96
    fn on_op(
5,885,551✔
97
        &mut self,
5,885,551✔
98
        index: usize,
5,885,551✔
99
        op: dozer_types::types::Operation,
5,885,551✔
100
    ) -> Result<(), ExecutionError> {
5,885,551✔
101
        self.processor.process(
5,885,551✔
102
            self.port_handles[index],
5,885,551✔
103
            op,
5,885,551✔
104
            &mut self.channel_manager,
5,885,551✔
105
            &self.master_tx,
5,885,551✔
106
            &self.record_readers,
5,885,551✔
107
        )
5,885,551✔
108
    }
5,885,551✔
109

×
110
    fn on_commit(&mut self, epoch: &crate::epoch::Epoch) -> Result<(), ExecutionError> {
×
111
        self.processor.commit(epoch, &self.master_tx)?;
2,902✔
112
        self.channel_manager.store_and_send_commit(epoch)
2,902✔
113
    }
2,902✔
114

×
115
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
1,010✔
116
        self.channel_manager.send_terminate()
1,010✔
117
    }
1,010✔
118

×
119
    fn on_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
120
        self.channel_manager.send_snapshotting_done()
×
121
    }
×
122
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc