• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.43
/dozer-core/src/executor/receiver_loop.rs
1
use std::borrow::Cow;
2

3
use crossbeam::channel::{Receiver, Select};
4
use dozer_types::log::debug;
5

6
use crate::{
7
    epoch::Epoch,
8
    errors::ExecutionError,
9
    executor_operation::{ExecutorOperation, ProcessorOperation},
10
};
11

12
use super::{name::Name, InputPortState};
13

14
/// Common code for processor and sink nodes.
15
///
16
/// They both select from their input channels, and respond to "op", "commit", and terminate.
17
pub trait ReceiverLoop: Name {
18
    /// Returns the epoch id that this node was constructed for.
19
    fn initial_epoch_id(&self) -> u64;
20
    /// Returns input channels to this node. Will be called exactly once in [`receiver_loop`].
21
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>>;
22
    /// Returns the name of the receiver at `index`. Used for logging.
23
    fn receiver_name(&self, index: usize) -> Cow<str>;
24
    /// Responds to `op` from the receiver at `index`.
25
    fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError>;
26
    /// Responds to `commit` of `epoch`.
27
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>;
28
    /// Responds to `terminate`.
29
    fn on_terminate(&mut self) -> Result<(), ExecutionError>;
30
    /// Responds to `SnapshottingDone`.
31
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError>;
32

33
    /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
34
    fn receiver_loop(&mut self, initial_epoch_id: u64) -> Result<(), ExecutionError> {
2,401✔
35
        let receivers = self.receivers();
2,401✔
36
        debug_assert!(
37
            !receivers.is_empty(),
2,401✔
38
            "Processor or sink must have at least 1 incoming edge"
×
39
        );
40
        let mut port_states = vec![InputPortState::Open; receivers.len()];
2,401✔
41

2,401✔
42
        let mut commits_received: usize = 0;
2,401✔
43
        let mut epoch_id = initial_epoch_id;
2,401✔
44

2,401✔
45
        let mut sel = init_select(&receivers);
2,401✔
46
        loop {
8,939,459✔
47
            let index = sel.ready();
8,939,459✔
48
            let op = receivers[index]
8,939,459✔
49
                .recv()
8,939,459✔
50
                .map_err(|_| ExecutionError::CannotReceiveFromChannel)?;
8,939,459✔
51

52
            match op {
8,939,454✔
53
                ExecutorOperation::Op { op } => {
8,931,427✔
54
                    self.on_op(index, op)?;
8,931,427✔
55
                }
56
                ExecutorOperation::Commit { epoch } => {
5,147✔
57
                    assert_eq!(epoch.common_info.id, epoch_id);
5,147✔
58
                    commits_received += 1;
5,146✔
59
                    sel.remove(index);
5,146✔
60

5,146✔
61
                    if commits_received == receivers.len() {
5,146✔
62
                        self.on_commit(&epoch)?;
4,546✔
63
                        epoch_id += 1;
4,545✔
64
                        commits_received = 0;
4,545✔
65
                        sel = init_select(&receivers);
4,545✔
66
                    }
600✔
67
                }
68
                ExecutorOperation::Terminate => {
69
                    port_states[index] = InputPortState::Terminated;
2,703✔
70
                    sel.remove(index);
2,703✔
71
                    debug!(
2,703✔
72
                        "[{}] Received Terminate request on port {}",
×
73
                        self.name(),
×
74
                        self.receiver_name(index)
×
75
                    );
76
                    if port_states.iter().all(|v| v == &InputPortState::Terminated) {
3,238✔
77
                        self.on_terminate()?;
2,386✔
78
                        debug!("[{}] Quit", self.name());
2,386✔
79
                        return Ok(());
2,386✔
80
                    }
317✔
81
                }
82
                ExecutorOperation::SnapshottingDone { connection_name } => {
177✔
83
                    self.on_snapshotting_done(connection_name)?;
177✔
84
                }
85
            }
86
        }
87
    }
2,392✔
88
}
89

90
fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Select {
6,946✔
91
    let mut sel = Select::new();
6,946✔
92
    for r in receivers {
14,810✔
93
        sel.recv(r);
7,864✔
94
    }
7,864✔
95
    sel
6,946✔
96
}
6,946✔
97

98
#[cfg(test)]
99
mod tests {
100
    use std::{mem::swap, sync::Arc, time::SystemTime};
101

102
    use crossbeam::channel::{unbounded, Sender};
103
    use dozer_types::{
104
        node::{NodeHandle, SourceStates},
105
        types::{Field, Record},
106
    };
107

108
    use dozer_recordstore::{ProcessorRecord, ProcessorRecordStore, StoreRecord};
109

110
    use super::*;
111

112
    struct TestReceiverLoop {
113
        receivers: Vec<Receiver<ExecutorOperation>>,
114
        ops: Vec<(usize, ProcessorOperation)>,
115
        commits: Vec<Epoch>,
116
        snapshotting_done: Vec<String>,
117
        num_terminations: usize,
118
    }
119

120
    impl Name for TestReceiverLoop {
121
        fn name(&self) -> Cow<str> {
×
122
            Cow::Borrowed("TestReceiverLoop")
×
123
        }
×
124
    }
125

126
    impl ReceiverLoop for TestReceiverLoop {
127
        fn initial_epoch_id(&self) -> u64 {
×
128
            0
×
129
        }
×
130

131
        fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
5✔
132
            let mut result = vec![];
5✔
133
            swap(&mut self.receivers, &mut result);
5✔
134
            result
5✔
135
        }
5✔
136

137
        fn receiver_name(&self, index: usize) -> Cow<str> {
×
138
            Cow::Owned(format!("receiver_{index}"))
×
139
        }
×
140

141
        fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError> {
1✔
142
            self.ops.push((index, op));
1✔
143
            Ok(())
1✔
144
        }
1✔
145

146
        fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
2✔
147
            self.commits.push(epoch.clone());
2✔
148
            Ok(())
2✔
149
        }
2✔
150

151
        fn on_terminate(&mut self) -> Result<(), ExecutionError> {
4✔
152
            self.num_terminations += 1;
4✔
153
            Ok(())
4✔
154
        }
4✔
155

156
        fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
1✔
157
            self.snapshotting_done.push(connection_name);
1✔
158
            Ok(())
1✔
159
        }
1✔
160
    }
161

162
    impl TestReceiverLoop {
163
        fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
5✔
164
            let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
10✔
165
            (
5✔
166
                TestReceiverLoop {
5✔
167
                    receivers,
5✔
168
                    ops: vec![],
5✔
169
                    commits: vec![],
5✔
170
                    snapshotting_done: vec![],
5✔
171
                    num_terminations: 0,
5✔
172
                },
5✔
173
                senders,
5✔
174
            )
5✔
175
        }
5✔
176
    }
177

178
    #[test]
1✔
179
    fn receiver_loop_stops_on_terminate() {
1✔
180
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
181
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
182
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
183
        test_loop.receiver_loop(0).unwrap();
1✔
184
        assert_eq!(test_loop.num_terminations, 1);
1✔
185
    }
1✔
186

187
    #[test]
1✔
188
    fn receiver_loop_forwards_snapshotting_done() {
1✔
189
        let connection_name = "test_connection".to_string();
1✔
190
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
191
        senders[0]
1✔
192
            .send(ExecutorOperation::SnapshottingDone {
1✔
193
                connection_name: connection_name.clone(),
1✔
194
            })
1✔
195
            .unwrap();
1✔
196
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
197
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
198
        test_loop.receiver_loop(0).unwrap();
1✔
199
        assert_eq!(test_loop.snapshotting_done, vec![connection_name])
1✔
200
    }
1✔
201

202
    #[test]
1✔
203
    fn receiver_loop_forwards_op() {
1✔
204
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
205
        let record_store = ProcessorRecordStore::new(Default::default()).unwrap();
1✔
206
        let record: ProcessorRecord = record_store
1✔
207
            .create_record(&Record::new(vec![Field::Int(1)]))
1✔
208
            .unwrap();
1✔
209
        senders[0]
1✔
210
            .send(ExecutorOperation::Op {
1✔
211
                op: ProcessorOperation::Insert {
1✔
212
                    new: record.clone(),
1✔
213
                },
1✔
214
            })
1✔
215
            .unwrap();
1✔
216
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
217
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
218
        test_loop.receiver_loop(0).unwrap();
1✔
219
        assert_eq!(
1✔
220
            test_loop.ops,
1✔
221
            vec![(0, ProcessorOperation::Insert { new: record })]
1✔
222
        );
1✔
223
    }
1✔
224

225
    #[test]
1✔
226
    fn receiver_loop_increases_epoch_id() {
1✔
227
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
228
        let mut source_states = SourceStates::default();
1✔
229
        source_states.insert(NodeHandle::new(None, "0".to_string()), Default::default());
1✔
230
        source_states.insert(NodeHandle::new(None, "1".to_string()), Default::default());
1✔
231
        let source_states = Arc::new(source_states);
1✔
232
        let decision_instant = SystemTime::now();
1✔
233
        let mut epoch0 = Epoch::new(0, source_states.clone(), None, decision_instant);
1✔
234
        let mut epoch1 = Epoch::new(0, source_states, None, decision_instant);
1✔
235
        senders[0]
1✔
236
            .send(ExecutorOperation::Commit {
1✔
237
                epoch: epoch0.clone(),
1✔
238
            })
1✔
239
            .unwrap();
1✔
240
        senders[1]
1✔
241
            .send(ExecutorOperation::Commit {
1✔
242
                epoch: epoch1.clone(),
1✔
243
            })
1✔
244
            .unwrap();
1✔
245
        epoch0.common_info.id = 1;
1✔
246
        epoch1.common_info.id = 1;
1✔
247
        senders[0]
1✔
248
            .send(ExecutorOperation::Commit {
1✔
249
                epoch: epoch0.clone(),
1✔
250
            })
1✔
251
            .unwrap();
1✔
252
        senders[1]
1✔
253
            .send(ExecutorOperation::Commit {
1✔
254
                epoch: epoch1.clone(),
1✔
255
            })
1✔
256
            .unwrap();
1✔
257
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
258
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
259
        test_loop.receiver_loop(0).unwrap();
1✔
260

1✔
261
        assert_eq!(test_loop.commits[0].common_info.id, 0);
1✔
262
        assert_eq!(test_loop.commits[0].decision_instant, decision_instant);
1✔
263
        assert_eq!(test_loop.commits[1].common_info.id, 1);
1✔
264
        assert_eq!(test_loop.commits[1].decision_instant, decision_instant);
1✔
265
    }
1✔
266

267
    #[test]
1✔
268
    #[should_panic]
269
    fn receiver_loop_panics_on_inconsistent_commit_epoch() {
1✔
270
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
271
        let mut source_states = SourceStates::new();
1✔
272
        source_states.insert(NodeHandle::new(None, "0".to_string()), Default::default());
1✔
273
        source_states.insert(NodeHandle::new(None, "1".to_string()), Default::default());
1✔
274
        let source_states = Arc::new(source_states);
1✔
275
        let decision_instant = SystemTime::now();
1✔
276
        let epoch0 = Epoch::new(0, source_states.clone(), None, decision_instant);
1✔
277
        let epoch1 = Epoch::new(1, source_states, None, decision_instant);
1✔
278
        senders[0]
1✔
279
            .send(ExecutorOperation::Commit { epoch: epoch0 })
1✔
280
            .unwrap();
1✔
281
        senders[1]
1✔
282
            .send(ExecutorOperation::Commit { epoch: epoch1 })
1✔
283
            .unwrap();
1✔
284
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
285
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
286
        test_loop.receiver_loop(0).unwrap();
1✔
287
    }
1✔
288
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc