• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6011532274

29 Aug 2023 11:17AM UTC coverage: 76.491% (-0.1%) from 76.616%
6011532274

push

github

web-flow
fix: Include connection type in `GenerateDot`. Fix `AggregationProcessorFactory::type_name` (#1934)

170 of 170 new or added lines in 5 files covered. (100.0%)

49016 of 64081 relevant lines covered (76.49%)

48200.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/simple/build/contract/service.rs
1
use std::{collections::HashMap, fmt::Display};
2

3
use dozer_core::{
4
    daggy,
5
    petgraph::{
6
        dot,
7
        visit::{EdgeRef, IntoEdgeReferences, IntoEdgesDirected, IntoNodeReferences},
8
        Direction,
9
    },
10
};
11
use dozer_types::grpc_types::{contract::Schema, conversions::field_definition_to_grpc};
12

13
use super::{Contract, NodeKind};
14

15
impl Contract {
16
    pub fn get_source_schemas(&self, connection_name: &str) -> Option<HashMap<String, Schema>> {
×
17
        // Find the source node.
18
        for (node_index, node) in self.pipeline.node_references() {
×
19
            if let NodeKind::Source { port_names, .. } = &node.kind {
×
20
                if node.handle.id == connection_name {
×
21
                    let mut result = HashMap::new();
×
22
                    for edge in self
×
23
                        .pipeline
24
                        .edges_directed(node_index, Direction::Outgoing)
×
25
                    {
×
26
                        let edge = edge.weight();
×
27
                        let name = port_names
×
28
                            .get(&edge.from_port)
×
29
                            .expect("Every port name must have been added")
×
30
                            .clone();
×
31
                        let schema = edge.schema.clone();
×
32
                        result.insert(name, map_schema(schema));
×
33
                    }
×
34
                    return Some(result);
×
35
                }
×
36
            }
×
37
        }
38
        None
×
39
    }
×
40

41
    pub fn get_endpoints_schemas(&self) -> HashMap<String, Schema> {
×
42
        self.endpoints
×
43
            .iter()
×
44
            .map(|(name, endpoint)| (name.clone(), map_schema(endpoint.schema.clone())))
×
45
            .collect()
×
46
    }
×
47

48
    pub fn get_graph_schemas(&self) -> HashMap<String, Schema> {
×
49
        let graph = self.create_ui_graph();
×
50
        let nodes = graph.into_graph().into_nodes_edges().0;
×
51
        nodes
×
52
            .into_iter()
×
53
            .filter_map(|node| {
×
54
                let node = node.weight;
×
55
                node.output_schema
×
56
                    .map(|schema| (node.kind.to_string(), schema))
×
57
            })
×
58
            .collect()
×
59
    }
×
60

×
61
    pub fn generate_dot(&self) -> String {
×
62
        dot::Dot::new(&self.create_ui_graph()).to_string()
×
63
    }
×
64

×
65
    fn create_ui_graph(&self) -> UiGraph {
×
66
        let mut ui_graph = UiGraph::new();
×
67
        let mut pipeline_node_index_to_ui_node_index = HashMap::new();
×
68
        let mut pipeline_source_to_ui_node_index = HashMap::new();
×
69

70
        // Create nodes.
×
71
        for (node_index, node) in self.pipeline.node_references() {
×
72
            match &node.kind {
×
73
                NodeKind::Source { typ, port_names } => {
×
74
                    // Create connection ui node.
×
75
                    let connection_node_index = ui_graph.add_node(UiNodeType {
×
76
                        kind: UiNodeKind::Connection {
×
77
                            typ: typ.clone(),
×
78
                            name: node.handle.id.clone(),
×
79
                        },
×
80
                        output_schema: None,
×
81
                    });
×
82
                    pipeline_node_index_to_ui_node_index.insert(node_index, connection_node_index);
×
83

×
84
                    // Create source ui node. Schema comes from connection's outgoing edge.
85
                    for edge in self
×
86
                        .pipeline
87
                        .edges_directed(node_index, Direction::Outgoing)
×
88
                    {
×
89
                        if let std::collections::hash_map::Entry::Vacant(entry) =
×
90
                            pipeline_source_to_ui_node_index
×
91
                                .entry((node_index, edge.weight().from_port))
×
92
                        {
×
93
                            let edge = edge.weight();
×
94
                            let schema = edge.schema.clone();
×
95
                            let source_node_index = ui_graph.add_node(UiNodeType {
×
96
                                kind: UiNodeKind::Source {
×
97
                                    name: port_names[&edge.from_port].clone(),
×
98
                                },
×
99
                                output_schema: Some(map_schema(schema)),
×
100
                            });
×
101
                            entry.insert(source_node_index);
×
102
                        }
×
103
                    }
×
104
                }
×
105
                NodeKind::Processor { typ } => {
×
106
                    // Create processor ui node. Schema comes from the outgoing edge.
×
107
                    let mut edges = self
×
108
                        .pipeline
×
109
                        .edges_directed(node_index, Direction::Outgoing)
×
110
                        .collect::<Vec<_>>();
×
111
                    assert!(
×
112
                        edges.len() == 1,
×
113
                        "We only support visualizing processors with one output port"
×
114
                    );
×
115
                    let edge = edges.remove(0);
×
116

×
117
                    let processor_node_index = ui_graph.add_node(UiNodeType {
×
118
                        kind: UiNodeKind::Processor {
×
119
                            typ: typ.clone(),
×
120
                            name: node.handle.id.clone(),
×
121
                        },
×
122
                        output_schema: Some(map_schema(edge.weight().schema.clone())),
×
123
                    });
×
124
                    pipeline_node_index_to_ui_node_index.insert(node_index, processor_node_index);
×
125
                }
×
126
                NodeKind::Sink => {
×
127
                    // Create sink ui node. Schema comes from endpoint.
×
128
                    let schema = self.endpoints[&node.handle.id].schema.clone();
×
129
                    let sink_node_index = ui_graph.add_node(UiNodeType {
×
130
                        kind: UiNodeKind::Sink {
×
131
                            name: node.handle.id.clone(),
×
132
                        },
×
133
                        output_schema: Some(map_schema(schema)),
×
134
                    });
×
135
                    pipeline_node_index_to_ui_node_index.insert(node_index, sink_node_index);
×
136
                }
×
137
            }
138
        }
139

×
140
        // Create edges.
×
141
        for edge in self.pipeline.edge_references() {
×
142
            let from_node_index = edge.source();
×
143
            let to_node_index = edge.target();
×
144
            let from_ui_node_index = pipeline_node_index_to_ui_node_index[&from_node_index];
×
145
            let to_ui_node_index = pipeline_node_index_to_ui_node_index[&to_node_index];
×
146

×
147
            let from_node = &self.pipeline[from_node_index];
×
148
            let kind = &from_node.kind;
×
149
            match &kind {
×
150
                NodeKind::Source { .. } => {
×
151
                    let ui_source_node_index = pipeline_source_to_ui_node_index
×
152
                        [&(from_node_index, edge.weight().from_port)];
×
153
                    // Connect ui connection node to ui source node.
×
154
                    ui_graph
×
155
                        .add_edge(from_ui_node_index, ui_source_node_index, UiEdgeType)
×
156
                        .unwrap();
×
157
                    // Connect ui source node to target node.
×
158
                    ui_graph
×
159
                        .add_edge(ui_source_node_index, to_ui_node_index, UiEdgeType)
×
160
                        .unwrap();
×
161
                }
×
162
                _ => {
×
163
                    // Connect ui node to target node.
×
164
                    ui_graph
×
165
                        .add_edge(from_ui_node_index, to_ui_node_index, UiEdgeType)
×
166
                        .unwrap();
×
167
                }
×
168
            }
169
        }
170

171
        ui_graph
×
172
    }
×
173
}
174

175
#[derive(Debug)]
×
176
struct UiNodeType {
177
    kind: UiNodeKind,
178
    output_schema: Option<Schema>,
179
}
180

181
impl Display for UiNodeType {
182
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
183
        self.kind.fmt(f)
×
184
    }
×
185
}
186

187
#[derive(Debug)]
×
188
enum UiNodeKind {
189
    Connection { typ: String, name: String },
190
    Source { name: String },
191
    Processor { typ: String, name: String },
192
    Sink { name: String },
193
}
194

195
impl Display for UiNodeKind {
196
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
197
        match self {
×
198
            UiNodeKind::Connection { typ, name } => write!(f, "connection::{typ}::{name}"),
×
199
            UiNodeKind::Source { name } => write!(f, "source::source::{name}"),
×
200
            UiNodeKind::Processor { typ, name } => write!(f, "processor::{typ}::{name}"),
×
201
            UiNodeKind::Sink { name } => write!(f, "sink::sink::{name}"),
×
202
        }
203
    }
×
204
}
205

206
#[derive(Debug)]
×
207
struct UiEdgeType;
208

209
impl Display for UiEdgeType {
210
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
211
        write!(f, "")
×
212
    }
×
213
}
214

215
type UiGraph = daggy::Dag<UiNodeType, UiEdgeType>;
216

217
fn map_schema(schema: dozer_types::types::Schema) -> Schema {
×
218
    Schema {
×
219
        primary_index: schema.primary_index.into_iter().map(|i| i as i32).collect(),
×
220
        fields: field_definition_to_grpc(schema.fields),
×
221
    }
×
222
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc