• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4302087115

pending completion
4302087115

push

github

GitHub
chore: Move `SnapshottingDone` out of `Operation` so processors don't have to know it.(#1103)

364 of 364 new or added lines in 33 files covered. (100.0%)

28623 of 40224 relevant lines covered (71.16%)

56785.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/dozer-core/src/executor/processor_node.rs
1
use std::{borrow::Cow, collections::HashMap, mem::swap};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_storage::lmdb_storage::SharedTransaction;
6
use dozer_types::node::NodeHandle;
7

8
use crate::{
9
    builder_dag::NodeKind,
10
    errors::ExecutionError,
11
    forwarder::{ProcessorChannelManager, StateWriter},
12
    node::{PortHandle, Processor},
13
    record_store::RecordReader,
14
};
15

16
use super::{
17
    execution_dag::ExecutionDag, name::Name, receiver_loop::ReceiverLoop, ExecutorOperation,
18
};
19

20
/// A processor in the execution DAG.
21
#[derive(Debug)]
×
22
pub struct ProcessorNode {
23
    /// Node handle in description DAG.
24
    node_handle: NodeHandle,
25
    /// Input port handles.
26
    port_handles: Vec<PortHandle>,
27
    /// Input data channels.
28
    receivers: Vec<Receiver<ExecutorOperation>>,
29
    /// The processor.
30
    processor: Box<dyn Processor>,
31
    /// Record readers of the input ports. Every record reader reads the state of corresponding output port.
32
    record_readers: HashMap<PortHandle, Box<dyn RecordReader>>,
33
    /// The transaction for this node's environment. Processor uses it to persist data.
34
    master_tx: SharedTransaction,
35
    /// This node's output channel manager, for forwarding data, writing metadata and writing port state.
36
    channel_manager: ProcessorChannelManager,
37
}
38

39
impl ProcessorNode {
40
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
1,017✔
41
        let node = dag.node_weight_mut(node_index);
1,017✔
42
        let node_handle = node.handle.clone();
1,017✔
43
        let node_storage = node.storage.clone();
1,017✔
44
        let Some(NodeKind::Processor(processor)) = node.kind.take() else {
1,017✔
45
            panic!("Must pass in a processor node");
×
46
        };
47

48
        let (port_handles, receivers, record_readers) =
1,017✔
49
            dag.collect_receivers_and_record_readers(node_index);
1,017✔
50

1,017✔
51
        let (senders, record_writers) = dag.collect_senders_and_record_writers(node_index);
1,017✔
52

1,017✔
53
        let state_writer = StateWriter::new(
1,017✔
54
            node_storage.meta_db,
1,017✔
55
            record_writers,
1,017✔
56
            node_storage.master_txn.clone(),
1,017✔
57
        );
1,017✔
58
        let channel_manager =
1,017✔
59
            ProcessorChannelManager::new(node_handle.clone(), senders, state_writer, true);
1,017✔
60

1,017✔
61
        Self {
1,017✔
62
            node_handle,
1,017✔
63
            port_handles,
1,017✔
64
            receivers,
1,017✔
65
            processor,
1,017✔
66
            record_readers,
1,017✔
67
            master_tx: node_storage.master_txn,
1,017✔
68
            channel_manager,
1,017✔
69
        }
1,017✔
70
    }
1,017✔
71

72
    pub fn handle(&self) -> &NodeHandle {
1,017✔
73
        &self.node_handle
1,017✔
74
    }
1,017✔
75
}
76

77
impl Name for ProcessorNode {
78
    fn name(&self) -> Cow<str> {
×
79
        Cow::Owned(self.node_handle.to_string())
×
80
    }
×
81
}
82

83
impl ReceiverLoop for ProcessorNode {
84
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
1,017✔
85
        let mut result = vec![];
1,017✔
86
        swap(&mut self.receivers, &mut result);
1,017✔
87
        result
1,017✔
88
    }
1,017✔
89

90
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
91
        Cow::Owned(self.port_handles[index].to_string())
×
92
    }
×
93

94
    fn on_op(
6,311,744✔
95
        &mut self,
6,311,744✔
96
        index: usize,
6,311,744✔
97
        op: dozer_types::types::Operation,
6,311,744✔
98
    ) -> Result<(), ExecutionError> {
6,311,744✔
99
        self.processor.process(
6,311,744✔
100
            self.port_handles[index],
6,311,744✔
101
            op,
6,311,744✔
102
            &mut self.channel_manager,
6,311,744✔
103
            &self.master_tx,
6,311,744✔
104
            &self.record_readers,
6,311,744✔
105
        )
6,311,744✔
106
    }
6,311,744✔
107

108
    fn on_commit(&mut self, epoch: &crate::epoch::Epoch) -> Result<(), ExecutionError> {
109
        self.processor.commit(epoch, &self.master_tx)?;
2,651✔
110
        self.channel_manager.store_and_send_commit(epoch)
2,651✔
111
    }
2,651✔
112

113
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
1,010✔
114
        self.channel_manager.send_terminate()
1,010✔
115
    }
1,010✔
116

117
    fn on_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
×
118
        self.channel_manager.send_snapshotting_done()
×
119
    }
×
120
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc