• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.71
/dozer-core/src/executor/receiver_loop.rs
1
use std::borrow::Cow;
2
use std::time::SystemTime;
3

4
use crossbeam::channel::{Receiver, Select};
5
use dozer_types::log::debug;
6

7
use crate::{
8
    epoch::Epoch,
9
    errors::ExecutionError,
10
    executor_operation::{ExecutorOperation, ProcessorOperation},
11
};
12

13
use super::{name::Name, InputPortState};
14

15
/// Common code for processor and sink nodes.
16
///
17
/// They both select from their input channels, and respond to "op", "commit", and terminate.
18
pub trait ReceiverLoop: Name {
19
    /// Returns input channels to this node. Will be called exactly once in [`receiver_loop`].
20
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>>;
21
    /// Returns the name of the receiver at `index`. Used for logging.
22
    fn receiver_name(&self, index: usize) -> Cow<str>;
23
    /// Responds to `op` from the receiver at `index`.
24
    fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError>;
25
    /// Responds to `commit` of `epoch`.
26
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>;
27
    /// Responds to `terminate`.
28
    fn on_terminate(&mut self) -> Result<(), ExecutionError>;
29
    /// Responds to `SnapshottingDone`.
30
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError>;
31

32
    /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
33
    fn receiver_loop(&mut self) -> Result<(), ExecutionError> {
1,501✔
34
        let receivers = self.receivers();
1,501✔
35
        debug_assert!(
36
            !receivers.is_empty(),
1,501✔
37
            "Processor or sink must have at least 1 incoming edge"
×
38
        );
39
        let mut port_states = vec![InputPortState::Open; receivers.len()];
1,501✔
40

1,501✔
41
        let mut commits_received: usize = 0;
1,501✔
42
        let mut common_epoch = Epoch::new(0, Default::default(), None, SystemTime::now());
1,501✔
43

1,501✔
44
        let mut sel = init_select(&receivers);
1,501✔
45
        loop {
9,099,487✔
46
            let index = sel.ready();
9,099,487✔
47
            let op = receivers[index]
9,099,487✔
48
                .recv()
9,099,487✔
49
                .map_err(|_| ExecutionError::CannotReceiveFromChannel)?;
9,099,487✔
50

51
            match op {
9,099,483✔
52
                ExecutorOperation::Op { op } => {
9,094,512✔
53
                    self.on_op(index, op)?;
9,094,512✔
54
                }
55
                ExecutorOperation::Commit { epoch } => {
3,169✔
56
                    assert_eq!(epoch.common_info.id, common_epoch.common_info.id);
3,169✔
57
                    commits_received += 1;
3,168✔
58
                    sel.remove(index);
3,168✔
59
                    // Commits from all inputs ports must have the same decision instant.
3,168✔
60
                    if commits_received > 1 {
3,168✔
61
                        assert_eq!(common_epoch.decision_instant, epoch.decision_instant);
323✔
62
                    }
2,845✔
63
                    common_epoch.common_info = epoch.common_info;
3,168✔
64
                    common_epoch.decision_instant = epoch.decision_instant;
3,168✔
65
                    common_epoch.details.extend(epoch.details);
3,168✔
66

3,168✔
67
                    if commits_received == receivers.len() {
3,168✔
68
                        self.on_commit(&common_epoch)?;
2,845✔
69
                        common_epoch = Epoch::new(
2,845✔
70
                            common_epoch.common_info.id + 1,
2,845✔
71
                            Default::default(),
2,845✔
72
                            None,
2,845✔
73
                            SystemTime::now(),
2,845✔
74
                        );
2,845✔
75
                        commits_received = 0;
2,845✔
76
                        sel = init_select(&receivers);
2,845✔
77
                    }
323✔
78
                }
79
                ExecutorOperation::Terminate => {
80
                    port_states[index] = InputPortState::Terminated;
1,691✔
81
                    sel.remove(index);
1,691✔
82
                    debug!(
1,691✔
83
                        "[{}] Received Terminate request on port {}",
×
84
                        self.name(),
×
85
                        self.receiver_name(index)
×
86
                    );
87
                    if port_states.iter().all(|v| v == &InputPortState::Terminated) {
2,035✔
88
                        self.on_terminate()?;
1,488✔
89
                        debug!("[{}] Quit", self.name());
1,488✔
90
                        return Ok(());
1,488✔
91
                    }
203✔
92
                }
93
                ExecutorOperation::SnapshottingDone { connection_name } => {
111✔
94
                    self.on_snapshotting_done(connection_name)?;
111✔
95
                }
96
            }
97
        }
98
    }
1,492✔
99
}
100

101
fn init_select(receivers: &Vec<Receiver<ExecutorOperation>>) -> Select {
4,346✔
102
    let mut sel = Select::new();
4,346✔
103
    for r in receivers {
9,219✔
104
        sel.recv(r);
4,873✔
105
    }
4,873✔
106
    sel
4,346✔
107
}
4,346✔
108

109
#[cfg(test)]
110
mod tests {
111
    use std::mem::swap;
112

113
    use crossbeam::channel::{unbounded, Sender};
114
    use dozer_types::{
115
        node::{NodeHandle, OpIdentifier, SourceStates},
116
        types::{Field, Record},
117
    };
118

119
    use crate::processor_record::{ProcessorRecord, ProcessorRecordStore};
120

121
    use super::*;
122

123
    struct TestReceiverLoop {
124
        receivers: Vec<Receiver<ExecutorOperation>>,
125
        ops: Vec<(usize, ProcessorOperation)>,
126
        commits: Vec<Epoch>,
127
        snapshotting_done: Vec<String>,
128
        num_terminations: usize,
129
    }
130

131
    impl Name for TestReceiverLoop {
132
        fn name(&self) -> Cow<str> {
×
133
            Cow::Borrowed("TestReceiverLoop")
×
134
        }
×
135
    }
136

137
    impl ReceiverLoop for TestReceiverLoop {
138
        fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
5✔
139
            let mut result = vec![];
5✔
140
            swap(&mut self.receivers, &mut result);
5✔
141
            result
5✔
142
        }
5✔
143

144
        fn receiver_name(&self, index: usize) -> Cow<str> {
×
145
            Cow::Owned(format!("receiver_{index}"))
×
146
        }
×
147

148
        fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError> {
1✔
149
            self.ops.push((index, op));
1✔
150
            Ok(())
1✔
151
        }
1✔
152

153
        fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
2✔
154
            self.commits.push(epoch.clone());
2✔
155
            Ok(())
2✔
156
        }
2✔
157

158
        fn on_terminate(&mut self) -> Result<(), ExecutionError> {
4✔
159
            self.num_terminations += 1;
4✔
160
            Ok(())
4✔
161
        }
4✔
162

163
        fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
1✔
164
            self.snapshotting_done.push(connection_name);
1✔
165
            Ok(())
1✔
166
        }
1✔
167
    }
168

169
    impl TestReceiverLoop {
170
        fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
5✔
171
            let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
10✔
172
            (
5✔
173
                TestReceiverLoop {
5✔
174
                    receivers,
5✔
175
                    ops: vec![],
5✔
176
                    commits: vec![],
5✔
177
                    snapshotting_done: vec![],
5✔
178
                    num_terminations: 0,
5✔
179
                },
5✔
180
                senders,
5✔
181
            )
5✔
182
        }
5✔
183
    }
184

185
    #[test]
1✔
186
    fn receiver_loop_stops_on_terminate() {
1✔
187
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
188
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
189
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
190
        test_loop.receiver_loop().unwrap();
1✔
191
        assert_eq!(test_loop.num_terminations, 1);
1✔
192
    }
1✔
193

194
    #[test]
1✔
195
    fn receiver_loop_forwards_snapshotting_done() {
1✔
196
        let connection_name = "test_connection".to_string();
1✔
197
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
198
        senders[0]
1✔
199
            .send(ExecutorOperation::SnapshottingDone {
1✔
200
                connection_name: connection_name.clone(),
1✔
201
            })
1✔
202
            .unwrap();
1✔
203
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
204
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
205
        test_loop.receiver_loop().unwrap();
1✔
206
        assert_eq!(test_loop.snapshotting_done, vec![connection_name])
1✔
207
    }
1✔
208

209
    #[test]
1✔
210
    fn receiver_loop_forwards_op() {
1✔
211
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
212
        let record_store = ProcessorRecordStore::new().unwrap();
1✔
213
        let record: ProcessorRecord = record_store
1✔
214
            .create_record(&Record::new(vec![Field::Int(1)]))
1✔
215
            .unwrap();
1✔
216
        senders[0]
1✔
217
            .send(ExecutorOperation::Op {
1✔
218
                op: ProcessorOperation::Insert {
1✔
219
                    new: record.clone(),
1✔
220
                },
1✔
221
            })
1✔
222
            .unwrap();
1✔
223
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
224
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
225
        test_loop.receiver_loop().unwrap();
1✔
226
        assert_eq!(
1✔
227
            test_loop.ops,
1✔
228
            vec![(0, ProcessorOperation::Insert { new: record })]
1✔
229
        );
1✔
230
    }
1✔
231

232
    #[test]
1✔
233
    fn receiver_loop_merges_commit_epoch_and_increases_epoch_id() {
1✔
234
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
235
        let mut details = SourceStates::default();
1✔
236
        details.insert(
1✔
237
            NodeHandle::new(None, "0".to_string()),
1✔
238
            OpIdentifier::new(0, 0),
1✔
239
        );
1✔
240
        let decision_instant = SystemTime::now();
1✔
241
        let mut epoch0 = Epoch::new(0, details, None, decision_instant);
1✔
242
        let mut details = SourceStates::default();
1✔
243
        details.insert(
1✔
244
            NodeHandle::new(None, "1".to_string()),
1✔
245
            OpIdentifier::new(0, 0),
1✔
246
        );
1✔
247
        let mut epoch1 = Epoch::new(0, details, None, decision_instant);
1✔
248
        senders[0]
1✔
249
            .send(ExecutorOperation::Commit {
1✔
250
                epoch: epoch0.clone(),
1✔
251
            })
1✔
252
            .unwrap();
1✔
253
        senders[1]
1✔
254
            .send(ExecutorOperation::Commit {
1✔
255
                epoch: epoch1.clone(),
1✔
256
            })
1✔
257
            .unwrap();
1✔
258
        epoch0.common_info.id = 1;
1✔
259
        epoch1.common_info.id = 1;
1✔
260
        senders[0]
1✔
261
            .send(ExecutorOperation::Commit {
1✔
262
                epoch: epoch0.clone(),
1✔
263
            })
1✔
264
            .unwrap();
1✔
265
        senders[1]
1✔
266
            .send(ExecutorOperation::Commit {
1✔
267
                epoch: epoch1.clone(),
1✔
268
            })
1✔
269
            .unwrap();
1✔
270
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
271
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
272
        test_loop.receiver_loop().unwrap();
1✔
273

1✔
274
        let mut details = SourceStates::new();
1✔
275
        details.extend(epoch0.details);
1✔
276
        details.extend(epoch1.details);
1✔
277
        assert_eq!(
1✔
278
            test_loop.commits,
1✔
279
            vec![
1✔
280
                Epoch::new(0, details.clone(), None, decision_instant),
1✔
281
                Epoch::new(1, details, None, decision_instant)
1✔
282
            ]
1✔
283
        );
1✔
284
    }
1✔
285

×
286
    #[test]
1✔
287
    #[should_panic]
×
288
    fn receiver_loop_panics_on_inconsistent_commit_epoch() {
1✔
289
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
290
        let mut details = SourceStates::new();
1✔
291
        details.insert(
1✔
292
            NodeHandle::new(None, "0".to_string()),
1✔
293
            OpIdentifier::new(0, 0),
1✔
294
        );
1✔
295
        let decision_instant = SystemTime::now();
1✔
296
        let epoch0 = Epoch::new(0, details, None, decision_instant);
1✔
297
        let mut details = SourceStates::new();
1✔
298
        details.insert(
1✔
299
            NodeHandle::new(None, "1".to_string()),
1✔
300
            OpIdentifier::new(0, 0),
1✔
301
        );
1✔
302
        let epoch1 = Epoch::new(1, details, None, decision_instant);
1✔
303
        senders[0]
1✔
304
            .send(ExecutorOperation::Commit { epoch: epoch0 })
1✔
305
            .unwrap();
1✔
306
        senders[1]
1✔
307
            .send(ExecutorOperation::Commit { epoch: epoch1 })
1✔
308
            .unwrap();
1✔
309
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
310
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
311
        test_loop.receiver_loop().unwrap();
1✔
312
    }
1✔
313
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc