• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4283961027

pending completion
4283961027

push

github

GitHub
feat: Blue green cache (#1061)

645 of 645 new or added lines in 45 files covered. (100.0%)

27779 of 39307 relevant lines covered (70.67%)

52489.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

22.22
/dozer-core/src/node.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::epoch::Epoch;
3
use crate::errors::ExecutionError;
4
use crate::record_store::RecordReader;
5
use dozer_storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
6

7
use dozer_types::node::SourceStates;
8
use dozer_types::types::{Operation, Schema};
9
use std::collections::HashMap;
10
use std::fmt::{Debug, Display, Formatter};
11

12
pub type PortHandle = u16;
13

×
14
#[derive(Debug, Clone, Copy)]
967✔
15
pub enum OutputPortType {
16
    Stateless,
17
    StatefulWithPrimaryKeyLookup {
18
        retr_old_records_for_deletes: bool,
19
        retr_old_records_for_updates: bool,
20
    },
21
    AutogenRowKeyLookup,
22
}
23

24
impl Display for OutputPortType {
×
25
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
26
        match self {
×
27
            OutputPortType::Stateless => f.write_str("Stateless"),
×
28
            OutputPortType::StatefulWithPrimaryKeyLookup { .. } => {
×
29
                f.write_str("StatefulWithPrimaryKeyLookup")
×
30
            }
×
31
            OutputPortType::AutogenRowKeyLookup => f.write_str("AutogenRowKeyLookup"),
×
32
        }
×
33
    }
×
34
}
35

×
36
#[derive(Debug, Clone)]
×
37
pub struct OutputPortDef {
38
    pub handle: PortHandle,
39
    pub typ: OutputPortType,
40
}
41

42
impl OutputPortDef {
×
43
    pub fn new(handle: PortHandle, typ: OutputPortType) -> Self {
6,682✔
44
        Self { handle, typ }
6,682✔
45
    }
6,682✔
46
}
47

48
pub trait SourceFactory<T>: Send + Sync + Debug {
49
    fn get_output_schema(&self, port: &PortHandle) -> Result<(Schema, T), ExecutionError>;
50
    fn get_output_ports(&self) -> Result<Vec<OutputPortDef>, ExecutionError>;
51
    fn build(
52
        &self,
53
        output_schemas: HashMap<PortHandle, Schema>,
54
    ) -> Result<Box<dyn Source>, ExecutionError>;
55
}
56

57
pub trait Source: Send + Sync + Debug {
58
    /// Checks if the source can start from the given checkpoint.
59
    /// If this function returns false, the executor will start the source from the beginning.
60
    fn can_start_from(&self, last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError>;
61
    fn start(
62
        &self,
63
        fw: &mut dyn SourceChannelForwarder,
64
        last_checkpoint: Option<(u64, u64)>,
65
    ) -> Result<(), ExecutionError>;
66
}
67

68
pub trait ProcessorFactory<T>: Send + Sync + Debug {
69
    fn get_output_schema(
70
        &self,
71
        output_port: &PortHandle,
72
        input_schemas: &HashMap<PortHandle, (Schema, T)>,
73
    ) -> Result<(Schema, T), ExecutionError>;
74
    fn get_input_ports(&self) -> Vec<PortHandle>;
75
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
76
    fn build(
77
        &self,
78
        input_schemas: HashMap<PortHandle, Schema>,
79
        output_schemas: HashMap<PortHandle, Schema>,
80
        txn: &mut LmdbExclusiveTransaction,
81
    ) -> Result<Box<dyn Processor>, ExecutionError>;
82
}
83

84
pub trait Processor: Send + Sync + Debug {
85
    fn commit(&self, epoch_details: &Epoch, tx: &SharedTransaction) -> Result<(), ExecutionError>;
86
    fn process(
87
        &mut self,
88
        from_port: PortHandle,
89
        op: Operation,
90
        fw: &mut dyn ProcessorChannelForwarder,
91
        tx: &SharedTransaction,
92
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
93
    ) -> Result<(), ExecutionError>;
94
}
95

96
pub trait SinkFactory<T>: Send + Sync + Debug {
97
    fn get_input_ports(&self) -> Vec<PortHandle>;
98
    fn prepare(
99
        &self,
100
        input_schemas: HashMap<PortHandle, (Schema, T)>,
101
    ) -> Result<(), ExecutionError>;
102
    fn build(
103
        &self,
104
        input_schemas: HashMap<PortHandle, Schema>,
105
        checkpoint: &SourceStates,
106
    ) -> Result<Box<dyn Sink>, ExecutionError>;
107
}
108

109
pub trait Sink: Send + Sync + Debug {
110
    fn commit(
111
        &mut self,
112
        epoch_details: &Epoch,
113
        tx: &SharedTransaction,
114
    ) -> Result<(), ExecutionError>;
115
    fn process(
116
        &mut self,
117
        from_port: PortHandle,
118
        op: Operation,
119
        state: &SharedTransaction,
120
        reader: &HashMap<PortHandle, Box<dyn RecordReader>>,
121
    ) -> Result<(), ExecutionError>;
122
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc