• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5888798292

17 Aug 2023 08:51AM UTC coverage: 76.025% (-1.4%) from 77.415%
5888798292

push

github

web-flow
feat: implement graph on live ui (#1847)

* feat: implement progress

* feat: implement enable progress flag

* feat: implement progress in live

* chore: fix clippy

* chore: always use telemetry metrics

* fix: Only run build once

---------

Co-authored-by: sagar <sagar@getdozer.io>
Co-authored-by: chubei <914745487@qq.com>

536 of 536 new or added lines in 21 files covered. (100.0%)

46101 of 60639 relevant lines covered (76.03%)

40410.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.17
/dozer-cli/src/pipeline/builder.rs
1
use std::collections::HashMap;
2
use std::collections::HashSet;
3
use std::sync::Arc;
4

5
use dozer_cache::dozer_log::replication::Log;
6
use dozer_core::app::App;
7
use dozer_core::app::AppPipeline;
8
use dozer_core::app::PipelineEntryPoint;
9
use dozer_core::node::SinkFactory;
10
use dozer_core::Dag;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_ingestion::connectors::{get_connector, get_connector_info_table};
13
use dozer_sql::pipeline::builder::statement_to_pipeline;
14
use dozer_sql::pipeline::builder::{OutputNodeInfo, QueryContext, SchemaSQLContext};
15
use dozer_types::indicatif::MultiProgress;
16
use dozer_types::log::debug;
17
use dozer_types::models::api_endpoint::ApiEndpoint;
18
use dozer_types::models::connection::Connection;
19
use dozer_types::models::source::Source;
20
use std::hash::Hash;
21
use tokio::runtime::Runtime;
22
use tokio::sync::Mutex;
23

24
use crate::pipeline::dummy_sink::DummySinkFactory;
25
use crate::pipeline::LogSinkFactory;
26
use crate::ui_helper::transform_to_ui_graph;
27

28
use super::source_builder::SourceBuilder;
29
use crate::errors::OrchestrationError;
30
use dozer_types::log::{error, info};
31
use metrics::{describe_counter, increment_counter};
32
use OrchestrationError::ExecutionError;
33

34
pub enum OutputTableInfo {
35
    Transformed(OutputNodeInfo),
36
    Original(OriginalTableInfo),
37
}
38

39
pub struct OriginalTableInfo {
40
    pub table_name: String,
41
    pub connection_name: String,
42
}
43

44
pub struct CalculatedSources {
45
    pub original_sources: Vec<String>,
46
    pub transformed_sources: Vec<String>,
47
    pub query_context: Option<QueryContext>,
48
}
49

50
type OptionLog = Option<Arc<Mutex<Log>>>;
51

52
pub struct PipelineBuilder<'a> {
53
    connections: &'a [Connection],
54
    sources: &'a [Source],
55
    sql: Option<&'a str>,
56
    /// `ApiEndpoint` and its log.
57
    endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
58
    progress: MultiProgress,
59
}
60

61
impl<'a> PipelineBuilder<'a> {
62
    pub fn new(
13✔
63
        connections: &'a [Connection],
13✔
64
        sources: &'a [Source],
13✔
65
        sql: Option<&'a str>,
13✔
66
        endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
13✔
67
        progress: MultiProgress,
13✔
68
    ) -> Self {
13✔
69
        Self {
13✔
70
            connections,
13✔
71
            sources,
13✔
72
            sql,
13✔
73
            endpoint_and_logs,
13✔
74
            progress,
13✔
75
        }
13✔
76
    }
13✔
77

78
    // Based on used_sources, map it to the connection name and create sources
79
    // For not breaking current functionality, current format is to be still supported.
80
    pub async fn get_grouped_tables(
13✔
81
        &self,
13✔
82
        original_sources: &[String],
13✔
83
    ) -> Result<HashMap<Connection, Vec<Source>>, OrchestrationError> {
13✔
84
        let mut grouped_connections: HashMap<Connection, Vec<Source>> = HashMap::new();
13✔
85

13✔
86
        let mut connector_map = HashMap::new();
13✔
87
        for connection in self.connections {
26✔
88
            let connector = get_connector(connection.clone())?;
13✔
89

90
            if let Ok(info_table) = get_connector_info_table(connection) {
13✔
91
                info!("[{}] Connection parameters\n{info_table}", connection.name);
2✔
92
            }
9✔
93

94
            let connector_tables = connector.list_tables().await?;
13✔
95

96
            // override source name if specified
97
            let connector_tables: Vec<Source> = connector_tables
13✔
98
                .iter()
13✔
99
                .map(|table| {
22✔
100
                    match self.sources.iter().find(|s| {
23✔
101
                        // TODO: @dario - Replace this line with the actual schema parsed from SQL
19✔
102
                        s.connection == connection.name && s.table_name == table.name
19✔
103
                    }) {
23✔
104
                        Some(source) => source.clone(),
14✔
105
                        None => Source {
8✔
106
                            name: table.name.clone(),
8✔
107
                            table_name: table.name.clone(),
8✔
108
                            schema: table.schema.clone(),
8✔
109
                            connection: connection.name.clone(),
8✔
110
                            ..Default::default()
8✔
111
                        },
8✔
112
                    }
113
                })
22✔
114
                .collect();
13✔
115

13✔
116
            connector_map.insert(connection.clone(), connector_tables);
13✔
117
        }
118

119
        for table_name in original_sources {
35✔
120
            let mut table_found = false;
22✔
121
            for (connection, tables) in connector_map.iter() {
22✔
122
                if let Some(source) = tables
22✔
123
                    .iter()
22✔
124
                    .find(|table| table.name == table_name.as_str())
31✔
125
                {
22✔
126
                    table_found = true;
22✔
127
                    grouped_connections
22✔
128
                        .entry(connection.clone())
22✔
129
                        .or_default()
22✔
130
                        .push(source.clone());
22✔
131
                }
22✔
132
            }
133

134
            if !table_found {
22✔
135
                error!("Table {} not found in any of the connections", table_name);
×
136
                return Err(OrchestrationError::SourceValidationError);
×
137
            }
22✔
138
        }
139

140
        Ok(grouped_connections)
13✔
141
    }
13✔
142

143
    // This function is used to figure out the sources that are used in the pipeline
144
    // based on the SQL and API Endpoints
145
    pub fn calculate_sources(&self) -> Result<CalculatedSources, OrchestrationError> {
12✔
146
        let mut original_sources = vec![];
12✔
147

12✔
148
        let mut query_ctx = None;
12✔
149
        let mut pipeline = AppPipeline::new();
12✔
150

12✔
151
        let mut transformed_sources = vec![];
12✔
152

153
        if let Some(sql) = &self.sql {
12✔
154
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
8✔
155
                .map_err(OrchestrationError::PipelineError)?;
8✔
156

157
            query_ctx = Some(query_context.clone());
8✔
158

159
            for (name, _) in query_context.output_tables_map {
16✔
160
                if transformed_sources.contains(&name) {
8✔
161
                    return Err(OrchestrationError::DuplicateTable(name));
×
162
                }
8✔
163
                transformed_sources.push(name.clone());
8✔
164
            }
165

166
            for name in query_context.used_sources {
28✔
167
                // Add all source tables to input tables
20✔
168
                original_sources.push(name);
20✔
169
            }
20✔
170
        }
4✔
171

172
        // Add Used Souces if direct from source
173
        for (api_endpoint, _) in &self.endpoint_and_logs {
24✔
174
            let table_name = &api_endpoint.table_name;
12✔
175

12✔
176
            // Don't add if the table is a result of SQL
12✔
177
            if !transformed_sources.contains(table_name) {
12✔
178
                original_sources.push(table_name.clone());
4✔
179
            }
8✔
180
        }
181
        dedup(&mut original_sources);
12✔
182
        dedup(&mut transformed_sources);
12✔
183

12✔
184
        Ok(CalculatedSources {
12✔
185
            original_sources,
12✔
186
            transformed_sources,
12✔
187
            query_context: query_ctx,
12✔
188
        })
12✔
189
    }
12✔
190

191
    // This function is used by both building and actual execution
192
    pub async fn build(
12✔
193
        self,
12✔
194
        runtime: &Arc<Runtime>,
12✔
195
    ) -> Result<dozer_core::Dag<SchemaSQLContext>, OrchestrationError> {
12✔
196
        let calculated_sources = self.calculate_sources()?;
12✔
197

198
        debug!("Used Sources: {:?}", calculated_sources.original_sources);
×
199
        let grouped_connections = self
12✔
200
            .get_grouped_tables(&calculated_sources.original_sources)
12✔
201
            .await?;
×
202

×
203
        let mut pipelines: Vec<AppPipeline<SchemaSQLContext>> = vec![];
12✔
204

12✔
205
        let mut pipeline = AppPipeline::new();
12✔
206

12✔
207
        let mut available_output_tables: HashMap<String, OutputTableInfo> = HashMap::new();
12✔
208

209
        // Add all source tables to available output tables
×
210
        for (connection, sources) in &grouped_connections {
24✔
211
            for source in sources {
32✔
212
                available_output_tables.insert(
20✔
213
                    source.name.clone(),
20✔
214
                    OutputTableInfo::Original(OriginalTableInfo {
20✔
215
                        connection_name: connection.name.to_string(),
20✔
216
                        table_name: source.name.clone(),
20✔
217
                    }),
20✔
218
                );
20✔
219
            }
20✔
220
        }
221

×
222
        if let Some(sql) = &self.sql {
12✔
223
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
8✔
224
                .map_err(OrchestrationError::PipelineError)?;
8✔
225

×
226
            for (name, table_info) in query_context.output_tables_map {
16✔
227
                if available_output_tables.contains_key(name.as_str()) {
8✔
228
                    return Err(OrchestrationError::DuplicateTable(name));
×
229
                }
8✔
230
                available_output_tables
8✔
231
                    .insert(name.clone(), OutputTableInfo::Transformed(table_info));
8✔
232
            }
×
233
        }
4✔
234

×
235
        for (api_endpoint, log) in self.endpoint_and_logs {
24✔
236
            let table_name = &api_endpoint.table_name;
12✔
237

×
238
            let table_info = available_output_tables
12✔
239
                .get(table_name)
12✔
240
                .ok_or_else(|| OrchestrationError::EndpointTableNotFound(table_name.clone()))?;
12✔
241

×
242
            let snk_factory: Box<dyn SinkFactory<SchemaSQLContext>> = if let Some(log) = log {
12✔
243
                Box::new(LogSinkFactory::new(
6✔
244
                    runtime.clone(),
6✔
245
                    log,
6✔
246
                    api_endpoint.name.clone(),
6✔
247
                    self.progress.clone(),
6✔
248
                ))
6✔
249
            } else {
×
250
                Box::new(DummySinkFactory)
6✔
251
            };
252

×
253
            match table_info {
12✔
254
                OutputTableInfo::Transformed(table_info) => {
8✔
255
                    pipeline.add_sink(snk_factory, api_endpoint.name.as_str(), None);
8✔
256

8✔
257
                    pipeline.connect_nodes(
8✔
258
                        &table_info.node,
8✔
259
                        table_info.port,
8✔
260
                        api_endpoint.name.as_str(),
8✔
261
                        DEFAULT_PORT_HANDLE,
8✔
262
                    );
8✔
263
                }
8✔
264
                OutputTableInfo::Original(table_info) => {
4✔
265
                    pipeline.add_sink(
4✔
266
                        snk_factory,
4✔
267
                        api_endpoint.name.as_str(),
4✔
268
                        Some(PipelineEntryPoint::new(
4✔
269
                            table_info.table_name.clone(),
4✔
270
                            DEFAULT_PORT_HANDLE,
4✔
271
                        )),
4✔
272
                    );
4✔
273
                }
4✔
274
            }
275
        }
276

×
277
        pipelines.push(pipeline);
12✔
278

12✔
279
        let source_builder = SourceBuilder::new(grouped_connections, Some(&self.progress));
12✔
280
        let asm = source_builder.build_source_manager(runtime).await?;
48✔
281
        let mut app = App::new(asm);
12✔
282

12✔
283
        Vec::into_iter(pipelines).for_each(|p| {
12✔
284
            app.add_pipeline(p);
12✔
285
        });
12✔
286

×
287
        let dag = app.into_dag().map_err(ExecutionError)?;
12✔
288

×
289
        // Emit metrics for monitoring
290
        emit_dag_metrics(&dag);
12✔
291

12✔
292
        Ok(dag)
12✔
293
    }
12✔
294
}
×
295

296
fn dedup<T: Eq + Hash + Clone>(v: &mut Vec<T>) {
24✔
297
    let mut uniques = HashSet::new();
24✔
298
    v.retain(|e| uniques.insert(e.clone()));
32✔
299
}
24✔
300

×
301
pub fn emit_dag_metrics(input_dag: &Dag<SchemaSQLContext>) {
12✔
302
    const GRAPH_NODES: &str = "pipeline_nodes";
12✔
303
    const GRAPH_EDGES: &str = "pipeline_edges";
12✔
304

12✔
305
    describe_counter!(GRAPH_NODES, "Number of nodes in the pipeline");
12✔
306
    describe_counter!(GRAPH_EDGES, "Number of edges in the pipeline");
12✔
307

×
308
    let query_graph = transform_to_ui_graph(input_dag);
12✔
309

×
310
    for node in query_graph.nodes {
68✔
311
        let node_name = node.name;
56✔
312
        let node_type = node.node_type;
56✔
313
        let node_idx = node.idx;
56✔
314
        let node_id = node.id;
56✔
315
        let node_data = node.data;
56✔
316

56✔
317
        let labels: [(&str, String); 5] = [
56✔
318
            ("node_name", node_name),
56✔
319
            ("node_type", node_type.to_string()),
56✔
320
            ("node_idx", node_idx.to_string()),
56✔
321
            ("node_id", node_id.to_string()),
56✔
322
            ("node_data", node_data.to_string()),
56✔
323
        ];
56✔
324

56✔
325
        increment_counter!(GRAPH_NODES, &labels);
56✔
326
    }
×
327

328
    for edge in query_graph.edges {
80✔
329
        let from = edge.from;
68✔
330
        let to = edge.to;
68✔
331

68✔
332
        let labels: [(&str, String); 2] = [("from", from.to_string()), ("to", to.to_string())];
68✔
333

68✔
334
        increment_counter!(GRAPH_EDGES, &labels);
68✔
335
    }
×
336
}
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc