• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.74
/dozer-core/src/executor/source_node.rs
1
use std::{
2
    sync::{
3
        atomic::{AtomicBool, Ordering},
4
        Arc,
5
    },
6
    time::Duration,
7
};
8

9
use crossbeam::channel::{bounded, Receiver, RecvTimeoutError, Sender};
10
use dozer_types::ingestion_types::IngestionMessage;
11
use dozer_types::{log::debug, node::NodeHandle};
12

13
use crate::{
14
    builder_dag::NodeKind,
15
    channels::SourceChannelForwarder,
16
    errors::ExecutionError,
17
    forwarder::{SourceChannelManager, StateWriter},
18
    node::{PortHandle, Source, SourceState},
19
};
20

21
use super::{execution_dag::ExecutionDag, node::Node, ExecutorOptions};
22

23
impl SourceChannelForwarder for InternalChannelSourceForwarder {
24
    fn send(&mut self, message: IngestionMessage, port: PortHandle) -> Result<(), ExecutionError> {
3,635,617✔
25
        Ok(self.sender.send((port, message))?)
3,635,617✔
26
    }
3,635,617✔
27
}
28

29
/// The sender half of a source in the execution DAG.
30
#[derive(Debug)]
×
31
pub struct SourceSenderNode {
32
    /// Node handle in description DAG.
33
    node_handle: NodeHandle,
34
    /// The source.
35
    source: Box<dyn Source>,
36
    /// Last checkpointed output data sequence numbers.
37
    last_checkpoint: SourceState,
38
    /// The forwarder that will be passed to the source for outputting data.
39
    forwarder: InternalChannelSourceForwarder,
40
}
41

42
impl SourceSenderNode {
43
    pub fn handle(&self) -> &NodeHandle {
636✔
44
        &self.node_handle
636✔
45
    }
636✔
46
}
47

48
impl Node for SourceSenderNode {
49
    fn run(mut self) -> Result<(), ExecutionError> {
636✔
50
        let result = self.source.start(&mut self.forwarder, self.last_checkpoint);
636✔
51
        debug!("[{}-sender] Quit", self.node_handle);
636✔
52
        result.map_err(ExecutionError::Source)
636✔
53
    }
636✔
54
}
55

56
/// The listener part of a source in the execution DAG.
57
#[derive(Debug)]
×
58
pub struct SourceListenerNode {
59
    /// Node handle in description DAG.
60
    node_handle: NodeHandle,
61
    /// Output from corresponding source sender.
62
    receiver: Receiver<(PortHandle, IngestionMessage)>,
63
    /// Receiving timeout.
64
    timeout: Duration,
65
    /// If the execution DAG should be running. Used for determining if a `terminate` message should be sent.
66
    running: Arc<AtomicBool>,
67
    /// This node's output channel manager, for communicating to other sources to coordinate terminate and commit, forwarding data, writing metadata and writing port state.
68
    channel_manager: SourceChannelManager,
69
}
70

71
#[derive(Debug, Clone, PartialEq)]
3,499,610✔
72
enum DataKind {
73
    Data((PortHandle, IngestionMessage)),
74
    NoDataBecauseOfTimeout,
75
    NoDataBecauseOfChannelDisconnection,
76
}
77

78
impl SourceListenerNode {
79
    /// Returns if the node should terminate.
80
    fn send_and_trigger_commit_if_needed(
3,499,610✔
81
        &mut self,
3,499,610✔
82
        data: DataKind,
3,499,610✔
83
    ) -> Result<bool, ExecutionError> {
3,499,610✔
84
        // If termination was requested the or source quit, we try to terminate.
85
        let terminating = data == DataKind::NoDataBecauseOfChannelDisconnection
3,499,610✔
86
            || !self.running.load(Ordering::SeqCst);
3,498,957✔
87
        // If this commit was not requested with termination at the start, we shouldn't terminate either.
88
        let terminating = match data {
3,499,610✔
89
            DataKind::Data((port, message)) => self
3,496,121✔
90
                .channel_manager
3,496,121✔
91
                .send_and_trigger_commit_if_needed(message, port, terminating)?,
3,496,121✔
92
            DataKind::NoDataBecauseOfTimeout | DataKind::NoDataBecauseOfChannelDisconnection => {
93
                self.channel_manager.trigger_commit_if_needed(terminating)?
3,489✔
94
            }
95
        };
96
        if terminating {
3,499,604✔
97
            self.channel_manager.terminate()?;
630✔
98
            debug!("[{}-listener] Quitting", &self.node_handle);
630✔
99
        }
3,498,974✔
100
        Ok(terminating)
3,499,604✔
101
    }
3,499,610✔
102
}
103

104
impl Node for SourceListenerNode {
105
    fn run(mut self) -> Result<(), ExecutionError> {
636✔
106
        loop {
107
            let terminating = match self.receiver.recv_timeout(self.timeout) {
3,499,610✔
108
                Ok(data) => self.send_and_trigger_commit_if_needed(DataKind::Data(data))?,
3,496,121✔
109
                Err(RecvTimeoutError::Timeout) => {
110
                    self.send_and_trigger_commit_if_needed(DataKind::NoDataBecauseOfTimeout)?
2,836✔
111
                }
112
                Err(RecvTimeoutError::Disconnected) => self.send_and_trigger_commit_if_needed(
653✔
113
                    DataKind::NoDataBecauseOfChannelDisconnection,
653✔
114
                )?,
653✔
115
            };
116
            if terminating {
3,499,604✔
117
                return Ok(());
630✔
118
            }
3,498,974✔
119
        }
120
    }
636✔
121
}
122

123
#[derive(Debug)]
×
124
struct InternalChannelSourceForwarder {
125
    sender: Sender<(PortHandle, IngestionMessage)>,
126
}
127

128
impl InternalChannelSourceForwarder {
129
    pub fn new(sender: Sender<(PortHandle, IngestionMessage)>) -> Self {
636✔
130
        Self { sender }
636✔
131
    }
636✔
132
}
133

134
pub fn create_source_nodes(
636✔
135
    dag: &mut ExecutionDag,
636✔
136
    node_index: daggy::NodeIndex,
636✔
137
    options: &ExecutorOptions,
636✔
138
    running: Arc<AtomicBool>,
636✔
139
) -> (SourceSenderNode, SourceListenerNode) {
636✔
140
    // Get the source node.
141
    let Some(node) = dag.node_weight_mut(node_index).take() else {
636✔
142
        panic!("Must pass in a node")
×
143
    };
144
    let node_handle = node.handle;
636✔
145
    let NodeKind::Source {
146
        source,
636✔
147
        port_names,
636✔
148
        last_checkpoint,
636✔
149
    } = node.kind
636✔
150
    else {
151
        panic!("Must pass in a source node");
×
152
    };
153

154
    // Create channel between source sender and source listener.
155
    let (source_sender, source_receiver) = bounded(options.channel_buffer_sz);
636✔
156
    // let (source_sender, source_receiver) = bounded(1);
636✔
157

636✔
158
    // Create source listener.
636✔
159
    let forwarder = InternalChannelSourceForwarder::new(source_sender);
636✔
160
    let source_sender_node = SourceSenderNode {
636✔
161
        node_handle: node_handle.clone(),
636✔
162
        source,
636✔
163
        last_checkpoint,
636✔
164
        forwarder,
636✔
165
    };
636✔
166

636✔
167
    // Create source sender node.
636✔
168
    let (senders, record_writers) = dag.collect_senders_and_record_writers(node_index);
636✔
169
    let state_writer = StateWriter::new(record_writers);
636✔
170
    let channel_manager = SourceChannelManager::new(
636✔
171
        node_handle.clone(),
636✔
172
        port_names,
636✔
173
        senders,
636✔
174
        Some(state_writer),
636✔
175
        options.commit_sz,
636✔
176
        options.commit_time_threshold,
636✔
177
        dag.epoch_manager().clone(),
636✔
178
        dag.error_manager().clone(),
636✔
179
    );
636✔
180
    let source_listener_node = SourceListenerNode {
636✔
181
        node_handle,
636✔
182
        receiver: source_receiver,
636✔
183
        timeout: options.commit_time_threshold,
636✔
184
        running,
636✔
185
        channel_manager,
636✔
186
    };
636✔
187

636✔
188
    (source_sender_node, source_listener_node)
636✔
189
}
636✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc