• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4360628700

pending completion
4360628700

push

github

GitHub
refactor: pipeline memory storage (#1135)

1152 of 1152 new or added lines in 40 files covered. (100.0%)

27472 of 39301 relevant lines covered (69.9%)

28369.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.0
/dozer-core/src/node.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::epoch::Epoch;
3
use crate::errors::ExecutionError;
4
use dozer_storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
5

6
use dozer_types::node::SourceStates;
7
use dozer_types::types::{Operation, Schema};
8
use std::collections::HashMap;
9
use std::fmt::{Debug, Display, Formatter};
10

11
pub type PortHandle = u16;
12

13
#[derive(Debug, Clone, Copy)]
3✔
14
pub enum OutputPortType {
×
15
    Stateless,
16
    StatefulWithPrimaryKeyLookup,
17
}
18

19
impl Display for OutputPortType {
20
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
21
        match self {
×
22
            OutputPortType::Stateless => f.write_str("Stateless"),
×
23
            OutputPortType::StatefulWithPrimaryKeyLookup { .. } => {
24
                f.write_str("StatefulWithPrimaryKeyLookup")
×
25
            }
×
26
        }
×
27
    }
×
28
}
29

×
30
#[derive(Debug, Clone)]
×
31
pub struct OutputPortDef {
×
32
    pub handle: PortHandle,
33
    pub typ: OutputPortType,
×
34
}
35

36
impl OutputPortDef {
×
37
    pub fn new(handle: PortHandle, typ: OutputPortType) -> Self {
6,655✔
38
        Self { handle, typ }
6,655✔
39
    }
6,655✔
40
}
41

42
pub trait SourceFactory<T>: Send + Sync + Debug {
43
    fn get_output_schema(&self, port: &PortHandle) -> Result<(Schema, T), ExecutionError>;
×
44
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
×
45
    fn build(
×
46
        &self,
47
        output_schemas: HashMap<PortHandle, Schema>,
48
    ) -> Result<Box<dyn Source>, ExecutionError>;
49
}
50

51
pub trait Source: Send + Sync + Debug {
52
    /// Checks if the source can start from the given checkpoint.
53
    /// If this function returns false, the executor will start the source from the beginning.
54
    fn can_start_from(&self, last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError>;
55
    fn start(
56
        &self,
57
        fw: &mut dyn SourceChannelForwarder,
58
        last_checkpoint: Option<(u64, u64)>,
59
    ) -> Result<(), ExecutionError>;
60
}
61

62
pub trait ProcessorFactory<T>: Send + Sync + Debug {
63
    fn get_output_schema(
64
        &self,
65
        output_port: &PortHandle,
66
        input_schemas: &HashMap<PortHandle, (Schema, T)>,
67
    ) -> Result<(Schema, T), ExecutionError>;
68
    fn get_input_ports(&self) -> Vec<PortHandle>;
69
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
70
    fn build(
71
        &self,
72
        input_schemas: HashMap<PortHandle, Schema>,
73
        output_schemas: HashMap<PortHandle, Schema>,
74
        txn: &mut LmdbExclusiveTransaction,
75
    ) -> Result<Box<dyn Processor>, ExecutionError>;
76
}
77

78
pub trait Processor: Send + Sync + Debug {
79
    fn commit(&self, epoch_details: &Epoch, tx: &SharedTransaction) -> Result<(), ExecutionError>;
80
    fn process(
81
        &mut self,
82
        from_port: PortHandle,
83
        op: Operation,
84
        fw: &mut dyn ProcessorChannelForwarder,
85
        tx: &SharedTransaction,
86
    ) -> Result<(), ExecutionError>;
87
}
88

89
pub trait SinkFactory<T>: Send + Sync + Debug {
90
    fn get_input_ports(&self) -> Vec<PortHandle>;
91
    fn prepare(
92
        &self,
93
        input_schemas: HashMap<PortHandle, (Schema, T)>,
94
    ) -> Result<(), ExecutionError>;
95
    fn build(
96
        &self,
97
        input_schemas: HashMap<PortHandle, Schema>,
98
        checkpoint: &SourceStates,
99
    ) -> Result<Box<dyn Sink>, ExecutionError>;
100
}
101

102
pub trait Sink: Send + Sync + Debug {
103
    fn commit(
104
        &mut self,
105
        epoch_details: &Epoch,
106
        tx: &SharedTransaction,
107
    ) -> Result<(), ExecutionError>;
108
    fn process(
109
        &mut self,
110
        from_port: PortHandle,
111
        op: Operation,
112
        state: &SharedTransaction,
113
    ) -> Result<(), ExecutionError>;
114

115
    fn on_source_snapshotting_done(&mut self) -> Result<(), ExecutionError>;
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc