• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4125713559

pending completion
4125713559

Pull #831

github

GitHub
Merge 3efc092d8 into cdb46cf22
Pull Request #831: chore: Improve Join processor errors

134 of 134 new or added lines in 4 files covered. (100.0%)

23044 of 34837 relevant lines covered (66.15%)

35039.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.56
/dozer-core/src/executor/receiver_loop.rs
1
use std::borrow::Cow;
2

3
use crossbeam::channel::Receiver;
4
use dozer_types::log::debug;
5
use dozer_types::{internal_err, types::Operation};
6

7
use crate::{
8
    epoch::Epoch,
9
    errors::ExecutionError::{self, InternalError},
10
    executor_utils::init_select,
11
};
12

13
use super::{name::Name, ExecutorOperation, InputPortState};
14

15
#[derive(Debug, PartialEq)]
5✔
16
enum MappedExecutorOperation {
17
    Data { op: Operation },
18
    Commit { epoch: Epoch },
19
    Terminate,
20
}
21

22
fn map_executor_operation(op: ExecutorOperation) -> MappedExecutorOperation {
9,374,634✔
23
    match op {
9,374,634✔
24
        ExecutorOperation::Delete { old } => MappedExecutorOperation::Data {
36,683✔
25
            op: Operation::Delete { old },
36,683✔
26
        },
36,683✔
27
        ExecutorOperation::Insert { new } => MappedExecutorOperation::Data {
9,271,496✔
28
            op: Operation::Insert { new },
9,271,496✔
29
        },
9,271,496✔
30
        ExecutorOperation::Update { old, new } => MappedExecutorOperation::Data {
62,019✔
31
            op: Operation::Update { old, new },
62,019✔
32
        },
62,019✔
33
        ExecutorOperation::Commit { epoch } => MappedExecutorOperation::Commit { epoch },
3,849✔
34
        ExecutorOperation::Terminate => MappedExecutorOperation::Terminate,
587✔
35
    }
36
}
9,374,634✔
37

38
/// Common code for processor and sink nodes.
39
///
40
/// They both select from their input channels, and respond to "op", "commit", and terminate.
41
pub trait ReceiverLoop: Name {
42
    /// Returns input channels to this node. Will be called exactly once in [`receiver_loop`].
43
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>>;
44
    /// Returns the name of the receiver at `index`. Used for logging.
45
    fn receiver_name(&self, index: usize) -> Cow<str>;
46
    /// Responds to `op` from the receiver at `index`.
47
    fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError>;
48
    /// Responds to `commit` of `epoch`.
49
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>;
50
    /// Responds to `terminate`.
51
    fn on_terminate(&mut self) -> Result<(), ExecutionError>;
52

53
    /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
54
    fn receiver_loop(&mut self) -> Result<(), ExecutionError> {
224✔
55
        let receivers = self.receivers();
224✔
56
        debug_assert!(
57
            !receivers.is_empty(),
224✔
58
            "Processor or sink must have at least 1 incoming edge"
×
59
        );
60
        let mut port_states = vec![InputPortState::Open; receivers.len()];
224✔
61

224✔
62
        let mut commits_received: usize = 0;
224✔
63
        let mut common_epoch = Epoch::new(0, Default::default());
224✔
64

224✔
65
        let mut sel = init_select(&receivers);
224✔
66
        loop {
9,206,141✔
67
            let index = sel.ready();
9,206,141✔
68
            match internal_err!(receivers[index].recv().map(map_executor_operation))? {
9,206,141✔
69
                MappedExecutorOperation::Data { op } => {
9,202,528✔
70
                    self.on_op(index, op)?;
9,202,528✔
71
                }
72
                MappedExecutorOperation::Commit { epoch } => {
3,371✔
73
                    assert_eq!(epoch.id, common_epoch.id);
3,371✔
74
                    commits_received += 1;
3,370✔
75
                    sel.remove(index);
3,370✔
76
                    common_epoch.details.extend(epoch.details);
3,370✔
77

3,370✔
78
                    if commits_received == receivers.len() {
3,370✔
79
                        self.on_commit(&common_epoch)?;
2,920✔
80
                        common_epoch = Epoch::new(common_epoch.id + 1, Default::default());
2,921✔
81
                        commits_received = 0;
2,921✔
82
                        sel = init_select(&receivers);
2,921✔
83
                    }
450✔
84
                }
85
                MappedExecutorOperation::Terminate => {
86
                    port_states[index] = InputPortState::Terminated;
232✔
87
                    sel.remove(index);
232✔
88
                    debug!(
232✔
89
                        "[{}] Received Terminate request on port {}",
×
90
                        self.name(),
×
91
                        self.receiver_name(index)
×
92
                    );
93
                    if port_states.iter().all(|v| v == &InputPortState::Terminated) {
276✔
94
                        self.on_terminate()?;
205✔
95
                        debug!("[{}] Quit", self.name());
205✔
96
                        return Ok(());
205✔
97
                    }
27✔
98
                }
99
            }
100
        }
101
    }
221✔
102
}
103

104
#[cfg(test)]
105
mod tests {
106
    use std::mem::swap;
107

108
    use crossbeam::channel::{unbounded, Sender};
109
    use dozer_types::types::{Field, Record};
110

111
    use crate::{
112
        epoch::{OpIdentifier, SourceStates},
113
        node::NodeHandle,
114
    };
115

116
    use super::*;
117

118
    #[test]
1✔
119
    fn test_map_executor_operation() {
1✔
120
        let old = Record::new(None, vec![Field::Int(1)], None);
1✔
121
        let new = Record::new(None, vec![Field::Int(2)], None);
1✔
122
        let epoch = Epoch::new(1, Default::default());
1✔
123
        assert_eq!(
1✔
124
            map_executor_operation(ExecutorOperation::Insert { new: new.clone() }),
1✔
125
            MappedExecutorOperation::Data {
1✔
126
                op: Operation::Insert { new: new.clone() }
1✔
127
            }
1✔
128
        );
1✔
129
        assert_eq!(
1✔
130
            map_executor_operation(ExecutorOperation::Update {
1✔
131
                old: old.clone(),
1✔
132
                new: new.clone()
1✔
133
            }),
1✔
134
            MappedExecutorOperation::Data {
1✔
135
                op: Operation::Update {
1✔
136
                    old: old.clone(),
1✔
137
                    new
1✔
138
                }
1✔
139
            }
1✔
140
        );
1✔
141
        assert_eq!(
1✔
142
            map_executor_operation(ExecutorOperation::Delete { old: old.clone() }),
1✔
143
            MappedExecutorOperation::Data {
1✔
144
                op: Operation::Delete { old }
1✔
145
            }
1✔
146
        );
1✔
147
        assert_eq!(
1✔
148
            map_executor_operation(ExecutorOperation::Commit {
1✔
149
                epoch: epoch.clone()
1✔
150
            }),
1✔
151
            MappedExecutorOperation::Commit { epoch }
1✔
152
        );
1✔
153
        assert_eq!(
1✔
154
            map_executor_operation(ExecutorOperation::Terminate),
1✔
155
            MappedExecutorOperation::Terminate
1✔
156
        );
1✔
157
    }
1✔
158

159
    struct TestReceiverLoop {
160
        receivers: Vec<Receiver<ExecutorOperation>>,
161
        ops: Vec<(usize, Operation)>,
162
        commits: Vec<Epoch>,
163
        num_termations: usize,
164
    }
165

166
    impl Name for TestReceiverLoop {
167
        fn name(&self) -> Cow<str> {
×
168
            Cow::Borrowed("TestReceiverLoop")
×
169
        }
×
170
    }
171

172
    impl ReceiverLoop for TestReceiverLoop {
173
        fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
4✔
174
            let mut result = vec![];
4✔
175
            swap(&mut self.receivers, &mut result);
4✔
176
            result
4✔
177
        }
4✔
178

179
        fn receiver_name(&self, index: usize) -> Cow<str> {
×
180
            Cow::Owned(format!("receiver_{index}"))
×
181
        }
×
182

183
        fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> {
1✔
184
            self.ops.push((index, op));
1✔
185
            Ok(())
1✔
186
        }
1✔
187

188
        fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
2✔
189
            self.commits.push(epoch.clone());
2✔
190
            Ok(())
2✔
191
        }
2✔
192

193
        fn on_terminate(&mut self) -> Result<(), ExecutionError> {
3✔
194
            self.num_termations += 1;
3✔
195
            Ok(())
3✔
196
        }
3✔
197
    }
198

199
    impl TestReceiverLoop {
200
        fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
4✔
201
            let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
8✔
202
            (
4✔
203
                TestReceiverLoop {
4✔
204
                    receivers,
4✔
205
                    ops: vec![],
4✔
206
                    commits: vec![],
4✔
207
                    num_termations: 0,
4✔
208
                },
4✔
209
                senders,
4✔
210
            )
4✔
211
        }
4✔
212
    }
213

214
    #[test]
1✔
215
    fn receiver_loop_stops_on_terminate() {
1✔
216
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
217
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
218
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
219
        test_loop.receiver_loop().unwrap();
1✔
220
        assert_eq!(test_loop.num_termations, 1);
1✔
221
    }
1✔
222

223
    #[test]
1✔
224
    fn receiver_loop_forwards_op() {
1✔
225
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
226
        let record = Record::new(None, vec![Field::Int(1)], None);
1✔
227
        senders[0]
1✔
228
            .send(ExecutorOperation::Insert {
1✔
229
                new: record.clone(),
1✔
230
            })
1✔
231
            .unwrap();
1✔
232
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
233
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
234
        test_loop.receiver_loop().unwrap();
1✔
235
        assert_eq!(test_loop.ops, vec![(0, Operation::Insert { new: record })]);
1✔
236
    }
1✔
237

238
    #[test]
1✔
239
    fn receiver_loop_merges_commit_epoch_and_increases_epoch_id() {
1✔
240
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
241
        let mut details = SourceStates::default();
1✔
242
        details.insert(
1✔
243
            NodeHandle::new(None, "0".to_string()),
1✔
244
            OpIdentifier::new(0, 0),
1✔
245
        );
1✔
246
        let mut epoch0 = Epoch::new(0, details);
1✔
247
        let mut details = SourceStates::default();
1✔
248
        details.insert(
1✔
249
            NodeHandle::new(None, "1".to_string()),
1✔
250
            OpIdentifier::new(0, 0),
1✔
251
        );
1✔
252
        let mut epoch1 = Epoch::new(0, details);
1✔
253
        senders[0]
1✔
254
            .send(ExecutorOperation::Commit {
1✔
255
                epoch: epoch0.clone(),
1✔
256
            })
1✔
257
            .unwrap();
1✔
258
        senders[1]
1✔
259
            .send(ExecutorOperation::Commit {
1✔
260
                epoch: epoch1.clone(),
1✔
261
            })
1✔
262
            .unwrap();
1✔
263
        epoch0.id = 1;
1✔
264
        epoch1.id = 1;
1✔
265
        senders[0]
1✔
266
            .send(ExecutorOperation::Commit {
1✔
267
                epoch: epoch0.clone(),
1✔
268
            })
1✔
269
            .unwrap();
1✔
270
        senders[1]
1✔
271
            .send(ExecutorOperation::Commit {
1✔
272
                epoch: epoch1.clone(),
1✔
273
            })
1✔
274
            .unwrap();
1✔
275
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
276
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
277
        test_loop.receiver_loop().unwrap();
1✔
278

1✔
279
        let mut details = SourceStates::new();
1✔
280
        details.extend(epoch0.details);
1✔
281
        details.extend(epoch1.details);
1✔
282
        assert_eq!(
1✔
283
            test_loop.commits,
1✔
284
            vec![Epoch::new(0, details.clone()), Epoch::new(1, details)]
1✔
285
        );
1✔
286
    }
1✔
287

288
    #[test]
1✔
289
    #[should_panic]
290
    fn receiver_loop_panics_on_inconsistent_commit_epoch() {
1✔
291
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
292
        let mut details = SourceStates::new();
1✔
293
        details.insert(
1✔
294
            NodeHandle::new(None, "0".to_string()),
1✔
295
            OpIdentifier::new(0, 0),
1✔
296
        );
1✔
297
        let epoch0 = Epoch::new(0, details);
1✔
298
        let mut details = SourceStates::new();
1✔
299
        details.insert(
1✔
300
            NodeHandle::new(None, "1".to_string()),
1✔
301
            OpIdentifier::new(0, 0),
1✔
302
        );
1✔
303
        let epoch1 = Epoch::new(1, details);
1✔
304
        senders[0]
1✔
305
            .send(ExecutorOperation::Commit { epoch: epoch0 })
1✔
306
            .unwrap();
1✔
307
        senders[1]
1✔
308
            .send(ExecutorOperation::Commit { epoch: epoch1 })
1✔
309
            .unwrap();
1✔
310
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
311
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
312
        test_loop.receiver_loop().unwrap();
1✔
313
    }
1✔
314
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc