• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.0
/dozer-core/src/node.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::epoch::Epoch;
3
use crate::executor_operation::ProcessorOperation;
4
use dozer_recordstore::{ProcessorRecordStore, ProcessorRecordStoreDeserializer};
5

6
use dozer_log::storage::{Object, Queue};
7
use dozer_types::errors::internal::BoxedError;
8
use dozer_types::node::OpIdentifier;
9
use dozer_types::serde::{Deserialize, Serialize};
10
use dozer_types::types::Schema;
11
use std::collections::HashMap;
12
use std::fmt::{Debug, Display, Formatter};
13

14
pub type PortHandle = u16;
15

16
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
×
17
#[serde(crate = "dozer_types::serde")]
18
pub enum OutputPortType {
19
    Stateless,
20
    StatefulWithPrimaryKeyLookup,
21
}
22

23
impl Display for OutputPortType {
24
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
25
        match self {
×
26
            OutputPortType::Stateless => f.write_str("Stateless"),
×
27
            OutputPortType::StatefulWithPrimaryKeyLookup { .. } => {
28
                f.write_str("StatefulWithPrimaryKeyLookup")
×
29
            }
30
        }
31
    }
×
32
}
33

34
#[derive(Debug, Clone)]
×
35
pub struct OutputPortDef {
36
    pub handle: PortHandle,
37
    pub typ: OutputPortType,
38
}
39

40
impl OutputPortDef {
41
    pub fn new(handle: PortHandle, typ: OutputPortType) -> Self {
10,967✔
42
        Self { handle, typ }
10,967✔
43
    }
10,967✔
44
}
45

46
pub trait SourceFactory: Send + Sync + Debug {
47
    fn get_output_schema(&self, port: &PortHandle) -> Result<Schema, BoxedError>;
48
    fn get_output_port_name(&self, port: &PortHandle) -> String;
49
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
50
    fn build(
51
        &self,
52
        output_schemas: HashMap<PortHandle, Schema>,
53
    ) -> Result<Box<dyn Source>, BoxedError>;
54
}
55

56
pub type SourceState = HashMap<PortHandle, Option<OpIdentifier>>;
57

58
pub trait Source: Send + Sync + Debug {
59
    fn start(
60
        &self,
61
        fw: &mut dyn SourceChannelForwarder,
62
        last_checkpoint: SourceState,
63
    ) -> Result<(), BoxedError>;
64
}
65

66
pub trait ProcessorFactory: Send + Sync + Debug {
67
    fn get_output_schema(
68
        &self,
69
        output_port: &PortHandle,
70
        input_schemas: &HashMap<PortHandle, Schema>,
71
    ) -> Result<Schema, BoxedError>;
72
    fn get_input_ports(&self) -> Vec<PortHandle>;
73
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
74
    fn build(
75
        &self,
76
        input_schemas: HashMap<PortHandle, Schema>,
77
        output_schemas: HashMap<PortHandle, Schema>,
78
        record_store: &ProcessorRecordStoreDeserializer,
79
        checkpoint_data: Option<Vec<u8>>,
80
    ) -> Result<Box<dyn Processor>, BoxedError>;
81
    fn type_name(&self) -> String;
82
    fn id(&self) -> String;
83
}
84

85
pub trait Processor: Send + Sync + Debug {
86
    fn commit(&self, epoch_details: &Epoch) -> Result<(), BoxedError>;
87
    fn process(
88
        &mut self,
89
        from_port: PortHandle,
90
        record_store: &ProcessorRecordStore,
91
        op: ProcessorOperation,
92
        fw: &mut dyn ProcessorChannelForwarder,
93
    ) -> Result<(), BoxedError>;
94
    fn serialize(
95
        &mut self,
96
        record_store: &ProcessorRecordStore,
97
        object: Object,
98
    ) -> Result<(), BoxedError>;
99
}
100

101
pub trait SinkFactory: Send + Sync + Debug {
102
    fn get_input_ports(&self) -> Vec<PortHandle>;
103
    fn prepare(&self, input_schemas: HashMap<PortHandle, Schema>) -> Result<(), BoxedError>;
104
    fn build(
105
        &self,
106
        input_schemas: HashMap<PortHandle, Schema>,
107
    ) -> Result<Box<dyn Sink>, BoxedError>;
108
}
109

110
pub trait Sink: Send + Sync + Debug {
111
    fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError>;
112
    fn process(
113
        &mut self,
114
        from_port: PortHandle,
115
        record_store: &ProcessorRecordStore,
116
        op: ProcessorOperation,
117
    ) -> Result<(), BoxedError>;
118
    fn persist(&mut self, queue: &Queue) -> Result<(), BoxedError>;
119

120
    fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError>;
121
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc