• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5693561935

pending completion
5693561935

push

github

web-flow
chore: Remove `AppSourceId` which is no longer used (#1803)

* chore: Remove `AppSourceeId` which is no longer used

* chore: Remove `AppSource` and simplify source endpoint finding process

* chore: Use `Box` instead of `Arc` for the factories

* chore: Remove unused parameter in `AppPipeline::connect_nodes`

* chore: Remove an unused `Option`

443 of 443 new or added lines in 22 files covered. (100.0%)

45511 of 58843 relevant lines covered (77.34%)

39550.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.52
/dozer-cli/src/ui_helper.rs
1
use std::collections::HashMap;
2

3
use dozer_core::{
4
    app::{App, AppPipeline},
5
    appsource::{AppSourceManager, AppSourceMappings},
6
    node::{OutputPortDef, OutputPortType, PortHandle, SourceFactory},
7
    Dag,
8
};
9
use dozer_sql::pipeline::builder::{statement_to_pipeline, SchemaSQLContext};
10
use dozer_types::{
11
    grpc_types::cloud::{QueryEdge, QueryGraph, QueryNode, QueryNodeType},
12
    models::{config::Config, connection::Connection, source::Source},
13
};
14

15
use crate::{errors::OrchestrationError, pipeline::source_builder::SourceBuilder};
16

17
#[derive(Debug)]
×
18
struct UISourceFactory {
19
    output_ports: Vec<PortHandle>,
20
}
21
impl SourceFactory<SchemaSQLContext> for UISourceFactory {
22
    fn get_output_schema(
×
23
        &self,
×
24
        _port: &dozer_core::node::PortHandle,
×
25
    ) -> Result<
×
26
        (dozer_types::types::Schema, SchemaSQLContext),
×
27
        dozer_types::errors::internal::BoxedError,
×
28
    > {
×
29
        todo!()
×
30
    }
×
31

32
    fn get_output_ports(&self) -> Vec<dozer_core::node::OutputPortDef> {
×
33
        self.output_ports
×
34
            .iter()
×
35
            .map(|e| OutputPortDef::new(*e, OutputPortType::Stateless))
×
36
            .collect()
×
37
    }
×
38

39
    fn build(
×
40
        &self,
×
41
        _output_schemas: HashMap<dozer_core::node::PortHandle, dozer_types::types::Schema>,
×
42
    ) -> Result<Box<dyn dozer_core::node::Source>, dozer_types::errors::internal::BoxedError> {
×
43
        todo!()
×
44
    }
×
45
}
46

47
fn prepare_pipeline_dag(
×
48
    sql: String,
×
49
    connection_sources: HashMap<Connection, Vec<Source>>,
×
50
    connection_source_ports: HashMap<(&str, &str), u16>,
×
51
) -> Result<Dag<SchemaSQLContext>, OrchestrationError> {
×
52
    let mut pipeline = AppPipeline::new();
×
53
    let mut asm: AppSourceManager<dozer_sql::pipeline::builder::SchemaSQLContext> =
×
54
        AppSourceManager::new();
×
55
    connection_sources.iter().for_each(|cs| {
×
56
        let (connection, sources) = cs;
×
57
        let ports = sources
×
58
            .iter()
×
59
            .map(|s| {
×
60
                let port = connection_source_ports
×
61
                    .get(&(connection.name.as_str(), s.name.as_str()))
×
62
                    .unwrap()
×
63
                    .to_owned();
×
64
                port
×
65
            })
×
66
            .collect();
×
67
        let mut ports_with_source_name: HashMap<String, u16> = HashMap::new();
×
68
        connection_source_ports.iter().for_each(|(k, v)| {
×
69
            ports_with_source_name.insert(k.1.to_string(), v.to_owned());
×
70
        });
×
71

×
72
        _ = asm.add(
×
73
            Box::new(UISourceFactory {
×
74
                output_ports: ports,
×
75
            }),
×
76
            AppSourceMappings::new(connection.name.to_string(), ports_with_source_name),
×
77
        );
×
78
    });
×
79
    statement_to_pipeline(&sql, &mut pipeline, None)?;
×
80
    let mut app = App::new(asm);
×
81
    app.add_pipeline(pipeline);
×
82
    let sql_dag = app.into_dag()?;
×
83
    Ok(sql_dag)
×
84
}
×
85

86
pub fn transform_to_ui_graph(
12✔
87
    input_dag: &Dag<SchemaSQLContext>,
12✔
88
    port_connection_source: HashMap<u16, (&str, &str)>,
12✔
89
) -> QueryGraph {
12✔
90
    let input_graph = input_dag.graph();
12✔
91
    let mut nodes = vec![];
12✔
92
    let mut edges: Vec<QueryEdge> = vec![];
12✔
93
    input_graph.raw_nodes().iter().enumerate().for_each(|n| {
44✔
94
        let weight = &n.1.weight;
44✔
95
        let idx = n.0;
44✔
96
        match &weight.kind {
44✔
97
            dozer_core::NodeKind::Source(_) => {
12✔
98
                nodes.push(QueryNode {
12✔
99
                    name: weight.handle.id.clone(),
12✔
100
                    node_type: QueryNodeType::Connection as i32,
12✔
101
                    idx: idx as u32,
12✔
102
                    id: idx as u32,
12✔
103
                    data: weight.handle.id.clone(),
12✔
104
                });
12✔
105
            }
12✔
106
            dozer_core::NodeKind::Processor(processor) => {
20✔
107
                nodes.push(QueryNode {
20✔
108
                    name: processor.type_name(),
20✔
109
                    node_type: QueryNodeType::Transformer as i32,
20✔
110
                    idx: idx as u32,
20✔
111
                    id: idx as u32,
20✔
112
                    data: processor.id(),
20✔
113
                });
20✔
114
            }
20✔
115
            dozer_core::NodeKind::Sink(_) => {}
12✔
116
        }
117
    });
44✔
118
    input_graph.raw_edges().iter().for_each(|e| {
44✔
119
        let edge = e;
44✔
120
        let edge_type = edge.weight;
44✔
121
        let sn = edge.source();
44✔
122
        let tg = edge.target();
44✔
123
        let from = edge_type.from;
44✔
124
        let source_by_port = port_connection_source.get(&from);
44✔
125
        if let Some(source_by_port) = source_by_port {
44✔
126
            let source_name = source_by_port.1;
24✔
127
            let new_node_idx = (nodes.len()) as u32;
24✔
128
            nodes.push(QueryNode {
24✔
129
                name: source_name.to_string(),
24✔
130
                node_type: QueryNodeType::Source as i32,
24✔
131
                idx: new_node_idx,
24✔
132
                id: new_node_idx,
24✔
133
                data: source_name.to_string(),
24✔
134
            });
24✔
135
            edges.push(QueryEdge {
24✔
136
                from: sn.index() as u32,
24✔
137
                to: new_node_idx,
24✔
138
            });
24✔
139
            edges.push(QueryEdge {
24✔
140
                from: new_node_idx,
24✔
141
                to: tg.index() as u32,
24✔
142
            });
24✔
143
        } else {
24✔
144
            edges.push(QueryEdge {
20✔
145
                from: sn.index() as u32,
20✔
146
                to: tg.index() as u32,
20✔
147
            });
20✔
148
        }
20✔
149
    });
44✔
150
    QueryGraph { nodes, edges }
12✔
151
}
12✔
152

153
pub fn config_to_ui_dag(config: Config) -> Result<QueryGraph, OrchestrationError> {
×
154
    let sql = config.sql.unwrap_or("".to_string());
×
155
    let mut connection_sources: HashMap<Connection, Vec<Source>> = HashMap::new();
×
156
    for source in config.sources {
×
157
        let connection = config
×
158
            .connections
×
159
            .iter()
×
160
            .find(|connection| connection.name == source.connection)
×
161
            .cloned()
×
162
            .ok_or(OrchestrationError::SourceValidationError)?;
×
163
        let sources_same_connection = connection_sources.entry(connection).or_insert(vec![]);
×
164
        sources_same_connection.push(source);
×
165
    }
166
    let source_builder = SourceBuilder::new(connection_sources.clone(), None);
×
167
    let connection_source_ports = source_builder.get_ports();
×
168
    let port_connection_source: HashMap<u16, (&str, &str)> = connection_source_ports
×
169
        .iter()
×
170
        .map(|(k, v)| (v.to_owned(), k.to_owned()))
×
171
        .collect();
×
172
    let sql_dag = prepare_pipeline_dag(sql, connection_sources, connection_source_ports)?;
×
173
    Ok(transform_to_ui_graph(&sql_dag, port_connection_source))
×
174
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc