• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5819169315

pending completion
5819169315

push

github

web-flow
Fix JavaScript capitalization (#1840)

45504 of 61139 relevant lines covered (74.43%)

58086.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.9
/dozer-core/src/executor/processor_node.rs
1
use std::sync::Arc;
2
use std::{borrow::Cow, mem::swap};
3

4
use crossbeam::channel::Receiver;
5
use daggy::NodeIndex;
6
use dozer_types::epoch::Epoch;
7
use dozer_types::node::NodeHandle;
8

9
use crate::error_manager::ErrorManager;
10
use crate::executor_operation::{ExecutorOperation, ProcessorOperation};
11
use crate::processor_record::ProcessorRecordStore;
12
use crate::{
13
    builder_dag::NodeKind,
14
    errors::ExecutionError,
15
    forwarder::{ProcessorChannelManager, StateWriter},
16
    node::{PortHandle, Processor},
17
};
18

19
use super::{execution_dag::ExecutionDag, name::Name, receiver_loop::ReceiverLoop};
20

21
/// A processor in the execution DAG.
×
22
#[derive(Debug)]
×
23
pub struct ProcessorNode {
24
    /// Node handle in description DAG.
25
    node_handle: NodeHandle,
26
    /// Input port handles.
27
    port_handles: Vec<PortHandle>,
28
    /// Input data channels.
29
    receivers: Vec<Receiver<ExecutorOperation>>,
30
    /// The processor.
31
    processor: Box<dyn Processor>,
32
    /// This node's output channel manager, for forwarding data, writing metadata and writing port state.
33
    channel_manager: ProcessorChannelManager,
34
    /// Where all the records from ingested data are stored.
35
    record_store: ProcessorRecordStore,
36
    /// The error manager, for reporting non-fatal errors.
37
    error_manager: Arc<ErrorManager>,
38
}
×
39

×
40
impl ProcessorNode {
×
41
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex) -> Self {
1,109✔
42
        let Some(node) = dag.node_weight_mut(node_index).take() else {
1,109✔
43
            panic!("Must pass in a node")
×
44
        };
×
45
        let node_handle = node.handle;
1,109✔
46
        let NodeKind::Processor(processor) = node.kind else {
1,109✔
47
            panic!("Must pass in a processor node");
×
48
        };
×
49

×
50
        let (port_handles, receivers) = dag.collect_receivers(node_index);
1,109✔
51

1,109✔
52
        let (senders, record_writers) = dag.collect_senders_and_record_writers(node_index);
1,109✔
53

1,109✔
54
        let state_writer = StateWriter::new(record_writers);
1,109✔
55
        let channel_manager = ProcessorChannelManager::new(
1,109✔
56
            node_handle.clone(),
1,109✔
57
            senders,
1,109✔
58
            state_writer,
1,109✔
59
            true,
1,109✔
60
            dag.error_manager().clone(),
1,109✔
61
        );
1,109✔
62

1,109✔
63
        Self {
1,109✔
64
            node_handle,
1,109✔
65
            port_handles,
1,109✔
66
            receivers,
1,109✔
67
            processor,
1,109✔
68
            channel_manager,
1,109✔
69
            record_store: dag.record_store().clone(),
1,109✔
70
            error_manager: dag.error_manager().clone(),
1,109✔
71
        }
1,109✔
72
    }
1,109✔
73

74
    pub fn handle(&self) -> &NodeHandle {
1,109✔
75
        &self.node_handle
1,109✔
76
    }
1,109✔
77
}
×
78

×
79
impl Name for ProcessorNode {
80
    fn name(&self) -> Cow<str> {
×
81
        Cow::Owned(self.node_handle.to_string())
×
82
    }
×
83
}
×
84

×
85
impl ReceiverLoop for ProcessorNode {
×
86
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
1,109✔
87
        let mut result = vec![];
1,109✔
88
        swap(&mut self.receivers, &mut result);
1,109✔
89
        result
1,109✔
90
    }
1,109✔
91

92
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
93
        Cow::Owned(self.port_handles[index].to_string())
×
94
    }
×
95

×
96
    fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError> {
×
97
        if let Err(e) = self.processor.process(
5,501,644✔
98
            self.port_handles[index],
5,501,644✔
99
            &self.record_store,
5,501,644✔
100
            op,
5,501,644✔
101
            &mut self.channel_manager,
5,501,644✔
102
        ) {
5,501,644✔
103
            self.error_manager.report(e);
6✔
104
        }
5,501,638✔
105
        Ok(())
5,501,644✔
106
    }
5,501,644✔
107

×
108
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
109
        if let Err(e) = self.processor.commit(epoch) {
1,768✔
110
            self.error_manager.report(e);
×
111
        }
1,768✔
112
        self.channel_manager.store_and_send_commit(epoch)
1,768✔
113
    }
1,768✔
114

×
115
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
1,102✔
116
        self.channel_manager.send_terminate()
1,102✔
117
    }
1,102✔
118

119
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
80✔
120
        self.channel_manager.send_snapshotting_done(connection_name)
80✔
121
    }
80✔
122
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc