• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.3
/dozer-core/src/executor/processor_node.rs
1
use std::sync::Arc;
2
use std::{borrow::Cow, mem::swap};
3

4
use crossbeam::channel::Receiver;
5
use daggy::NodeIndex;
6
use dozer_types::node::NodeHandle;
7

8
use crate::epoch::Epoch;
9
use crate::error_manager::ErrorManager;
10
use crate::executor_operation::{ExecutorOperation, ProcessorOperation};
11
use crate::{
12
    builder_dag::NodeKind,
13
    errors::ExecutionError,
14
    forwarder::ProcessorChannelManager,
15
    node::{PortHandle, Processor},
16
};
17
use dozer_recordstore::ProcessorRecordStore;
18

19
use super::{execution_dag::ExecutionDag, name::Name, receiver_loop::ReceiverLoop};
20

21
/// A processor in the execution DAG.
22
#[derive(Debug)]
×
23
pub struct ProcessorNode {
24
    /// Node handle in description DAG.
25
    node_handle: NodeHandle,
26
    /// The epoch id the processor was constructed for.
27
    initial_epoch_id: u64,
28
    /// Input port handles.
29
    port_handles: Vec<PortHandle>,
30
    /// Input data channels.
31
    receivers: Vec<Receiver<ExecutorOperation>>,
32
    /// The processor.
33
    processor: Box<dyn Processor>,
34
    /// This node's output channel manager, for forwarding data, writing metadata and writing port state.
35
    channel_manager: ProcessorChannelManager,
36
    /// Where all the records from ingested data are stored.
37
    record_store: Arc<ProcessorRecordStore>,
38
    /// The error manager, for reporting non-fatal errors.
39
    error_manager: Arc<ErrorManager>,
40
}
41

42
impl ProcessorNode {
43
    pub fn new(dag: &mut ExecutionDag, node_index: NodeIndex, initial_epoch_id: u64) -> Self {
1,763✔
44
        let Some(node) = dag.node_weight_mut(node_index).take() else {
1,763✔
45
            panic!("Must pass in a node")
×
46
        };
47
        let node_handle = node.handle;
1,763✔
48
        let NodeKind::Processor(processor) = node.kind else {
1,763✔
49
            panic!("Must pass in a processor node");
×
50
        };
51

52
        let (port_handles, receivers) = dag.collect_receivers(node_index);
1,763✔
53

1,763✔
54
        let (senders, _) = dag.collect_senders_and_record_writers(node_index);
1,763✔
55

1,763✔
56
        let channel_manager = ProcessorChannelManager::new(
1,763✔
57
            node_handle.clone(),
1,763✔
58
            senders,
1,763✔
59
            None,
1,763✔
60
            dag.error_manager().clone(),
1,763✔
61
        );
1,763✔
62

1,763✔
63
        Self {
1,763✔
64
            node_handle,
1,763✔
65
            initial_epoch_id,
1,763✔
66
            port_handles,
1,763✔
67
            receivers,
1,763✔
68
            processor,
1,763✔
69
            channel_manager,
1,763✔
70
            record_store: dag.record_store().clone(),
1,763✔
71
            error_manager: dag.error_manager().clone(),
1,763✔
72
        }
1,763✔
73
    }
1,763✔
74

75
    pub fn handle(&self) -> &NodeHandle {
1,763✔
76
        &self.node_handle
1,763✔
77
    }
1,763✔
78
}
79

80
impl Name for ProcessorNode {
81
    fn name(&self) -> Cow<str> {
×
82
        Cow::Owned(self.node_handle.to_string())
×
83
    }
×
84
}
85

86
impl ReceiverLoop for ProcessorNode {
87
    fn initial_epoch_id(&self) -> u64 {
1,763✔
88
        self.initial_epoch_id
1,763✔
89
    }
1,763✔
90

91
    fn receivers(&mut self) -> Vec<Receiver<ExecutorOperation>> {
1,763✔
92
        let mut result = vec![];
1,763✔
93
        swap(&mut self.receivers, &mut result);
1,763✔
94
        result
1,763✔
95
    }
1,763✔
96

97
    fn receiver_name(&self, index: usize) -> Cow<str> {
×
98
        Cow::Owned(self.port_handles[index].to_string())
×
99
    }
×
100

101
    fn on_op(&mut self, index: usize, op: ProcessorOperation) -> Result<(), ExecutionError> {
102
        if let Err(e) = self.processor.process(
5,356,975✔
103
            self.port_handles[index],
5,356,975✔
104
            &self.record_store,
5,356,975✔
105
            op,
5,356,975✔
106
            &mut self.channel_manager,
5,356,975✔
107
        ) {
5,356,975✔
108
            self.error_manager.report(e);
6✔
109
        }
5,356,969✔
110
        Ok(())
5,356,975✔
111
    }
5,356,975✔
112

113
    fn on_commit(&mut self, epoch: &Epoch) -> Result<(), ExecutionError> {
114
        if let Err(e) = self.processor.commit(epoch) {
3,004✔
115
            self.error_manager.report(e);
×
116
        }
3,004✔
117

118
        if let Some(checkpoint_writer) = &epoch.common_info.checkpoint_writer {
3,004✔
119
            let object = checkpoint_writer.create_processor_object(&self.node_handle)?;
45✔
120
            self.processor
44✔
121
                .serialize(&self.record_store, object)
44✔
122
                .map_err(ExecutionError::FailedToCreateCheckpoint)?;
44✔
123
        }
2,959✔
124

125
        self.channel_manager.store_and_send_commit(epoch)
3,003✔
126
    }
3,004✔
127

128
    fn on_terminate(&mut self) -> Result<(), ExecutionError> {
1,755✔
129
        self.channel_manager.send_terminate()
1,755✔
130
    }
1,755✔
131

132
    fn on_snapshotting_done(&mut self, connection_name: String) -> Result<(), ExecutionError> {
128✔
133
        self.channel_manager.send_snapshotting_done(connection_name)
128✔
134
    }
128✔
135
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc