• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5923086724

21 Aug 2023 07:05AM UTC coverage: 74.763% (-1.2%) from 75.988%
5923086724

push

github

web-flow
chore: Remove short form of `enable_progress` because it's conflicting with `dozer cloud` (#1876)

46105 of 61668 relevant lines covered (74.76%)

39792.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.2
/dozer-cli/src/pipeline/builder.rs
1
use std::collections::HashMap;
2
use std::collections::HashSet;
3
use std::sync::Arc;
4

5
use dozer_cache::dozer_log::replication::Log;
6
use dozer_core::app::App;
7
use dozer_core::app::AppPipeline;
8
use dozer_core::app::PipelineEntryPoint;
9
use dozer_core::node::SinkFactory;
10
use dozer_core::Dag;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_ingestion::connectors::{get_connector, get_connector_info_table};
13
use dozer_sql::pipeline::builder::statement_to_pipeline;
14
use dozer_sql::pipeline::builder::{OutputNodeInfo, QueryContext, SchemaSQLContext};
15
use dozer_types::indicatif::MultiProgress;
16
use dozer_types::log::debug;
17
use dozer_types::models::api_endpoint::ApiEndpoint;
18
use dozer_types::models::connection::Connection;
19
use dozer_types::models::source::Source;
20
use std::hash::Hash;
21
use tokio::runtime::Runtime;
22
use tokio::sync::Mutex;
23

24
use crate::pipeline::dummy_sink::DummySinkFactory;
25
use crate::pipeline::LogSinkFactory;
26
use crate::shutdown::ShutdownReceiver;
27
use crate::ui_helper::transform_to_ui_graph;
28

29
use super::source_builder::SourceBuilder;
30
use crate::errors::OrchestrationError;
31
use dozer_types::log::{error, info};
32
use metrics::{describe_counter, increment_counter};
33
use OrchestrationError::ExecutionError;
34

35
pub enum OutputTableInfo {
36
    Transformed(OutputNodeInfo),
37
    Original(OriginalTableInfo),
38
}
39

40
pub struct OriginalTableInfo {
41
    pub table_name: String,
42
    pub connection_name: String,
43
}
44

45
pub struct CalculatedSources {
46
    pub original_sources: Vec<String>,
47
    pub transformed_sources: Vec<String>,
48
    pub query_context: Option<QueryContext>,
49
}
50

51
type OptionLog = Option<Arc<Mutex<Log>>>;
52

53
pub struct PipelineBuilder<'a> {
54
    connections: &'a [Connection],
55
    sources: &'a [Source],
56
    sql: Option<&'a str>,
57
    /// `ApiEndpoint` and its log.
58
    endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
59
    progress: MultiProgress,
60
}
61

62
impl<'a> PipelineBuilder<'a> {
×
63
    pub fn new(
13✔
64
        connections: &'a [Connection],
13✔
65
        sources: &'a [Source],
13✔
66
        sql: Option<&'a str>,
13✔
67
        endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
13✔
68
        progress: MultiProgress,
13✔
69
    ) -> Self {
13✔
70
        Self {
13✔
71
            connections,
13✔
72
            sources,
13✔
73
            sql,
13✔
74
            endpoint_and_logs,
13✔
75
            progress,
13✔
76
        }
13✔
77
    }
13✔
78

79
    // Based on used_sources, map it to the connection name and create sources
80
    // For not breaking current functionality, current format is to be still supported.
×
81
    pub async fn get_grouped_tables(
13✔
82
        &self,
13✔
83
        original_sources: &[String],
13✔
84
    ) -> Result<HashMap<Connection, Vec<Source>>, OrchestrationError> {
13✔
85
        let mut grouped_connections: HashMap<Connection, Vec<Source>> = HashMap::new();
13✔
86

13✔
87
        let mut connector_map = HashMap::new();
13✔
88
        for connection in self.connections {
26✔
89
            let connector = get_connector(connection.clone())?;
13✔
90

×
91
            if let Ok(info_table) = get_connector_info_table(connection) {
13✔
92
                info!("[{}] Connection parameters\n{info_table}", connection.name);
2✔
93
            }
9✔
94

×
95
            let connector_tables = connector.list_tables().await?;
13✔
96

97
            // override source name if specified
×
98
            let connector_tables: Vec<Source> = connector_tables
13✔
99
                .iter()
13✔
100
                .map(|table| {
22✔
101
                    match self.sources.iter().find(|s| {
23✔
102
                        // TODO: @dario - Replace this line with the actual schema parsed from SQL
19✔
103
                        s.connection == connection.name && s.table_name == table.name
19✔
104
                    }) {
23✔
105
                        Some(source) => source.clone(),
14✔
106
                        None => Source {
8✔
107
                            name: table.name.clone(),
8✔
108
                            table_name: table.name.clone(),
8✔
109
                            schema: table.schema.clone(),
8✔
110
                            connection: connection.name.clone(),
8✔
111
                            ..Default::default()
8✔
112
                        },
8✔
113
                    }
×
114
                })
22✔
115
                .collect();
13✔
116

13✔
117
            connector_map.insert(connection.clone(), connector_tables);
13✔
118
        }
119

×
120
        for table_name in original_sources {
35✔
121
            let mut table_found = false;
22✔
122
            for (connection, tables) in connector_map.iter() {
22✔
123
                if let Some(source) = tables
22✔
124
                    .iter()
22✔
125
                    .find(|table| table.name == table_name.as_str())
31✔
126
                {
22✔
127
                    table_found = true;
22✔
128
                    grouped_connections
22✔
129
                        .entry(connection.clone())
22✔
130
                        .or_default()
22✔
131
                        .push(source.clone());
22✔
132
                }
22✔
133
            }
134

×
135
            if !table_found {
22✔
136
                error!("Table {} not found in any of the connections", table_name);
×
137
                return Err(OrchestrationError::SourceValidationError);
×
138
            }
22✔
139
        }
140

×
141
        Ok(grouped_connections)
13✔
142
    }
13✔
143

144
    // This function is used to figure out the sources that are used in the pipeline
145
    // based on the SQL and API Endpoints
×
146
    pub fn calculate_sources(&self) -> Result<CalculatedSources, OrchestrationError> {
12✔
147
        let mut original_sources = vec![];
12✔
148

12✔
149
        let mut query_ctx = None;
12✔
150
        let mut pipeline = AppPipeline::new();
12✔
151

12✔
152
        let mut transformed_sources = vec![];
12✔
153

×
154
        if let Some(sql) = &self.sql {
12✔
155
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
8✔
156
                .map_err(OrchestrationError::PipelineError)?;
8✔
157

×
158
            query_ctx = Some(query_context.clone());
8✔
159

×
160
            for (name, _) in query_context.output_tables_map {
16✔
161
                if transformed_sources.contains(&name) {
8✔
162
                    return Err(OrchestrationError::DuplicateTable(name));
×
163
                }
8✔
164
                transformed_sources.push(name.clone());
8✔
165
            }
166

×
167
            for name in query_context.used_sources {
28✔
168
                // Add all source tables to input tables
20✔
169
                original_sources.push(name);
20✔
170
            }
20✔
171
        }
4✔
172

173
        // Add Used Souces if direct from source
×
174
        for (api_endpoint, _) in &self.endpoint_and_logs {
24✔
175
            let table_name = &api_endpoint.table_name;
12✔
176

12✔
177
            // Don't add if the table is a result of SQL
12✔
178
            if !transformed_sources.contains(table_name) {
12✔
179
                original_sources.push(table_name.clone());
4✔
180
            }
8✔
181
        }
×
182
        dedup(&mut original_sources);
12✔
183
        dedup(&mut transformed_sources);
12✔
184

12✔
185
        Ok(CalculatedSources {
12✔
186
            original_sources,
12✔
187
            transformed_sources,
12✔
188
            query_context: query_ctx,
12✔
189
        })
12✔
190
    }
12✔
191

192
    // This function is used by both building and actual execution
×
193
    pub async fn build(
12✔
194
        self,
12✔
195
        runtime: &Arc<Runtime>,
12✔
196
        shutdown: ShutdownReceiver,
12✔
197
    ) -> Result<dozer_core::Dag<SchemaSQLContext>, OrchestrationError> {
12✔
198
        let calculated_sources = self.calculate_sources()?;
12✔
199

×
200
        debug!("Used Sources: {:?}", calculated_sources.original_sources);
×
201
        let grouped_connections = self
12✔
202
            .get_grouped_tables(&calculated_sources.original_sources)
12✔
203
            .await?;
×
204

×
205
        let mut pipelines: Vec<AppPipeline<SchemaSQLContext>> = vec![];
12✔
206

12✔
207
        let mut pipeline = AppPipeline::new();
12✔
208

12✔
209
        let mut available_output_tables: HashMap<String, OutputTableInfo> = HashMap::new();
12✔
210

×
211
        // Add all source tables to available output tables
×
212
        for (connection, sources) in &grouped_connections {
24✔
213
            for source in sources {
32✔
214
                available_output_tables.insert(
20✔
215
                    source.name.clone(),
20✔
216
                    OutputTableInfo::Original(OriginalTableInfo {
20✔
217
                        connection_name: connection.name.to_string(),
20✔
218
                        table_name: source.name.clone(),
20✔
219
                    }),
20✔
220
                );
20✔
221
            }
20✔
222
        }
×
223

×
224
        if let Some(sql) = &self.sql {
12✔
225
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
8✔
226
                .map_err(OrchestrationError::PipelineError)?;
8✔
227

×
228
            for (name, table_info) in query_context.output_tables_map {
16✔
229
                if available_output_tables.contains_key(name.as_str()) {
8✔
230
                    return Err(OrchestrationError::DuplicateTable(name));
×
231
                }
8✔
232
                available_output_tables
8✔
233
                    .insert(name.clone(), OutputTableInfo::Transformed(table_info));
8✔
234
            }
235
        }
4✔
236

×
237
        for (api_endpoint, log) in self.endpoint_and_logs {
24✔
238
            let table_name = &api_endpoint.table_name;
12✔
239

×
240
            let table_info = available_output_tables
12✔
241
                .get(table_name)
12✔
242
                .ok_or_else(|| OrchestrationError::EndpointTableNotFound(table_name.clone()))?;
12✔
243

×
244
            let snk_factory: Box<dyn SinkFactory<SchemaSQLContext>> = if let Some(log) = log {
12✔
245
                Box::new(LogSinkFactory::new(
6✔
246
                    runtime.clone(),
6✔
247
                    log,
6✔
248
                    api_endpoint.name.clone(),
6✔
249
                    self.progress.clone(),
6✔
250
                ))
6✔
251
            } else {
252
                Box::new(DummySinkFactory)
6✔
253
            };
×
254

×
255
            match table_info {
12✔
256
                OutputTableInfo::Transformed(table_info) => {
8✔
257
                    pipeline.add_sink(snk_factory, api_endpoint.name.as_str(), None);
8✔
258

8✔
259
                    pipeline.connect_nodes(
8✔
260
                        &table_info.node,
8✔
261
                        table_info.port,
8✔
262
                        api_endpoint.name.as_str(),
8✔
263
                        DEFAULT_PORT_HANDLE,
8✔
264
                    );
8✔
265
                }
8✔
266
                OutputTableInfo::Original(table_info) => {
4✔
267
                    pipeline.add_sink(
4✔
268
                        snk_factory,
4✔
269
                        api_endpoint.name.as_str(),
4✔
270
                        Some(PipelineEntryPoint::new(
4✔
271
                            table_info.table_name.clone(),
4✔
272
                            DEFAULT_PORT_HANDLE,
4✔
273
                        )),
4✔
274
                    );
4✔
275
                }
4✔
276
            }
277
        }
×
278

×
279
        pipelines.push(pipeline);
12✔
280

12✔
281
        let source_builder = SourceBuilder::new(grouped_connections, Some(&self.progress));
12✔
282
        let asm = source_builder
12✔
283
            .build_source_manager(runtime, shutdown)
12✔
284
            .await?;
48✔
285
        let mut app = App::new(asm);
12✔
286

12✔
287
        Vec::into_iter(pipelines).for_each(|p| {
12✔
288
            app.add_pipeline(p);
12✔
289
        });
12✔
290

×
291
        let dag = app.into_dag().map_err(ExecutionError)?;
12✔
292

×
293
        // Emit metrics for monitoring
×
294
        emit_dag_metrics(&dag);
12✔
295

12✔
296
        Ok(dag)
12✔
297
    }
12✔
298
}
×
299

×
300
fn dedup<T: Eq + Hash + Clone>(v: &mut Vec<T>) {
24✔
301
    let mut uniques = HashSet::new();
24✔
302
    v.retain(|e| uniques.insert(e.clone()));
32✔
303
}
24✔
304

×
305
pub fn emit_dag_metrics(input_dag: &Dag<SchemaSQLContext>) {
12✔
306
    const GRAPH_NODES: &str = "pipeline_nodes";
12✔
307
    const GRAPH_EDGES: &str = "pipeline_edges";
12✔
308

12✔
309
    describe_counter!(GRAPH_NODES, "Number of nodes in the pipeline");
12✔
310
    describe_counter!(GRAPH_EDGES, "Number of edges in the pipeline");
12✔
311

×
312
    let query_graph = transform_to_ui_graph(input_dag);
12✔
313

×
314
    for node in query_graph.nodes {
68✔
315
        let node_name = node.name;
56✔
316
        let node_type = node.node_type;
56✔
317
        let node_idx = node.idx;
56✔
318
        let node_id = node.id;
56✔
319
        let node_data = node.data;
56✔
320

56✔
321
        let labels: [(&str, String); 5] = [
56✔
322
            ("node_name", node_name),
56✔
323
            ("node_type", node_type.to_string()),
56✔
324
            ("node_idx", node_idx.to_string()),
56✔
325
            ("node_id", node_id.to_string()),
56✔
326
            ("node_data", node_data.to_string()),
56✔
327
        ];
56✔
328

56✔
329
        increment_counter!(GRAPH_NODES, &labels);
56✔
330
    }
×
331

×
332
    for edge in query_graph.edges {
80✔
333
        let from = edge.from;
68✔
334
        let to = edge.to;
68✔
335

68✔
336
        let labels: [(&str, String); 2] = [("from", from.to_string()), ("to", to.to_string())];
68✔
337

68✔
338
        increment_counter!(GRAPH_EDGES, &labels);
68✔
339
    }
340
}
12✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc