• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4280059073

pending completion
4280059073

push

github

GitHub
Bump version (#1069)

27464 of 37850 relevant lines covered (72.56%)

52364.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.74
/dozer-core/src/executor/receiver_loop.rs
1
use std::borrow::Cow;
2

3
use crossbeam::channel::{Receiver, Select};
4
use dozer_types::log::debug;
5
use dozer_types::types::Operation;
6

7
use crate::{epoch::Epoch, errors::ExecutionError};
8

9
use super::{name::Name, ExecutorOperation, InputPortState};
10

11
#[derive(Debug, PartialEq)]
5✔
12
enum MappedExecutorOperation {
13
    Data { op: Operation },
14
    Commit { epoch: Epoch },
15
    Terminate,
16
}
17

18
fn map_executor_operation(op: ExecutorOperation) -> MappedExecutorOperation {
10,612,116✔
19
    match op {
10,612,116✔
20
        ExecutorOperation::Delete { old } => MappedExecutorOperation::Data {
79,672✔
21
            op: Operation::Delete { old },
79,672✔
22
        },
79,672✔
23
        ExecutorOperation::Insert { new } => MappedExecutorOperation::Data {
10,402,997✔
24
            op: Operation::Insert { new },
10,402,997✔
25
        },
10,402,997✔
26
        ExecutorOperation::Update { old, new } => MappedExecutorOperation::Data {
121,273✔
27
            op: Operation::Update { old, new },
121,273✔
28
        },
121,273✔
29
        ExecutorOperation::Commit { epoch } => MappedExecutorOperation::Commit { epoch },
6,706✔
30
        ExecutorOperation::Terminate => MappedExecutorOperation::Terminate,
1,468✔
31
    }
32
}
10,612,116✔
33

34
/// Common code for processor and sink nodes.
35
///
36
/// They both select from their input channels, and respond to "op", "commit", and terminate.
37
pub trait ReceiverLoop: Name {
38
    /// Returns input channels to this node. Will be called exactly once in [`receiver_loop`].
39
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>>;
40
    /// Returns the name of the receiver at `index`. Used for logging.
41
    fn receiver_name(&self, index: usize) -> Cow<str>;
42
    /// Responds to `op` from the receiver at `index`.
43
    fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError>;
44
    /// Responds to `commit` of `epoch`.
45
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>;
46
    /// Responds to `terminate`.
47
    fn on_terminate(&mut self) -> Result<(), ExecutionError>;
48

49
    /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
50
    fn receiver_loop(&mut self) -> Result<(), ExecutionError> {
1,357✔
51
        let receivers = self.receivers();
1,357✔
52
        debug_assert!(
53
            !receivers.is_empty(),
1,357✔
54
            "Processor or sink must have at least 1 incoming edge"
×
55
        );
56
        let mut port_states = vec![InputPortState::Open; receivers.len()];
1,357✔
57

1,357✔
58
        let mut commits_received: usize = 0;
1,357✔
59
        let mut common_epoch = Epoch::new(0, Default::default());
1,357✔
60

1,357✔
61
        let mut sel = init_select(&receivers);
1,357✔
62
        loop {
10,633,121✔
63
            let index = sel.ready();
10,633,121✔
64
            let op = match receivers[index].recv() {
10,633,121✔
65
                Ok(op) => map_executor_operation(op),
10,633,117✔
66
                // Channel disconnected before receiving a terminate. The upstream node had an error.
67
                Err(_) => return Err(ExecutionError::CannotReceiveFromChannel),
4✔
68
            };
69

70
            match op {
10,633,117✔
71
                MappedExecutorOperation::Data { op } => {
10,624,945✔
72
                    self.on_op(index, op)?;
10,624,945✔
73
                }
74
                MappedExecutorOperation::Commit { epoch } => {
6,705✔
75
                    assert_eq!(epoch.id, common_epoch.id);
6,705✔
76
                    commits_received += 1;
6,704✔
77
                    sel.remove(index);
6,704✔
78
                    common_epoch.details.extend(epoch.details);
6,704✔
79

6,704✔
80
                    if commits_received == receivers.len() {
6,704✔
81
                        self.on_commit(&common_epoch)?;
5,655✔
82
                        common_epoch = Epoch::new(common_epoch.id + 1, Default::default());
5,655✔
83
                        commits_received = 0;
5,655✔
84
                        sel = init_select(&receivers);
5,655✔
85
                    }
1,049✔
86
                }
87
                MappedExecutorOperation::Terminate => {
88
                    port_states[index] = InputPortState::Terminated;
1,467✔
89
                    sel.remove(index);
1,467✔
90
                    debug!(
1,467✔
91
                        "[{}] Received Terminate request on port {}",
×
92
                        self.name(),
×
93
                        self.receiver_name(index)
×
94
                    );
95
                    if port_states.iter().all(|v| v == &InputPortState::Terminated) {
1,656✔
96
                        self.on_terminate()?;
1,344✔
97
                        debug!("[{}] Quit", self.name());
1,344✔
98
                        return Ok(());
1,344✔
99
                    }
123✔
100
                }
101
            }
102
        }
103
    }
1,354✔
104
}
105

106
fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Select {
7,012✔
107
    let mut sel = Select::new();
7,012✔
108
    for r in receivers {
15,197✔
109
        sel.recv(r);
8,185✔
110
    }
8,185✔
111
    sel
7,012✔
112
}
7,012✔
113

114
#[cfg(test)]
115
mod tests {
116
    use std::mem::swap;
117

118
    use crossbeam::channel::{unbounded, Sender};
119
    use dozer_types::{
120
        node::{NodeHandle, OpIdentifier, SourceStates},
121
        types::{Field, Record},
122
    };
123

124
    use super::*;
125

126
    #[test]
1✔
127
    fn test_map_executor_operation() {
1✔
128
        let old = Record::new(None, vec![Field::Int(1)], None);
1✔
129
        let new = Record::new(None, vec![Field::Int(2)], None);
1✔
130
        let epoch = Epoch::new(1, Default::default());
1✔
131
        assert_eq!(
1✔
132
            map_executor_operation(ExecutorOperation::Insert { new: new.clone() }),
1✔
133
            MappedExecutorOperation::Data {
1✔
134
                op: Operation::Insert { new: new.clone() }
1✔
135
            }
1✔
136
        );
1✔
137
        assert_eq!(
1✔
138
            map_executor_operation(ExecutorOperation::Update {
1✔
139
                old: old.clone(),
1✔
140
                new: new.clone()
1✔
141
            }),
1✔
142
            MappedExecutorOperation::Data {
1✔
143
                op: Operation::Update {
1✔
144
                    old: old.clone(),
1✔
145
                    new
1✔
146
                }
1✔
147
            }
1✔
148
        );
1✔
149
        assert_eq!(
1✔
150
            map_executor_operation(ExecutorOperation::Delete { old: old.clone() }),
1✔
151
            MappedExecutorOperation::Data {
1✔
152
                op: Operation::Delete { old }
1✔
153
            }
1✔
154
        );
1✔
155
        assert_eq!(
1✔
156
            map_executor_operation(ExecutorOperation::Commit {
1✔
157
                epoch: epoch.clone()
1✔
158
            }),
1✔
159
            MappedExecutorOperation::Commit { epoch }
1✔
160
        );
1✔
161
        assert_eq!(
1✔
162
            map_executor_operation(ExecutorOperation::Terminate),
1✔
163
            MappedExecutorOperation::Terminate
1✔
164
        );
1✔
165
    }
1✔
166

167
    struct TestReceiverLoop {
168
        receivers: Vec<Receiver<ExecutorOperation>>,
169
        ops: Vec<(usize, Operation)>,
170
        commits: Vec<Epoch>,
171
        num_termations: usize,
172
    }
173

174
    impl Name for TestReceiverLoop {
175
        fn name(&self) -> Cow<str> {
×
176
            Cow::Borrowed("TestReceiverLoop")
×
177
        }
×
178
    }
179

180
    impl ReceiverLoop for TestReceiverLoop {
181
        fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
4✔
182
            let mut result = vec![];
4✔
183
            swap(&mut self.receivers, &mut result);
4✔
184
            result
4✔
185
        }
4✔
186

187
        fn receiver_name(&self, index: usize) -> Cow<str> {
×
188
            Cow::Owned(format!("receiver_{index}"))
×
189
        }
×
190

191
        fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> {
1✔
192
            self.ops.push((index, op));
1✔
193
            Ok(())
1✔
194
        }
1✔
195

196
        fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
2✔
197
            self.commits.push(epoch.clone());
2✔
198
            Ok(())
2✔
199
        }
2✔
200

201
        fn on_terminate(&mut self) -> Result<(), ExecutionError> {
3✔
202
            self.num_termations += 1;
3✔
203
            Ok(())
3✔
204
        }
3✔
205
    }
206

207
    impl TestReceiverLoop {
208
        fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
4✔
209
            let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
8✔
210
            (
4✔
211
                TestReceiverLoop {
4✔
212
                    receivers,
4✔
213
                    ops: vec![],
4✔
214
                    commits: vec![],
4✔
215
                    num_termations: 0,
4✔
216
                },
4✔
217
                senders,
4✔
218
            )
4✔
219
        }
4✔
220
    }
221

222
    #[test]
1✔
223
    fn receiver_loop_stops_on_terminate() {
1✔
224
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
225
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
226
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
227
        test_loop.receiver_loop().unwrap();
1✔
228
        assert_eq!(test_loop.num_termations, 1);
1✔
229
    }
1✔
230

231
    #[test]
1✔
232
    fn receiver_loop_forwards_op() {
1✔
233
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
234
        let record = Record::new(None, vec![Field::Int(1)], None);
1✔
235
        senders[0]
1✔
236
            .send(ExecutorOperation::Insert {
1✔
237
                new: record.clone(),
1✔
238
            })
1✔
239
            .unwrap();
1✔
240
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
241
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
242
        test_loop.receiver_loop().unwrap();
1✔
243
        assert_eq!(test_loop.ops, vec![(0, Operation::Insert { new: record })]);
1✔
244
    }
1✔
245

246
    #[test]
1✔
247
    fn receiver_loop_merges_commit_epoch_and_increases_epoch_id() {
1✔
248
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
249
        let mut details = SourceStates::default();
1✔
250
        details.insert(
1✔
251
            NodeHandle::new(None, "0".to_string()),
1✔
252
            OpIdentifier::new(0, 0),
1✔
253
        );
1✔
254
        let mut epoch0 = Epoch::new(0, details);
1✔
255
        let mut details = SourceStates::default();
1✔
256
        details.insert(
1✔
257
            NodeHandle::new(None, "1".to_string()),
1✔
258
            OpIdentifier::new(0, 0),
1✔
259
        );
1✔
260
        let mut epoch1 = Epoch::new(0, details);
1✔
261
        senders[0]
1✔
262
            .send(ExecutorOperation::Commit {
1✔
263
                epoch: epoch0.clone(),
1✔
264
            })
1✔
265
            .unwrap();
1✔
266
        senders[1]
1✔
267
            .send(ExecutorOperation::Commit {
1✔
268
                epoch: epoch1.clone(),
1✔
269
            })
1✔
270
            .unwrap();
1✔
271
        epoch0.id = 1;
1✔
272
        epoch1.id = 1;
1✔
273
        senders[0]
1✔
274
            .send(ExecutorOperation::Commit {
1✔
275
                epoch: epoch0.clone(),
1✔
276
            })
1✔
277
            .unwrap();
1✔
278
        senders[1]
1✔
279
            .send(ExecutorOperation::Commit {
1✔
280
                epoch: epoch1.clone(),
1✔
281
            })
1✔
282
            .unwrap();
1✔
283
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
284
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
285
        test_loop.receiver_loop().unwrap();
1✔
286

1✔
287
        let mut details = SourceStates::new();
1✔
288
        details.extend(epoch0.details);
1✔
289
        details.extend(epoch1.details);
1✔
290
        assert_eq!(
1✔
291
            test_loop.commits,
1✔
292
            vec![Epoch::new(0, details.clone()), Epoch::new(1, details)]
1✔
293
        );
1✔
294
    }
1✔
295

296
    #[test]
1✔
297
    #[should_panic]
298
    fn receiver_loop_panics_on_inconsistent_commit_epoch() {
1✔
299
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
300
        let mut details = SourceStates::new();
1✔
301
        details.insert(
1✔
302
            NodeHandle::new(None, "0".to_string()),
1✔
303
            OpIdentifier::new(0, 0),
1✔
304
        );
1✔
305
        let epoch0 = Epoch::new(0, details);
1✔
306
        let mut details = SourceStates::new();
1✔
307
        details.insert(
1✔
308
            NodeHandle::new(None, "1".to_string()),
1✔
309
            OpIdentifier::new(0, 0),
1✔
310
        );
1✔
311
        let epoch1 = Epoch::new(1, details);
1✔
312
        senders[0]
1✔
313
            .send(ExecutorOperation::Commit { epoch: epoch0 })
1✔
314
            .unwrap();
1✔
315
        senders[1]
1✔
316
            .send(ExecutorOperation::Commit { epoch: epoch1 })
1✔
317
            .unwrap();
1✔
318
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
319
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
320
        test_loop.receiver_loop().unwrap();
1✔
321
    }
1✔
322
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc