• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5725710489

pending completion
5725710489

push

github

web-flow
chore: Add `SourceFactory::get_output_port_name` to simplify ui graph generation (#1812)

140 of 140 new or added lines in 13 files covered. (100.0%)

45519 of 60083 relevant lines covered (75.76%)

39458.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.76
/dozer-cli/src/pipeline/builder.rs
1
use std::collections::HashMap;
2
use std::collections::HashSet;
3
use std::sync::Arc;
4

5
use dozer_cache::dozer_log::replication::Log;
6
use dozer_core::app::App;
7
use dozer_core::app::AppPipeline;
8
use dozer_core::app::PipelineEntryPoint;
9
use dozer_core::node::SinkFactory;
10
use dozer_core::Dag;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_ingestion::connectors::{get_connector, get_connector_info_table};
13
use dozer_sql::pipeline::builder::statement_to_pipeline;
14
use dozer_sql::pipeline::builder::{OutputNodeInfo, QueryContext, SchemaSQLContext};
15
use dozer_types::indicatif::MultiProgress;
16
use dozer_types::log::debug;
17
use dozer_types::models::api_endpoint::ApiEndpoint;
18
use dozer_types::models::connection::Connection;
19
use dozer_types::models::source::Source;
20
use std::hash::Hash;
21
use tokio::runtime::Runtime;
22
use tokio::sync::Mutex;
23

24
use crate::pipeline::dummy_sink::DummySinkFactory;
25
use crate::pipeline::LogSinkFactory;
26
use crate::ui_helper::transform_to_ui_graph;
27

28
use super::source_builder::SourceBuilder;
29
use crate::errors::OrchestrationError;
30
use dozer_types::log::{error, info};
31
use metrics::{describe_counter, increment_counter};
32
use OrchestrationError::ExecutionError;
33

34
pub enum OutputTableInfo {
35
    Transformed(OutputNodeInfo),
36
    Original(OriginalTableInfo),
37
}
38

39
pub struct OriginalTableInfo {
40
    pub table_name: String,
41
    pub connection_name: String,
42
}
43

44
pub struct CalculatedSources {
45
    pub original_sources: Vec<String>,
46
    pub transformed_sources: Vec<String>,
47
    pub query_context: Option<QueryContext>,
48
}
49

50
type OptionLog = Option<Arc<Mutex<Log>>>;
51

52
pub struct PipelineBuilder<'a> {
53
    connections: &'a [Connection],
54
    sources: &'a [Source],
55
    sql: Option<&'a str>,
56
    /// `ApiEndpoint` and its log.
57
    endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
58
    progress: MultiProgress,
59
}
60

61
impl<'a> PipelineBuilder<'a> {
62
    pub fn new(
13✔
63
        connections: &'a [Connection],
13✔
64
        sources: &'a [Source],
13✔
65
        sql: Option<&'a str>,
13✔
66
        endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
13✔
67
        progress: MultiProgress,
13✔
68
    ) -> Self {
13✔
69
        Self {
13✔
70
            connections,
13✔
71
            sources,
13✔
72
            sql,
13✔
73
            endpoint_and_logs,
13✔
74
            progress,
13✔
75
        }
13✔
76
    }
13✔
77

78
    // Based on used_sources, map it to the connection name and create sources
79
    // For not breaking current functionality, current format is to be still supported.
80
    pub async fn get_grouped_tables(
13✔
81
        &self,
13✔
82
        original_sources: &[String],
13✔
83
    ) -> Result<HashMap<Connection, Vec<Source>>, OrchestrationError> {
13✔
84
        let mut grouped_connections: HashMap<Connection, Vec<Source>> = HashMap::new();
13✔
85

13✔
86
        let mut connector_map = HashMap::new();
13✔
87
        for connection in self.connections {
26✔
88
            let connector = get_connector(connection.clone())?;
13✔
89

90
            if let Ok(info_table) = get_connector_info_table(connection) {
13✔
91
                info!("[{}] Connection parameters\n{info_table}", connection.name);
2✔
92
            }
9✔
93

94
            let connector_tables = connector.list_tables().await?;
13✔
95

96
            // override source name if specified
97
            let connector_tables: Vec<Source> = connector_tables
13✔
98
                .iter()
13✔
99
                .map(|table| {
22✔
100
                    match self.sources.iter().find(|s| {
23✔
101
                        // TODO: @dario - Replace this line with the actual schema parsed from SQL
19✔
102
                        s.connection == connection.name && s.table_name == table.name
19✔
103
                    }) {
23✔
104
                        Some(source) => source.clone(),
14✔
105
                        None => Source {
8✔
106
                            name: table.name.clone(),
8✔
107
                            table_name: table.name.clone(),
8✔
108
                            schema: table.schema.clone(),
8✔
109
                            connection: connection.name.clone(),
8✔
110
                            ..Default::default()
8✔
111
                        },
8✔
112
                    }
113
                })
22✔
114
                .collect();
13✔
115

13✔
116
            connector_map.insert(connection.clone(), connector_tables);
13✔
117
        }
118

119
        for table_name in original_sources {
35✔
120
            let mut table_found = false;
22✔
121
            for (connection, tables) in connector_map.iter() {
22✔
122
                if let Some(source) = tables
22✔
123
                    .iter()
22✔
124
                    .find(|table| table.name == table_name.as_str())
31✔
125
                {
22✔
126
                    table_found = true;
22✔
127
                    grouped_connections
22✔
128
                        .entry(connection.clone())
22✔
129
                        .or_default()
22✔
130
                        .push(source.clone());
22✔
131
                }
22✔
132
            }
133

134
            if !table_found {
22✔
135
                error!("Table {} not found in any of the connections", table_name);
×
136
                return Err(OrchestrationError::SourceValidationError);
×
137
            }
22✔
138
        }
139

140
        Ok(grouped_connections)
13✔
141
    }
13✔
142

143
    // This function is used to figure out the sources that are used in the pipeline
144
    // based on the SQL and API Endpoints
145
    pub fn calculate_sources(&self) -> Result<CalculatedSources, OrchestrationError> {
12✔
146
        let mut original_sources = vec![];
12✔
147

12✔
148
        let mut query_ctx = None;
12✔
149
        let mut pipeline = AppPipeline::new();
12✔
150

12✔
151
        let mut transformed_sources = vec![];
12✔
152

153
        if let Some(sql) = &self.sql {
12✔
154
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
8✔
155
                .map_err(OrchestrationError::PipelineError)?;
8✔
156

157
            query_ctx = Some(query_context.clone());
8✔
158

159
            for (name, _) in query_context.output_tables_map {
16✔
160
                if transformed_sources.contains(&name) {
8✔
161
                    return Err(OrchestrationError::DuplicateTable(name));
×
162
                }
8✔
163
                transformed_sources.push(name.clone());
8✔
164
            }
165

166
            for name in query_context.used_sources {
28✔
167
                // Add all source tables to input tables
20✔
168
                original_sources.push(name);
20✔
169
            }
20✔
170
        }
4✔
171

172
        // Add Used Souces if direct from source
173
        for (api_endpoint, _) in &self.endpoint_and_logs {
24✔
174
            let table_name = &api_endpoint.table_name;
12✔
175

12✔
176
            // Don't add if the table is a result of SQL
12✔
177
            if !transformed_sources.contains(table_name) {
12✔
178
                original_sources.push(table_name.clone());
4✔
179
            }
8✔
180
        }
181
        dedup(&mut original_sources);
12✔
182
        dedup(&mut transformed_sources);
12✔
183

12✔
184
        Ok(CalculatedSources {
12✔
185
            original_sources,
12✔
186
            transformed_sources,
12✔
187
            query_context: query_ctx,
12✔
188
        })
12✔
189
    }
12✔
190

191
    // This function is used by both building and actual execution
192
    pub fn build(
12✔
193
        self,
12✔
194
        runtime: Arc<Runtime>,
12✔
195
    ) -> Result<dozer_core::Dag<SchemaSQLContext>, OrchestrationError> {
12✔
196
        let calculated_sources = self.calculate_sources()?;
12✔
197

198
        debug!("Used Sources: {:?}", calculated_sources.original_sources);
12✔
199
        let grouped_connections =
12✔
200
            runtime.block_on(self.get_grouped_tables(&calculated_sources.original_sources))?;
12✔
201

202
        let mut pipelines: Vec<AppPipeline<SchemaSQLContext>> = vec![];
12✔
203

12✔
204
        let mut pipeline = AppPipeline::new();
12✔
205

12✔
206
        let mut available_output_tables: HashMap<String, OutputTableInfo> = HashMap::new();
12✔
207

208
        // Add all source tables to available output tables
209
        for (connection, sources) in &grouped_connections {
24✔
210
            for source in sources {
32✔
211
                available_output_tables.insert(
20✔
212
                    source.name.clone(),
20✔
213
                    OutputTableInfo::Original(OriginalTableInfo {
20✔
214
                        connection_name: connection.name.to_string(),
20✔
215
                        table_name: source.name.clone(),
20✔
216
                    }),
20✔
217
                );
20✔
218
            }
20✔
219
        }
220

221
        if let Some(sql) = &self.sql {
12✔
222
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
8✔
223
                .map_err(OrchestrationError::PipelineError)?;
8✔
224

225
            for (name, table_info) in query_context.output_tables_map {
16✔
226
                if available_output_tables.contains_key(name.as_str()) {
8✔
227
                    return Err(OrchestrationError::DuplicateTable(name));
×
228
                }
8✔
229
                available_output_tables
8✔
230
                    .insert(name.clone(), OutputTableInfo::Transformed(table_info));
8✔
231
            }
232
        }
4✔
233

234
        for (api_endpoint, log) in self.endpoint_and_logs {
24✔
235
            let table_name = &api_endpoint.table_name;
12✔
236

×
237
            let table_info = available_output_tables
12✔
238
                .get(table_name)
12✔
239
                .ok_or_else(|| OrchestrationError::EndpointTableNotFound(table_name.clone()))?;
12✔
240

241
            let snk_factory: Box<dyn SinkFactory<SchemaSQLContext>> = if let Some(log) = log {
12✔
242
                Box::new(LogSinkFactory::new(
6✔
243
                    runtime.clone(),
6✔
244
                    log,
6✔
245
                    api_endpoint.name.clone(),
6✔
246
                    self.progress.clone(),
6✔
247
                ))
6✔
248
            } else {
×
249
                Box::new(DummySinkFactory)
6✔
250
            };
×
251

×
252
            match table_info {
12✔
253
                OutputTableInfo::Transformed(table_info) => {
8✔
254
                    pipeline.add_sink(snk_factory, api_endpoint.name.as_str(), None);
8✔
255

8✔
256
                    pipeline.connect_nodes(
8✔
257
                        &table_info.node,
8✔
258
                        table_info.port,
8✔
259
                        api_endpoint.name.as_str(),
8✔
260
                        DEFAULT_PORT_HANDLE,
8✔
261
                    );
8✔
262
                }
8✔
263
                OutputTableInfo::Original(table_info) => {
4✔
264
                    pipeline.add_sink(
4✔
265
                        snk_factory,
4✔
266
                        api_endpoint.name.as_str(),
4✔
267
                        Some(PipelineEntryPoint::new(
4✔
268
                            table_info.table_name.clone(),
4✔
269
                            DEFAULT_PORT_HANDLE,
4✔
270
                        )),
4✔
271
                    );
4✔
272
                }
4✔
273
            }
×
274
        }
×
275

×
276
        pipelines.push(pipeline);
12✔
277

12✔
278
        let source_builder = SourceBuilder::new(grouped_connections, Some(&self.progress));
12✔
279
        let asm = source_builder.build_source_manager(runtime)?;
12✔
280
        let mut app = App::new(asm);
12✔
281

12✔
282
        Vec::into_iter(pipelines).for_each(|p| {
12✔
283
            app.add_pipeline(p);
12✔
284
        });
12✔
285

×
286
        let dag = app.into_dag().map_err(ExecutionError)?;
12✔
287

288
        debug!("{}", dag);
12✔
289

×
290
        // Emit metrics for monitoring
291
        emit_dag_metrics(&dag);
12✔
292

12✔
293
        Ok(dag)
12✔
294
    }
12✔
295
}
×
296

×
297
fn dedup<T: Eq + Hash + Clone>(v: &mut Vec<T>) {
24✔
298
    let mut uniques = HashSet::new();
24✔
299
    v.retain(|e| uniques.insert(e.clone()));
32✔
300
}
24✔
301

302
pub fn emit_dag_metrics(input_dag: &Dag<SchemaSQLContext>) {
12✔
303
    const GRAPH_NODES: &str = "pipeline_nodes";
12✔
304
    const GRAPH_EDGES: &str = "pipeline_edges";
12✔
305

12✔
306
    describe_counter!(GRAPH_NODES, "Number of nodes in the pipeline");
12✔
307
    describe_counter!(GRAPH_EDGES, "Number of edges in the pipeline");
12✔
308

×
309
    let query_graph = transform_to_ui_graph(input_dag);
12✔
310

311
    for node in query_graph.nodes {
68✔
312
        let node_name = node.name;
56✔
313
        let node_type = node.node_type;
56✔
314
        let node_idx = node.idx;
56✔
315
        let node_id = node.id;
56✔
316
        let node_data = node.data;
56✔
317

56✔
318
        let labels: [(&str, String); 5] = [
56✔
319
            ("node_name", node_name),
56✔
320
            ("node_type", node_type.to_string()),
56✔
321
            ("node_idx", node_idx.to_string()),
56✔
322
            ("node_id", node_id.to_string()),
56✔
323
            ("node_data", node_data.to_string()),
56✔
324
        ];
56✔
325

56✔
326
        increment_counter!(GRAPH_NODES, &labels);
56✔
327
    }
×
328

×
329
    for edge in query_graph.edges {
80✔
330
        let from = edge.from;
68✔
331
        let to = edge.to;
68✔
332

68✔
333
        let labels: [(&str, String); 2] = [("from", from.to_string()), ("to", to.to_string())];
68✔
334

68✔
335
        increment_counter!(GRAPH_EDGES, &labels);
68✔
336
    }
×
337
}
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc