• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4763381855

pending completion
4763381855

Pull #1461

github

GitHub
Merge 50bf72be2 into c58df4a0b
Pull Request #1461: feat: Make secondary index configurable

135 of 135 new or added lines in 6 files covered. (100.0%)

34877 of 44466 relevant lines covered (78.44%)

11367.99 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.97
/dozer-core/src/executor/receiver_loop.rs
1
use std::borrow::Cow;
2

3
use crossbeam::channel::{Receiver, Select};
4
use dozer_types::log::debug;
5
use dozer_types::types::Operation;
6

7
use crate::{epoch::Epoch, errors::ExecutionError};
8

9
use super::{name::Name, ExecutorOperation, InputPortState};
10

11
/// Common code for processor and sink nodes.
12
///
13
/// They both select from their input channels, and respond to "op", "commit", and terminate.
14
pub trait ReceiverLoop: Name {
15
    /// Returns input channels to this node. Will be called exactly once in [`receiver_loop`].
16
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>>;
17
    /// Returns the name of the receiver at `index`. Used for logging.
18
    fn receiver_name(&self, index: usize) -> Cow<str>;
19
    /// Responds to `op` from the receiver at `index`.
20
    fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError>;
21
    /// Responds to `commit` of `epoch`.
22
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>;
23
    /// Responds to `terminate`.
24
    fn on_terminate(&mut self) -> Result<(), ExecutionError>;
25
    /// Responds to `SnapshottingDone`.
26
    fn on_snapshotting_done(&mut self) -> Result<(), ExecutionError>;
27

28
    /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
29
    fn receiver_loop(&mut self) -> Result<(), ExecutionError> {
2,347✔
30
        let receivers = self.receivers();
2,347✔
31
        debug_assert!(
32
            !receivers.is_empty(),
2,347✔
33
            "Processor or sink must have at least 1 incoming edge"
×
34
        );
35
        let mut port_states = vec![InputPortState::Open; receivers.len()];
2,347✔
36

2,347✔
37
        let mut commits_received: usize = 0;
2,347✔
38
        let mut common_epoch = Epoch::new(0, Default::default());
2,347✔
39

2,347✔
40
        let mut sel = init_select(&receivers);
2,347✔
41
        loop {
4,344,206✔
42
            let index = sel.ready();
4,344,206✔
43
            let op = receivers[index]
4,344,206✔
44
                .recv()
4,344,206✔
45
                .map_err(|_| ExecutionError::CannotReceiveFromChannel)?;
4,344,206✔
46

47
            match op {
4,344,205✔
48
                ExecutorOperation::Op { op } => {
4,337,532✔
49
                    self.on_op(index, op)?;
4,337,532✔
50
                }
51
                ExecutorOperation::Commit { epoch } => {
3,663✔
52
                    assert_eq!(epoch.id, common_epoch.id);
3,663✔
53
                    commits_received += 1;
3,662✔
54
                    sel.remove(index);
3,662✔
55
                    common_epoch.details.extend(epoch.details);
3,662✔
56

3,662✔
57
                    if commits_received == receivers.len() {
3,662✔
58
                        self.on_commit(&common_epoch)?;
3,151✔
59
                        common_epoch = Epoch::new(common_epoch.id + 1, Default::default());
3,148✔
60
                        commits_received = 0;
3,148✔
61
                        sel = init_select(&receivers);
3,148✔
62
                    }
511✔
63
                }
64
                ExecutorOperation::Terminate => {
65
                    port_states[index] = InputPortState::Terminated;
2,657✔
66
                    sel.remove(index);
2,657✔
67
                    debug!(
2,657✔
68
                        "[{}] Received Terminate request on port {}",
×
69
                        self.name(),
×
70
                        self.receiver_name(index)
×
71
                    );
72
                    if port_states.iter().all(|v| v == &InputPortState::Terminated) {
3,162✔
73
                        self.on_terminate()?;
2,324✔
74
                        debug!("[{}] Quit", self.name());
2,324✔
75
                        return Ok(());
2,340✔
76
                    }
317✔
77
                }
78
                ExecutorOperation::SnapshottingDone {} => self.on_snapshotting_done()?,
353✔
79
            }
80
        }
81
    }
2,344✔
82
}
83

84
fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Select {
5,487✔
85
    let mut sel = Select::new();
5,487✔
86
    for r in receivers {
11,796✔
87
        sel.recv(r);
6,309✔
88
    }
6,309✔
89
    sel
5,487✔
90
}
5,487✔
91

92
#[cfg(test)]
93
mod tests {
94
    use std::mem::swap;
95

96
    use crossbeam::channel::{unbounded, Sender};
97
    use dozer_types::{
98
        node::{NodeHandle, OpIdentifier, SourceStates},
99
        types::{Field, Record},
100
    };
101

102
    use super::*;
103

104
    struct TestReceiverLoop {
105
        receivers: Vec<Receiver<ExecutorOperation>>,
106
        ops: Vec<(usize, Operation)>,
107
        commits: Vec<Epoch>,
108
        snapshotting_done: Vec<()>,
109
        num_terminations: usize,
110
    }
111

112
    impl Name for TestReceiverLoop {
113
        fn name(&self) -> Cow<str> {
×
114
            Cow::Borrowed("TestReceiverLoop")
×
115
        }
×
116
    }
117

118
    impl ReceiverLoop for TestReceiverLoop {
119
        fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
5✔
120
            let mut result = vec![];
5✔
121
            swap(&mut self.receivers, &mut result);
5✔
122
            result
5✔
123
        }
5✔
124

125
        fn receiver_name(&self, index: usize) -> Cow<str> {
×
126
            Cow::Owned(format!("receiver_{index}"))
×
127
        }
×
128

129
        fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> {
1✔
130
            self.ops.push((index, op));
1✔
131
            Ok(())
1✔
132
        }
1✔
133

134
        fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
2✔
135
            self.commits.push(epoch.clone());
2✔
136
            Ok(())
2✔
137
        }
2✔
138

139
        fn on_terminate(&mut self) -> Result<(), ExecutionError> {
4✔
140
            self.num_terminations += 1;
4✔
141
            Ok(())
4✔
142
        }
4✔
143

144
        fn on_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
1✔
145
            self.snapshotting_done.push(());
1✔
146
            Ok(())
1✔
147
        }
1✔
148
    }
149

150
    impl TestReceiverLoop {
151
        fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
5✔
152
            let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
10✔
153
            (
5✔
154
                TestReceiverLoop {
5✔
155
                    receivers,
5✔
156
                    ops: vec![],
5✔
157
                    commits: vec![],
5✔
158
                    snapshotting_done: vec![],
5✔
159
                    num_terminations: 0,
5✔
160
                },
5✔
161
                senders,
5✔
162
            )
5✔
163
        }
5✔
164
    }
165

166
    #[test]
1✔
167
    fn receiver_loop_stops_on_terminate() {
1✔
168
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
169
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
170
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
171
        test_loop.receiver_loop().unwrap();
1✔
172
        assert_eq!(test_loop.num_terminations, 1);
1✔
173
    }
1✔
174

175
    #[test]
1✔
176
    fn receiver_loop_forwards_snapshotting_done() {
1✔
177
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
178
        senders[0]
1✔
179
            .send(ExecutorOperation::SnapshottingDone {})
1✔
180
            .unwrap();
1✔
181
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
182
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
183
        test_loop.receiver_loop().unwrap();
1✔
184
        assert_eq!(test_loop.snapshotting_done, vec![()])
1✔
185
    }
1✔
186

187
    #[test]
1✔
188
    fn receiver_loop_forwards_op() {
1✔
189
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
190
        let record = Record::new(None, vec![Field::Int(1)]);
1✔
191
        senders[0]
1✔
192
            .send(ExecutorOperation::Op {
1✔
193
                op: Operation::Insert {
1✔
194
                    new: record.clone(),
1✔
195
                },
1✔
196
            })
1✔
197
            .unwrap();
1✔
198
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
199
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
200
        test_loop.receiver_loop().unwrap();
1✔
201
        assert_eq!(test_loop.ops, vec![(0, Operation::Insert { new: record })]);
1✔
202
    }
1✔
203

204
    #[test]
1✔
205
    fn receiver_loop_merges_commit_epoch_and_increases_epoch_id() {
1✔
206
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
207
        let mut details = SourceStates::default();
1✔
208
        details.insert(
1✔
209
            NodeHandle::new(None, "0".to_string()),
1✔
210
            OpIdentifier::new(0, 0),
1✔
211
        );
1✔
212
        let mut epoch0 = Epoch::new(0, details);
1✔
213
        let mut details = SourceStates::default();
1✔
214
        details.insert(
1✔
215
            NodeHandle::new(None, "1".to_string()),
1✔
216
            OpIdentifier::new(0, 0),
1✔
217
        );
1✔
218
        let mut epoch1 = Epoch::new(0, details);
1✔
219
        senders[0]
1✔
220
            .send(ExecutorOperation::Commit {
1✔
221
                epoch: epoch0.clone(),
1✔
222
            })
1✔
223
            .unwrap();
1✔
224
        senders[1]
1✔
225
            .send(ExecutorOperation::Commit {
1✔
226
                epoch: epoch1.clone(),
1✔
227
            })
1✔
228
            .unwrap();
1✔
229
        epoch0.id = 1;
1✔
230
        epoch1.id = 1;
1✔
231
        senders[0]
1✔
232
            .send(ExecutorOperation::Commit {
1✔
233
                epoch: epoch0.clone(),
1✔
234
            })
1✔
235
            .unwrap();
1✔
236
        senders[1]
1✔
237
            .send(ExecutorOperation::Commit {
1✔
238
                epoch: epoch1.clone(),
1✔
239
            })
1✔
240
            .unwrap();
1✔
241
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
242
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
243
        test_loop.receiver_loop().unwrap();
1✔
244

1✔
245
        let mut details = SourceStates::new();
1✔
246
        details.extend(epoch0.details);
1✔
247
        details.extend(epoch1.details);
1✔
248
        assert_eq!(
1✔
249
            test_loop.commits,
1✔
250
            vec![Epoch::new(0, details.clone()), Epoch::new(1, details)]
1✔
251
        );
1✔
252
    }
1✔
253

254
    #[test]
1✔
255
    #[should_panic]
256
    fn receiver_loop_panics_on_inconsistent_commit_epoch() {
1✔
257
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
258
        let mut details = SourceStates::new();
1✔
259
        details.insert(
1✔
260
            NodeHandle::new(None, "0".to_string()),
1✔
261
            OpIdentifier::new(0, 0),
1✔
262
        );
1✔
263
        let epoch0 = Epoch::new(0, details);
1✔
264
        let mut details = SourceStates::new();
1✔
265
        details.insert(
1✔
266
            NodeHandle::new(None, "1".to_string()),
1✔
267
            OpIdentifier::new(0, 0),
1✔
268
        );
1✔
269
        let epoch1 = Epoch::new(1, details);
1✔
270
        senders[0]
1✔
271
            .send(ExecutorOperation::Commit { epoch: epoch0 })
1✔
272
            .unwrap();
1✔
273
        senders[1]
1✔
274
            .send(ExecutorOperation::Commit { epoch: epoch1 })
1✔
275
            .unwrap();
1✔
276
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
277
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
278
        test_loop.receiver_loop().unwrap();
1✔
279
    }
1✔
280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc