• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3975475267

pending completion
3975475267

Pull #699

github

GitHub
Merge cbe01669c into 02f99a9c0
Pull Request #699: feature: Atomatically trim record history in `RecordWriter`

229 of 229 new or added lines in 6 files covered. (100.0%)

22375 of 33744 relevant lines covered (66.31%)

45958.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.76
/dozer-core/src/dag/executor/processor_node.rs
1
use std::{borrow::Cow, collections::HashMap, mem::swap, path::Path, sync::Arc};
2

3
use crossbeam::channel::{Receiver, Sender};
4
use dozer_types::parking_lot::RwLock;
5

6
use crate::{
7
    dag::{
8
        dag::Edge,
9
        dag_schemas::NodeSchemas,
10
        errors::ExecutionError,
11
        executor_utils::{
12
            build_receivers_lists, create_ports_databases_and_fill_downstream_record_readers,
13
            init_component,
14
        },
15
        forwarder::{ProcessorChannelManager, StateWriter},
16
        node::{NodeHandle, PortHandle, Processor, ProcessorFactory},
17
        record_store::RecordReader,
18
    },
19
    storage::lmdb_storage::SharedTransaction,
20
};
21

22
use super::{name::Name, receiver_loop::ReceiverLoop, ExecutorOperation};
23

24
/// A processor in the execution DAG.
25
#[derive(Debug)]
×
26
pub struct ProcessorNode {
27
    /// Node handle in description DAG.
28
    node_handle: NodeHandle,
29
    /// Input port handles.
30
    port_handles: Vec<PortHandle>,
31
    /// Input data channels.
32
    receivers: Vec<Receiver<ExecutorOperation>>,
33
    /// The processor.
34
    processor: Box<dyn Processor>,
35
    /// Record readers of all stateful ports. Using `self.node_handle`, we can find the record readers of our stateful inputs.
36
    record_readers: Arc<RwLock<HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>>>,
37
    /// The transaction for this node's environment. Processor uses it to persist data.
38
    master_tx: SharedTransaction,
39
    /// This node's output channel manager, for forwarding data, writing metadata and writing port state.
40
    channel_manager: ProcessorChannelManager,
41
}
42

43
impl ProcessorNode {
44
    /// # Arguments
45
    ///
46
    /// - `node_handle`: Node handle in description DAG.
47
    /// - `processor_factory`: Processor factory in description DAG.
48
    /// - `base_path`: Base path of persisted data for the last execution of the description DAG.
49
    /// - `record_readers`: Record readers of all stateful ports.
50
    /// - `receivers`: Input channels to this processor.
51
    /// - `senders`: Output channels from this processor.
52
    /// - `edges`: All edges in the description DAG, used for creating record readers for input ports which is connected to this processor's stateful output ports.
53
    /// - `node_schemas`: Input and output data schemas.
54
    /// - `retention_queue_size`: Size of retention queue (used by RecordWriter)
55
    #[allow(clippy::too_many_arguments)]
×
56
    pub fn new(
299✔
57
        node_handle: NodeHandle,
299✔
58
        processor_factory: &dyn ProcessorFactory,
299✔
59
        base_path: &Path,
299✔
60
        record_readers: Arc<
299✔
61
            RwLock<HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>>,
299✔
62
        >,
299✔
63
        receivers: HashMap<PortHandle, Vec<Receiver<ExecutorOperation>>>,
299✔
64
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
299✔
65
        edges: &[Edge],
299✔
66
        node_schemas: NodeSchemas,
299✔
67
        retention_queue_size: usize,
299✔
68
    ) -> Result<Self, ExecutionError> {
299✔
69
        let mut processor = processor_factory.build(
299✔
70
            node_schemas.input_schemas.clone(),
299✔
71
            node_schemas.output_schemas.clone(),
299✔
72
        )?;
299✔
73
        let state_meta = init_component(&node_handle, base_path, |e| processor.init(e))?;
298✔
74

×
75
        let (master_tx, port_databases) =
297✔
76
            create_ports_databases_and_fill_downstream_record_readers(
297✔
77
                &node_handle,
297✔
78
                edges,
297✔
79
                state_meta.env,
297✔
80
                &processor_factory.get_output_ports(),
297✔
81
                &mut record_readers.write(),
297✔
82
            )?;
297✔
83
        let (port_handles, receivers) = build_receivers_lists(receivers);
297✔
84
        let channel_manager = ProcessorChannelManager::new(
296✔
85
            node_handle.clone(),
297✔
86
            senders,
297✔
87
            StateWriter::new(
297✔
88
                state_meta.meta_db,
297✔
89
                port_databases,
297✔
90
                master_tx.clone(),
297✔
91
                node_schemas.output_schemas,
297✔
92
                retention_queue_size,
297✔
93
            )?,
297✔
94
            true,
×
95
        );
×
96

×
97
        Ok(Self {
296✔
98
            node_handle,
296✔
99
            port_handles,
296✔
100
            receivers,
296✔
101
            processor,
296✔
102
            record_readers,
296✔
103
            master_tx,
296✔
104
            channel_manager,
296✔
105
        })
296✔
106
    }
298✔
107
}
×
108

×
109
impl Name for ProcessorNode {
×
110
    fn name(&self) -> Cow<str> {
×
111
        Cow::Owned(self.node_handle.to_string())
×
112
    }
×
113
}
×
114

×
115
impl ReceiverLoop for ProcessorNode {
×
116
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
296✔
117
        let mut result = vec![];
296✔
118
        swap(&mut self.receivers, &mut result);
296✔
119
        result
296✔
120
    }
296✔
121

×
122
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
123
        Cow::Owned(self.port_handles[index].to_string())
×
124
    }
×
125

×
126
    fn on_op(
5,550,534✔
127
        &mut self,
5,550,534✔
128
        index: usize,
5,550,534✔
129
        op: dozer_types::types::Operation,
5,550,534✔
130
    ) -> Result<(), ExecutionError> {
5,550,534✔
131
        let record_readers = self.record_readers.read();
5,550,534✔
132
        let reader = record_readers
5,550,534✔
133
            .get(&self.node_handle)
5,550,534✔
134
            .ok_or_else(|| ExecutionError::InvalidNodeHandle(self.node_handle.clone()))?;
5,550,534✔
135

×
136
        self.processor.process(
5,550,534✔
137
            self.port_handles[index],
5,550,534✔
138
            op,
5,550,534✔
139
            &mut self.channel_manager,
5,550,534✔
140
            &self.master_tx,
5,550,534✔
141
            reader,
5,550,534✔
142
        )
5,550,534✔
143
    }
5,550,534✔
144

×
145
    fn on_commit(&mut self, epoch: &crate::dag::epoch::Epoch) -> Result<(), ExecutionError> {
×
146
        self.processor.commit(epoch, &self.master_tx)?;
9,687✔
147
        self.channel_manager.store_and_send_commit(epoch)
9,687✔
148
    }
9,687✔
149

×
150
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
287✔
151
        self.channel_manager.send_terminate()
287✔
152
    }
287✔
153
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc