• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4295401807

pending completion
4295401807

push

github

GitHub
Bump version (#1099)

28685 of 39545 relevant lines covered (72.54%)

52105.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.23
/dozer-core/src/executor/source_node.rs
1
use std::{
2
    sync::{
3
        atomic::{AtomicBool, Ordering},
4
        Arc,
5
    },
6
    time::Duration,
7
};
8

9
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
10
use dozer_types::types::Operation;
11
use dozer_types::{
12
    log::debug,
13
    node::{NodeHandle, OpIdentifier},
14
};
15

16
use crate::{
17
    builder_dag::NodeKind,
18
    channels::SourceChannelForwarder,
19
    errors::ExecutionError,
20
    forwarder::{SourceChannelManager, StateWriter},
21
    node::{PortHandle, Source},
22
};
23

24
use super::{execution_dag::ExecutionDag, node::Node, ExecutorOptions};
25

26
impl SourceChannelForwarder for InternalChannelSourceForwarder {
27
    fn send(
4,416,715✔
28
        &mut self,
4,416,715✔
29
        txid: u64,
4,416,715✔
30
        seq_in_tx: u64,
4,416,715✔
31
        op: Operation,
4,416,715✔
32
        port: PortHandle,
4,416,715✔
33
    ) -> Result<(), ExecutionError> {
4,416,715✔
34
        Ok(self.sender.send((port, txid, seq_in_tx, op))?)
4,416,715✔
35
    }
4,416,715✔
36
}
37

38
/// The sender half of a source in the execution DAG.
39
#[derive(Debug)]
×
40
pub struct SourceSenderNode {
41
    /// Node handle in description DAG.
42
    node_handle: NodeHandle,
43
    /// The source.
44
    source: Box<dyn Source>,
45
    /// Last checkpointed output data sequence number.
46
    last_checkpoint: Option<OpIdentifier>,
47
    /// The forwarder that will be passed to the source for outputting data.
48
    forwarder: InternalChannelSourceForwarder,
49
}
50

51
impl SourceSenderNode {
52
    pub fn handle(&self) -> &NodeHandle {
342✔
53
        &self.node_handle
342✔
54
    }
342✔
55
}
56

57
impl Node for SourceSenderNode {
58
    fn run(mut self) -> Result<(), ExecutionError> {
342✔
59
        let result = self.source.start(
342✔
60
            &mut self.forwarder,
342✔
61
            self.last_checkpoint
342✔
62
                .map(|op_id| (op_id.txid, op_id.seq_in_tx)),
342✔
63
        );
342✔
64
        debug!("[{}-sender] Quit", self.node_handle);
342✔
65
        result
336✔
66
    }
336✔
67
}
68

69
/// The listener part of a source in the execution DAG.
70
#[derive(Debug)]
×
71
pub struct SourceListenerNode {
72
    /// Node handle in description DAG.
73
    node_handle: NodeHandle,
74
    /// Output from corresponding source sender.
75
    receiver: Receiver<(PortHandle, u64, u64, Operation)>,
76
    /// Receiving timeout.
77
    timeout: Duration,
78
    /// If the execution DAG should be running. Used for determining if a `terminate` message should be sent.
79
    running: Arc<AtomicBool>,
80
    /// This node's output channel manager, for communicating to other sources to coordinate terminate and commit, forwarding data, writing metadata and writing port state.
81
    channel_manager: SourceChannelManager,
82
}
83

84
#[derive(Debug, Clone, PartialEq)]
4,294,162✔
85
enum DataKind {
86
    Data((PortHandle, u64, u64, Operation)),
87
    NoDataBecauseOfTimeout,
88
    NoDataBecauseOfChannelDisconnection,
89
}
90

91
impl SourceListenerNode {
92
    /// Returns if the node should terminate.
93
    fn send_and_trigger_commit_if_needed(
4,296,111✔
94
        &mut self,
4,296,111✔
95
        data: DataKind,
4,296,111✔
96
    ) -> Result<bool, ExecutionError> {
4,296,111✔
97
        // If termination was requested the or source quit, we try to terminate.
98
        let terminating = data == DataKind::NoDataBecauseOfChannelDisconnection
4,296,111✔
99
            || !self.running.load(Ordering::SeqCst);
4,295,709✔
100
        // If this commit was not requested with termination at the start, we shouldn't terminate either.
101
        let terminating = match data {
4,296,111✔
102
            DataKind::Data((port, txid, seq_in_tx, op)) => self
4,292,841✔
103
                .channel_manager
4,292,841✔
104
                .send_and_trigger_commit_if_needed(txid, seq_in_tx, op, port, terminating)?,
4,292,841✔
105
            DataKind::NoDataBecauseOfTimeout | DataKind::NoDataBecauseOfChannelDisconnection => {
106
                self.channel_manager.trigger_commit_if_needed(terminating)?
3,270✔
107
            }
108
        };
109
        if terminating {
4,296,106✔
110
            self.channel_manager.terminate()?;
1,395✔
111
            debug!("[{}-listener] Quitting", &self.node_handle);
1,395✔
112
        }
4,294,711✔
113
        Ok(terminating)
4,295,048✔
114
    }
4,295,053✔
115
}
116

117
impl Node for SourceListenerNode {
118
    fn run(mut self) -> Result<(), ExecutionError> {
342✔
119
        loop {
120
            let terminating = match self.receiver.recv_timeout(self.timeout) {
4,296,073✔
121
                Ok(data) => self.send_and_trigger_commit_if_needed(DataKind::Data(data))?,
4,292,803✔
122
                Err(RecvTimeoutError::Timeout) => {
123
                    self.send_and_trigger_commit_if_needed(DataKind::NoDataBecauseOfTimeout)?
2,868✔
124
                }
125
                Err(RecvTimeoutError::Disconnected) => self.send_and_trigger_commit_if_needed(
402✔
126
                    DataKind::NoDataBecauseOfChannelDisconnection,
402✔
127
                )?,
402✔
128
            };
129
            if terminating {
4,296,068✔
130
                return Ok(());
337✔
131
            }
4,295,731✔
132
        }
133
    }
342✔
134
}
135

136
#[derive(Debug)]
×
137
struct InternalChannelSourceForwarder {
138
    sender: Sender<(PortHandle, u64, u64, Operation)>,
139
}
140

141
impl InternalChannelSourceForwarder {
142
    pub fn new(sender: Sender<(PortHandle, u64, u64, Operation)>) -> Self {
342✔
143
        Self { sender }
342✔
144
    }
342✔
145
}
146

147
pub fn create_source_nodes(
342✔
148
    dag: &mut ExecutionDag,
342✔
149
    node_index: daggy::NodeIndex,
342✔
150
    options: &ExecutorOptions,
342✔
151
    running: Arc<AtomicBool>,
342✔
152
) -> (SourceSenderNode, SourceListenerNode) {
342✔
153
    // Get the source node.
342✔
154
    let node = dag.node_weight_mut(node_index);
342✔
155
    let node_handle = node.handle.clone();
342✔
156
    let node_storage = node.storage.clone();
342✔
157
    let Some(NodeKind::Source(source, last_checkpoint)) = node.kind.take() else {
342✔
158
        panic!("Must pass in a source node");
×
159
    };
160

161
    // Create channel between source sender and source listener.
162
    let (source_sender, source_receiver) = bounded(options.channel_buffer_sz);
342✔
163
    // let (source_sender, source_receiver) = bounded(1);
342✔
164

342✔
165
    // Create source listener.
342✔
166
    let forwarder = InternalChannelSourceForwarder::new(source_sender);
342✔
167
    let source_sender_node = SourceSenderNode {
342✔
168
        node_handle: node_handle.clone(),
342✔
169
        source,
342✔
170
        last_checkpoint,
342✔
171
        forwarder,
342✔
172
    };
342✔
173

342✔
174
    // Create source sender node.
342✔
175
    let (senders, record_writers) = dag.collect_senders_and_record_writers(node_index);
342✔
176
    let state_writer = StateWriter::new(
342✔
177
        node_storage.meta_db,
342✔
178
        record_writers,
342✔
179
        node_storage.master_txn,
342✔
180
    );
342✔
181
    let channel_manager = SourceChannelManager::new(
342✔
182
        node_handle.clone(),
342✔
183
        senders,
342✔
184
        state_writer,
342✔
185
        true,
342✔
186
        options.commit_sz,
342✔
187
        options.commit_time_threshold,
342✔
188
        dag.epoch_manager().clone(),
342✔
189
    );
342✔
190
    let source_listener_node = SourceListenerNode {
342✔
191
        node_handle,
342✔
192
        receiver: source_receiver,
342✔
193
        timeout: options.commit_time_threshold,
342✔
194
        running,
342✔
195
        channel_manager,
342✔
196
    };
342✔
197

342✔
198
    (source_sender_node, source_listener_node)
342✔
199
}
342✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc