• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4763031797

pending completion
4763031797

Pull #1458

github

GitHub
Merge 6302acc92 into c58df4a0b
Pull Request #1458: fix: uuid postgres type support

20 of 20 new or added lines in 3 files covered. (100.0%)

34612 of 43909 relevant lines covered (78.83%)

11679.41 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.75
/dozer-core/src/executor/processor_node.rs
1
use std::{borrow::Cow, mem::swap};
2

3
use crossbeam::channel::Receiver;
4
use daggy::NodeIndex;
5
use dozer_types::log::warn;
6
use dozer_types::node::NodeHandle;
7

8
use crate::{
9
    builder_dag::NodeKind,
10
    errors::ExecutionError,
11
    forwarder::{ProcessorChannelManager, StateWriter},
12
    node::{PortHandle, Processor},
13
};
14

15
use super::{
16
    execution_dag::ExecutionDag, name::Name, receiver_loop::ReceiverLoop, ExecutorOperation,
17
};
18

19
/// A processor in the execution DAG.
20
#[derive(Debug)]
×
21
pub struct ProcessorNode {
22
    /// Node handle in description DAG.
23
    node_handle: NodeHandle,
24
    /// Input port handles.
25
    port_handles: Vec<PortHandle>,
26
    /// Input data channels.
27
    receivers: Vec<Receiver<ExecutorOperation>>,
28
    /// The processor.
29
    processor: Box<dyn Processor>,
30
    /// This node's output channel manager, for forwarding data, writing metadata and writing port state.
31
    channel_manager: ProcessorChannelManager,
32
}
33

34
impl ProcessorNode {
35
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
1,743✔
36
        let Some(node) = dag.node_weight_mut(node_index).take() else {
1,743✔
37
            panic!("Must pass in a node")
×
38
        };
39
        let node_handle = node.handle;
1,743✔
40
        let NodeKind::Processor(processor) = node.kind else {
1,743✔
41
            panic!("Must pass in a processor node");
×
42
        };
43

44
        let (port_handles, receivers) = dag.collect_receivers(node_index);
1,743✔
45

1,743✔
46
        let (senders, record_writers) = dag.collect_senders_and_record_writers(node_index);
1,743✔
47

1,743✔
48
        let state_writer = StateWriter::new(record_writers);
1,743✔
49
        let channel_manager =
1,743✔
50
            ProcessorChannelManager::new(node_handle.clone(), senders, state_writer, true);
1,743✔
51

1,743✔
52
        Self {
1,743✔
53
            node_handle,
1,743✔
54
            port_handles,
1,743✔
55
            receivers,
1,743✔
56
            processor,
1,743✔
57
            channel_manager,
1,743✔
58
        }
1,743✔
59
    }
1,743✔
60

61
    pub fn handle(&self) -> &NodeHandle {
1,743✔
62
        &self.node_handle
1,743✔
63
    }
1,743✔
64
}
65

66
impl Name for ProcessorNode {
67
    fn name(&self) -> Cow<str> {
×
68
        Cow::Owned(self.node_handle.to_string())
×
69
    }
×
70
}
71

72
impl ReceiverLoop for ProcessorNode {
73
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
1,743✔
74
        let mut result = vec![];
1,743✔
75
        swap(&mut self.receivers, &mut result);
1,743✔
76
        result
1,743✔
77
    }
1,743✔
78

79
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
80
        Cow::Owned(self.port_handles[index].to_string())
×
81
    }
×
82

83
    fn on_op(
2,242,604✔
84
        &mut self,
2,242,604✔
85
        index: usize,
2,242,604✔
86
        op: dozer_types::types::Operation,
2,242,604✔
87
    ) -> Result<(), ExecutionError> {
2,242,604✔
88
        let result =
2,242,604✔
89
            self.processor
2,242,604✔
90
                .process(self.port_handles[index], op, &mut self.channel_manager);
2,242,604✔
91
        if let Err(e) = result {
2,242,604✔
92
            warn!("Processor error: {:?}", e);
10,218✔
93
        }
2,232,386✔
94

95
        // TODO: Enable "test_run_dag_proc_err_2" and "test_run_dag_proc_err_3" tests when errors threshold is implemented
96
        Ok(())
2,237,148✔
97
    }
2,237,148✔
98

99
    fn on_commit(&mut self, epoch: &crate::epoch::Epoch) -> Result<(), ExecutionError> {
100
        self.processor.commit(epoch)?;
2,159✔
101
        self.channel_manager.store_and_send_commit(epoch)
2,159✔
102
    }
2,159✔
103

104
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
1,740✔
105
        self.channel_manager.send_terminate()
1,740✔
106
    }
1,740✔
107

108
    fn on_snapshotting_done(&mut self) -> Result<(), ExecutionError> {
256✔
109
        self.channel_manager.send_snapshotting_done()
256✔
110
    }
256✔
111
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc