• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283961027

pending completion
4283961027

push

github

GitHub
feat: Blue green cache (#1061)

645 of 645 new or added lines in 45 files covered. (100.0%)

27779 of 39307 relevant lines covered (70.67%)

52489.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.31
/dozer-core/src/executor/receiver_loop.rs
1
use std::borrow::Cow;
2

3
use crossbeam::channel::{Receiver, Select};
4
use dozer_types::log::debug;
5
use dozer_types::types::Operation;
6

7
use crate::{epoch::Epoch, errors::ExecutionError};
8

9
use super::{name::Name, ExecutorOperation, InputPortState};
10

11
/// Common code for processor and sink nodes.
×
12
///
13
/// They both select from their input channels, and respond to "op", "commit", and terminate.
14
pub trait ReceiverLoop: Name {
15
    /// Returns input channels to this node. Will be called exactly once in [`receiver_loop`].
16
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>>;
17
    /// Returns the name of the receiver at `index`. Used for logging.
18
    fn receiver_name(&self, index: usize) -> Cow<str>;
×
19
    /// Responds to `op` from the receiver at `index`.
×
20
    fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError>;
×
21
    /// Responds to `commit` of `epoch`.
×
22
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>;
×
23
    /// Responds to `terminate`.
×
24
    fn on_terminate(&mut self) -> Result<(), ExecutionError>;
×
25

×
26
    /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
×
27
    fn receiver_loop(&mut self) -> Result<(), ExecutionError> {
1,357✔
28
        let receivers = self.receivers();
1,357✔
29
        debug_assert!(
×
30
            !receivers.is_empty(),
1,357✔
31
            "Processor or sink must have at least 1 incoming edge"
×
32
        );
×
33
        let mut port_states = vec![InputPortState::Open; receivers.len()];
1,357✔
34

1,357✔
35
        let mut commits_received: usize = 0;
1,357✔
36
        let mut common_epoch = Epoch::new(0, Default::default());
1,357✔
37

1,357✔
38
        let mut sel = init_select(&receivers);
1,357✔
39
        loop {
10,796,163✔
40
            let index = sel.ready();
10,796,163✔
41
            let op = receivers[index]
10,796,163✔
42
                .recv()
10,796,163✔
43
                .map_err(|_| ExecutionError::CannotReceiveFromChannel)?;
10,796,163✔
44

45
            match op {
10,796,159✔
46
                ExecutorOperation::Op { op } => {
10,789,155✔
47
                    self.on_op(index, op)?;
10,789,155✔
48
                }
49
                ExecutorOperation::Commit { epoch } => {
5,537✔
50
                    assert_eq!(epoch.id, common_epoch.id);
5,537✔
51
                    commits_received += 1;
5,536✔
52
                    sel.remove(index);
5,536✔
53
                    common_epoch.details.extend(epoch.details);
5,536✔
54

5,536✔
55
                    if commits_received == receivers.len() {
5,536✔
56
                        self.on_commit(&common_epoch)?;
4,744✔
57
                        common_epoch = Epoch::new(common_epoch.id + 1, Default::default());
4,743✔
58
                        commits_received = 0;
4,743✔
59
                        sel = init_select(&receivers);
4,743✔
60
                    }
792✔
61
                }
×
62
                ExecutorOperation::Terminate => {
×
63
                    port_states[index] = InputPortState::Terminated;
1,467✔
64
                    sel.remove(index);
1,467✔
65
                    debug!(
1,467✔
66
                        "[{}] Received Terminate request on port {}",
×
67
                        self.name(),
×
68
                        self.receiver_name(index)
×
69
                    );
70
                    if port_states.iter().all(|v| v == &InputPortState::Terminated) {
1,667✔
71
                        self.on_terminate()?;
1,344✔
72
                        debug!("[{}] Quit", self.name());
1,344✔
73
                        return Ok(());
1,344✔
74
                    }
123✔
75
                }
×
76
            }
×
77
        }
×
78
    }
1,354✔
79
}
×
80

×
81
fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Select {
6,101✔
82
    let mut sel = Select::new();
6,101✔
83
    for r in receivers {
13,118✔
84
        sel.recv(r);
7,017✔
85
    }
7,017✔
86
    sel
6,101✔
87
}
6,101✔
88

×
89
#[cfg(test)]
×
90
mod tests {
×
91
    use std::mem::swap;
×
92

×
93
    use crossbeam::channel::{unbounded, Sender};
×
94
    use dozer_types::{
95
        node::{NodeHandle, OpIdentifier, SourceStates},
×
96
        types::{Field, Record},
×
97
    };
×
98

×
99
    use super::*;
×
100

101
    struct TestReceiverLoop {
102
        receivers: Vec<Receiver<ExecutorOperation>>,
103
        ops: Vec<(usize, Operation)>,
×
104
        commits: Vec<Epoch>,
105
        num_termations: usize,
106
    }
×
107

×
108
    impl Name for TestReceiverLoop {
×
109
        fn name(&self) -> Cow<str> {
×
110
            Cow::Borrowed("TestReceiverLoop")
×
111
        }
×
112
    }
×
113

114
    impl ReceiverLoop for TestReceiverLoop {
115
        fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
4✔
116
            let mut result = vec![];
4✔
117
            swap(&mut self.receivers, &mut result);
4✔
118
            result
4✔
119
        }
4✔
120

121
        fn receiver_name(&self, index: usize) -> Cow<str> {
×
122
            Cow::Owned(format!("receiver_{index}"))
×
123
        }
×
124

125
        fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> {
1✔
126
            self.ops.push((index, op));
1✔
127
            Ok(())
1✔
128
        }
1✔
129

×
130
        fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
2✔
131
            self.commits.push(epoch.clone());
2✔
132
            Ok(())
2✔
133
        }
2✔
134

×
135
        fn on_terminate(&mut self) -> Result<(), ExecutionError> {
3✔
136
            self.num_termations += 1;
3✔
137
            Ok(())
3✔
138
        }
3✔
139
    }
×
140

×
141
    impl TestReceiverLoop {
×
142
        fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
4✔
143
            let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
8✔
144
            (
4✔
145
                TestReceiverLoop {
4✔
146
                    receivers,
4✔
147
                    ops: vec![],
4✔
148
                    commits: vec![],
4✔
149
                    num_termations: 0,
4✔
150
                },
4✔
151
                senders,
4✔
152
            )
4✔
153
        }
4✔
154
    }
×
155

×
156
    #[test]
1✔
157
    fn receiver_loop_stops_on_terminate() {
1✔
158
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
159
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
160
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
161
        test_loop.receiver_loop().unwrap();
1✔
162
        assert_eq!(test_loop.num_termations, 1);
1✔
163
    }
1✔
164

×
165
    #[test]
1✔
166
    fn receiver_loop_forwards_op() {
1✔
167
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
168
        let record = Record::new(None, vec![Field::Int(1)], None);
1✔
169
        senders[0]
1✔
170
            .send(ExecutorOperation::Op {
1✔
171
                op: Operation::Insert {
1✔
172
                    new: record.clone(),
1✔
173
                },
1✔
174
            })
1✔
175
            .unwrap();
1✔
176
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
177
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
178
        test_loop.receiver_loop().unwrap();
1✔
179
        assert_eq!(test_loop.ops, vec![(0, Operation::Insert { new: record })]);
1✔
180
    }
1✔
181

×
182
    #[test]
1✔
183
    fn receiver_loop_merges_commit_epoch_and_increases_epoch_id() {
1✔
184
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
185
        let mut details = SourceStates::default();
1✔
186
        details.insert(
1✔
187
            NodeHandle::new(None, "0".to_string()),
1✔
188
            OpIdentifier::new(0, 0),
1✔
189
        );
1✔
190
        let mut epoch0 = Epoch::new(0, details);
1✔
191
        let mut details = SourceStates::default();
1✔
192
        details.insert(
1✔
193
            NodeHandle::new(None, "1".to_string()),
1✔
194
            OpIdentifier::new(0, 0),
1✔
195
        );
1✔
196
        let mut epoch1 = Epoch::new(0, details);
1✔
197
        senders[0]
1✔
198
            .send(ExecutorOperation::Commit {
1✔
199
                epoch: epoch0.clone(),
1✔
200
            })
1✔
201
            .unwrap();
1✔
202
        senders[1]
1✔
203
            .send(ExecutorOperation::Commit {
1✔
204
                epoch: epoch1.clone(),
1✔
205
            })
1✔
206
            .unwrap();
1✔
207
        epoch0.id = 1;
1✔
208
        epoch1.id = 1;
1✔
209
        senders[0]
1✔
210
            .send(ExecutorOperation::Commit {
1✔
211
                epoch: epoch0.clone(),
1✔
212
            })
1✔
213
            .unwrap();
1✔
214
        senders[1]
1✔
215
            .send(ExecutorOperation::Commit {
1✔
216
                epoch: epoch1.clone(),
1✔
217
            })
1✔
218
            .unwrap();
1✔
219
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
220
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
221
        test_loop.receiver_loop().unwrap();
1✔
222

1✔
223
        let mut details = SourceStates::new();
1✔
224
        details.extend(epoch0.details);
1✔
225
        details.extend(epoch1.details);
1✔
226
        assert_eq!(
1✔
227
            test_loop.commits,
1✔
228
            vec![Epoch::new(0, details.clone()), Epoch::new(1, details)]
1✔
229
        );
1✔
230
    }
1✔
231

×
232
    #[test]
1✔
233
    #[should_panic]
×
234
    fn receiver_loop_panics_on_inconsistent_commit_epoch() {
1✔
235
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
236
        let mut details = SourceStates::new();
1✔
237
        details.insert(
1✔
238
            NodeHandle::new(None, "0".to_string()),
1✔
239
            OpIdentifier::new(0, 0),
1✔
240
        );
1✔
241
        let epoch0 = Epoch::new(0, details);
1✔
242
        let mut details = SourceStates::new();
1✔
243
        details.insert(
1✔
244
            NodeHandle::new(None, "1".to_string()),
1✔
245
            OpIdentifier::new(0, 0),
1✔
246
        );
1✔
247
        let epoch1 = Epoch::new(1, details);
1✔
248
        senders[0]
1✔
249
            .send(ExecutorOperation::Commit { epoch: epoch0 })
1✔
250
            .unwrap();
1✔
251
        senders[1]
1✔
252
            .send(ExecutorOperation::Commit { epoch: epoch1 })
1✔
253
            .unwrap();
1✔
254
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
255
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
256
        test_loop.receiver_loop().unwrap();
1✔
257
    }
1✔
258
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc