• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6011532274

29 Aug 2023 11:17AM UTC coverage: 76.491% (-0.1%) from 76.616%
6011532274

push

github

web-flow
fix: Include connection type in `GenerateDot`. Fix `AggregationProcessorFactory::type_name` (#1934)

170 of 170 new or added lines in 5 files covered. (100.0%)

49016 of 64081 relevant lines covered (76.49%)

48200.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.81
/dozer-cli/src/simple/build/contract/mod.rs
1
use std::{
2
    collections::{BTreeMap, HashMap, HashSet},
3
    fs::OpenOptions,
4
    path::Path,
5
};
6

7
use dozer_cache::dozer_log::{home_dir::BuildPath, schemas::EndpointSchema};
8
use dozer_core::{
9
    dag_schemas::DagSchemas,
10
    daggy::{self, NodeIndex},
11
    node::PortHandle,
12
    petgraph::{
13
        visit::{EdgeRef, IntoEdgesDirected, IntoNodeReferences},
14
        Direction,
15
    },
16
};
17
use dozer_types::{
18
    models::{
19
        api_endpoint::ApiEndpoint,
20
        connection::{Connection, ConnectionConfig},
21
    },
22
    node::NodeHandle,
23
    types::Schema,
24
};
25
use dozer_types::{
×
26
    serde::{de::DeserializeOwned, Deserialize, Serialize},
27
    serde_json,
28
};
29

30
use crate::errors::BuildError;
31

32
#[derive(Debug, Clone, Serialize, Deserialize)]
85✔
33
#[serde(crate = "dozer_types::serde")]
34
pub struct NodeType {
35
    pub handle: NodeHandle,
36
    pub kind: NodeKind,
37
}
38

39
#[derive(Debug, Clone, Serialize, Deserialize)]
85✔
40
#[serde(crate = "dozer_types::serde")]
41
pub enum NodeKind {
42
    Source {
×
43
        typ: String,
44
        port_names: HashMap<PortHandle, String>,
45
    },
46
    Processor {
47
        typ: String,
48
    },
49
    Sink,
50
}
51

52
#[derive(Debug, Clone, Serialize, Deserialize)]
70✔
53
#[serde(crate = "dozer_types::serde")]
54
pub struct EdgeType {
55
    pub from_port: PortHandle,
56
    pub to_port: PortHandle,
57
    pub schema: Schema,
58
}
59

×
60
pub type PipelineContract = daggy::Dag<NodeType, EdgeType>;
×
61

×
62
#[derive(Debug, Clone)]
×
63
pub struct Contract {
×
64
    pub pipeline: PipelineContract,
×
65
    pub endpoints: BTreeMap<String, EndpointSchema>,
×
66
}
×
67

×
68
impl Contract {
×
69
    pub fn new(
30✔
70
        dag_schemas: &DagSchemas,
30✔
71
        connections: &[Connection],
30✔
72
        endpoints: &[ApiEndpoint],
30✔
73
        enable_token: bool,
30✔
74
        enable_on_event: bool,
30✔
75
    ) -> Result<Self, BuildError> {
30✔
76
        let mut endpoint_schemas = BTreeMap::new();
30✔
77
        for endpoint in endpoints {
60✔
78
            let node_index = find_sink(dag_schemas, &endpoint.name)
30✔
79
                .ok_or(BuildError::MissingEndpoint(endpoint.name.clone()))?;
30✔
80

×
81
            let (schema, secondary_indexes) =
30✔
82
                modify_schema::modify_schema(sink_input_schema(dag_schemas, node_index), endpoint)?;
30✔
83

84
            let connections = collect_ancestor_sources(dag_schemas, node_index);
30✔
85

30✔
86
            let schema = EndpointSchema {
30✔
87
                schema,
30✔
88
                secondary_indexes,
30✔
89
                enable_token,
30✔
90
                enable_on_event,
30✔
91
                connections,
30✔
92
            };
30✔
93
            endpoint_schemas.insert(endpoint.name.clone(), schema);
30✔
94
        }
×
95

×
96
        let mut source_types = HashMap::new();
30✔
97
        for (node_index, node) in dag_schemas.graph().node_references() {
85✔
98
            if let dozer_core::NodeKind::Source(_) = &node.kind {
85✔
99
                let connection = connections
30✔
100
                    .iter()
30✔
101
                    .find(|connection| connection.name == node.handle.id)
30✔
102
                    .ok_or(BuildError::MissingConnection(node.handle.id.clone()))?;
30✔
103
                let typ = match &connection.config {
30✔
104
                    None => "None",
×
105
                    Some(ConnectionConfig::Postgres(_)) => "Postgres",
×
106
                    Some(ConnectionConfig::Ethereum(_)) => "Ethereum",
×
107
                    Some(ConnectionConfig::Grpc(_)) => "Grpc",
25✔
108
                    Some(ConnectionConfig::Snowflake(_)) => "Snowflake",
×
109
                    Some(ConnectionConfig::Kafka(_)) => "Kafka",
×
110
                    Some(ConnectionConfig::S3Storage(_)) => "S3Storage",
×
111
                    Some(ConnectionConfig::LocalStorage(_)) => "LocalStorage",
5✔
112
                    Some(ConnectionConfig::DeltaLake(_)) => "DeltaLake",
×
113
                    Some(ConnectionConfig::MongoDB(_)) => "MongoDB",
×
114
                    Some(ConnectionConfig::MySQL(_)) => "MySQL",
×
115
                    Some(ConnectionConfig::Dozer(_)) => "Dozer",
×
116
                };
×
117
                source_types.insert(node_index, typ);
30✔
118
            }
55✔
119
        }
120

×
121
        let graph = dag_schemas.graph();
30✔
122
        let pipeline = graph.map(
30✔
123
            |node_index, node| {
85✔
124
                let handle = node.handle.clone();
85✔
125
                let kind = match &node.kind {
85✔
126
                    dozer_core::NodeKind::Source(source) => {
30✔
127
                        let typ = source_types
30✔
128
                            .get(&node_index)
30✔
129
                            .expect("Source must have a type")
30✔
130
                            .to_string();
30✔
131
                        let port_names = source
30✔
132
                            .get_output_ports()
30✔
133
                            .iter()
30✔
134
                            .map(|port| {
40✔
135
                                let port_name = source.get_output_port_name(&port.handle);
40✔
136
                                (port.handle, port_name)
40✔
137
                            })
40✔
138
                            .collect();
30✔
139
                        NodeKind::Source { typ, port_names }
30✔
140
                    }
141
                    dozer_core::NodeKind::Processor(processor) => NodeKind::Processor {
25✔
142
                        typ: processor.type_name(),
25✔
143
                    },
25✔
144
                    dozer_core::NodeKind::Sink(_) => NodeKind::Sink,
30✔
145
                };
×
146
                NodeType { handle, kind }
85✔
147
            },
85✔
148
            |_, edge| EdgeType {
70✔
149
                from_port: edge.output_port,
70✔
150
                to_port: edge.input_port,
70✔
151
                schema: edge.schema.clone(),
70✔
152
            },
70✔
153
        );
30✔
154

30✔
155
        Ok(Self {
30✔
156
            pipeline,
30✔
157
            endpoints: endpoint_schemas,
30✔
158
        })
30✔
159
    }
30✔
160

161
    pub fn serialize(&self, build_path: &BuildPath) -> Result<(), BuildError> {
×
162
        serde_json_to_path(&build_path.dag_path, &self.pipeline)?;
30✔
163

×
164
        for (endpoint_name, schema) in &self.endpoints {
60✔
165
            let endpoint_path = build_path.get_endpoint_path(endpoint_name);
30✔
166
            serde_json_to_path(&endpoint_path.schema_path, schema)?;
30✔
167
        }
168

×
169
        Ok(())
30✔
170
    }
30✔
171

×
172
    pub fn deserialize(build_path: &BuildPath) -> Result<Self, BuildError> {
×
173
        let pipeline: daggy::Dag<NodeType, EdgeType> = serde_json_from_path(&build_path.dag_path)?;
×
174

×
175
        let mut endpoints = BTreeMap::new();
×
176
        for (node_index, node) in pipeline.node_references() {
×
177
            // Endpoint must have zero out degree.
×
178
            if pipeline
×
179
                .edges_directed(node_index, Direction::Outgoing)
×
180
                .count()
×
181
                > 0
×
182
            {
183
                continue;
×
184
            }
×
185

×
186
            // `NodeHandle::id` is the endpoint name.
×
187
            let endpoint_name = node.handle.id.clone();
×
188
            let endpoint_path = build_path.get_endpoint_path(&endpoint_name);
×
189
            let schema: EndpointSchema = serde_json_from_path(&endpoint_path.schema_path)?;
×
190
            endpoints.insert(endpoint_name, schema);
×
191
        }
×
192

×
193
        Ok(Self {
×
194
            pipeline,
×
195
            endpoints,
×
196
        })
×
197
    }
×
198
}
×
199

×
200
mod service;
×
201

202
/// Sink's `NodeHandle::id` must be `endpoint_name`.
×
203
fn find_sink(dag: &DagSchemas, endpoint_name: &str) -> Option<NodeIndex> {
30✔
204
    dag.graph()
30✔
205
        .node_references()
30✔
206
        .find(|(_node_index, node)| {
55✔
207
            if let dozer_core::NodeKind::Sink(_) = &node.kind {
55✔
208
                node.handle.id == endpoint_name
30✔
209
            } else {
×
210
                false
25✔
211
            }
×
212
        })
55✔
213
        .map(|(node_index, _)| node_index)
30✔
214
}
30✔
215

×
216
fn sink_input_schema(dag: &DagSchemas, node_index: NodeIndex) -> &Schema {
30✔
217
    let edge = dag
30✔
218
        .graph()
30✔
219
        .edges_directed(node_index, Direction::Incoming)
30✔
220
        .next()
30✔
221
        .expect("Sink must have one incoming edge");
30✔
222
    &edge.weight().schema
30✔
223
}
30✔
224

225
fn collect_ancestor_sources(dag: &DagSchemas, node_index: NodeIndex) -> HashSet<String> {
30✔
226
    let mut sources = HashSet::new();
30✔
227
    collect_ancestor_sources_recursive(dag, node_index, &mut sources);
30✔
228
    sources
30✔
229
}
30✔
230

231
fn collect_ancestor_sources_recursive(
100✔
232
    dag: &DagSchemas,
100✔
233
    node_index: NodeIndex,
100✔
234
    sources: &mut HashSet<String>,
100✔
235
) {
100✔
236
    for edge in dag.graph().edges_directed(node_index, Direction::Incoming) {
100✔
237
        let source_node_index = edge.source();
70✔
238
        let source_node = &dag.graph()[source_node_index];
70✔
239
        if matches!(source_node.kind, dozer_core::NodeKind::Source(_)) {
70✔
240
            sources.insert(source_node.handle.id.clone());
45✔
241
        }
45✔
242
        collect_ancestor_sources_recursive(dag, source_node_index, sources);
70✔
243
    }
244
}
100✔
245

246
fn serde_json_to_path(path: impl AsRef<Path>, value: &impl Serialize) -> Result<(), BuildError> {
60✔
247
    let file = OpenOptions::new()
60✔
248
        .create(true)
60✔
249
        .write(true)
60✔
250
        .open(path.as_ref())
60✔
251
        .map_err(|e| BuildError::FileSystem(path.as_ref().into(), e))?;
60✔
252
    serde_json::to_writer_pretty(file, value).map_err(BuildError::SerdeJson)
60✔
253
}
60✔
254

255
fn serde_json_from_path<T>(path: impl AsRef<Path>) -> Result<T, BuildError>
×
256
where
×
257
    T: DeserializeOwned,
×
258
{
×
259
    let file = OpenOptions::new()
×
260
        .read(true)
×
261
        .open(path.as_ref())
×
262
        .map_err(|e| BuildError::FileSystem(path.as_ref().into(), e))?;
×
263
    serde_json::from_reader(file).map_err(BuildError::FailedToLoadExistingContract)
×
264
}
×
265

266
mod modify_schema;
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc