• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5830737820

pending completion
5830737820

push

github

web-flow
fix: Should use tokio mutex in async context (#1842)

3 of 3 new or added lines in 1 file covered. (100.0%)

45592 of 61344 relevant lines covered (74.32%)

56624.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.25
/dozer-core/src/executor/processor_node.rs
1
use std::sync::Arc;
2
use std::{borrow::Cow, mem::swap};
3

4
use crossbeam::channel::Receiver;
5
use daggy::NodeIndex;
6
use dozer_types::node::NodeHandle;
7

8
use crate::epoch::Epoch;
9
use crate::error_manager::ErrorManager;
10
use crate::executor_operation::{ExecutorOperation, ProcessorOperation};
11
use crate::processor_record::ProcessorRecordStore;
12
use crate::{
13
    builder_dag::NodeKind,
14
    errors::ExecutionError,
15
    forwarder::ProcessorChannelManager,
16
    node::{PortHandle, Processor},
17
};
18

19
use super::{execution_dag::ExecutionDag, name::Name, receiver_loop::ReceiverLoop};
20

21
/// A processor in the execution DAG.
×
22
#[derive(Debug)]
×
23
pub struct ProcessorNode {
24
    /// Node handle in description DAG.
25
    node_handle: NodeHandle,
26
    /// Input port handles.
27
    port_handles: Vec<PortHandle>,
28
    /// Input data channels.
29
    receivers: Vec<Receiver<ExecutorOperation>>,
30
    /// The processor.
31
    processor: Box<dyn Processor>,
32
    /// This node's output channel manager, for forwarding data, writing metadata and writing port state.
33
    channel_manager: ProcessorChannelManager,
34
    /// Where all the records from ingested data are stored.
35
    record_store: Arc<ProcessorRecordStore>,
36
    /// The error manager, for reporting non-fatal errors.
37
    error_manager: Arc<ErrorManager>,
38
}
×
39

×
40
impl ProcessorNode {
×
41
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
1,109✔
42
        let Some(node) = dag.node_weight_mut(node_index).take() else {
1,109✔
43
            panic!("Must pass in a node")
×
44
        };
×
45
        let node_handle = node.handle;
1,109✔
46
        let NodeKind::Processor(processor) = node.kind else {
1,109✔
47
            panic!("Must pass in a processor node");
×
48
        };
×
49

×
50
        let (port_handles, receivers) = dag.collect_receivers(node_index);
1,109✔
51

1,109✔
52
        let (senders, _) = dag.collect_senders_and_record_writers(node_index);
1,109✔
53

1,109✔
54
        let channel_manager = ProcessorChannelManager::new(
1,109✔
55
            node_handle.clone(),
1,109✔
56
            senders,
1,109✔
57
            None,
1,109✔
58
            dag.error_manager().clone(),
1,109✔
59
        );
1,109✔
60

1,109✔
61
        Self {
1,109✔
62
            node_handle,
1,109✔
63
            port_handles,
1,109✔
64
            receivers,
1,109✔
65
            processor,
1,109✔
66
            channel_manager,
1,109✔
67
            record_store: dag.record_store().clone(),
1,109✔
68
            error_manager: dag.error_manager().clone(),
1,109✔
69
        }
1,109✔
70
    }
1,109✔
71

×
72
    pub fn handle(&self) -> &NodeHandle {
1,109✔
73
        &self.node_handle
1,109✔
74
    }
1,109✔
75
}
76

×
77
impl Name for ProcessorNode {
×
78
    fn name(&self) -> Cow<str> {
×
79
        Cow::Owned(self.node_handle.to_string())
×
80
    }
×
81
}
82

×
83
impl ReceiverLoop for ProcessorNode {
×
84
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
1,109✔
85
        let mut result = vec![];
1,109✔
86
        swap(&mut self.receivers, &mut result);
1,109✔
87
        result
1,109✔
88
    }
1,109✔
89

×
90
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
91
        Cow::Owned(self.port_handles[index].to_string())
×
92
    }
×
93

×
94
    fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError> {
×
95
        if let Err(e) = self.processor.process(
5,405,329✔
96
            self.port_handles[index],
5,405,329✔
97
            &self.record_store,
5,405,329✔
98
            op,
5,405,329✔
99
            &mut self.channel_manager,
5,405,329✔
100
        ) {
5,405,329✔
101
            self.error_manager.report(e);
6✔
102
        }
5,405,323✔
103
        Ok(())
5,405,329✔
104
    }
5,405,329✔
105

×
106
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
×
107
        if let Err(e) = self.processor.commit(epoch) {
1,872✔
108
            self.error_manager.report(e);
×
109
        }
1,872✔
110
        self.channel_manager.store_and_send_commit(epoch)
1,872✔
111
    }
1,872✔
112

113
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
1,102✔
114
        self.channel_manager.send_terminate()
1,102✔
115
    }
1,102✔
116

117
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
80✔
118
        self.channel_manager.send_snapshotting_done(connection_name)
80✔
119
    }
80✔
120
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc