• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5725710489

pending completion
5725710489

push

github

web-flow
chore: Add `SourceFactory::get_output_port_name` to simplify ui graph generation (#1812)

140 of 140 new or added lines in 13 files covered. (100.0%)

45519 of 60083 relevant lines covered (75.76%)

39458.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.49
/dozer-cli/src/ui_helper.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    app::{App, AppPipeline},
5
    appsource::{AppSourceManager, AppSourceMappings},
6
    node::{OutputPortDef, OutputPortType, PortHandle, SourceFactory},
7
    petgraph::visit::{EdgeRef, IntoEdgeReferences, IntoNodeReferences},
8
    Dag,
9
};
10
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
11
use dozer_types::{
12
    grpc_types::cloud::{QueryEdge, QueryGraph, QueryNode, QueryNodeType},
13
    models::{config::Config, connection::Connection, source::Source},
14
};
15

16
use crate::{errors::OrchestrationError, pipeline::source_builder::SourceBuilder};
17

18
#[derive(Debug)]
×
19
struct UISourceFactory {
20
    output_ports: HashMap<PortHandle, String>,
×
21
}
22
impl SourceFactory<SchemaSQLContext> for UISourceFactory {
23
    fn get_output_schema(
×
24
        &self,
×
25
        _port: &PortHandle,
×
26
    ) -> Result<
×
27
        (dozer_types::types::Schema, SchemaSQLContext),
×
28
        dozer_types::errors::internal::BoxedError,
×
29
    > {
×
30
        todo!()
×
31
    }
×
32

×
33
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
34
        self.output_ports.get(port).expect("Port not found").clone()
×
35
    }
×
36

×
37
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
38
        self.output_ports
×
39
            .keys()
×
40
            .map(|port| OutputPortDef::new(*port, OutputPortType::Stateless))
×
41
            .collect()
×
42
    }
×
43

×
44
    fn build(
×
45
        &self,
×
46
        _output_schemas: HashMap<dozer_core::node::PortHandle, dozer_types::types::Schema>,
×
47
    ) -> Result<Box<dyn dozer_core::node::Source>, dozer_types::errors::internal::BoxedError> {
×
48
        todo!()
×
49
    }
×
50
}
×
51

×
52
fn prepare_pipeline_dag(
×
53
    sql: String,
×
54
    connection_sources: HashMap<Connection, Vec<Source>>,
×
55
    connection_source_ports: HashMap<(&str, &str), u16>,
×
56
) -> Result<Dag<SchemaSQLContext>, OrchestrationError> {
×
57
    let mut pipeline = AppPipeline::new();
×
58
    let mut asm: AppSourceManager<dozer_sql::pipeline::builder::SchemaSQLContext> =
×
59
        AppSourceManager::new();
×
60
    connection_sources.iter().for_each(|cs| {
×
61
        let (connection, sources) = cs;
×
62
        let ports = sources
×
63
            .iter()
×
64
            .map(|source| {
×
65
                let port = *connection_source_ports
×
66
                    .get(&(connection.name.as_str(), source.name.as_str()))
×
67
                    .unwrap();
×
68
                (port, source.name.clone())
×
69
            })
×
70
            .collect();
×
71
        let mut ports_with_source_name: HashMap<String, u16> = HashMap::new();
×
72
        connection_source_ports.iter().for_each(|(k, v)| {
×
73
            ports_with_source_name.insert(k.1.to_string(), v.to_owned());
×
74
        });
×
75

×
76
        _ = asm.add(
×
77
            Box::new(UISourceFactory {
×
78
                output_ports: ports,
×
79
            }),
×
80
            AppSourceMappings::new(connection.name.to_string(), ports_with_source_name),
×
81
        );
×
82
    });
×
83
    statement_to_pipeline(&sql, &mut pipeline, None)?;
×
84
    let mut app = App::new(asm);
×
85
    app.add_pipeline(pipeline);
×
86
    let sql_dag = app.into_dag()?;
×
87
    Ok(sql_dag)
×
88
}
×
89

90
pub fn transform_to_ui_graph(input_dag: &Dag<SchemaSQLContext>) -> QueryGraph {
12✔
91
    let input_graph = input_dag.graph();
12✔
92
    let mut nodes = vec![];
12✔
93
    let mut edges: Vec<QueryEdge> = vec![];
12✔
94
    input_graph
12✔
95
        .node_references()
12✔
96
        .for_each(|(node_index, node)| {
44✔
97
            let node_index = node_index.index() as u32;
44✔
98
            match &node.kind {
44✔
99
                dozer_core::NodeKind::Source(_) => {
12✔
100
                    nodes.push(QueryNode {
12✔
101
                        name: node.handle.id.clone(),
12✔
102
                        node_type: QueryNodeType::Connection as i32,
12✔
103
                        idx: node_index,
12✔
104
                        id: node_index,
12✔
105
                        data: node.handle.id.clone(),
12✔
106
                    });
12✔
107
                }
12✔
108
                dozer_core::NodeKind::Processor(processor) => {
20✔
109
                    nodes.push(QueryNode {
20✔
110
                        name: processor.type_name(),
20✔
111
                        node_type: QueryNodeType::Transformer as i32,
20✔
112
                        idx: node_index,
20✔
113
                        id: node_index,
20✔
114
                        data: processor.id(),
20✔
115
                    });
20✔
116
                }
20✔
117
                dozer_core::NodeKind::Sink(_) => {}
12✔
118
            }
×
119
        });
44✔
120
    input_graph.edge_references().for_each(|edge| {
44✔
121
        let source_node_index = edge.source();
44✔
122
        let target_node_index = edge.target();
44✔
123
        let from_port = edge.weight().from;
44✔
124
        match &input_graph[source_node_index].kind {
44✔
125
            dozer_core::NodeKind::Source(source) => {
24✔
126
                let source_name = source.get_output_port_name(&from_port);
24✔
127
                let new_node_idx = nodes.len() as u32;
24✔
128
                nodes.push(QueryNode {
24✔
129
                    name: source_name.clone(),
24✔
130
                    node_type: QueryNodeType::Source as i32,
24✔
131
                    idx: new_node_idx,
24✔
132
                    id: new_node_idx,
24✔
133
                    data: source_name,
24✔
134
                });
24✔
135
                edges.push(QueryEdge {
24✔
136
                    from: source_node_index.index() as u32,
24✔
137
                    to: new_node_idx,
24✔
138
                });
24✔
139
                edges.push(QueryEdge {
24✔
140
                    from: new_node_idx,
24✔
141
                    to: target_node_index.index() as u32,
24✔
142
                });
24✔
143
            }
24✔
144
            _ => {
20✔
145
                edges.push(QueryEdge {
20✔
146
                    from: source_node_index.index() as u32,
20✔
147
                    to: target_node_index.index() as u32,
20✔
148
                });
20✔
149
            }
20✔
150
        }
×
151
    });
44✔
152
    QueryGraph { nodes, edges }
12✔
153
}
12✔
154

×
155
pub fn config_to_ui_dag(config: Config) -> Result<QueryGraph, OrchestrationError> {
×
156
    let sql = config.sql.unwrap_or("".to_string());
×
157
    let mut connection_sources: HashMap<Connection, Vec<Source>> = HashMap::new();
×
158
    for source in config.sources {
×
159
        let connection = config
×
160
            .connections
×
161
            .iter()
×
162
            .find(|connection| connection.name == source.connection)
×
163
            .cloned()
×
164
            .ok_or(OrchestrationError::SourceValidationError)?;
×
165
        let sources_same_connection = connection_sources.entry(connection).or_insert(vec![]);
×
166
        sources_same_connection.push(source);
×
167
    }
×
168
    let source_builder = SourceBuilder::new(connection_sources.clone(), None);
×
169
    let connection_source_ports = source_builder.get_ports();
×
170
    let sql_dag = prepare_pipeline_dag(sql, connection_sources, connection_source_ports)?;
×
171
    Ok(transform_to_ui_graph(&sql_dag))
×
172
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc