• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.97
/dozer-cli/src/pipeline/builder.rs
1
use std::collections::HashMap;
2
use std::collections::HashSet;
3
use std::sync::Arc;
4

5
use dozer_cache::dozer_log::replication::Log;
6
use dozer_core::app::App;
7
use dozer_core::app::AppPipeline;
8
use dozer_core::app::PipelineEntryPoint;
9
use dozer_core::node::SinkFactory;
10
use dozer_core::DEFAULT_PORT_HANDLE;
11
use dozer_ingestion::connectors::{get_connector, get_connector_info_table};
12
use dozer_sql::builder::statement_to_pipeline;
13
use dozer_sql::builder::{OutputNodeInfo, QueryContext};
14
use dozer_tracing::LabelsAndProgress;
15
use dozer_types::log::debug;
16
use dozer_types::models::api_endpoint::ApiEndpoint;
17
use dozer_types::models::connection::Connection;
18
use dozer_types::models::flags::Flags;
19
use dozer_types::models::source::Source;
20
use dozer_types::models::udf_config::UdfConfig;
21
use std::hash::Hash;
22
use tokio::runtime::Runtime;
23
use tokio::sync::Mutex;
24

25
use crate::pipeline::dummy_sink::DummySinkFactory;
26
use crate::pipeline::LogSinkFactory;
27
use crate::shutdown::ShutdownReceiver;
28

29
use super::source_builder::SourceBuilder;
30
use crate::errors::OrchestrationError;
31
use dozer_types::log::info;
32
use OrchestrationError::ExecutionError;
33

34
pub enum OutputTableInfo {
35
    Transformed(OutputNodeInfo),
36
    Original(OriginalTableInfo),
37
}
38

39
pub struct OriginalTableInfo {
40
    pub table_name: String,
41
    pub connection_name: String,
42
}
43

44
pub struct CalculatedSources {
45
    pub original_sources: Vec<String>,
46
    pub transformed_sources: Vec<String>,
47
    pub query_context: Option<QueryContext>,
48
}
49

50
type OptionLog = Option<Arc<Mutex<Log>>>;
51

52
pub struct PipelineBuilder<'a> {
53
    connections: &'a [Connection],
54
    sources: &'a [Source],
55
    sql: Option<&'a str>,
56
    /// `ApiEndpoint` and its log.
57
    endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
58
    labels: LabelsAndProgress,
59
    flags: Flags,
60
    udfs: &'a [UdfConfig],
61
}
62

63
impl<'a> PipelineBuilder<'a> {
64
    pub fn new(
61✔
65
        connections: &'a [Connection],
61✔
66
        sources: &'a [Source],
61✔
67
        sql: Option<&'a str>,
61✔
68
        endpoint_and_logs: Vec<(ApiEndpoint, OptionLog)>,
61✔
69
        labels: LabelsAndProgress,
61✔
70
        flags: Flags,
61✔
71
        udfs: &'a [UdfConfig],
61✔
72
    ) -> Self {
61✔
73
        Self {
61✔
74
            connections,
61✔
75
            sources,
61✔
76
            sql,
61✔
77
            endpoint_and_logs,
61✔
78
            labels,
61✔
79
            flags,
61✔
80
            udfs,
61✔
81
        }
61✔
82
    }
61✔
83

84
    // Based on used_sources, map it to the connection name and create sources
85
    // For not breaking current functionality, current format is to be still supported.
86
    pub async fn get_grouped_tables(
61✔
87
        &self,
61✔
88
        original_sources: &[String],
61✔
89
    ) -> Result<HashMap<Connection, Vec<Source>>, OrchestrationError> {
61✔
90
        let mut grouped_connections: HashMap<Connection, Vec<Source>> = HashMap::new();
61✔
91

61✔
92
        let mut connector_map = HashMap::new();
61✔
93
        for connection in self.connections {
122✔
94
            let connector = get_connector(connection.clone())?;
61✔
95

96
            if let Some(info_table) = get_connector_info_table(connection) {
61✔
97
                info!("[{}] Connection parameters\n{info_table}", connection.name);
5✔
98
            }
51✔
99

100
            let connector_tables = connector.list_tables().await?;
61✔
101

102
            // override source name if specified
103
            let connector_tables: Vec<Source> = connector_tables
61✔
104
                .iter()
61✔
105
                .map(|table| {
82✔
106
                    match self.sources.iter().find(|s| {
83✔
107
                        // TODO: @dario - Replace this line with the actual schema parsed from SQL
73✔
108
                        s.connection == connection.name && s.table_name == table.name
73✔
109
                    }) {
83✔
110
                        Some(source) => source.clone(),
62✔
111
                        None => Source {
20✔
112
                            name: table.name.clone(),
20✔
113
                            table_name: table.name.clone(),
20✔
114
                            schema: table.schema.clone(),
20✔
115
                            connection: connection.name.clone(),
20✔
116
                            ..Default::default()
20✔
117
                        },
20✔
118
                    }
119
                })
82✔
120
                .collect();
61✔
121

61✔
122
            connector_map.insert(connection.clone(), connector_tables);
61✔
123
        }
124

125
        for table_name in original_sources {
143✔
126
            let mut table_found = false;
82✔
127
            for (connection, tables) in connector_map.iter() {
82✔
128
                if let Some(source) = tables
82✔
129
                    .iter()
82✔
130
                    .find(|table| table.name == table_name.as_str())
103✔
131
                {
82✔
132
                    table_found = true;
82✔
133
                    grouped_connections
82✔
134
                        .entry(connection.clone())
82✔
135
                        .or_default()
82✔
136
                        .push(source.clone());
82✔
137
                }
82✔
138
            }
139

140
            if !table_found {
82✔
141
                return Err(OrchestrationError::SourceValidationError(
×
142
                    table_name.to_string(),
×
143
                ));
×
144
            }
82✔
145
        }
146

147
        Ok(grouped_connections)
61✔
148
    }
61✔
149

150
    // This function is used to figure out the sources that are used in the pipeline
151
    // based on the SQL and API Endpoints
152
    pub fn calculate_sources(&self) -> Result<CalculatedSources, OrchestrationError> {
60✔
153
        let mut original_sources = vec![];
60✔
154

60✔
155
        let mut query_ctx = None;
60✔
156
        let mut pipeline = AppPipeline::new((&self.flags).into());
60✔
157

60✔
158
        let mut transformed_sources = vec![];
60✔
159

160
        if let Some(sql) = &self.sql {
60✔
161
            let query_context = statement_to_pipeline(sql, &mut pipeline, None, self.udfs.to_vec())
20✔
162
                .map_err(OrchestrationError::PipelineError)?;
20✔
163

164
            query_ctx = Some(query_context.clone());
20✔
165

20✔
166
            transformed_sources = query_context.output_tables_map.keys().cloned().collect();
20✔
167

168
            for name in query_context.used_sources {
70✔
169
                // Add all source tables to input tables
50✔
170
                original_sources.push(name);
50✔
171
            }
50✔
172
        }
40✔
173

174
        // Add Used Souces if direct from source
175
        for (api_endpoint, _) in &self.endpoint_and_logs {
120✔
176
            let table_name = &api_endpoint.table_name;
60✔
177

60✔
178
            // Don't add if the table is a result of SQL
60✔
179
            if !transformed_sources.contains(table_name) {
60✔
180
                original_sources.push(table_name.clone());
40✔
181
            }
40✔
182
        }
183
        dedup(&mut original_sources);
60✔
184
        dedup(&mut transformed_sources);
60✔
185

60✔
186
        Ok(CalculatedSources {
60✔
187
            original_sources,
60✔
188
            transformed_sources,
60✔
189
            query_context: query_ctx,
60✔
190
        })
60✔
191
    }
60✔
192

193
    // This function is used by both building and actual execution
194
    pub async fn build(
60✔
195
        self,
60✔
196
        runtime: &Arc<Runtime>,
60✔
197
        shutdown: ShutdownReceiver,
60✔
198
    ) -> Result<dozer_core::Dag, OrchestrationError> {
60✔
199
        let calculated_sources = self.calculate_sources()?;
60✔
200

201
        debug!("Used Sources: {:?}", calculated_sources.original_sources);
×
202
        let grouped_connections = self
60✔
203
            .get_grouped_tables(&calculated_sources.original_sources)
60✔
204
            .await?;
×
205

206
        let mut pipelines: Vec<AppPipeline> = vec![];
60✔
207

60✔
208
        let mut pipeline = AppPipeline::new(self.flags.into());
60✔
209

60✔
210
        let mut available_output_tables: HashMap<String, OutputTableInfo> = HashMap::new();
60✔
211

212
        // Add all source tables to available output tables
213
        for (connection, sources) in &grouped_connections {
120✔
214
            for source in sources {
140✔
215
                available_output_tables.insert(
80✔
216
                    source.name.clone(),
80✔
217
                    OutputTableInfo::Original(OriginalTableInfo {
80✔
218
                        connection_name: connection.name.to_string(),
80✔
219
                        table_name: source.name.clone(),
80✔
220
                    }),
80✔
221
                );
80✔
222
            }
80✔
223
        }
224

225
        if let Some(sql) = &self.sql {
60✔
226
            let query_context = statement_to_pipeline(sql, &mut pipeline, None, self.udfs.to_vec())
20✔
227
                .map_err(OrchestrationError::PipelineError)?;
20✔
228

229
            for (name, table_info) in query_context.output_tables_map {
40✔
230
                available_output_tables
20✔
231
                    .insert(name.clone(), OutputTableInfo::Transformed(table_info));
20✔
232
            }
20✔
233
        }
40✔
234

235
        // Check if all output tables are used.
236
        for (table_name, table_info) in &available_output_tables {
160✔
237
            if matches!(table_info, OutputTableInfo::Transformed(_))
100✔
238
                && !self
20✔
239
                    .endpoint_and_logs
20✔
240
                    .iter()
20✔
241
                    .any(|(endpoint, _)| &endpoint.table_name == table_name)
20✔
242
            {
243
                return Err(OrchestrationError::OutputTableNotUsed(table_name.clone()));
×
244
            }
100✔
245
        }
246

247
        for (api_endpoint, log) in self.endpoint_and_logs {
120✔
248
            let table_name = &api_endpoint.table_name;
60✔
249

250
            let table_info = available_output_tables
60✔
251
                .get(table_name)
60✔
252
                .ok_or_else(|| OrchestrationError::EndpointTableNotFound(table_name.clone()))?;
60✔
253

254
            let snk_factory: Box<dyn SinkFactory> = if let Some(log) = log {
60✔
255
                Box::new(LogSinkFactory::new(
30✔
256
                    runtime.clone(),
30✔
257
                    log,
30✔
258
                    api_endpoint.name.clone(),
30✔
259
                    self.labels.clone(),
30✔
260
                ))
30✔
261
            } else {
262
                Box::new(DummySinkFactory)
30✔
263
            };
264

265
            match table_info {
60✔
266
                OutputTableInfo::Transformed(table_info) => {
20✔
267
                    pipeline.add_sink(snk_factory, api_endpoint.name.as_str(), None);
20✔
268

20✔
269
                    pipeline.connect_nodes(
20✔
270
                        &table_info.node,
20✔
271
                        table_info.port,
20✔
272
                        api_endpoint.name.as_str(),
20✔
273
                        DEFAULT_PORT_HANDLE,
20✔
274
                    );
20✔
275
                }
20✔
276
                OutputTableInfo::Original(table_info) => {
40✔
277
                    pipeline.add_sink(
40✔
278
                        snk_factory,
40✔
279
                        api_endpoint.name.as_str(),
40✔
280
                        Some(PipelineEntryPoint::new(
40✔
281
                            table_info.table_name.clone(),
40✔
282
                            DEFAULT_PORT_HANDLE,
40✔
283
                        )),
40✔
284
                    );
40✔
285
                }
40✔
286
            }
287
        }
288

289
        pipelines.push(pipeline);
60✔
290

60✔
291
        let source_builder = SourceBuilder::new(grouped_connections, self.labels);
60✔
292
        let asm = source_builder
60✔
293
            .build_source_manager(runtime, shutdown)
60✔
294
            .await?;
240✔
295
        let mut app = App::new(asm);
60✔
296

60✔
297
        Vec::into_iter(pipelines).for_each(|p| {
60✔
298
            app.add_pipeline(p);
60✔
299
        });
60✔
300

301
        let dag = app.into_dag().map_err(ExecutionError)?;
60✔
302

303
        Ok(dag)
60✔
304
    }
60✔
305
}
306

307
fn dedup<T: Eq + Hash + Clone>(v: &mut Vec<T>) {
120✔
308
    let mut uniques = HashSet::new();
120✔
309
    v.retain(|e| uniques.insert(e.clone()));
120✔
310
}
120✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc