• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

25.0
/dozer-core/src/node.rs
1
use crate::channels::{ProcessorChannelForwarder, SourceChannelForwarder};
2
use crate::epoch::Epoch;
3
use crate::executor_operation::ProcessorOperation;
4
use crate::processor_record::ProcessorRecordStore;
5

6
use dozer_types::errors::internal::BoxedError;
7
use dozer_types::serde::{Deserialize, Serialize};
8
use dozer_types::types::Schema;
9
use std::collections::HashMap;
10
use std::fmt::{Debug, Display, Formatter};
11

12
pub type PortHandle = u16;
13

14
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
22✔
15
#[serde(crate = "dozer_types::serde")]
×
16
pub enum OutputPortType {
17
    Stateless,
18
    StatefulWithPrimaryKeyLookup,
19
}
20

21
impl Display for OutputPortType {
22
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
23
        match self {
×
24
            OutputPortType::Stateless => f.write_str("Stateless"),
×
25
            OutputPortType::StatefulWithPrimaryKeyLookup { .. } => {
×
26
                f.write_str("StatefulWithPrimaryKeyLookup")
×
27
            }
×
28
        }
29
    }
×
30
}
×
31

32
#[derive(Debug, Clone)]
×
33
pub struct OutputPortDef {
×
34
    pub handle: PortHandle,
35
    pub typ: OutputPortType,
36
}
37

38
impl OutputPortDef {
39
    pub fn new(handle: PortHandle, typ: OutputPortType) -> Self {
6,217✔
40
        Self { handle, typ }
6,217✔
41
    }
6,217✔
42
}
×
43

44
pub trait SourceFactory<T>: Send + Sync + Debug {
45
    fn get_output_schema(&self, port: &PortHandle) -> Result<(Schema, T), BoxedError>;
46
    fn get_output_port_name(&self, port: &PortHandle) -> String;
47
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
48
    fn build(
49
        &self,
50
        output_schemas: HashMap<PortHandle, Schema>,
51
    ) -> Result<Box<dyn Source>, BoxedError>;
52
}
53

54
pub trait Source: Send + Sync + Debug {
55
    /// Checks if the source can start from the given checkpoint.
56
    /// If this function returns false, the executor will start the source from the beginning.
57
    fn can_start_from(&self, last_checkpoint: (u64, u64)) -> Result<bool, BoxedError>;
58
    fn start(
59
        &self,
60
        fw: &mut dyn SourceChannelForwarder,
61
        last_checkpoint: Option<(u64, u64)>,
62
    ) -> Result<(), BoxedError>;
63
}
64

65
pub trait ProcessorFactory<T>: Send + Sync + Debug {
66
    fn get_output_schema(
67
        &self,
68
        output_port: &PortHandle,
69
        input_schemas: &HashMap<PortHandle, (Schema, T)>,
70
    ) -> Result<(Schema, T), BoxedError>;
71
    fn get_input_ports(&self) -> Vec<PortHandle>;
72
    fn get_output_ports(&self) -> Vec<OutputPortDef>;
73
    fn build(
74
        &self,
75
        input_schemas: HashMap<PortHandle, Schema>,
76
        output_schemas: HashMap<PortHandle, Schema>,
77
        record_store: &ProcessorRecordStore,
78
    ) -> Result<Box<dyn Processor>, BoxedError>;
79
    fn type_name(&self) -> String;
80
    fn id(&self) -> String;
81
}
82

83
pub trait Processor: Send + Sync + Debug {
84
    fn commit(&self, epoch_details: &Epoch) -> Result<(), BoxedError>;
85
    fn process(
86
        &mut self,
87
        from_port: PortHandle,
88
        record_store: &ProcessorRecordStore,
89
        op: ProcessorOperation,
90
        fw: &mut dyn ProcessorChannelForwarder,
91
    ) -> Result<(), BoxedError>;
92
}
93

94
pub trait SinkFactory<T>: Send + Sync + Debug {
95
    fn get_input_ports(&self) -> Vec<PortHandle>;
96
    fn prepare(&self, input_schemas: HashMap<PortHandle, (Schema, T)>) -> Result<(), BoxedError>;
97
    fn build(
98
        &self,
99
        input_schemas: HashMap<PortHandle, Schema>,
100
    ) -> Result<Box<dyn Sink>, BoxedError>;
101
}
102

103
pub trait Sink: Send + Sync + Debug {
104
    fn commit(&mut self, epoch_details: &Epoch) -> Result<(), BoxedError>;
105
    fn process(
106
        &mut self,
107
        from_port: PortHandle,
108
        record_store: &ProcessorRecordStore,
109
        op: ProcessorOperation,
110
    ) -> Result<(), BoxedError>;
111

112
    fn on_source_snapshotting_done(&mut self, connection_name: String) -> Result<(), BoxedError>;
113
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc