• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6008856021

29 Aug 2023 06:44AM UTC coverage: 76.756% (-1.0%) from 77.736%
6008856021

push

github

web-flow
chore: Remove unused generic type parameter in `dozer-core` (#1929)

330 of 330 new or added lines in 38 files covered. (100.0%)

48977 of 63809 relevant lines covered (76.76%)

48470.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.24
/dozer-cli/src/ui_helper.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    app::{App, AppPipeline},
5
    appsource::{AppSourceManager, AppSourceMappings},
6
    node::{OutputPortDef, OutputPortType, PortHandle, SourceFactory},
7
    petgraph::visit::{EdgeRef, IntoEdgeReferences, IntoNodeReferences},
8
    Dag,
9
};
10
use dozer_sql::pipeline::builder::statement_to_pipeline;
11
use dozer_types::{
12
    grpc_types::cloud::{QueryEdge, QueryGraph, QueryNode, QueryNodeType},
13
    models::{config::Config, connection::Connection, flags::Flags, source::Source},
14
};
15

16
use crate::{errors::OrchestrationError, pipeline::source_builder::SourceBuilder};
17

18
#[derive(Debug)]
×
19
struct UISourceFactory {
20
    output_ports: HashMap<PortHandle, String>,
21
}
22
impl SourceFactory for UISourceFactory {
23
    fn get_output_schema(
×
24
        &self,
×
25
        _port: &PortHandle,
×
26
    ) -> Result<dozer_types::types::Schema, dozer_types::errors::internal::BoxedError> {
×
27
        todo!()
×
28
    }
×
29

×
30
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
31
        self.output_ports.get(port).expect("Port not found").clone()
×
32
    }
×
33

×
34
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
35
        self.output_ports
×
36
            .keys()
×
37
            .map(|port| OutputPortDef::new(*port, OutputPortType::Stateless))
×
38
            .collect()
×
39
    }
×
40

×
41
    fn build(
×
42
        &self,
×
43
        _output_schemas: HashMap<dozer_core::node::PortHandle, dozer_types::types::Schema>,
×
44
    ) -> Result<Box<dyn dozer_core::node::Source>, dozer_types::errors::internal::BoxedError> {
×
45
        todo!()
×
46
    }
×
47
}
×
48

×
49
fn prepare_pipeline_dag(
×
50
    sql: String,
×
51
    connection_sources: HashMap<Connection, Vec<Source>>,
×
52
    connection_source_ports: HashMap<(&str, &str), u16>,
×
53
    flags: Flags,
×
54
) -> Result<Dag, OrchestrationError> {
×
55
    let mut pipeline = AppPipeline::new(flags.into());
×
56
    let mut asm = AppSourceManager::new();
×
57
    connection_sources.iter().for_each(|cs| {
×
58
        let (connection, sources) = cs;
×
59
        let ports = sources
×
60
            .iter()
×
61
            .map(|source| {
×
62
                let port = *connection_source_ports
×
63
                    .get(&(connection.name.as_str(), source.name.as_str()))
×
64
                    .unwrap();
×
65
                (port, source.name.clone())
×
66
            })
×
67
            .collect();
×
68
        let mut ports_with_source_name: HashMap<String, u16> = HashMap::new();
×
69
        connection_source_ports.iter().for_each(|(k, v)| {
×
70
            ports_with_source_name.insert(k.1.to_string(), v.to_owned());
×
71
        });
×
72

×
73
        _ = asm.add(
×
74
            Box::new(UISourceFactory {
×
75
                output_ports: ports,
×
76
            }),
×
77
            AppSourceMappings::new(connection.name.to_string(), ports_with_source_name),
×
78
        );
×
79
    });
×
80
    statement_to_pipeline(&sql, &mut pipeline, None)?;
×
81
    let mut app = App::new(asm);
×
82
    app.add_pipeline(pipeline);
×
83
    let sql_dag = app.into_dag()?;
×
84
    Ok(sql_dag)
×
85
}
×
86

×
87
pub fn transform_to_ui_graph(input_dag: &Dag) -> QueryGraph {
60✔
88
    let input_graph = input_dag.graph();
60✔
89
    let mut nodes = vec![];
60✔
90
    let mut edges: Vec<QueryEdge> = vec![];
60✔
91
    input_graph
60✔
92
        .node_references()
60✔
93
        .for_each(|(node_index, node)| {
170✔
94
            let node_index = node_index.index() as u32;
170✔
95
            match &node.kind {
170✔
96
                dozer_core::NodeKind::Source(_) => {
60✔
97
                    nodes.push(QueryNode {
60✔
98
                        name: node.handle.id.clone(),
60✔
99
                        node_type: QueryNodeType::Connection as i32,
60✔
100
                        idx: node_index,
60✔
101
                        id: node_index,
60✔
102
                        data: node.handle.id.clone(),
60✔
103
                    });
60✔
104
                }
60✔
105
                dozer_core::NodeKind::Processor(processor) => {
50✔
106
                    nodes.push(QueryNode {
50✔
107
                        name: processor.type_name(),
50✔
108
                        node_type: QueryNodeType::Transformer as i32,
50✔
109
                        idx: node_index,
50✔
110
                        id: node_index,
50✔
111
                        data: processor.id(),
50✔
112
                    });
50✔
113
                }
50✔
114
                dozer_core::NodeKind::Sink(_) => {}
60✔
115
            }
×
116
        });
170✔
117
    input_graph.edge_references().for_each(|edge| {
140✔
118
        let source_node_index = edge.source();
140✔
119
        let target_node_index = edge.target();
140✔
120
        let from_port = edge.weight().from;
140✔
121
        match &input_graph[source_node_index].kind {
140✔
122
            dozer_core::NodeKind::Source(source) => {
90✔
123
                let source_name = source.get_output_port_name(&from_port);
90✔
124
                let new_node_idx = nodes.len() as u32;
90✔
125
                nodes.push(QueryNode {
90✔
126
                    name: source_name.clone(),
90✔
127
                    node_type: QueryNodeType::Source as i32,
90✔
128
                    idx: new_node_idx,
90✔
129
                    id: new_node_idx,
90✔
130
                    data: source_name,
90✔
131
                });
90✔
132
                edges.push(QueryEdge {
90✔
133
                    from: source_node_index.index() as u32,
90✔
134
                    to: new_node_idx,
90✔
135
                });
90✔
136
                edges.push(QueryEdge {
90✔
137
                    from: new_node_idx,
90✔
138
                    to: target_node_index.index() as u32,
90✔
139
                });
90✔
140
            }
90✔
141
            _ => {
50✔
142
                edges.push(QueryEdge {
50✔
143
                    from: source_node_index.index() as u32,
50✔
144
                    to: target_node_index.index() as u32,
50✔
145
                });
50✔
146
            }
50✔
147
        }
×
148
    });
140✔
149
    QueryGraph { nodes, edges }
60✔
150
}
60✔
151

152
pub fn config_to_ui_dag(config: Config) -> Result<QueryGraph, OrchestrationError> {
×
153
    let sql = config.sql.unwrap_or("".to_string());
×
154
    let mut connection_sources: HashMap<Connection, Vec<Source>> = HashMap::new();
×
155
    for source in config.sources {
×
156
        let connection = config
×
157
            .connections
×
158
            .iter()
×
159
            .find(|connection| connection.name == source.connection)
×
160
            .cloned()
×
161
            .ok_or(OrchestrationError::ConnectionNotFound(
×
162
                source.connection.clone(),
×
163
            ))?;
×
164
        let sources_same_connection = connection_sources.entry(connection).or_insert(vec![]);
×
165
        sources_same_connection.push(source);
×
166
    }
×
167
    let source_builder = SourceBuilder::new(connection_sources.clone(), None);
×
168
    let connection_source_ports = source_builder.get_ports();
×
169
    let sql_dag = prepare_pipeline_dag(
×
170
        sql,
×
171
        connection_sources,
×
172
        connection_source_ports,
×
173
        config.flags.unwrap_or_default(),
×
174
    )?;
×
175
    Ok(transform_to_ui_graph(&sql_dag))
×
176
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc