• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020902227

pending completion
4020902227

Pull #743

github

GitHub
Merge 57279c6b6 into a12da35a5
Pull Request #743: Chore clippy fix

165 of 165 new or added lines in 60 files covered. (100.0%)

23638 of 35485 relevant lines covered (66.61%)

38417.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.73
/dozer-core/src/dag/executor/receiver_loop.rs
1
use std::{borrow::Cow, collections::HashMap};
2

3
use crossbeam::channel::Receiver;
4
use dozer_types::log::debug;
5
use dozer_types::{internal_err, types::Operation};
6

7
use crate::dag::{
8
    epoch::Epoch,
9
    errors::ExecutionError::{self, InternalError},
10
    executor_utils::init_select,
11
};
12

13
use super::{name::Name, ExecutorOperation, InputPortState};
14

15
#[derive(Debug, PartialEq)]
5✔
16
enum MappedExecutorOperation {
17
    Data { op: Operation },
18
    Commit { epoch: Epoch },
19
    Terminate,
20
}
21

22
fn map_executor_operation(op: ExecutorOperation) -> MappedExecutorOperation {
9,297,440✔
23
    match op {
9,297,440✔
24
        ExecutorOperation::Delete { old } => MappedExecutorOperation::Data {
58,383✔
25
            op: Operation::Delete { old },
58,383✔
26
        },
58,383✔
27
        ExecutorOperation::Insert { new } => MappedExecutorOperation::Data {
9,125,649✔
28
            op: Operation::Insert { new },
9,125,649✔
29
        },
9,125,649✔
30
        ExecutorOperation::Update { old, new } => MappedExecutorOperation::Data {
100,825✔
31
            op: Operation::Update { old, new },
100,825✔
32
        },
100,825✔
33
        ExecutorOperation::Commit { epoch } => MappedExecutorOperation::Commit { epoch },
11,955✔
34
        ExecutorOperation::Terminate => MappedExecutorOperation::Terminate,
628✔
35
    }
36
}
9,297,440✔
37

38
/// Common code for processor and sink nodes.
39
///
40
/// They both select from their input channels, and respond to "op", "commit", and terminate.
41
pub trait ReceiverLoop: Name {
42
    /// Returns input channels to this node. Will be called exactly once in [`receiver_loop`].
43
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>>;
44
    /// Returns the name of the receiver at `index`. Used for logging.
45
    fn receiver_name(&self, index: usize) -> Cow<str>;
46
    /// Responds to `op` from the receiver at `index`.
47
    fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError>;
48
    /// Responds to `commit` of `epoch`.
49
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>;
50
    /// Responds to `terminate`.
51
    fn on_terminate(&mut self) -> Result<(), ExecutionError>;
52

53
    /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
54
    fn receiver_loop(&mut self) -> Result<(), ExecutionError> {
199✔
55
        let receivers = self.receivers();
199✔
56
        let mut port_states = vec![InputPortState::Open; receivers.len()];
199✔
57

199✔
58
        let mut commits_received: usize = 0;
199✔
59
        let mut common_epoch = Epoch::new(0, HashMap::new());
199✔
60

199✔
61
        let mut sel = init_select(&receivers);
199✔
62
        loop {
9,042,137✔
63
            let index = sel.ready();
9,042,137✔
64
            match internal_err!(receivers[index].recv().map(map_executor_operation))? {
9,042,137✔
65
                MappedExecutorOperation::Data { op } => {
9,031,114✔
66
                    self.on_op(index, op)?;
9,031,114✔
67
                }
68
                MappedExecutorOperation::Commit { epoch } => {
10,815✔
69
                    assert_eq!(epoch.id, common_epoch.id);
10,815✔
70
                    commits_received += 1;
10,814✔
71
                    sel.remove(index);
10,814✔
72
                    common_epoch.details.extend(epoch.details);
10,814✔
73

10,814✔
74
                    if commits_received == receivers.len() {
10,814✔
75
                        self.on_commit(&common_epoch)?;
9,726✔
76
                        common_epoch = Epoch::new(common_epoch.id + 1, HashMap::new());
9,724✔
77
                        commits_received = 0;
9,724✔
78
                        sel = init_select(&receivers);
9,724✔
79
                    }
1,088✔
80
                }
81
                MappedExecutorOperation::Terminate => {
82
                    port_states[index] = InputPortState::Terminated;
198✔
83
                    sel.remove(index);
198✔
84
                    debug!(
198✔
85
                        "[{}] Received Terminate request on port {}",
×
86
                        self.name(),
×
87
                        self.receiver_name(index)
×
88
                    );
89
                    if port_states.iter().all(|v| v == &InputPortState::Terminated) {
223✔
90
                        self.on_terminate()?;
180✔
91
                        debug!("[{}] Quit", self.name());
180✔
92
                        return Ok(());
180✔
93
                    }
18✔
94
                }
95
            }
96
        }
97
    }
196✔
98
}
99

100
#[cfg(test)]
101
mod tests {
102
    use std::mem::swap;
103

104
    use crossbeam::channel::{unbounded, Sender};
105
    use dozer_types::types::{Field, Record};
106

107
    use crate::dag::node::NodeHandle;
108

109
    use super::*;
110

111
    #[test]
1✔
112
    fn test_map_executor_operation() {
1✔
113
        let old = Record::new(None, vec![Field::Int(1)], None);
1✔
114
        let new = Record::new(None, vec![Field::Int(2)], None);
1✔
115
        let epoch = Epoch::new(1, HashMap::new());
1✔
116
        assert_eq!(
1✔
117
            map_executor_operation(ExecutorOperation::Insert { new: new.clone() }),
1✔
118
            MappedExecutorOperation::Data {
1✔
119
                op: Operation::Insert { new: new.clone() }
1✔
120
            }
1✔
121
        );
1✔
122
        assert_eq!(
1✔
123
            map_executor_operation(ExecutorOperation::Update {
1✔
124
                old: old.clone(),
1✔
125
                new: new.clone()
1✔
126
            }),
1✔
127
            MappedExecutorOperation::Data {
1✔
128
                op: Operation::Update {
1✔
129
                    old: old.clone(),
1✔
130
                    new
1✔
131
                }
1✔
132
            }
1✔
133
        );
1✔
134
        assert_eq!(
1✔
135
            map_executor_operation(ExecutorOperation::Delete { old: old.clone() }),
1✔
136
            MappedExecutorOperation::Data {
1✔
137
                op: Operation::Delete { old }
1✔
138
            }
1✔
139
        );
1✔
140
        assert_eq!(
1✔
141
            map_executor_operation(ExecutorOperation::Commit {
1✔
142
                epoch: epoch.clone()
1✔
143
            }),
1✔
144
            MappedExecutorOperation::Commit { epoch }
1✔
145
        );
1✔
146
        assert_eq!(
1✔
147
            map_executor_operation(ExecutorOperation::Terminate),
1✔
148
            MappedExecutorOperation::Terminate
1✔
149
        );
1✔
150
    }
1✔
151

152
    struct TestReceiverLoop {
153
        receivers: Vec<Receiver<ExecutorOperation>>,
154
        ops: Vec<(usize, Operation)>,
155
        commits: Vec<Epoch>,
156
        num_termations: usize,
157
    }
158

159
    impl Name for TestReceiverLoop {
160
        fn name(&self) -> Cow<str> {
×
161
            Cow::Borrowed("TestReceiverLoop")
×
162
        }
×
163
    }
164

165
    impl ReceiverLoop for TestReceiverLoop {
166
        fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
4✔
167
            let mut result = vec![];
4✔
168
            swap(&mut self.receivers, &mut result);
4✔
169
            result
4✔
170
        }
4✔
171

172
        fn receiver_name(&self, index: usize) -> Cow<str> {
×
173
            Cow::Owned(format!("receiver_{index}"))
×
174
        }
×
175

176
        fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> {
1✔
177
            self.ops.push((index, op));
1✔
178
            Ok(())
1✔
179
        }
1✔
180

181
        fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
2✔
182
            self.commits.push(epoch.clone());
2✔
183
            Ok(())
2✔
184
        }
2✔
185

186
        fn on_terminate(&mut self) -> Result<(), ExecutionError> {
3✔
187
            self.num_termations += 1;
3✔
188
            Ok(())
3✔
189
        }
3✔
190
    }
191

192
    impl TestReceiverLoop {
193
        fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
4✔
194
            let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
8✔
195
            (
4✔
196
                TestReceiverLoop {
4✔
197
                    receivers,
4✔
198
                    ops: vec![],
4✔
199
                    commits: vec![],
4✔
200
                    num_termations: 0,
4✔
201
                },
4✔
202
                senders,
4✔
203
            )
4✔
204
        }
4✔
205
    }
206

207
    #[test]
1✔
208
    fn receiver_loop_stops_on_terminate() {
1✔
209
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
210
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
211
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
212
        test_loop.receiver_loop().unwrap();
1✔
213
        assert_eq!(test_loop.num_termations, 1);
1✔
214
    }
1✔
215

216
    #[test]
1✔
217
    fn receiver_loop_forwards_op() {
1✔
218
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
219
        let record = Record::new(None, vec![Field::Int(1)], None);
1✔
220
        senders[0]
1✔
221
            .send(ExecutorOperation::Insert {
1✔
222
                new: record.clone(),
1✔
223
            })
1✔
224
            .unwrap();
1✔
225
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
226
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
227
        test_loop.receiver_loop().unwrap();
1✔
228
        assert_eq!(test_loop.ops, vec![(0, Operation::Insert { new: record })]);
1✔
229
    }
1✔
230

231
    #[test]
1✔
232
    fn receiver_loop_merges_commit_epoch_and_increases_epoch_id() {
1✔
233
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
234
        let mut details = HashMap::new();
1✔
235
        details.insert(NodeHandle::new(None, "0".to_string()), (0, 0));
1✔
236
        let mut epoch0 = Epoch::new(0, details);
1✔
237
        let mut details = HashMap::new();
1✔
238
        details.insert(NodeHandle::new(None, "1".to_string()), (0, 0));
1✔
239
        let mut epoch1 = Epoch::new(0, details);
1✔
240
        senders[0]
1✔
241
            .send(ExecutorOperation::Commit {
1✔
242
                epoch: epoch0.clone(),
1✔
243
            })
1✔
244
            .unwrap();
1✔
245
        senders[1]
1✔
246
            .send(ExecutorOperation::Commit {
1✔
247
                epoch: epoch1.clone(),
1✔
248
            })
1✔
249
            .unwrap();
1✔
250
        epoch0.id = 1;
1✔
251
        epoch1.id = 1;
1✔
252
        senders[0]
1✔
253
            .send(ExecutorOperation::Commit {
1✔
254
                epoch: epoch0.clone(),
1✔
255
            })
1✔
256
            .unwrap();
1✔
257
        senders[1]
1✔
258
            .send(ExecutorOperation::Commit {
1✔
259
                epoch: epoch1.clone(),
1✔
260
            })
1✔
261
            .unwrap();
1✔
262
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
263
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
264
        test_loop.receiver_loop().unwrap();
1✔
265

1✔
266
        let mut details = HashMap::new();
1✔
267
        details.extend(epoch0.details);
1✔
268
        details.extend(epoch1.details);
1✔
269
        assert_eq!(
1✔
270
            test_loop.commits,
1✔
271
            vec![Epoch::new(0, details.clone()), Epoch::new(1, details)]
1✔
272
        );
1✔
273
    }
1✔
274

275
    #[test]
1✔
276
    #[should_panic]
277
    fn receiver_loop_panics_on_inconsistent_commit_epoch() {
1✔
278
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
279
        let mut details = HashMap::new();
1✔
280
        details.insert(NodeHandle::new(None, "0".to_string()), (0, 0));
1✔
281
        let epoch0 = Epoch::new(0, details);
1✔
282
        let mut details = HashMap::new();
1✔
283
        details.insert(NodeHandle::new(None, "1".to_string()), (0, 0));
1✔
284
        let epoch1 = Epoch::new(1, details);
1✔
285
        senders[0]
1✔
286
            .send(ExecutorOperation::Commit { epoch: epoch0 })
1✔
287
            .unwrap();
1✔
288
        senders[1]
1✔
289
            .send(ExecutorOperation::Commit { epoch: epoch1 })
1✔
290
            .unwrap();
1✔
291
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
292
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
293
        test_loop.receiver_loop().unwrap();
1✔
294
    }
1✔
295
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc