• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5957916370

24 Aug 2023 12:27AM UTC coverage: 75.783% (-0.08%) from 75.86%
5957916370

push

github

web-flow
fix: introduce build states and fix inconsistencies (#1902)

* chore: return if state cant be sent

* chore: remove broadcasting state when stopped

* chore: send build message

* chore: fix build

84 of 84 new or added lines in 7 files covered. (100.0%)

46996 of 62014 relevant lines covered (75.78%)

73508.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.06
/dozer-cli/src/ui_helper.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    app::{App, AppPipeline},
5
    appsource::{AppSourceManager, AppSourceMappings},
6
    node::{OutputPortDef, OutputPortType, PortHandle, SourceFactory},
7
    petgraph::visit::{EdgeRef, IntoEdgeReferences, IntoNodeReferences},
8
    Dag,
9
};
10
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
11
use dozer_types::{
12
    grpc_types::cloud::{QueryEdge, QueryGraph, QueryNode, QueryNodeType},
13
    models::{config::Config, connection::Connection, source::Source},
14
};
15

16
use crate::{errors::OrchestrationError, pipeline::source_builder::SourceBuilder};
17

18
#[derive(Debug)]
×
19
struct UISourceFactory {
20
    output_ports: HashMap<PortHandle, String>,
21
}
22
impl SourceFactory<SchemaSQLContext> for UISourceFactory {
23
    fn get_output_schema(
×
24
        &self,
×
25
        _port: &PortHandle,
×
26
    ) -> Result<
×
27
        (dozer_types::types::Schema, SchemaSQLContext),
×
28
        dozer_types::errors::internal::BoxedError,
×
29
    > {
×
30
        todo!()
×
31
    }
×
32

33
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
34
        self.output_ports.get(port).expect("Port not found").clone()
×
35
    }
×
36

37
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
38
        self.output_ports
×
39
            .keys()
×
40
            .map(|port| OutputPortDef::new(*port, OutputPortType::Stateless))
×
41
            .collect()
×
42
    }
×
43

44
    fn build(
×
45
        &self,
×
46
        _output_schemas: HashMap<dozer_core::node::PortHandle, dozer_types::types::Schema>,
×
47
    ) -> Result<Box<dyn dozer_core::node::Source>, dozer_types::errors::internal::BoxedError> {
×
48
        todo!()
×
49
    }
×
50
}
51

52
fn prepare_pipeline_dag(
×
53
    sql: String,
×
54
    connection_sources: HashMap<Connection, Vec<Source>>,
×
55
    connection_source_ports: HashMap<(&str, &str), u16>,
×
56
) -> Result<Dag<SchemaSQLContext>, OrchestrationError> {
×
57
    let mut pipeline = AppPipeline::new();
×
58
    let mut asm: AppSourceManager<dozer_sql::pipeline::builder::SchemaSQLContext> =
×
59
        AppSourceManager::new();
×
60
    connection_sources.iter().for_each(|cs| {
×
61
        let (connection, sources) = cs;
×
62
        let ports = sources
×
63
            .iter()
×
64
            .map(|source| {
×
65
                let port = *connection_source_ports
×
66
                    .get(&(connection.name.as_str(), source.name.as_str()))
×
67
                    .unwrap();
×
68
                (port, source.name.clone())
×
69
            })
×
70
            .collect();
×
71
        let mut ports_with_source_name: HashMap<String, u16> = HashMap::new();
×
72
        connection_source_ports.iter().for_each(|(k, v)| {
×
73
            ports_with_source_name.insert(k.1.to_string(), v.to_owned());
×
74
        });
×
75

×
76
        _ = asm.add(
×
77
            Box::new(UISourceFactory {
×
78
                output_ports: ports,
×
79
            }),
×
80
            AppSourceMappings::new(connection.name.to_string(), ports_with_source_name),
×
81
        );
×
82
    });
×
83
    statement_to_pipeline(&sql, &mut pipeline, None)?;
×
84
    let mut app = App::new(asm);
×
85
    app.add_pipeline(pipeline);
×
86
    let sql_dag = app.into_dag()?;
×
87
    Ok(sql_dag)
×
88
}
×
89

90
pub fn transform_to_ui_graph(input_dag: &Dag<SchemaSQLContext>) -> QueryGraph {
36✔
91
    let input_graph = input_dag.graph();
36✔
92
    let mut nodes = vec![];
36✔
93
    let mut edges: Vec<QueryEdge> = vec![];
36✔
94
    input_graph
36✔
95
        .node_references()
36✔
96
        .for_each(|(node_index, node)| {
102✔
97
            let node_index = node_index.index() as u32;
102✔
98
            match &node.kind {
102✔
99
                dozer_core::NodeKind::Source(_) => {
36✔
100
                    nodes.push(QueryNode {
36✔
101
                        name: node.handle.id.clone(),
36✔
102
                        node_type: QueryNodeType::Connection as i32,
36✔
103
                        idx: node_index,
36✔
104
                        id: node_index,
36✔
105
                        data: node.handle.id.clone(),
36✔
106
                    });
36✔
107
                }
36✔
108
                dozer_core::NodeKind::Processor(processor) => {
30✔
109
                    nodes.push(QueryNode {
30✔
110
                        name: processor.type_name(),
30✔
111
                        node_type: QueryNodeType::Transformer as i32,
30✔
112
                        idx: node_index,
30✔
113
                        id: node_index,
30✔
114
                        data: processor.id(),
30✔
115
                    });
30✔
116
                }
30✔
117
                dozer_core::NodeKind::Sink(_) => {}
36✔
118
            }
119
        });
102✔
120
    input_graph.edge_references().for_each(|edge| {
84✔
121
        let source_node_index = edge.source();
84✔
122
        let target_node_index = edge.target();
84✔
123
        let from_port = edge.weight().from;
84✔
124
        match &input_graph[source_node_index].kind {
84✔
125
            dozer_core::NodeKind::Source(source) => {
54✔
126
                let source_name = source.get_output_port_name(&from_port);
54✔
127
                let new_node_idx = nodes.len() as u32;
54✔
128
                nodes.push(QueryNode {
54✔
129
                    name: source_name.clone(),
54✔
130
                    node_type: QueryNodeType::Source as i32,
54✔
131
                    idx: new_node_idx,
54✔
132
                    id: new_node_idx,
54✔
133
                    data: source_name,
54✔
134
                });
54✔
135
                edges.push(QueryEdge {
54✔
136
                    from: source_node_index.index() as u32,
54✔
137
                    to: new_node_idx,
54✔
138
                });
54✔
139
                edges.push(QueryEdge {
54✔
140
                    from: new_node_idx,
54✔
141
                    to: target_node_index.index() as u32,
54✔
142
                });
54✔
143
            }
54✔
144
            _ => {
30✔
145
                edges.push(QueryEdge {
30✔
146
                    from: source_node_index.index() as u32,
30✔
147
                    to: target_node_index.index() as u32,
30✔
148
                });
30✔
149
            }
30✔
150
        }
151
    });
84✔
152
    QueryGraph { nodes, edges }
36✔
153
}
36✔
154

155
pub fn config_to_ui_dag(config: Config) -> Result<QueryGraph, OrchestrationError> {
×
156
    let sql = config.sql.unwrap_or("".to_string());
×
157
    let mut connection_sources: HashMap<Connection, Vec<Source>> = HashMap::new();
×
158
    for source in config.sources {
×
159
        let connection = config
×
160
            .connections
×
161
            .iter()
×
162
            .find(|connection| connection.name == source.connection)
×
163
            .cloned()
×
164
            .ok_or(OrchestrationError::ConnectionNotFound(
×
165
                source.connection.clone(),
×
166
            ))?;
×
167
        let sources_same_connection = connection_sources.entry(connection).or_insert(vec![]);
×
168
        sources_same_connection.push(source);
×
169
    }
×
170
    let source_builder = SourceBuilder::new(connection_sources.clone(), None);
×
171
    let connection_source_ports = source_builder.get_ports();
×
172
    let sql_dag = prepare_pipeline_dag(sql, connection_sources, connection_source_ports)?;
×
173
    Ok(transform_to_ui_graph(&sql_dag))
×
174
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc