• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4370280743

pending completion
4370280743

push

github

GitHub
Bump async-trait from 0.1.65 to 0.1.66 (#1179)

27808 of 38702 relevant lines covered (71.85%)

25323.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.0
/dozer-core/src/node.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::epoch::Epoch;
3
use crate::errors::ExecutionError;
4
use dozer_storage::lmdb_storage::{LmdbExclusiveTransaction, SharedTransaction};
5

6
use dozer_types::types::{Operation, Schema};
7
use std::collections::HashMap;
8
use std::fmt::{Debug, Display, Formatter};
9

10
pub type PortHandle = u16;
11

12
#[derive(Debug, Clone, Copy)]
3✔
13
pub enum OutputPortType {
×
14
    Stateless,
15
    StatefulWithPrimaryKeyLookup,
16
}
17

18
impl Display for OutputPortType {
19
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
20
        match self {
×
21
            OutputPortType::Stateless => f.write_str("Stateless"),
×
22
            OutputPortType::StatefulWithPrimaryKeyLookup { .. } => {
×
23
                f.write_str("StatefulWithPrimaryKeyLookup")
×
24
            }
×
25
        }
26
    }
×
27
}
×
28

29
#[derive(Debug, Clone)]
×
30
pub struct OutputPortDef {
×
31
    pub handle: PortHandle,
32
    pub typ: OutputPortType,
33
}
34

35
impl OutputPortDef {
36
    pub fn new(handle: PortHandle, typ: OutputPortType) -> Self {
6,653✔
37
        Self { handle, typ }
6,653✔
38
    }
6,653✔
39
}
×
40

41
pub trait SourceFactory<T>: Send + Sync + Debug {
42
    fn get_output_schema(&self, port: &PortHandle) -> Result<(Schema, T), ExecutionError>;
43
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
44
    fn build(
45
        &self,
46
        output_schemas: HashMap<PortHandle, Schema>,
47
    ) -> Result<Box<dyn Source>, ExecutionError>;
48
}
49

50
pub trait Source: Send + Sync + Debug {
51
    /// Checks if the source can start from the given checkpoint.
52
    /// If this function returns false, the executor will start the source from the beginning.
53
    fn can_start_from(&self, last_checkpoint: (u64, u64)) -> Result<bool, ExecutionError>;
54
    fn start(
55
        &self,
56
        fw: &mut dyn SourceChannelForwarder,
57
        last_checkpoint: Option<(u64, u64)>,
58
    ) -> Result<(), ExecutionError>;
59
}
60

61
pub trait ProcessorFactory<T>: Send + Sync + Debug {
62
    fn get_output_schema(
63
        &self,
64
        output_port: &PortHandle,
65
        input_schemas: &HashMap<PortHandle, (Schema, T)>,
66
    ) -> Result<(Schema, T), ExecutionError>;
67
    fn get_input_ports(&self) -> Vec<PortHandle>;
68
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
69
    fn build(
70
        &self,
71
        input_schemas: HashMap<PortHandle, Schema>,
72
        output_schemas: HashMap<PortHandle, Schema>,
73
        txn: &mut LmdbExclusiveTransaction,
74
    ) -> Result<Box<dyn Processor>, ExecutionError>;
75
}
76

77
pub trait Processor: Send + Sync + Debug {
78
    fn commit(&self, epoch_details: &Epoch, tx: &SharedTransaction) -> Result<(), ExecutionError>;
79
    fn process(
80
        &mut self,
81
        from_port: PortHandle,
82
        op: Operation,
83
        fw: &mut dyn ProcessorChannelForwarder,
84
        tx: &SharedTransaction,
85
    ) -> Result<(), ExecutionError>;
86
}
87

88
pub trait SinkFactory<T>: Send + Sync + Debug {
89
    fn get_input_ports(&self) -> Vec<PortHandle>;
90
    fn prepare(
91
        &self,
92
        input_schemas: HashMap<PortHandle, (Schema, T)>,
93
    ) -> Result<(), ExecutionError>;
94
    fn build(
95
        &self,
96
        input_schemas: HashMap<PortHandle, Schema>,
97
    ) -> Result<Box<dyn Sink>, ExecutionError>;
98
}
99

100
pub trait Sink: Send + Sync + Debug {
101
    fn commit(&mut self, tx: &SharedTransaction) -> Result<(), ExecutionError>;
102
    fn process(
103
        &mut self,
104
        from_port: PortHandle,
105
        op: Operation,
106
        state: &SharedTransaction,
107
    ) -> Result<(), ExecutionError>;
108

109
    fn on_source_snapshotting_done(&mut self) -> Result<(), ExecutionError>;
110
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc