• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 3977575075

pending completion
3977575075

push

github

GitHub
Chore/apply flags config (#664)

139 of 139 new or added lines in 10 files covered. (100.0%)

22504 of 34428 relevant lines covered (65.37%)

72228.09 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/simple/executor.rs
1
use dozer_api::grpc::internal_grpc::PipelineResponse;
2
use dozer_core::dag::app::{App, AppPipeline};
3
use dozer_sql::pipeline::builder::{self, statement_to_pipeline};
4
use dozer_types::indicatif::MultiProgress;
5
use dozer_types::types::{Operation, SchemaWithChangesType};
6
use std::collections::HashMap;
7
use std::path::PathBuf;
8
use std::sync::atomic::AtomicBool;
9
use std::sync::Arc;
10

11
use dozer_api::CacheEndpoint;
12
use dozer_types::models::source::Source;
13

14
use crate::pipeline::{CacheSinkFactory, CacheSinkSettings, StreamingSinkFactory};
15
use dozer_core::dag::dag::DEFAULT_PORT_HANDLE;
16
use dozer_core::dag::executor::{DagExecutor, ExecutorOptions};
17
use dozer_ingestion::connectors::{get_connector, get_connector_info_table, TableInfo};
18

19
use dozer_ingestion::ingestion::{IngestionIterator, Ingestor};
20

21
use dozer_types::crossbeam;
22
use dozer_types::log::{error, info};
23

24
use dozer_types::models::connection::Connection;
25
use dozer_types::parking_lot::RwLock;
26
use OrchestrationError::ExecutionError;
27

28
use crate::console_helper::get_colored_text;
29
use crate::errors::OrchestrationError;
30
use crate::pipeline::source_builder::SourceBuilder;
31
use crate::{validate, validate_schema};
32

33
pub struct Executor {
34
    sources: Vec<Source>,
35
    cache_endpoints: Vec<CacheEndpoint>,
36
    pipeline_dir: PathBuf,
37
    ingestor: Arc<RwLock<Ingestor>>,
38
    iterator: Arc<RwLock<IngestionIterator>>,
39
    running: Arc<AtomicBool>,
40
    progress: MultiProgress,
41
}
42
impl Executor {
43
    pub fn new(
×
44
        sources: Vec<Source>,
×
45
        cache_endpoints: Vec<CacheEndpoint>,
×
46
        ingestor: Arc<RwLock<Ingestor>>,
×
47
        iterator: Arc<RwLock<IngestionIterator>>,
×
48
        running: Arc<AtomicBool>,
×
49
        pipeline_dir: PathBuf,
×
50
    ) -> Self {
×
51
        Self {
×
52
            sources,
×
53
            cache_endpoints,
×
54
            pipeline_dir,
×
55
            ingestor,
×
56
            iterator,
×
57
            running,
×
58
            progress: MultiProgress::new(),
×
59
        }
×
60
    }
×
61

62
    pub fn get_connection_groups(&self) -> HashMap<String, Vec<Source>> {
×
63
        SourceBuilder::group_connections(self.sources.clone())
×
64
    }
×
65

66
    pub fn validate_grouped_connections(
×
67
        grouped_connections: &HashMap<String, Vec<Source>>,
×
68
    ) -> Result<(), OrchestrationError> {
×
69
        for sources_group in grouped_connections.values() {
×
70
            let first_source = sources_group.get(0).unwrap();
×
71

72
            if let Some(connection) = &first_source.connection {
×
73
                let tables: Vec<TableInfo> = sources_group
×
74
                    .iter()
×
75
                    .map(|source| TableInfo {
×
76
                        name: source.name.clone(),
×
77
                        table_name: source.table_name.clone(),
×
78
                        id: 0,
×
79
                        columns: Some(source.columns.clone()),
×
80
                    })
×
81
                    .collect();
×
82

83
                if let Some(info_table) = get_connector_info_table(connection) {
×
84
                    info!("[{}] Connection parameters", connection.name);
×
85
                    info_table.printstd();
×
86
                }
×
87

88
                validate(connection.clone(), Some(tables.clone()))
×
89
                    .map_err(|e| {
×
90
                        error!(
×
91
                            "[{}] {} Connection validation error: {}",
×
92
                            connection.name,
×
93
                            get_colored_text("X", "31"),
×
94
                            e
95
                        );
96
                        OrchestrationError::SourceValidationError
×
97
                    })
×
98
                    .map(|_| {
×
99
                        info!(
×
100
                            "[{}] {} Connection validation completed",
×
101
                            connection.name,
×
102
                            get_colored_text("✓", "32")
×
103
                        );
104
                    })?;
×
105

106
                validate_schema(connection.clone(), &tables).map_or_else(
×
107
                    |e| {
×
108
                        error!(
×
109
                            "[{}] {} Schema validation error: {}",
×
110
                            connection.name,
×
111
                            get_colored_text("X", "31"),
×
112
                            e
113
                        );
114
                        Err(OrchestrationError::SourceValidationError)
×
115
                    },
×
116
                    |r| {
×
117
                        let mut all_valid = true;
×
118
                        for (table_name, validation_result) in r.into_iter() {
×
119
                            let is_valid =
×
120
                                validation_result.iter().all(|(_, result)| result.is_ok());
×
121

×
122
                            if is_valid {
×
123
                                info!(
×
124
                                    "[{}][{}] {} Schema validation completed",
×
125
                                    connection.name,
×
126
                                    table_name,
×
127
                                    get_colored_text("✓", "32")
×
128
                                );
129
                            } else {
130
                                all_valid = false;
×
131
                                for (_, error) in validation_result {
×
132
                                    if let Err(e) = error {
×
133
                                        error!(
×
134
                                            "[{}][{}] {} Schema validation error: {}",
×
135
                                            connection.name,
×
136
                                            table_name,
×
137
                                            get_colored_text("X", "31"),
×
138
                                            e
139
                                        );
140
                                    }
×
141
                                }
142
                            }
143
                        }
144

145
                        if !all_valid {
×
146
                            return Err(OrchestrationError::SourceValidationError);
×
147
                        }
×
148

×
149
                        Ok(())
×
150
                    },
×
151
                )?;
×
152
            }
×
153
        }
154

155
        Ok(())
×
156
    }
×
157

158
    // This function is used to run a query using a temporary pipeline
159
    pub fn query(
×
160
        &self,
×
161
        sql: String,
×
162
        sender: crossbeam::channel::Sender<Operation>,
×
163
    ) -> Result<dozer_core::dag::dag::Dag, OrchestrationError> {
×
164
        let grouped_connections = self.get_connection_groups();
×
165

166
        let (mut pipeline, (query_name, query_port)) =
×
167
            statement_to_pipeline(&sql).map_err(OrchestrationError::PipelineError)?;
×
168
        pipeline.add_sink(
×
169
            Arc::new(StreamingSinkFactory::new(sender)),
×
170
            "streaming_sink",
×
171
        );
×
172
        pipeline
×
173
            .connect_nodes(
×
174
                &query_name,
×
175
                Some(query_port),
×
176
                "streaming_sink",
×
177
                Some(DEFAULT_PORT_HANDLE),
×
178
            )
×
179
            .map_err(OrchestrationError::ExecutionError)?;
×
180

×
181
        let used_sources: Vec<String> = pipeline.get_entry_points_sources_names();
×
182

×
183
        let asm = SourceBuilder::build_source_manager(
×
184
            used_sources,
×
185
            grouped_connections,
×
186
            self.ingestor.clone(),
×
187
            self.iterator.clone(),
×
188
            self.running.clone(),
×
189
        )?;
×
190
        let mut app = App::new(asm);
×
191
        app.add_pipeline(pipeline);
×
192

×
193
        let dag = app.get_dag().map_err(OrchestrationError::ExecutionError)?;
×
194
        let path = &self.pipeline_dir;
×
195
        let mut exec = DagExecutor::new(
×
196
            &dag,
×
197
            path.as_path(),
×
198
            ExecutorOptions::default(),
×
199
            self.running.clone(),
×
200
        )?;
×
201

×
202
        exec.start()?;
×
203
        Ok(dag)
×
204
    }
×
205

×
206
    // This function is used by both migrate and actual execution
207
    pub fn build_pipeline(
×
208
        &self,
×
209
        notifier: Option<crossbeam::channel::Sender<PipelineResponse>>,
×
210
        api_dir: PathBuf,
×
211
        settings: CacheSinkSettings,
×
212
    ) -> Result<dozer_core::dag::dag::Dag, OrchestrationError> {
×
213
        let grouped_connections = self.get_connection_groups();
×
214

×
215
        Self::validate_grouped_connections(&grouped_connections)?;
×
216

×
217
        let mut pipelines: Vec<AppPipeline> = vec![];
×
218
        let mut used_sources = vec![];
×
219
        for cache_endpoint in self.cache_endpoints.iter().cloned() {
×
220
            let api_endpoint = cache_endpoint.endpoint.clone();
×
221
            let _api_endpoint_name = api_endpoint.name.clone();
×
222
            let cache = cache_endpoint.cache;
×
223

×
224
            // let mut pipeline = PipelineBuilder {}
225
            //     .build_pipeline(&api_endpoint.sql)
×
226
            //     .map_err(OrchestrationError::PipelineError)?;
×
227

×
228
            let (mut pipeline, (query_name, query_port)) =
×
229
                builder::statement_to_pipeline(&api_endpoint.sql)
×
230
                    .map_err(OrchestrationError::PipelineError)?;
×
231

×
232
            pipeline.add_sink(
×
233
                Arc::new(CacheSinkFactory::new(
×
234
                    vec![DEFAULT_PORT_HANDLE],
×
235
                    cache,
×
236
                    api_endpoint,
×
237
                    notifier.clone(),
×
238
                    api_dir.clone(),
×
239
                    self.progress.clone(),
×
240
                    settings.to_owned(),
×
241
                )),
×
242
                cache_endpoint.endpoint.name.as_str(),
×
243
            );
×
244

×
245
            pipeline
×
246
                .connect_nodes(
×
247
                    &query_name,
×
248
                    Some(query_port),
×
249
                    cache_endpoint.endpoint.name.as_str(),
×
250
                    Some(DEFAULT_PORT_HANDLE),
×
251
                )
×
252
                .map_err(ExecutionError)?;
×
253

×
254
            for name in pipeline.get_entry_points_sources_names() {
×
255
                used_sources.push(name);
×
256
            }
×
257

258
            pipelines.push(pipeline);
×
259
        }
×
260

×
261
        let asm = SourceBuilder::build_source_manager(
×
262
            used_sources,
×
263
            grouped_connections,
×
264
            self.ingestor.clone(),
×
265
            self.iterator.clone(),
×
266
            self.running.clone(),
×
267
        )?;
×
268
        let mut app = App::new(asm);
×
269

×
270
        Vec::into_iter(pipelines).for_each(|p| {
×
271
            app.add_pipeline(p);
×
272
        });
×
273

×
274
        let dag = app.get_dag().map_err(ExecutionError)?;
×
275

×
276
        DagExecutor::validate(&dag, &self.pipeline_dir)
×
277
            .map(|_| {
×
278
                info!("[pipeline] Validation completed");
×
279
            })
×
280
            .map_err(|e| {
×
281
                error!("[pipeline] Validation error: {}", e);
×
282
                OrchestrationError::PipelineValidationError
×
283
            })?;
×
284

285
        Ok(dag)
×
286
    }
×
287

×
288
    pub fn get_tables(
×
289
        connections: &Vec<Connection>,
×
290
    ) -> Result<HashMap<String, Vec<SchemaWithChangesType>>, OrchestrationError> {
×
291
        let mut schema_map = HashMap::new();
×
292
        for connection in connections {
×
293
            validate(connection.to_owned(), None)?;
×
294

×
295
            let connector = get_connector(connection.to_owned())?;
×
296
            let schema_tuples = connector.get_schemas(None)?;
×
297
            schema_map.insert(connection.name.to_owned(), schema_tuples);
×
298
        }
×
299

300
        Ok(schema_map)
×
301
    }
×
302

×
303
    pub fn run(
×
304
        &self,
×
305
        notifier: Option<crossbeam::channel::Sender<PipelineResponse>>,
×
306
        settings: CacheSinkSettings,
×
307
    ) -> Result<(), OrchestrationError> {
×
308
        let running_wait = self.running.clone();
×
309

×
310
        let parent_dag = self.build_pipeline(notifier, PathBuf::default(), settings)?;
×
311
        let path = &self.pipeline_dir;
×
312

×
313
        if !path.exists() {
×
314
            return Err(OrchestrationError::PipelineDirectoryNotFound(
×
315
                path.to_string_lossy().to_string(),
×
316
            ));
×
317
        }
×
318

×
319
        let mut exec = DagExecutor::new(
×
320
            &parent_dag,
×
321
            path.as_path(),
×
322
            ExecutorOptions::default(),
×
323
            running_wait,
×
324
        )?;
×
325

×
326
        exec.start()?;
×
327
        exec.join().map_err(ExecutionError)
×
328
    }
×
329
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc