• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4164814422

pending completion
4164814422

Pull #883

github

GitHub
Merge eae178c73 into 7cb6e2163
Pull Request #883: refactor: Remove unused struct `PipelineDetails`

211 of 211 new or added lines in 12 files covered. (100.0%)

23813 of 35480 relevant lines covered (67.12%)

41873.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.16
/dozer-core/src/executor/receiver_loop.rs
1
use std::borrow::Cow;
2

3
use crossbeam::channel::{Receiver, Select};
4
use dozer_types::log::debug;
5
use dozer_types::{internal_err, types::Operation};
6

7
use crate::{
8
    epoch::Epoch,
9
    errors::ExecutionError::{self, InternalError},
10
};
11

12
use super::{name::Name, ExecutorOperation, InputPortState};
13

14
#[derive(Debug, PartialEq)]
5✔
15
enum MappedExecutorOperation {
16
    Data { op: Operation },
17
    Commit { epoch: Epoch },
18
    Terminate,
19
}
20

21
fn map_executor_operation(op: ExecutorOperation) -> MappedExecutorOperation {
9,963,319✔
22
    match op {
9,963,319✔
23
        ExecutorOperation::Delete { old } => MappedExecutorOperation::Data {
49,633✔
24
            op: Operation::Delete { old },
49,633✔
25
        },
49,633✔
26
        ExecutorOperation::Insert { new } => MappedExecutorOperation::Data {
9,819,729✔
27
            op: Operation::Insert { new },
9,819,729✔
28
        },
9,819,729✔
29
        ExecutorOperation::Update { old, new } => MappedExecutorOperation::Data {
89,558✔
30
            op: Operation::Update { old, new },
89,558✔
31
        },
89,558✔
32
        ExecutorOperation::Commit { epoch } => MappedExecutorOperation::Commit { epoch },
3,810✔
33
        ExecutorOperation::Terminate => MappedExecutorOperation::Terminate,
589✔
34
    }
35
}
9,963,319✔
36

37
/// Common code for processor and sink nodes.
38
///
39
/// They both select from their input channels, and respond to "op", "commit", and terminate.
40
pub trait ReceiverLoop: Name {
41
    /// Returns input channels to this node. Will be called exactly once in [`receiver_loop`].
42
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>>;
43
    /// Returns the name of the receiver at `index`. Used for logging.
44
    fn receiver_name(&self, index: usize) -> Cow<str>;
45
    /// Responds to `op` from the receiver at `index`.
46
    fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError>;
47
    /// Responds to `commit` of `epoch`.
48
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>;
49
    /// Responds to `terminate`.
50
    fn on_terminate(&mut self) -> Result<(), ExecutionError>;
51

52
    /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
53
    fn receiver_loop(&mut self) -> Result<(), ExecutionError> {
550✔
54
        let receivers = self.receivers();
550✔
55
        debug_assert!(
56
            !receivers.is_empty(),
550✔
57
            "Processor or sink must have at least 1 incoming edge"
×
58
        );
59
        let mut port_states = vec![InputPortState::Open; receivers.len()];
550✔
60

550✔
61
        let mut commits_received: usize = 0;
550✔
62
        let mut common_epoch = Epoch::new(0, Default::default());
550✔
63

550✔
64
        let mut sel = init_select(&receivers);
550✔
65
        loop {
10,014,460✔
66
            let index = sel.ready();
10,014,460✔
67
            match internal_err!(receivers[index].recv().map(map_executor_operation))? {
10,014,460✔
68
                MappedExecutorOperation::Data { op } => {
10,010,059✔
69
                    self.on_op(index, op)?;
10,010,059✔
70
                }
71
                MappedExecutorOperation::Commit { epoch } => {
3,809✔
72
                    assert_eq!(epoch.id, common_epoch.id);
3,809✔
73
                    commits_received += 1;
3,808✔
74
                    sel.remove(index);
3,808✔
75
                    common_epoch.details.extend(epoch.details);
3,808✔
76

3,808✔
77
                    if commits_received == receivers.len() {
3,808✔
78
                        self.on_commit(&common_epoch)?;
3,234✔
79
                        common_epoch = Epoch::new(common_epoch.id + 1, Default::default());
3,234✔
80
                        commits_received = 0;
3,234✔
81
                        sel = init_select(&receivers);
3,234✔
82
                    }
574✔
83
                }
84
                MappedExecutorOperation::Terminate => {
85
                    port_states[index] = InputPortState::Terminated;
588✔
86
                    sel.remove(index);
588✔
87
                    debug!(
588✔
88
                        "[{}] Received Terminate request on port {}",
×
89
                        self.name(),
×
90
                        self.receiver_name(index)
×
91
                    );
92
                    if port_states.iter().all(|v| v == &InputPortState::Terminated) {
673✔
93
                        self.on_terminate()?;
537✔
94
                        debug!("[{}] Quit", self.name());
537✔
95
                        return Ok(());
537✔
96
                    }
51✔
97
                }
98
            }
99
        }
100
    }
547✔
101
}
102

103
fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Select {
3,784✔
104
    let mut sel = Select::new();
3,784✔
105
    for r in receivers {
8,194✔
106
        sel.recv(r);
4,410✔
107
    }
4,410✔
108
    sel
3,784✔
109
}
3,784✔
110

111
#[cfg(test)]
112
mod tests {
113
    use std::mem::swap;
114

115
    use crossbeam::channel::{unbounded, Sender};
116
    use dozer_types::{
117
        node::{NodeHandle, OpIdentifier, SourceStates},
118
        types::{Field, Record},
119
    };
120

121
    use super::*;
122

123
    #[test]
1✔
124
    fn test_map_executor_operation() {
1✔
125
        let old = Record::new(None, vec![Field::Int(1)], None);
1✔
126
        let new = Record::new(None, vec![Field::Int(2)], None);
1✔
127
        let epoch = Epoch::new(1, Default::default());
1✔
128
        assert_eq!(
1✔
129
            map_executor_operation(ExecutorOperation::Insert { new: new.clone() }),
1✔
130
            MappedExecutorOperation::Data {
1✔
131
                op: Operation::Insert { new: new.clone() }
1✔
132
            }
1✔
133
        );
1✔
134
        assert_eq!(
1✔
135
            map_executor_operation(ExecutorOperation::Update {
1✔
136
                old: old.clone(),
1✔
137
                new: new.clone()
1✔
138
            }),
1✔
139
            MappedExecutorOperation::Data {
1✔
140
                op: Operation::Update {
1✔
141
                    old: old.clone(),
1✔
142
                    new
1✔
143
                }
1✔
144
            }
1✔
145
        );
1✔
146
        assert_eq!(
1✔
147
            map_executor_operation(ExecutorOperation::Delete { old: old.clone() }),
1✔
148
            MappedExecutorOperation::Data {
1✔
149
                op: Operation::Delete { old }
1✔
150
            }
1✔
151
        );
1✔
152
        assert_eq!(
1✔
153
            map_executor_operation(ExecutorOperation::Commit {
1✔
154
                epoch: epoch.clone()
1✔
155
            }),
1✔
156
            MappedExecutorOperation::Commit { epoch }
1✔
157
        );
1✔
158
        assert_eq!(
1✔
159
            map_executor_operation(ExecutorOperation::Terminate),
1✔
160
            MappedExecutorOperation::Terminate
1✔
161
        );
1✔
162
    }
1✔
163

×
164
    struct TestReceiverLoop {
×
165
        receivers: Vec<Receiver<ExecutorOperation>>,
166
        ops: Vec<(usize, Operation)>,
167
        commits: Vec<Epoch>,
168
        num_termations: usize,
169
    }
170

171
    impl Name for TestReceiverLoop {
172
        fn name(&self) -> Cow<str> {
×
173
            Cow::Borrowed("TestReceiverLoop")
×
174
        }
×
175
    }
×
176

×
177
    impl ReceiverLoop for TestReceiverLoop {
178
        fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
4✔
179
            let mut result = vec![];
4✔
180
            swap(&mut self.receivers, &mut result);
4✔
181
            result
4✔
182
        }
4✔
183

×
184
        fn receiver_name(&self, index: usize) -> Cow<str> {
×
185
            Cow::Owned(format!("receiver_{index}"))
×
186
        }
×
187

×
188
        fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> {
1✔
189
            self.ops.push((index, op));
1✔
190
            Ok(())
1✔
191
        }
1✔
192

×
193
        fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
2✔
194
            self.commits.push(epoch.clone());
2✔
195
            Ok(())
2✔
196
        }
2✔
197

×
198
        fn on_terminate(&mut self) -> Result<(), ExecutionError> {
3✔
199
            self.num_termations += 1;
3✔
200
            Ok(())
3✔
201
        }
3✔
202
    }
×
203

×
204
    impl TestReceiverLoop {
205
        fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
4✔
206
            let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
8✔
207
            (
4✔
208
                TestReceiverLoop {
4✔
209
                    receivers,
4✔
210
                    ops: vec![],
4✔
211
                    commits: vec![],
4✔
212
                    num_termations: 0,
4✔
213
                },
4✔
214
                senders,
4✔
215
            )
4✔
216
        }
4✔
217
    }
×
218

×
219
    #[test]
1✔
220
    fn receiver_loop_stops_on_terminate() {
1✔
221
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
222
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
223
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
224
        test_loop.receiver_loop().unwrap();
1✔
225
        assert_eq!(test_loop.num_termations, 1);
1✔
226
    }
1✔
227

×
228
    #[test]
1✔
229
    fn receiver_loop_forwards_op() {
1✔
230
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
231
        let record = Record::new(None, vec![Field::Int(1)], None);
1✔
232
        senders[0]
1✔
233
            .send(ExecutorOperation::Insert {
1✔
234
                new: record.clone(),
1✔
235
            })
1✔
236
            .unwrap();
1✔
237
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
238
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
239
        test_loop.receiver_loop().unwrap();
1✔
240
        assert_eq!(test_loop.ops, vec![(0, Operation::Insert { new: record })]);
1✔
241
    }
1✔
242

×
243
    #[test]
1✔
244
    fn receiver_loop_merges_commit_epoch_and_increases_epoch_id() {
1✔
245
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
246
        let mut details = SourceStates::default();
1✔
247
        details.insert(
1✔
248
            NodeHandle::new(None, "0".to_string()),
1✔
249
            OpIdentifier::new(0, 0),
1✔
250
        );
1✔
251
        let mut epoch0 = Epoch::new(0, details);
1✔
252
        let mut details = SourceStates::default();
1✔
253
        details.insert(
1✔
254
            NodeHandle::new(None, "1".to_string()),
1✔
255
            OpIdentifier::new(0, 0),
1✔
256
        );
1✔
257
        let mut epoch1 = Epoch::new(0, details);
1✔
258
        senders[0]
1✔
259
            .send(ExecutorOperation::Commit {
1✔
260
                epoch: epoch0.clone(),
1✔
261
            })
1✔
262
            .unwrap();
1✔
263
        senders[1]
1✔
264
            .send(ExecutorOperation::Commit {
1✔
265
                epoch: epoch1.clone(),
1✔
266
            })
1✔
267
            .unwrap();
1✔
268
        epoch0.id = 1;
1✔
269
        epoch1.id = 1;
1✔
270
        senders[0]
1✔
271
            .send(ExecutorOperation::Commit {
1✔
272
                epoch: epoch0.clone(),
1✔
273
            })
1✔
274
            .unwrap();
1✔
275
        senders[1]
1✔
276
            .send(ExecutorOperation::Commit {
1✔
277
                epoch: epoch1.clone(),
1✔
278
            })
1✔
279
            .unwrap();
1✔
280
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
281
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
282
        test_loop.receiver_loop().unwrap();
1✔
283

1✔
284
        let mut details = SourceStates::new();
1✔
285
        details.extend(epoch0.details);
1✔
286
        details.extend(epoch1.details);
1✔
287
        assert_eq!(
1✔
288
            test_loop.commits,
1✔
289
            vec![Epoch::new(0, details.clone()), Epoch::new(1, details)]
1✔
290
        );
1✔
291
    }
1✔
292

×
293
    #[test]
1✔
294
    #[should_panic]
295
    fn receiver_loop_panics_on_inconsistent_commit_epoch() {
1✔
296
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
297
        let mut details = SourceStates::new();
1✔
298
        details.insert(
1✔
299
            NodeHandle::new(None, "0".to_string()),
1✔
300
            OpIdentifier::new(0, 0),
1✔
301
        );
1✔
302
        let epoch0 = Epoch::new(0, details);
1✔
303
        let mut details = SourceStates::new();
1✔
304
        details.insert(
1✔
305
            NodeHandle::new(None, "1".to_string()),
1✔
306
            OpIdentifier::new(0, 0),
1✔
307
        );
1✔
308
        let epoch1 = Epoch::new(1, details);
1✔
309
        senders[0]
1✔
310
            .send(ExecutorOperation::Commit { epoch: epoch0 })
1✔
311
            .unwrap();
1✔
312
        senders[1]
1✔
313
            .send(ExecutorOperation::Commit { epoch: epoch1 })
1✔
314
            .unwrap();
1✔
315
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
316
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
317
        test_loop.receiver_loop().unwrap();
1✔
318
    }
1✔
319
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc