• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4074492184

pending completion
4074492184

push

github

GitHub
feat: Implement basic continue for ingestion in snowflake (#775)

55 of 55 new or added lines in 4 files covered. (100.0%)

24547 of 36016 relevant lines covered (68.16%)

41582.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.96
/dozer-core/src/dag/executor/receiver_loop.rs
1
use std::borrow::Cow;
2

3
use crossbeam::channel::Receiver;
4
use dozer_types::log::debug;
5
use dozer_types::{internal_err, types::Operation};
6

7
use crate::dag::{
8
    epoch::Epoch,
9
    errors::ExecutionError::{self, InternalError},
10
    executor_utils::init_select,
11
};
12

13
use super::{name::Name, ExecutorOperation, InputPortState};
14

15
#[derive(Debug, PartialEq)]
5✔
16
enum MappedExecutorOperation {
17
    Data { op: Operation },
18
    Commit { epoch: Epoch },
19
    Terminate,
20
}
21

22
fn map_executor_operation(op: ExecutorOperation) -> MappedExecutorOperation {
9,267,477✔
23
    match op {
9,267,477✔
24
        ExecutorOperation::Delete { old } => MappedExecutorOperation::Data {
5,656✔
25
            op: Operation::Delete { old },
5,656✔
26
        },
5,656✔
27
        ExecutorOperation::Insert { new } => MappedExecutorOperation::Data {
9,195,968✔
28
            op: Operation::Insert { new },
9,195,968✔
29
        },
9,195,968✔
30
        ExecutorOperation::Update { old, new } => MappedExecutorOperation::Data {
61,117✔
31
            op: Operation::Update { old, new },
61,117✔
32
        },
61,117✔
33
        ExecutorOperation::Commit { epoch } => MappedExecutorOperation::Commit { epoch },
4,192✔
34
        ExecutorOperation::Terminate => MappedExecutorOperation::Terminate,
544✔
35
    }
36
}
9,267,477✔
37

38
/// Common code for processor and sink nodes.
39
///
40
/// They both select from their input channels, and respond to "op", "commit", and terminate.
41
pub trait ReceiverLoop: Name {
42
    /// Returns input channels to this node. Will be called exactly once in [`receiver_loop`].
43
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>>;
44
    /// Returns the name of the receiver at `index`. Used for logging.
45
    fn receiver_name(&self, index: usize) -> Cow<str>;
46
    /// Responds to `op` from the receiver at `index`.
47
    fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError>;
48
    /// Responds to `commit` of `epoch`.
49
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError>;
50
    /// Responds to `terminate`.
51
    fn on_terminate(&mut self) -> Result<(), ExecutionError>;
52

53
    /// The loop implementation, calls [`on_op`], [`on_commit`] and [`on_terminate`] at appropriate times.
54
    fn receiver_loop(&mut self) -> Result<(), ExecutionError> {
172✔
55
        let receivers = self.receivers();
172✔
56
        let mut port_states = vec![InputPortState::Open; receivers.len()];
172✔
57

172✔
58
        let mut commits_received: usize = 0;
172✔
59
        let mut common_epoch = Epoch::new(0, Default::default());
172✔
60

172✔
61
        let mut sel = init_select(&receivers);
172✔
62
        loop {
9,135,974✔
63
            let index = sel.ready();
9,135,974✔
64
            match internal_err!(receivers[index].recv().map(map_executor_operation))? {
9,135,974✔
65
                MappedExecutorOperation::Data { op } => {
9,132,113✔
66
                    self.on_op(index, op)?;
9,132,113✔
67
                }
68
                MappedExecutorOperation::Commit { epoch } => {
3,675✔
69
                    assert_eq!(epoch.id, common_epoch.id);
3,675✔
70
                    commits_received += 1;
3,674✔
71
                    sel.remove(index);
3,674✔
72
                    common_epoch.details.extend(epoch.details);
3,674✔
73

3,674✔
74
                    if commits_received == receivers.len() {
3,674✔
75
                        self.on_commit(&common_epoch)?;
3,220✔
76
                        common_epoch = Epoch::new(common_epoch.id + 1, Default::default());
3,220✔
77
                        commits_received = 0;
3,220✔
78
                        sel = init_select(&receivers);
3,220✔
79
                    }
454✔
80
                }
81
                MappedExecutorOperation::Terminate => {
82
                    port_states[index] = InputPortState::Terminated;
177✔
83
                    sel.remove(index);
177✔
84
                    debug!(
177✔
85
                        "[{}] Received Terminate request on port {}",
×
86
                        self.name(),
×
87
                        self.receiver_name(index)
×
88
                    );
89
                    if port_states.iter().all(|v| v == &InputPortState::Terminated) {
214✔
90
                        self.on_terminate()?;
154✔
91
                        debug!("[{}] Quit", self.name());
154✔
92
                        return Ok(());
154✔
93
                    }
23✔
94
                }
95
            }
96
        }
97
    }
169✔
98
}
99

100
#[cfg(test)]
101
mod tests {
102
    use std::mem::swap;
103

104
    use crossbeam::channel::{unbounded, Sender};
105
    use dozer_types::types::{Field, Record};
106

107
    use crate::dag::{
108
        epoch::{OpIdentifier, SourceStates},
109
        node::NodeHandle,
110
    };
111

112
    use super::*;
113

114
    #[test]
1✔
115
    fn test_map_executor_operation() {
1✔
116
        let old = Record::new(None, vec![Field::Int(1)], None);
1✔
117
        let new = Record::new(None, vec![Field::Int(2)], None);
1✔
118
        let epoch = Epoch::new(1, Default::default());
1✔
119
        assert_eq!(
1✔
120
            map_executor_operation(ExecutorOperation::Insert { new: new.clone() }),
1✔
121
            MappedExecutorOperation::Data {
1✔
122
                op: Operation::Insert { new: new.clone() }
1✔
123
            }
1✔
124
        );
1✔
125
        assert_eq!(
1✔
126
            map_executor_operation(ExecutorOperation::Update {
1✔
127
                old: old.clone(),
1✔
128
                new: new.clone()
1✔
129
            }),
1✔
130
            MappedExecutorOperation::Data {
1✔
131
                op: Operation::Update {
1✔
132
                    old: old.clone(),
1✔
133
                    new
1✔
134
                }
1✔
135
            }
1✔
136
        );
1✔
137
        assert_eq!(
1✔
138
            map_executor_operation(ExecutorOperation::Delete { old: old.clone() }),
1✔
139
            MappedExecutorOperation::Data {
1✔
140
                op: Operation::Delete { old }
1✔
141
            }
1✔
142
        );
1✔
143
        assert_eq!(
1✔
144
            map_executor_operation(ExecutorOperation::Commit {
1✔
145
                epoch: epoch.clone()
1✔
146
            }),
1✔
147
            MappedExecutorOperation::Commit { epoch }
1✔
148
        );
1✔
149
        assert_eq!(
1✔
150
            map_executor_operation(ExecutorOperation::Terminate),
1✔
151
            MappedExecutorOperation::Terminate
1✔
152
        );
1✔
153
    }
1✔
154

155
    struct TestReceiverLoop {
156
        receivers: Vec<Receiver<ExecutorOperation>>,
157
        ops: Vec<(usize, Operation)>,
158
        commits: Vec<Epoch>,
159
        num_termations: usize,
160
    }
161

162
    impl Name for TestReceiverLoop {
163
        fn name(&self) -> Cow<str> {
×
164
            Cow::Borrowed("TestReceiverLoop")
×
165
        }
×
166
    }
167

168
    impl ReceiverLoop for TestReceiverLoop {
169
        fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
4✔
170
            let mut result = vec![];
4✔
171
            swap(&mut self.receivers, &mut result);
4✔
172
            result
4✔
173
        }
4✔
174

175
        fn receiver_name(&self, index: usize) -> Cow<str> {
×
176
            Cow::Owned(format!("receiver_{index}"))
×
177
        }
×
178

179
        fn on_op(&mut self, index: usize, op: Operation) -> Result<(), ExecutionError> {
1✔
180
            self.ops.push((index, op));
1✔
181
            Ok(())
1✔
182
        }
1✔
183

184
        fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
2✔
185
            self.commits.push(epoch.clone());
2✔
186
            Ok(())
2✔
187
        }
2✔
188

189
        fn on_terminate(&mut self) -> Result<(), ExecutionError> {
3✔
190
            self.num_termations += 1;
3✔
191
            Ok(())
3✔
192
        }
3✔
193
    }
194

195
    impl TestReceiverLoop {
196
        fn new(num_receivers: usize) -> (TestReceiverLoop, Vec<Sender<ExecutorOperation>>) {
4✔
197
            let (senders, receivers) = (0..num_receivers).map(|_| unbounded()).unzip();
8✔
198
            (
4✔
199
                TestReceiverLoop {
4✔
200
                    receivers,
4✔
201
                    ops: vec![],
4✔
202
                    commits: vec![],
4✔
203
                    num_termations: 0,
4✔
204
                },
4✔
205
                senders,
4✔
206
            )
4✔
207
        }
4✔
208
    }
209

210
    #[test]
1✔
211
    fn receiver_loop_stops_on_terminate() {
1✔
212
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
213
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
214
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
215
        test_loop.receiver_loop().unwrap();
1✔
216
        assert_eq!(test_loop.num_termations, 1);
1✔
217
    }
1✔
218

219
    #[test]
1✔
220
    fn receiver_loop_forwards_op() {
1✔
221
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
222
        let record = Record::new(None, vec![Field::Int(1)], None);
1✔
223
        senders[0]
1✔
224
            .send(ExecutorOperation::Insert {
1✔
225
                new: record.clone(),
1✔
226
            })
1✔
227
            .unwrap();
1✔
228
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
229
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
230
        test_loop.receiver_loop().unwrap();
1✔
231
        assert_eq!(test_loop.ops, vec![(0, Operation::Insert { new: record })]);
1✔
232
    }
1✔
233

234
    #[test]
1✔
235
    fn receiver_loop_merges_commit_epoch_and_increases_epoch_id() {
1✔
236
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
237
        let mut details = SourceStates::default();
1✔
238
        details.insert(
1✔
239
            NodeHandle::new(None, "0".to_string()),
1✔
240
            OpIdentifier::new(0, 0),
1✔
241
        );
1✔
242
        let mut epoch0 = Epoch::new(0, details);
1✔
243
        let mut details = SourceStates::default();
1✔
244
        details.insert(
1✔
245
            NodeHandle::new(None, "1".to_string()),
1✔
246
            OpIdentifier::new(0, 0),
1✔
247
        );
1✔
248
        let mut epoch1 = Epoch::new(0, details);
1✔
249
        senders[0]
1✔
250
            .send(ExecutorOperation::Commit {
1✔
251
                epoch: epoch0.clone(),
1✔
252
            })
1✔
253
            .unwrap();
1✔
254
        senders[1]
1✔
255
            .send(ExecutorOperation::Commit {
1✔
256
                epoch: epoch1.clone(),
1✔
257
            })
1✔
258
            .unwrap();
1✔
259
        epoch0.id = 1;
1✔
260
        epoch1.id = 1;
1✔
261
        senders[0]
1✔
262
            .send(ExecutorOperation::Commit {
1✔
263
                epoch: epoch0.clone(),
1✔
264
            })
1✔
265
            .unwrap();
1✔
266
        senders[1]
1✔
267
            .send(ExecutorOperation::Commit {
1✔
268
                epoch: epoch1.clone(),
1✔
269
            })
1✔
270
            .unwrap();
1✔
271
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
272
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
273
        test_loop.receiver_loop().unwrap();
1✔
274

1✔
275
        let mut details = SourceStates::new();
1✔
276
        details.extend(epoch0.details);
1✔
277
        details.extend(epoch1.details);
1✔
278
        assert_eq!(
1✔
279
            test_loop.commits,
1✔
280
            vec![Epoch::new(0, details.clone()), Epoch::new(1, details)]
1✔
281
        );
1✔
282
    }
1✔
283

284
    #[test]
1✔
285
    #[should_panic]
286
    fn receiver_loop_panics_on_inconsistent_commit_epoch() {
1✔
287
        let (mut test_loop, senders) = TestReceiverLoop::new(2);
1✔
288
        let mut details = SourceStates::new();
1✔
289
        details.insert(
1✔
290
            NodeHandle::new(None, "0".to_string()),
1✔
291
            OpIdentifier::new(0, 0),
1✔
292
        );
1✔
293
        let epoch0 = Epoch::new(0, details);
1✔
294
        let mut details = SourceStates::new();
1✔
295
        details.insert(
1✔
296
            NodeHandle::new(None, "1".to_string()),
1✔
297
            OpIdentifier::new(0, 0),
1✔
298
        );
1✔
299
        let epoch1 = Epoch::new(1, details);
1✔
300
        senders[0]
1✔
301
            .send(ExecutorOperation::Commit { epoch: epoch0 })
1✔
302
            .unwrap();
1✔
303
        senders[1]
1✔
304
            .send(ExecutorOperation::Commit { epoch: epoch1 })
1✔
305
            .unwrap();
1✔
306
        senders[0].send(ExecutorOperation::Terminate).unwrap();
1✔
307
        senders[1].send(ExecutorOperation::Terminate).unwrap();
1✔
308
        test_loop.receiver_loop().unwrap();
1✔
309
    }
1✔
310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc