• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4123314638

pending completion
4123314638

Pull #831

github

GitHub
Merge b8720faa6 into f4fe30c14
Pull Request #831: chore: Improve Join processor errors

136 of 136 new or added lines in 4 files covered. (100.0%)

23567 of 34840 relevant lines covered (67.64%)

39485.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.6
/dozer-core/src/executor/source_node.rs
1
use std::{
2
    collections::HashMap,
3
    path::Path,
4
    sync::{
5
        atomic::{AtomicBool, Ordering},
6
        Arc,
7
    },
8
    time::Duration,
9
};
10

11
use crossbeam::channel::{Receiver, RecvTimeoutError, Sender};
12
use dozer_types::log::debug;
13
use dozer_types::{
14
    internal_err,
15
    parking_lot::RwLock,
16
    types::{Operation, Schema},
17
};
18

19
use crate::{
20
    channels::SourceChannelForwarder,
21
    epoch::{EpochManager, OpIdentifier},
22
    errors::ExecutionError::{self, InternalError},
23
    executor_utils::{create_ports_databases_and_fill_downstream_record_readers, init_component},
24
    forwarder::{SourceChannelManager, StateWriter},
25
    node::{NodeHandle, OutputPortDef, PortHandle, Source, SourceFactory},
26
    record_store::RecordReader,
27
    Edge,
28
};
29

30
use super::{node::Node, ExecutorOperation};
31

32
#[derive(Debug)]
×
33
struct InternalChannelSourceForwarder {
34
    sender: Sender<(PortHandle, u64, u64, Operation)>,
35
}
36

37
impl InternalChannelSourceForwarder {
38
    pub fn new(sender: Sender<(PortHandle, u64, u64, Operation)>) -> Self {
164✔
39
        Self { sender }
164✔
40
    }
164✔
41
}
42

43
impl SourceChannelForwarder for InternalChannelSourceForwarder {
44
    fn send(
4,056,059✔
45
        &mut self,
4,056,059✔
46
        txid: u64,
4,056,059✔
47
        seq_in_tx: u64,
4,056,059✔
48
        op: Operation,
4,056,059✔
49
        port: PortHandle,
4,056,059✔
50
    ) -> Result<(), ExecutionError> {
4,056,059✔
51
        internal_err!(self.sender.send((port, txid, seq_in_tx, op)))
9✔
52
    }
4,056,059✔
53
}
54

55
/// The sender half of a source in the execution DAG.
56
#[derive(Debug)]
×
57
pub struct SourceSenderNode {
58
    /// Node handle in description DAG.
59
    node_handle: NodeHandle,
60
    /// The source.
61
    source: Box<dyn Source>,
62
    /// Last checkpointed output data sequence number.
63
    last_checkpoint: Option<OpIdentifier>,
64
    /// The forwarder that will be passed to the source for outputig data.
65
    forwarder: InternalChannelSourceForwarder,
66
    /// If the execution DAG should be running. Used for terminating the execution DAG.
67
    running: Arc<AtomicBool>,
68
}
69

70
impl SourceSenderNode {
71
    /// # Arguments
72
    ///
73
    /// - `node_handle`: Node handle in description DAG.
74
    /// - `source_factory`: Source factory in description DAG.
75
    /// - `output_schemas`: Output data schemas.
76
    /// - `last_checkpoint`: Last checkpointed output of this source.
77
    /// - `sender`: Channel to send data to.
78
    /// - `running`: If the execution DAG should still be running.
79
    pub fn new<T: Clone>(
78✔
80
        node_handle: NodeHandle,
78✔
81
        source_factory: &dyn SourceFactory<T>,
78✔
82
        output_schemas: HashMap<PortHandle, Schema>,
78✔
83
        last_checkpoint: Option<OpIdentifier>,
78✔
84
        sender: Sender<(PortHandle, u64, u64, Operation)>,
78✔
85
        running: Arc<AtomicBool>,
78✔
86
    ) -> Result<Self, ExecutionError> {
78✔
87
        let source = source_factory.build(output_schemas)?;
78✔
88
        let forwarder = InternalChannelSourceForwarder::new(sender);
77✔
89
        Ok(Self {
77✔
90
            node_handle,
77✔
91
            source,
77✔
92
            last_checkpoint,
77✔
93
            forwarder,
77✔
94
            running,
77✔
95
        })
77✔
96
    }
78✔
97
}
98

99
impl Node for SourceSenderNode {
100
    fn run(mut self) -> Result<(), ExecutionError> {
164✔
101
        let result = self.source.start(
164✔
102
            &mut self.forwarder,
164✔
103
            self.last_checkpoint
164✔
104
                .map(|op_id| (op_id.txid, op_id.seq_in_tx)),
164✔
105
        );
164✔
106
        self.running.store(false, Ordering::SeqCst);
164✔
107
        debug!("[{}-sender] Quit", self.node_handle);
164✔
108
        result
164✔
109
    }
164✔
110
}
111

112
/// The listener part of a source in the execution DAG.
113
#[derive(Debug)]
×
114
pub struct SourceListenerNode {
115
    /// Node handle in description DAG.
116
    node_handle: NodeHandle,
117
    /// Output from corresponding source sender.
118
    receiver: Receiver<(PortHandle, u64, u64, Operation)>,
119
    /// Receiving timeout.
120
    timeout: Duration,
121
    /// If the execution DAG should be running. Used for determining if a `terminate` message should be sent.
122
    running: Arc<AtomicBool>,
123
    /// This node's output channel manager, for communicating to other sources to coordinate terminate and commit, forwarding data, writing metadata and writing port state.
124
    channel_manager: SourceChannelManager,
125
}
126

127
impl SourceListenerNode {
128
    /// # Arguments
129
    ///
130
    /// - `node_handle`: Node handle in description DAG.
131
    /// - `receiver`: Channel that the data comes in.
132
    /// - `timeout`: `Listener timeout. After this timeout, listener will check if commit or terminate need to happen.
133
    /// - `base_path`: Base path of persisted data for the last execution of the description DAG.
134
    /// - `output_ports`: Output port definition of the source in description DAG.
135
    /// - `record_readers`: Record readers of all stateful ports.
136
    /// - `senders`: Output channels from this processor.
137
    /// - `edges`: All edges in the description DAG, used for creating record readers for input ports which is connected to this processor's stateful output ports.
138
    /// - `running`: If the execution DAG should still be running.
139
    /// - `epoch_manager`: Used for coordinating commit and terminate between sources. Shared by all sources.
140
    /// - `output_schemas`: Output data schemas.
141
    /// - `retention_queue_size`: Size of retention queue (used by RecordWriter)
142
    #[allow(clippy::too_many_arguments)]
143
    pub(crate) fn new(
166✔
144
        node_handle: NodeHandle,
166✔
145
        receiver: Receiver<(PortHandle, u64, u64, Operation)>,
166✔
146
        timeout: Duration,
166✔
147
        base_path: &Path,
166✔
148
        output_ports: &[OutputPortDef],
166✔
149
        record_readers: Arc<
166✔
150
            RwLock<HashMap<NodeHandle, HashMap<PortHandle, Box<dyn RecordReader>>>>,
166✔
151
        >,
166✔
152
        senders: HashMap<PortHandle, Vec<Sender<ExecutorOperation>>>,
166✔
153
        edges: &[Edge],
166✔
154
        running: Arc<AtomicBool>,
166✔
155
        commit_sz: u32,
166✔
156
        max_duration_between_commits: Duration,
166✔
157
        epoch_manager: Arc<EpochManager>,
166✔
158
        output_schemas: HashMap<PortHandle, Schema>,
166✔
159
        retention_queue_size: usize,
166✔
160
    ) -> Result<Self, ExecutionError> {
166✔
161
        let state_meta = init_component(&node_handle, base_path, |_| Ok(()))?;
166✔
162
        let (master_tx, port_databases) =
165✔
163
            create_ports_databases_and_fill_downstream_record_readers(
165✔
164
                &node_handle,
165✔
165
                edges,
165✔
166
                state_meta.env,
165✔
167
                output_ports,
165✔
168
                &mut record_readers.write(),
165✔
169
            )?;
165✔
170
        let channel_manager = SourceChannelManager::new(
165✔
171
            node_handle.clone(),
165✔
172
            senders,
165✔
173
            StateWriter::new(
165✔
174
                state_meta.meta_db,
165✔
175
                port_databases,
165✔
176
                master_tx,
165✔
177
                output_schemas,
165✔
178
                retention_queue_size,
165✔
179
            )?,
165✔
180
            true,
181
            commit_sz,
165✔
182
            max_duration_between_commits,
165✔
183
            epoch_manager,
165✔
184
        );
165✔
185
        Ok(Self {
165✔
186
            node_handle,
165✔
187
            receiver,
165✔
188
            timeout,
165✔
189
            running,
165✔
190
            channel_manager,
165✔
191
        })
165✔
192
    }
166✔
193
}
194

195
impl SourceListenerNode {
196
    /// Returns if the node should terminate.
197
    fn send_and_trigger_commit_if_needed(
3,901,347✔
198
        &mut self,
3,901,347✔
199
        data: Option<(PortHandle, u64, u64, Operation)>,
3,901,347✔
200
    ) -> Result<bool, ExecutionError> {
3,901,347✔
201
        // First check if termination was requested.
3,901,347✔
202
        let terminating = !self.running.load(Ordering::SeqCst);
3,901,347✔
203
        // If this commit was not requested with termination at the start, we shouldn't terminate either.
204
        let terminating = match data {
3,901,347✔
205
            Some((port, txid, seq_in_tx, op)) => self
3,899,634✔
206
                .channel_manager
3,899,634✔
207
                .send_and_trigger_commit_if_needed(txid, seq_in_tx, op, port, terminating)?,
3,899,634✔
208
            None => self.channel_manager.trigger_commit_if_needed(terminating)?,
1,713✔
209
        };
210
        if terminating {
3,901,340✔
211
            self.channel_manager.terminate()?;
2,505✔
212
            debug!("[{}-listener] Quitting", &self.node_handle);
2,505✔
213
        }
3,898,835✔
214
        Ok(terminating)
3,898,991✔
215
    }
3,898,998✔
216
}
217

218
impl Node for SourceListenerNode {
219
    fn run(mut self) -> Result<(), ExecutionError> {
165✔
220
        loop {
3,898,131✔
221
            match self.receiver.recv_timeout(self.timeout) {
3,898,131✔
222
                Ok(data) => {
3,896,590✔
223
                    if self.send_and_trigger_commit_if_needed(Some(data))? {
3,896,590✔
224
                        return Ok(());
6✔
225
                    }
3,896,577✔
226
                }
227
                Err(e) => {
1,541✔
228
                    if self.send_and_trigger_commit_if_needed(None)? {
1,541✔
229
                        return Ok(());
150✔
230
                    }
1,391✔
231
                    // Channel disconnected but running flag not set to false, the source sender must have panicked.
1,391✔
232
                    if self.running.load(Ordering::SeqCst) && e == RecvTimeoutError::Disconnected {
1,391✔
233
                        return Err(ExecutionError::ChannelDisconnected);
2✔
234
                    }
1,389✔
235
                }
236
            }
237
        }
238
    }
165✔
239
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc