• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5869050642

pending completion
5869050642

Pull #1858

github

supergi0
updated readme
Pull Request #1858: feat: Implement graph for dozer-live ui

419 of 419 new or added lines in 15 files covered. (100.0%)

46002 of 59761 relevant lines covered (76.98%)

52423.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/live/graph.rs
1
use std::{
2
    collections::HashMap,
3
    fmt::{Display, Formatter},
4
};
5

6
use dozer_core::{
7
    dag_schemas::{DagSchemas, EdgeHaveSchema},
8
    daggy,
9
    petgraph::{
10
        self,
11
        visit::{EdgeRef, IntoEdgeReferences, IntoEdgesDirected, IntoNodeReferences},
12
        Direction,
13
    },
14
    Dag, NodeKind,
15
};
16
use dozer_sql::pipeline::builder::SchemaSQLContext;
17
use dozer_types::grpc_types::live::Schema;
18

19
use super::helper::map_schema;
20

21
pub struct Node {
22
    id: u32,
23
    label: String,
24
}
25
impl Display for Node {
26
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
27
        write!(f, "{}", self.label)
×
28
    }
×
29
}
30

31
pub fn transform_dag_ui(input_dag: &Dag<SchemaSQLContext>) -> daggy::Dag<Node, &str> {
×
32
    let input_graph = input_dag.graph();
×
33

×
34
    let mut dag = daggy::Dag::new();
×
35
    let mut sources: HashMap<String, daggy::NodeIndex> = HashMap::new();
×
36
    input_graph
×
37
        .node_references()
×
38
        .for_each(|(node_index, node)| {
×
39
            let node_index = node_index.index() as u32;
×
40
            let id = node.handle.id.to_string();
×
41
            let label = get_label(&node.kind, true);
×
42

×
43
            dag.add_node(Node {
×
44
                id: node_index,
×
45
                label: format!("{}::{}", label, id),
×
46
            });
×
47
        });
×
48
    input_graph.edge_references().for_each(|edge| {
×
49
        let source_node_index = edge.source();
×
50
        let target_node_index = edge.target();
×
51
        let source_index = find_node(source_node_index.index(), &dag);
×
52
        let target_index = find_node(target_node_index.index(), &dag);
×
53

×
54
        let from_port = edge.weight().from;
×
55
        let kind = &input_graph[source_node_index].kind;
×
56
        let label = get_label(kind, false);
×
57
        match &kind {
×
58
            dozer_core::NodeKind::Source(source) => {
×
59
                let source_name = source.get_output_port_name(&from_port);
×
60
                let new_node_idx = match sources.get(&source_name) {
×
61
                    Some(idx) => *idx,
×
62
                    None => {
63
                        let new_node_idx = dag.node_count();
×
64

×
65
                        let new_node_idx = dag.add_node(Node {
×
66
                            id: new_node_idx as u32,
×
67
                            label: format!("{}::{}", label, source_name),
×
68
                        });
×
69
                        sources.insert(source_name, new_node_idx);
×
70
                        new_node_idx
×
71
                    }
72
                };
73

74
                dag.add_edge(source_index, new_node_idx, "").unwrap();
×
75
                dag.add_edge(new_node_idx, target_index, "").unwrap();
×
76
            }
77
            _ => {
×
78
                dag.add_edge(source_index, target_index, "").unwrap();
×
79
            }
×
80
        }
81
    });
×
82
    dag
×
83
}
×
84

85
fn find_node(id: usize, dag: &daggy::Dag<Node, &str>) -> petgraph::graph::NodeIndex<u32> {
×
86
    dag.node_references()
×
87
        .find(|(_node_index, node)| node.id == id as u32)
×
88
        .map(|(node_index, _)| node_index)
×
89
        .unwrap()
×
90
}
×
91

92
// Return all input schemas for a given node
93
// Hence, source schemas are to be mapped separately.
94
pub fn map_dag_schemas(dag_schemas: DagSchemas<SchemaSQLContext>) -> HashMap<String, Schema> {
×
95
    let mut schemas = HashMap::new();
×
96
    let graph = dag_schemas.into_graph();
×
97
    for (node_index, node) in graph.node_references() {
×
98
        // ignore source schemas
99

100
        match node.kind {
×
101
            NodeKind::Sink(_) => {
102
                for edge in graph.edges_directed(node_index, Direction::Incoming) {
×
103
                    let edge = edge.weight();
×
104
                    let schema = edge.schema();
×
105
                    let id = node.handle.id.to_string();
×
106
                    let label = get_label(&node.kind, false);
×
107
                    let key = format!("{}::{}", label, id);
×
108
                    schemas.insert(key, map_schema(schema.clone()));
×
109
                }
×
110
            }
111
            _ => {
112
                for edge in graph.edges_directed(node_index, Direction::Outgoing) {
×
113
                    let edge = edge.weight();
×
114
                    let schema = edge.schema();
×
115
                    let id = if let NodeKind::Source(source) = &node.kind {
×
116
                        let to_port = edge.output_port;
×
117

×
118
                        source.get_output_port_name(&to_port)
×
119
                    } else {
120
                        node.handle.id.to_string()
×
121
                    };
122
                    let label = get_label(&node.kind, false);
×
123
                    let key = format!("{}::{}", label, id);
×
124
                    schemas.insert(key, map_schema(schema.clone()));
×
125
                }
126
            }
127
        }
128
    }
129
    schemas
×
130
}
×
131

132
fn get_label(kind: &NodeKind<SchemaSQLContext>, match_connection: bool) -> &str {
×
133
    match kind {
×
134
        dozer_core::NodeKind::Source(_) => match match_connection {
×
135
            true => "connection",
×
136
            false => "source",
×
137
        },
138
        dozer_core::NodeKind::Processor(_processor) => "processor",
×
139
        dozer_core::NodeKind::Sink(_) => "sink",
×
140
    }
141
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc