• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5978430793

25 Aug 2023 04:54PM UTC coverage: 75.575% (-0.7%) from 76.279%
5978430793

push

github

web-flow
Bump ordered-float from 3.4.0 to 3.9.1 (#1919)

Bumps [ordered-float](https://github.com/reem/rust-ordered-float) from 3.4.0 to 3.9.1.
- [Release notes](https://github.com/reem/rust-ordered-float/releases)
- [Commits](https://github.com/reem/rust-ordered-float/compare/v3.4.0...v3.9.1)

---
updated-dependencies:
- dependency-name: ordered-float
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

47272 of 62550 relevant lines covered (75.57%)

49425.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.26
/dozer-cli/src/ui_helper.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    app::{App, AppPipeline},
5
    appsource::{AppSourceManager, AppSourceMappings},
6
    node::{OutputPortDef, OutputPortType, PortHandle, SourceFactory},
7
    petgraph::visit::{EdgeRef, IntoEdgeReferences, IntoNodeReferences},
8
    Dag,
9
};
10
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
11
use dozer_types::{
12
    grpc_types::cloud::{QueryEdge, QueryGraph, QueryNode, QueryNodeType},
13
    models::{config::Config, connection::Connection, flags::Flags, source::Source},
14
};
15

16
use crate::{errors::OrchestrationError, pipeline::source_builder::SourceBuilder};
17

18
#[derive(Debug)]
×
19
struct UISourceFactory {
20
    output_ports: HashMap<PortHandle, String>,
21
}
22
impl SourceFactory<SchemaSQLContext> for UISourceFactory {
23
    fn get_output_schema(
×
24
        &self,
×
25
        _port: &PortHandle,
×
26
    ) -> Result<
×
27
        (dozer_types::types::Schema, SchemaSQLContext),
×
28
        dozer_types::errors::internal::BoxedError,
×
29
    > {
×
30
        todo!()
×
31
    }
×
32

33
    fn get_output_port_name(&self, port: &PortHandle) -> String {
×
34
        self.output_ports.get(port).expect("Port not found").clone()
×
35
    }
×
36

37
    fn get_output_ports(&self) -> Vec<OutputPortDef> {
×
38
        self.output_ports
×
39
            .keys()
×
40
            .map(|port| OutputPortDef::new(*port, OutputPortType::Stateless))
×
41
            .collect()
×
42
    }
×
43

44
    fn build(
×
45
        &self,
×
46
        _output_schemas: HashMap<dozer_core::node::PortHandle, dozer_types::types::Schema>,
×
47
    ) -> Result<Box<dyn dozer_core::node::Source>, dozer_types::errors::internal::BoxedError> {
×
48
        todo!()
×
49
    }
×
50
}
51

52
fn prepare_pipeline_dag(
×
53
    sql: String,
×
54
    connection_sources: HashMap<Connection, Vec<Source>>,
×
55
    connection_source_ports: HashMap<(&str, &str), u16>,
×
56
    flags: Flags,
×
57
) -> Result<Dag<SchemaSQLContext>, OrchestrationError> {
×
58
    let mut pipeline = AppPipeline::new(flags.into());
×
59
    let mut asm: AppSourceManager<dozer_sql::pipeline::builder::SchemaSQLContext> =
×
60
        AppSourceManager::new();
×
61
    connection_sources.iter().for_each(|cs| {
×
62
        let (connection, sources) = cs;
×
63
        let ports = sources
×
64
            .iter()
×
65
            .map(|source| {
×
66
                let port = *connection_source_ports
×
67
                    .get(&(connection.name.as_str(), source.name.as_str()))
×
68
                    .unwrap();
×
69
                (port, source.name.clone())
×
70
            })
×
71
            .collect();
×
72
        let mut ports_with_source_name: HashMap<String, u16> = HashMap::new();
×
73
        connection_source_ports.iter().for_each(|(k, v)| {
×
74
            ports_with_source_name.insert(k.1.to_string(), v.to_owned());
×
75
        });
×
76

×
77
        _ = asm.add(
×
78
            Box::new(UISourceFactory {
×
79
                output_ports: ports,
×
80
            }),
×
81
            AppSourceMappings::new(connection.name.to_string(), ports_with_source_name),
×
82
        );
×
83
    });
×
84
    statement_to_pipeline(&sql, &mut pipeline, None)?;
×
85
    let mut app = App::new(asm);
×
86
    app.add_pipeline(pipeline);
×
87
    let sql_dag = app.into_dag()?;
×
88
    Ok(sql_dag)
×
89
}
×
90

×
91
pub fn transform_to_ui_graph(input_dag: &Dag<SchemaSQLContext>) -> QueryGraph {
60✔
92
    let input_graph = input_dag.graph();
60✔
93
    let mut nodes = vec![];
60✔
94
    let mut edges: Vec<QueryEdge> = vec![];
60✔
95
    input_graph
60✔
96
        .node_references()
60✔
97
        .for_each(|(node_index, node)| {
170✔
98
            let node_index = node_index.index() as u32;
170✔
99
            match &node.kind {
170✔
100
                dozer_core::NodeKind::Source(_) => {
60✔
101
                    nodes.push(QueryNode {
60✔
102
                        name: node.handle.id.clone(),
60✔
103
                        node_type: QueryNodeType::Connection as i32,
60✔
104
                        idx: node_index,
60✔
105
                        id: node_index,
60✔
106
                        data: node.handle.id.clone(),
60✔
107
                    });
60✔
108
                }
60✔
109
                dozer_core::NodeKind::Processor(processor) => {
50✔
110
                    nodes.push(QueryNode {
50✔
111
                        name: processor.type_name(),
50✔
112
                        node_type: QueryNodeType::Transformer as i32,
50✔
113
                        idx: node_index,
50✔
114
                        id: node_index,
50✔
115
                        data: processor.id(),
50✔
116
                    });
50✔
117
                }
50✔
118
                dozer_core::NodeKind::Sink(_) => {}
60✔
119
            }
×
120
        });
170✔
121
    input_graph.edge_references().for_each(|edge| {
140✔
122
        let source_node_index = edge.source();
140✔
123
        let target_node_index = edge.target();
140✔
124
        let from_port = edge.weight().from;
140✔
125
        match &input_graph[source_node_index].kind {
140✔
126
            dozer_core::NodeKind::Source(source) => {
90✔
127
                let source_name = source.get_output_port_name(&from_port);
90✔
128
                let new_node_idx = nodes.len() as u32;
90✔
129
                nodes.push(QueryNode {
90✔
130
                    name: source_name.clone(),
90✔
131
                    node_type: QueryNodeType::Source as i32,
90✔
132
                    idx: new_node_idx,
90✔
133
                    id: new_node_idx,
90✔
134
                    data: source_name,
90✔
135
                });
90✔
136
                edges.push(QueryEdge {
90✔
137
                    from: source_node_index.index() as u32,
90✔
138
                    to: new_node_idx,
90✔
139
                });
90✔
140
                edges.push(QueryEdge {
90✔
141
                    from: new_node_idx,
90✔
142
                    to: target_node_index.index() as u32,
90✔
143
                });
90✔
144
            }
90✔
145
            _ => {
50✔
146
                edges.push(QueryEdge {
50✔
147
                    from: source_node_index.index() as u32,
50✔
148
                    to: target_node_index.index() as u32,
50✔
149
                });
50✔
150
            }
50✔
151
        }
×
152
    });
140✔
153
    QueryGraph { nodes, edges }
60✔
154
}
60✔
155

×
156
pub fn config_to_ui_dag(config: Config) -> Result<QueryGraph, OrchestrationError> {
×
157
    let sql = config.sql.unwrap_or("".to_string());
×
158
    let mut connection_sources: HashMap<Connection, Vec<Source>> = HashMap::new();
×
159
    for source in config.sources {
×
160
        let connection = config
×
161
            .connections
×
162
            .iter()
×
163
            .find(|connection| connection.name == source.connection)
×
164
            .cloned()
×
165
            .ok_or(OrchestrationError::ConnectionNotFound(
×
166
                source.connection.clone(),
×
167
            ))?;
×
168
        let sources_same_connection = connection_sources.entry(connection).or_insert(vec![]);
×
169
        sources_same_connection.push(source);
×
170
    }
×
171
    let source_builder = SourceBuilder::new(connection_sources.clone(), None);
×
172
    let connection_source_ports = source_builder.get_ports();
×
173
    let sql_dag = prepare_pipeline_dag(
×
174
        sql,
×
175
        connection_sources,
×
176
        connection_source_ports,
×
177
        config.flags.unwrap_or_default(),
×
178
    )?;
×
179
    Ok(transform_to_ui_graph(&sql_dag))
×
180
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc