• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5954012408

23 Aug 2023 04:32PM UTC coverage: 75.86% (-0.2%) from 76.088%
5954012408

push

github

web-flow
chore: Move ContractService implementation to Contract (#1899)

* chore: Split `build.rs` to several files

* chore: Remove `serde` from `dozer-cli/Cargo.toml`

* chore: Move `ContractService` implementation to `Contract`

461 of 461 new or added lines in 8 files covered. (100.0%)

46996 of 61951 relevant lines covered (75.86%)

73804.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

30.0
/dozer-core/src/node.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::epoch::Epoch;
3
use crate::executor_operation::ProcessorOperation;
4
use crate::processor_record::ProcessorRecordStore;
5

6
use dozer_log::storage::Queue;
7
use dozer_types::errors::internal::BoxedError;
8
use dozer_types::serde::{Deserialize, Serialize};
9
use dozer_types::types::Schema;
10
use std::collections::HashMap;
11
use std::fmt::{Debug, Display, Formatter};
12

13
pub type PortHandle = u16;
14

15
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
×
16
#[serde(crate = "dozer_types::serde")]
17
pub enum OutputPortType {
18
    Stateless,
19
    StatefulWithPrimaryKeyLookup,
20
}
21

22
impl Display for OutputPortType {
23
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
24
        match self {
×
25
            OutputPortType::Stateless => f.write_str("Stateless"),
×
26
            OutputPortType::StatefulWithPrimaryKeyLookup { .. } => {
27
                f.write_str("StatefulWithPrimaryKeyLookup")
×
28
            }
29
        }
30
    }
×
31
}
32

33
#[derive(Debug, Clone)]
×
34
pub struct OutputPortDef {
35
    pub handle: PortHandle,
36
    pub typ: OutputPortType,
37
}
38

39
impl OutputPortDef {
40
    pub fn new(handle: PortHandle, typ: OutputPortType) -> Self {
19,877✔
41
        Self { handle, typ }
19,877✔
42
    }
19,877✔
43
}
44

45
pub trait SourceFactory<T>: Send + Sync + Debug {
46
    fn get_output_schema(&self, port: &PortHandle) -> Result<(Schema, T), BoxedError>;
47
    fn get_output_port_name(&self, port: &PortHandle) -> String;
48
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
49
    fn build(
50
        &self,
51
        output_schemas: HashMap<PortHandle, Schema>,
52
    ) -> Result<Box<dyn Source>, BoxedError>;
53
}
54

55
pub trait Source: Send + Sync + Debug {
56
    /// Checks if the source can start from the given checkpoint.
57
    /// If this function returns false, the executor will start the source from the beginning.
58
    fn can_start_from(&self, last_checkpoint: (u64, u64)) -> Result<bool, BoxedError>;
59
    fn start(
60
        &self,
61
        fw: &mut dyn SourceChannelForwarder,
62
        last_checkpoint: Option<(u64, u64)>,
63
    ) -> Result<(), BoxedError>;
64
}
65

66
pub trait ProcessorFactory<T>: Send + Sync + Debug {
67
    fn get_output_schema(
68
        &self,
69
        output_port: &PortHandle,
70
        input_schemas: &HashMap<PortHandle, (Schema, T)>,
71
    ) -> Result<(Schema, T), BoxedError>;
72
    fn get_input_ports(&self) -> Vec<PortHandle>;
73
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
74
    fn build(
75
        &self,
76
        input_schemas: HashMap<PortHandle, Schema>,
77
        output_schemas: HashMap<PortHandle, Schema>,
78
        record_store: &ProcessorRecordStore,
79
    ) -> Result<Box<dyn Processor>, BoxedError>;
80
    fn type_name(&self) -> String;
81
    fn id(&self) -> String;
82
}
83

84
pub trait Processor: Send + Sync + Debug {
85
    fn commit(&self, epoch_details: &Epoch) -> Result<(), BoxedError>;
86
    fn process(
87
        &mut self,
88
        from_port: PortHandle,
89
        record_store: &ProcessorRecordStore,
90
        op: ProcessorOperation,
91
        fw: &mut dyn ProcessorChannelForwarder,
92
    ) -> Result<(), BoxedError>;
93
}
94

95
pub trait SinkFactory<T>: Send + Sync + Debug {
96
    fn get_input_ports(&self) -> Vec<PortHandle>;
97
    fn prepare(&self, input_schemas: HashMap<PortHandle, (Schema, T)>) -> Result<(), BoxedError>;
98
    fn build(
99
        &self,
100
        input_schemas: HashMap<PortHandle, Schema>,
101
    ) -> Result<Box<dyn Sink>, BoxedError>;
102
}
103

104
pub trait Sink: Send + Sync + Debug {
105
    fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError>;
106
    fn process(
107
        &mut self,
108
        from_port: PortHandle,
109
        record_store: &ProcessorRecordStore,
110
        op: ProcessorOperation,
111
    ) -> Result<(), BoxedError>;
112
    fn persist(&mut self, queue: &Queue) -> Result<(), BoxedError>;
113

114
    fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError>;
115
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc