• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3968308782

pending completion
3968308782

push

github

GitHub
fix: implement record versioning in `RecordWriter` and `RecordReader` (#682)

343 of 343 new or added lines in 18 files covered. (100.0%)

22167 of 33175 relevant lines covered (66.82%)

33425.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.49
/dozer-core/src/dag/executor/processor_node.rs
1
use std::{borrow::Cow, collections::HashMap, mem::swap, path::Path, sync::Arc};
2

3
use crossbeam::channel::{Receiver, Sender};
4
use dozer_types::parking_lot::RwLock;
5

6
use crate::{
7
    dag::{
8
        dag::Edge,
9
        dag_schemas::NodeSchemas,
10
        errors::ExecutionError,
11
        executor_utils::{
12
            build_receivers_lists, create_ports_databases_and_fill_downstream_record_readers,
13
            init_component,
14
        },
15
        forwarder::{ProcessorChannelManager, StateWriter},
16
        node::{NodeHandle, PortHandle, Processor, ProcessorFactory},
17
        record_store::RecordReader,
18
    },
19
    storage::lmdb_storage::SharedTransaction,
20
};
21

22
use super::{name::Name, receiver_loop::ReceiverLoop, ExecutorOperation};
23

24
/// A processor in the execution DAG.
25
#[derive(Debug)]
×
26
pub struct ProcessorNode {
27
    /// Node handle in description DAG.
28
    node_handle: NodeHandle,
29
    /// Input port handles.
30
    port_handles: Vec<PortHandle>,
31
    /// Input data channels.
32
    receivers: Vec<Receiver<ExecutorOperation>>,
33
    /// The processor.
34
    processor: Box<dyn Processor>,
35
    /// Record readers of all stateful ports. Using `self.node_handle`, we can find the record readers of our stateful inputs.
36
    record_readers: Arc<RwLock<HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>>>,
37
    /// The transaction for this node's environment. Processor uses it to persist data.
38
    master_tx: SharedTransaction,
39
    /// This node's output channel manager, for forwarding data, writing metadata and writing port state.
40
    channel_manager: ProcessorChannelManager,
41
}
42

43
impl ProcessorNode {
44
    /// # Arguments
45
    ///
46
    /// - `node_handle`: Node handle in description DAG.
47
    /// - `processor_factory`: Processor factory in description DAG.
48
    /// - `base_path`: Base path of persisted data for the last execution of the description DAG.
49
    /// - `record_readers`: Record readers of all stateful ports.
50
    /// - `receivers`: Input channels to this processor.
51
    /// - `senders`: Output channels from this processor.
52
    /// - `edges`: All edges in the description DAG, used for creating record readers for input ports which is connected to this processor's stateful output ports.
53
    /// - `node_schemas`: Input and output data schemas.
54
    #[allow(clippy::too_many_arguments)]
55
    pub fn new(
298✔
56
        node_handle: NodeHandle,
298✔
57
        processor_factory: &dyn ProcessorFactory,
298✔
58
        base_path: &Path,
298✔
59
        record_readers: Arc<
298✔
60
            RwLock<HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>>,
298✔
61
        >,
298✔
62
        receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
298✔
63
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
298✔
64
        edges: &[Edge],
298✔
65
        node_schemas: NodeSchemas,
298✔
66
    ) -> Result<Self, ExecutionError> {
298✔
67
        let mut processor = processor_factory.build(
298✔
68
            node_schemas.input_schemas.clone(),
298✔
69
            node_schemas.output_schemas.clone(),
298✔
70
        )?;
298✔
71
        let state_meta = init_component(&node_handle, base_path, |e| processor.init(e))?;
297✔
72

×
73
        let (master_tx, port_databases) =
296✔
74
            create_ports_databases_and_fill_downstream_record_readers(
296✔
75
                &node_handle,
296✔
76
                edges,
296✔
77
                state_meta.env,
296✔
78
                &processor_factory.get_output_ports(),
296✔
79
                &mut record_readers.write(),
296✔
80
            )?;
296✔
81
        let (port_handles, receivers) = build_receivers_lists(receivers);
296✔
82
        let channel_manager = ProcessorChannelManager::new(
295✔
83
            node_handle.clone(),
296✔
84
            senders,
296✔
85
            StateWriter::new(
296✔
86
                state_meta.meta_db,
296✔
87
                port_databases,
296✔
88
                master_tx.clone(),
296✔
89
                node_schemas.output_schemas,
296✔
90
            )?,
296✔
91
            true,
92
        );
×
93

×
94
        Ok(Self {
295✔
95
            node_handle,
295✔
96
            port_handles,
295✔
97
            receivers,
295✔
98
            processor,
295✔
99
            record_readers,
295✔
100
            master_tx,
295✔
101
            channel_manager,
295✔
102
        })
295✔
103
    }
297✔
104
}
105

×
106
impl Name for ProcessorNode {
×
107
    fn name(&self) -> Cow<str> {
×
108
        Cow::Owned(self.node_handle.to_string())
×
109
    }
×
110
}
111

×
112
impl ReceiverLoop for ProcessorNode {
×
113
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
295✔
114
        let mut result = vec![];
295✔
115
        swap(&mut self.receivers, &mut result);
295✔
116
        result
295✔
117
    }
295✔
118

×
119
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
120
        Cow::Owned(self.port_handles[index].to_string())
×
121
    }
×
122

×
123
    fn on_op(
5,275,199✔
124
        &mut self,
5,275,199✔
125
        index: usize,
5,275,199✔
126
        op: dozer_types::types::Operation,
5,275,199✔
127
    ) -> Result<(), ExecutionError> {
5,275,199✔
128
        let record_readers = self.record_readers.read();
5,275,199✔
129
        let reader = record_readers
5,275,199✔
130
            .get(&self.node_handle)
5,275,199✔
131
            .ok_or_else(|| ExecutionError::InvalidNodeHandle(self.node_handle.clone()))?;
5,275,199✔
132

×
133
        self.processor.process(
5,275,199✔
134
            self.port_handles[index],
5,275,199✔
135
            op,
5,275,199✔
136
            &mut self.channel_manager,
5,275,199✔
137
            &self.master_tx,
5,275,199✔
138
            reader,
5,275,199✔
139
        )
5,275,199✔
140
    }
5,275,199✔
141

×
142
    fn on_commit(&mut self, epoch: &crate::dag::epoch::Epoch) -> Result<(), ExecutionError> {
×
143
        self.processor.commit(epoch, &self.master_tx)?;
14,040✔
144
        self.channel_manager.store_and_send_commit(epoch)
14,040✔
145
    }
14,040✔
146

×
147
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
286✔
148
        self.channel_manager.send_terminate()
286✔
149
    }
286✔
150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc