• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5978430793

25 Aug 2023 04:54PM UTC coverage: 75.575% (-0.7%) from 76.279%
5978430793

push

github

web-flow
Bump ordered-float from 3.4.0 to 3.9.1 (#1919)

Bumps [ordered-float](https://github.com/reem/rust-ordered-float) from 3.4.0 to 3.9.1.
- [Release notes](https://github.com/reem/rust-ordered-float/releases)
- [Commits](https://github.com/reem/rust-ordered-float/compare/v3.4.0...v3.9.1)

---
updated-dependencies:
- dependency-name: ordered-float
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>

47272 of 62550 relevant lines covered (75.57%)

49425.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.35
/dozer-cli/src/pipeline/builder.rs
1
use std::collections::HashMap;
2
use std::collections::HashSet;
3
use std::sync::Arc;
4

5
use dozer_cache::dozer_log::replication::Log;
6
use dozer_core::app::App;
7
use dozer_core::app::AppPipeline;
8
use dozer_core::app::PipelineEntryPoint;
9
use dozer_core::node::SinkFactory;
10
use dozer_core::Dag;
11
use dozer_core::DEFAULT_PORT_HANDLE;
12
use dozer_ingestion::connectors::{get_connector, get_connector_info_table};
13
use dozer_sql::pipeline::builder::statement_to_pipeline;
14
use dozer_sql::pipeline::builder::{OutputNodeInfo, QueryContext, SchemaSQLContext};
15
use dozer_types::indicatif::MultiProgress;
16
use dozer_types::log::debug;
17
use dozer_types::models::api_endpoint::ApiEndpoint;
18
use dozer_types::models::connection::Connection;
19
use dozer_types::models::flags::Flags;
20
use dozer_types::models::source::Source;
21
use dozer_types::parking_lot::Mutex;
22
use std::hash::Hash;
23
use tokio::runtime::Runtime;
24

25
use crate::pipeline::dummy_sink::DummySinkFactory;
26
use crate::pipeline::LogSinkFactory;
27
use crate::shutdown::ShutdownReceiver;
28
use crate::ui_helper::transform_to_ui_graph;
29

30
use super::source_builder::SourceBuilder;
31
use crate::errors::OrchestrationError;
32
use dozer_types::log::info;
33
use metrics::{describe_counter, increment_counter};
34
use OrchestrationError::ExecutionError;
35

36
pub enum OutputTableInfo {
37
    Transformed(OutputNodeInfo),
38
    Original(OriginalTableInfo),
39
}
40

41
pub struct OriginalTableInfo {
42
    pub table_name: String,
43
    pub connection_name: String,
44
}
45

46
pub struct CalculatedSources {
47
    pub original_sources: Vec<String>,
48
    pub transformed_sources: Vec<String>,
49
    pub query_context: Option<QueryContext>,
50
}
51

52
type OptionLog = Option<Arc<Mutex<Log>>>;
53

54
pub struct PipelineBuilder<'a> {
55
    connections: &'a [Connection],
56
    sources: &'a [Source],
57
    sql: Option<&'a str>,
58
    /// `ApiEndpoint` and its log.
59
    endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
60
    progress: MultiProgress,
61
    flags: Flags,
62
}
63

×
64
impl<'a> PipelineBuilder<'a> {
×
65
    pub fn new(
61✔
66
        connections: &'a [Connection],
61✔
67
        sources: &'a [Source],
61✔
68
        sql: Option<&'a str>,
61✔
69
        endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
61✔
70
        progress: MultiProgress,
61✔
71
        flags: Flags,
61✔
72
    ) -> Self {
61✔
73
        Self {
61✔
74
            connections,
61✔
75
            sources,
61✔
76
            sql,
61✔
77
            endpoint_and_logs,
61✔
78
            progress,
61✔
79
            flags,
61✔
80
        }
61✔
81
    }
61✔
82

×
83
    // Based on used_sources, map it to the connection name and create sources
×
84
    // For not breaking current functionality, current format is to be still supported.
×
85
    pub async fn get_grouped_tables(
61✔
86
        &self,
61✔
87
        original_sources: &[String],
61✔
88
    ) -> Result<HashMap<Connection, Vec<Source>>, OrchestrationError> {
61✔
89
        let mut grouped_connections: HashMap<Connection, Vec<Source>> = HashMap::new();
61✔
90

61✔
91
        let mut connector_map = HashMap::new();
61✔
92
        for connection in self.connections {
122✔
93
            let connector = get_connector(connection.clone())?;
61✔
94

95
            if let Ok(info_table) = get_connector_info_table(connection) {
61✔
96
                info!("[{}] Connection parameters\n{info_table}", connection.name);
5✔
97
            }
51✔
98

×
99
            let connector_tables = connector.list_tables().await?;
61✔
100

×
101
            // override source name if specified
×
102
            let connector_tables: Vec<Source> = connector_tables
61✔
103
                .iter()
61✔
104
                .map(|table| {
82✔
105
                    match self.sources.iter().find(|s| {
83✔
106
                        // TODO: @dario - Replace this line with the actual schema parsed from SQL
73✔
107
                        s.connection == connection.name && s.table_name == table.name
73✔
108
                    }) {
83✔
109
                        Some(source) => source.clone(),
62✔
110
                        None => Source {
20✔
111
                            name: table.name.clone(),
20✔
112
                            table_name: table.name.clone(),
20✔
113
                            schema: table.schema.clone(),
20✔
114
                            connection: connection.name.clone(),
20✔
115
                            ..Default::default()
20✔
116
                        },
20✔
117
                    }
×
118
                })
82✔
119
                .collect();
61✔
120

61✔
121
            connector_map.insert(connection.clone(), connector_tables);
61✔
122
        }
×
123

×
124
        for table_name in original_sources {
143✔
125
            let mut table_found = false;
82✔
126
            for (connection, tables) in connector_map.iter() {
82✔
127
                if let Some(source) = tables
82✔
128
                    .iter()
82✔
129
                    .find(|table| table.name == table_name.as_str())
103✔
130
                {
82✔
131
                    table_found = true;
82✔
132
                    grouped_connections
82✔
133
                        .entry(connection.clone())
82✔
134
                        .or_default()
82✔
135
                        .push(source.clone());
82✔
136
                }
82✔
137
            }
×
138

×
139
            if !table_found {
82✔
140
                return Err(OrchestrationError::SourceValidationError(
×
141
                    table_name.to_string(),
×
142
                ));
×
143
            }
82✔
144
        }
145

146
        Ok(grouped_connections)
61✔
147
    }
61✔
148

×
149
    // This function is used to figure out the sources that are used in the pipeline
×
150
    // based on the SQL and API Endpoints
×
151
    pub fn calculate_sources(&self) -> Result<CalculatedSources, OrchestrationError> {
60✔
152
        let mut original_sources = vec![];
60✔
153

60✔
154
        let mut query_ctx = None;
60✔
155
        let mut pipeline = AppPipeline::new((&self.flags).into());
60✔
156

60✔
157
        let mut transformed_sources = vec![];
60✔
158

159
        if let Some(sql) = &self.sql {
60✔
160
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
20✔
161
                .map_err(OrchestrationError::PipelineError)?;
20✔
162

×
163
            query_ctx = Some(query_context.clone());
20✔
164

×
165
            for (name, _) in query_context.output_tables_map {
40✔
166
                if transformed_sources.contains(&name) {
20✔
167
                    return Err(OrchestrationError::DuplicateTable(name));
×
168
                }
20✔
169
                transformed_sources.push(name.clone());
20✔
170
            }
×
171

×
172
            for name in query_context.used_sources {
70✔
173
                // Add all source tables to input tables
50✔
174
                original_sources.push(name);
50✔
175
            }
50✔
176
        }
40✔
177

×
178
        // Add Used Souces if direct from source
×
179
        for (api_endpoint, _) in &self.endpoint_and_logs {
120✔
180
            let table_name = &api_endpoint.table_name;
60✔
181

60✔
182
            // Don't add if the table is a result of SQL
60✔
183
            if !transformed_sources.contains(table_name) {
60✔
184
                original_sources.push(table_name.clone());
40✔
185
            }
40✔
186
        }
×
187
        dedup(&mut original_sources);
60✔
188
        dedup(&mut transformed_sources);
60✔
189

60✔
190
        Ok(CalculatedSources {
60✔
191
            original_sources,
60✔
192
            transformed_sources,
60✔
193
            query_context: query_ctx,
60✔
194
        })
60✔
195
    }
60✔
196

×
197
    // This function is used by both building and actual execution
×
198
    pub async fn build(
60✔
199
        self,
60✔
200
        runtime: &Arc<Runtime>,
60✔
201
        shutdown: ShutdownReceiver,
60✔
202
    ) -> Result<dozer_core::Dag<SchemaSQLContext>, OrchestrationError> {
60✔
203
        let calculated_sources = self.calculate_sources()?;
60✔
204

×
205
        debug!("Used Sources: {:?}", calculated_sources.original_sources);
×
206
        let grouped_connections = self
60✔
207
            .get_grouped_tables(&calculated_sources.original_sources)
60✔
208
            .await?;
×
209

×
210
        let mut pipelines: Vec<AppPipeline<SchemaSQLContext>> = vec![];
60✔
211

60✔
212
        let mut pipeline = AppPipeline::new(self.flags.into());
60✔
213

60✔
214
        let mut available_output_tables: HashMap<String, OutputTableInfo> = HashMap::new();
60✔
215

×
216
        // Add all source tables to available output tables
×
217
        for (connection, sources) in &grouped_connections {
120✔
218
            for source in sources {
140✔
219
                available_output_tables.insert(
80✔
220
                    source.name.clone(),
80✔
221
                    OutputTableInfo::Original(OriginalTableInfo {
80✔
222
                        connection_name: connection.name.to_string(),
80✔
223
                        table_name: source.name.clone(),
80✔
224
                    }),
80✔
225
                );
80✔
226
            }
80✔
227
        }
×
228

229
        if let Some(sql) = &self.sql {
60✔
230
            let query_context = statement_to_pipeline(sql, &mut pipeline, None)
20✔
231
                .map_err(OrchestrationError::PipelineError)?;
20✔
232

×
233
            for (name, table_info) in query_context.output_tables_map {
40✔
234
                if available_output_tables.contains_key(name.as_str()) {
20✔
235
                    return Err(OrchestrationError::DuplicateTable(name));
×
236
                }
20✔
237
                available_output_tables
20✔
238
                    .insert(name.clone(), OutputTableInfo::Transformed(table_info));
20✔
239
            }
×
240
        }
40✔
241

×
242
        for (api_endpoint, log) in self.endpoint_and_logs {
120✔
243
            let table_name = &api_endpoint.table_name;
60✔
244

245
            let table_info = available_output_tables
60✔
246
                .get(table_name)
60✔
247
                .ok_or_else(|| OrchestrationError::EndpointTableNotFound(table_name.clone()))?;
60✔
248

×
249
            let snk_factory: Box<dyn SinkFactory<SchemaSQLContext>> = if let Some(log) = log {
60✔
250
                Box::new(LogSinkFactory::new(
30✔
251
                    runtime.clone(),
30✔
252
                    log,
30✔
253
                    api_endpoint.name.clone(),
30✔
254
                    self.progress.clone(),
30✔
255
                ))
30✔
256
            } else {
×
257
                Box::new(DummySinkFactory)
30✔
258
            };
×
259

×
260
            match table_info {
60✔
261
                OutputTableInfo::Transformed(table_info) => {
20✔
262
                    pipeline.add_sink(snk_factory, api_endpoint.name.as_str(), None);
20✔
263

20✔
264
                    pipeline.connect_nodes(
20✔
265
                        &table_info.node,
20✔
266
                        table_info.port,
20✔
267
                        api_endpoint.name.as_str(),
20✔
268
                        DEFAULT_PORT_HANDLE,
20✔
269
                    );
20✔
270
                }
20✔
271
                OutputTableInfo::Original(table_info) => {
40✔
272
                    pipeline.add_sink(
40✔
273
                        snk_factory,
40✔
274
                        api_endpoint.name.as_str(),
40✔
275
                        Some(PipelineEntryPoint::new(
40✔
276
                            table_info.table_name.clone(),
40✔
277
                            DEFAULT_PORT_HANDLE,
40✔
278
                        )),
40✔
279
                    );
40✔
280
                }
40✔
281
            }
×
282
        }
×
283

×
284
        pipelines.push(pipeline);
60✔
285

60✔
286
        let source_builder = SourceBuilder::new(grouped_connections, Some(&self.progress));
60✔
287
        let asm = source_builder
60✔
288
            .build_source_manager(runtime, shutdown)
60✔
289
            .await?;
120✔
290
        let mut app = App::new(asm);
60✔
291

60✔
292
        Vec::into_iter(pipelines).for_each(|p| {
60✔
293
            app.add_pipeline(p);
60✔
294
        });
60✔
295

×
296
        let dag = app.into_dag().map_err(ExecutionError)?;
60✔
297

×
298
        // Emit metrics for monitoring
×
299
        emit_dag_metrics(&dag);
60✔
300

60✔
301
        Ok(dag)
60✔
302
    }
60✔
303
}
×
304

×
305
fn dedup<T: Eq + Hash + Clone>(v: &mut Vec<T>) {
120✔
306
    let mut uniques = HashSet::new();
120✔
307
    v.retain(|e| uniques.insert(e.clone()));
120✔
308
}
120✔
309

×
310
pub fn emit_dag_metrics(input_dag: &Dag<SchemaSQLContext>) {
60✔
311
    const GRAPH_NODES: &str = "pipeline_nodes";
60✔
312
    const GRAPH_EDGES: &str = "pipeline_edges";
60✔
313

60✔
314
    describe_counter!(GRAPH_NODES, "Number of nodes in the pipeline");
60✔
315
    describe_counter!(GRAPH_EDGES, "Number of edges in the pipeline");
60✔
316

×
317
    let query_graph = transform_to_ui_graph(input_dag);
60✔
318

×
319
    for node in query_graph.nodes {
260✔
320
        let node_name = node.name;
200✔
321
        let node_type = node.node_type;
200✔
322
        let node_idx = node.idx;
200✔
323
        let node_id = node.id;
200✔
324
        let node_data = node.data;
200✔
325

200✔
326
        let labels: [(&str, String); 5] = [
200✔
327
            ("node_name", node_name),
200✔
328
            ("node_type", node_type.to_string()),
200✔
329
            ("node_idx", node_idx.to_string()),
200✔
330
            ("node_id", node_id.to_string()),
200✔
331
            ("node_data", node_data.to_string()),
200✔
332
        ];
200✔
333

200✔
334
        increment_counter!(GRAPH_NODES, &labels);
200✔
335
    }
×
336

×
337
    for edge in query_graph.edges {
290✔
338
        let from = edge.from;
230✔
339
        let to = edge.to;
230✔
340

230✔
341
        let labels: [(&str, String); 2] = [("from", from.to_string()), ("to", to.to_string())];
230✔
342

230✔
343
        increment_counter!(GRAPH_EDGES, &labels);
230✔
344
    }
345
}
60✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc