• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5709656380

pending completion
5709656380

push

github

web-flow
Version bump (#1808)

45512 of 59772 relevant lines covered (76.14%)

39312.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.91
/dozer-cli/src/simple/orchestrator.rs
1
use super::executor::{run_dag_executor, Executor};
2
use crate::errors::OrchestrationError;
3
use crate::pipeline::PipelineBuilder;
4
use crate::shutdown::ShutdownReceiver;
5
use crate::simple::build;
6
use crate::simple::helper::validate_config;
7
use crate::utils::{
8
    get_api_security_config, get_app_grpc_config, get_cache_manager_options, get_executor_options,
9
    get_grpc_config, get_log_options, get_rest_config,
10
};
11

12
use crate::{flatten_join_handle, join_handle_map_err};
13
use dozer_api::auth::{Access, Authorizer};
14
use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server;
15
use dozer_api::{grpc, rest, CacheEndpoint};
16
use dozer_cache::cache::LmdbRwCacheManager;
17
use dozer_cache::dozer_log::home_dir::HomeDir;
18
use dozer_cache::dozer_log::schemas::BuildSchema;
19
use dozer_core::app::AppPipeline;
20
use dozer_core::dag_schemas::DagSchemas;
21
use futures::future::join_all;
22

23
use crate::console_helper::get_colored_text;
24
use crate::console_helper::GREEN;
25
use crate::console_helper::PURPLE;
26
use crate::console_helper::RED;
27
use dozer_core::errors::ExecutionError;
28
use dozer_ingestion::connectors::{get_connector, SourceSchema, TableInfo};
29
use dozer_sql::pipeline::builder::statement_to_pipeline;
30
use dozer_sql::pipeline::errors::PipelineError;
31
use dozer_types::crossbeam::channel::{self, Sender};
32
use dozer_types::indicatif::{MultiProgress, ProgressDrawTarget};
33
use dozer_types::log::info;
34
use dozer_types::models::config::Config;
35
use dozer_types::tracing::error;
36
use futures::stream::FuturesUnordered;
37
use futures::{FutureExt, StreamExt, TryFutureExt};
38
use metrics::{describe_counter, describe_histogram};
39
use std::collections::HashMap;
40
use std::fs;
41
use std::path::PathBuf;
42

43
use std::sync::Arc;
44
use std::thread;
45
use tokio::runtime::Runtime;
46
use tokio::sync::broadcast;
47

48
#[derive(Clone)]
12✔
49
pub struct SimpleOrchestrator {
50
    pub config: Config,
51
    pub runtime: Arc<Runtime>,
52
    pub multi_pb: MultiProgress,
53
}
54

55
impl SimpleOrchestrator {
56
    pub fn new(config: Config, runtime: Arc<Runtime>) -> Self {
6✔
57
        let progress_draw_target = if atty::is(atty::Stream::Stderr) {
6✔
58
            ProgressDrawTarget::stderr()
×
59
        } else {
60
            ProgressDrawTarget::hidden()
6✔
61
        };
62
        Self {
6✔
63
            config,
6✔
64
            runtime,
6✔
65
            multi_pb: MultiProgress::with_draw_target(progress_draw_target),
6✔
66
        }
6✔
67
    }
6✔
68

69
    pub fn run_api(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
6✔
70
        describe_histogram!(
6✔
71
            dozer_api::API_LATENCY_HISTOGRAM_NAME,
×
72
            "The api processing latency in seconds"
×
73
        );
74
        describe_counter!(
6✔
75
            dozer_api::API_REQUEST_COUNTER_NAME,
×
76
            "Number of requests processed by the api"
×
77
        );
78
        self.runtime.block_on(async {
6✔
79
            let mut futures = FuturesUnordered::new();
6✔
80

6✔
81
            // Open `RoCacheEndpoint`s. Streaming operations if necessary.
6✔
82
            let flags = self.config.flags.clone().unwrap_or_default();
6✔
83
            let (operations_sender, operations_receiver) = if flags.dynamic {
6✔
84
                let (sender, receiver) = broadcast::channel(16);
6✔
85
                (Some(sender), Some(receiver))
6✔
86
            } else {
87
                (None, None)
×
88
            };
89

90
            let internal_grpc_config = get_app_grpc_config(&self.config);
6✔
91
            let app_server_addr = format!(
6✔
92
                "http://{}:{}",
6✔
93
                internal_grpc_config.host, internal_grpc_config.port
6✔
94
            );
6✔
95
            let cache_manager = Arc::new(
6✔
96
                LmdbRwCacheManager::new(get_cache_manager_options(&self.config))
6✔
97
                    .map_err(OrchestrationError::CacheInitFailed)?,
6✔
98
            );
99
            let mut cache_endpoints = vec![];
6✔
100
            for endpoint in &self.config.endpoints {
12✔
101
                let (cache_endpoint, handle) = CacheEndpoint::new(
6✔
102
                    app_server_addr.clone(),
6✔
103
                    &*cache_manager,
6✔
104
                    endpoint.clone(),
6✔
105
                    Box::pin(shutdown.create_shutdown_future()),
6✔
106
                    operations_sender.clone(),
6✔
107
                    Some(self.multi_pb.clone()),
6✔
108
                )
6✔
109
                .await?;
54✔
110
                let cache_name = endpoint.name.clone();
6✔
111
                futures.push(flatten_join_handle(join_handle_map_err(handle, move |e| {
6✔
112
                    if e.is_map_full() {
×
113
                        OrchestrationError::CacheFull(cache_name)
×
114
                    } else {
115
                        OrchestrationError::CacheBuildFailed(cache_name, e)
×
116
                    }
117
                })));
6✔
118
                cache_endpoints.push(Arc::new(cache_endpoint));
6✔
119
            }
120

121
            // Initialize API Server
122
            let rest_config = get_rest_config(&self.config);
6✔
123
            let rest_handle = if rest_config.enabled {
6✔
124
                let security = get_api_security_config(&self.config).cloned();
6✔
125
                let cache_endpoints_for_rest = cache_endpoints.clone();
6✔
126
                let shutdown_for_rest = shutdown.create_shutdown_future();
6✔
127
                let api_server = rest::ApiServer::new(rest_config, security);
6✔
128
                let api_server = api_server
6✔
129
                    .run(cache_endpoints_for_rest, shutdown_for_rest)
6✔
130
                    .map_err(OrchestrationError::ApiInitFailed)?;
6✔
131
                tokio::spawn(api_server.map_err(OrchestrationError::ApiServeFailed))
6✔
132
            } else {
×
133
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
134
            };
135

×
136
            // Initialize gRPC Server
137
            let grpc_config = get_grpc_config(&self.config);
6✔
138
            let grpc_handle = if grpc_config.enabled {
6✔
139
                let api_security = get_api_security_config(&self.config).cloned();
6✔
140
                let grpc_server = grpc::ApiServer::new(grpc_config, api_security, flags);
6✔
141
                let shutdown = shutdown.create_shutdown_future();
6✔
142
                tokio::spawn(async move {
6✔
143
                    grpc_server
6✔
144
                        .run(cache_endpoints, shutdown, operations_receiver)
6✔
145
                        .await
12✔
146
                        .map_err(OrchestrationError::ApiInitFailed)
6✔
147
                })
6✔
148
            } else {
×
149
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
150
            };
151

×
152
            futures.push(flatten_join_handle(rest_handle));
6✔
153
            futures.push(flatten_join_handle(grpc_handle));
6✔
154

×
155
            while let Some(result) = futures.next().await {
24✔
156
                result?;
18✔
157
            }
×
158

×
159
            Ok::<(), OrchestrationError>(())
6✔
160
        })?;
6✔
161

×
162
        Ok(())
6✔
163
    }
6✔
164

×
165
    pub fn run_apps(
6✔
166
        &mut self,
6✔
167
        shutdown: ShutdownReceiver,
6✔
168
        api_notifier: Option<Sender<bool>>,
6✔
169
        err_threshold: Option<u32>,
6✔
170
    ) -> Result<(), OrchestrationError> {
6✔
171
        let mut global_err_threshold: Option<u32> =
6✔
172
            self.config.app.as_ref().and_then(|app| app.err_threshold);
6✔
173
        if err_threshold.is_some() {
6✔
174
            global_err_threshold = err_threshold;
×
175
        }
6✔
176

×
177
        let home_dir = HomeDir::new(self.config.home_dir.as_ref(), self.config.cache_dir.clone());
6✔
178
        let executor = self.runtime.block_on(Executor::new(
6✔
179
            &home_dir,
6✔
180
            &self.config.connections,
6✔
181
            &self.config.sources,
6✔
182
            self.config.sql.as_deref(),
6✔
183
            &self.config.endpoints,
6✔
184
            get_log_options(&self.config),
6✔
185
            self.multi_pb.clone(),
6✔
186
        ))?;
6✔
187
        let dag_executor = executor.create_dag_executor(
6✔
188
            self.runtime.clone(),
6✔
189
            get_executor_options(&self.config, global_err_threshold),
6✔
190
        )?;
6✔
191

×
192
        let app_grpc_config = get_app_grpc_config(&self.config);
6✔
193
        let internal_server_future = start_internal_pipeline_server(
6✔
194
            executor.endpoint_and_logs().to_vec(),
6✔
195
            &app_grpc_config,
6✔
196
            shutdown.create_shutdown_future(),
6✔
197
        );
6✔
198

×
199
        if let Some(api_notifier) = api_notifier {
6✔
200
            api_notifier
6✔
201
                .send(true)
6✔
202
                .expect("Failed to notify API server");
6✔
203
        }
6✔
204

×
205
        let running = shutdown.get_running_flag();
6✔
206
        let pipeline_future = self
6✔
207
            .runtime
6✔
208
            .spawn_blocking(|| run_dag_executor(dag_executor, running));
6✔
209

6✔
210
        let mut futures = FuturesUnordered::new();
6✔
211
        futures.push(
6✔
212
            internal_server_future
6✔
213
                .map_err(OrchestrationError::InternalServerFailed)
6✔
214
                .boxed(),
6✔
215
        );
6✔
216
        futures.push(flatten_join_handle(pipeline_future).boxed());
6✔
217

6✔
218
        self.runtime.block_on(async move {
6✔
219
            while let Some(result) = futures.next().await {
24✔
220
                result?;
12✔
221
            }
×
222
            Ok(())
6✔
223
        })
6✔
224
    }
6✔
225

×
226
    #[allow(clippy::type_complexity)]
×
227
    pub fn list_connectors(
×
228
        &self,
×
229
    ) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError> {
×
230
        self.runtime.block_on(async {
×
231
            let mut schema_map = HashMap::new();
×
232
            for connection in &self.config.connections {
×
233
                let connector = get_connector(connection.clone())?;
×
234
                let schema_tuples = connector.list_all_schemas().await?;
×
235
                schema_map.insert(connection.name.clone(), schema_tuples);
×
236
            }
×
237

×
238
            Ok(schema_map)
×
239
        })
×
240
    }
×
241

×
242
    pub fn generate_token(&self) -> Result<String, OrchestrationError> {
×
243
        if let Some(api_config) = &self.config.api {
×
244
            if let Some(api_security) = &api_config.api_security {
×
245
                match api_security {
×
246
                    dozer_types::models::api_security::ApiSecurity::Jwt(secret) => {
×
247
                        let auth = Authorizer::new(secret, None, None);
×
248
                        let token = auth
×
249
                            .generate_token(Access::All, None)
×
250
                            .map_err(OrchestrationError::GenerateTokenFailed)?;
×
251
                        return Ok(token);
×
252
                    }
×
253
                }
×
254
            }
×
255
        }
×
256
        Err(OrchestrationError::MissingSecurityConfig)
×
257
    }
×
258

×
259
    pub fn build(&mut self, force: bool) -> Result<(), OrchestrationError> {
6✔
260
        let home_dir = HomeDir::new(self.config.home_dir.as_ref(), self.config.cache_dir.clone());
6✔
261

6✔
262
        info!(
6✔
263
            "Initiating app: {}",
×
264
            get_colored_text(&self.config.app_name, PURPLE)
×
265
        );
×
266
        if force {
6✔
267
            self.clean()?;
×
268
        }
6✔
269
        validate_config(&self.config)?;
6✔
270

×
271
        // Calculate schemas.
×
272
        let endpoint_and_logs = self
6✔
273
            .config
6✔
274
            .endpoints
6✔
275
            .iter()
6✔
276
            // We're not really going to run the pipeline, so we don't create logs.
6✔
277
            .map(|endpoint| (endpoint.clone(), None))
6✔
278
            .collect();
6✔
279
        let builder = PipelineBuilder::new(
6✔
280
            &self.config.connections,
6✔
281
            &self.config.sources,
6✔
282
            self.config.sql.as_deref(),
6✔
283
            endpoint_and_logs,
6✔
284
            self.multi_pb.clone(),
6✔
285
        );
6✔
286
        let dag = builder.build(self.runtime.clone())?;
6✔
287
        // Populate schemas.
288
        let dag_schemas = DagSchemas::new(dag)?;
6✔
289

×
290
        // Build endpoints one by one.
×
291
        let schemas = dag_schemas.get_sink_schemas();
6✔
292
        let enable_token = self
6✔
293
            .config
6✔
294
            .api
6✔
295
            .as_ref()
6✔
296
            .map(|api| api.api_security.is_some())
6✔
297
            .unwrap_or(false);
6✔
298
        let enable_on_event = self
6✔
299
            .config
6✔
300
            .flags
6✔
301
            .as_ref()
6✔
302
            .map(|flags| flags.push_events)
6✔
303
            .unwrap_or(false);
6✔
304
        let storage_config = get_log_options(&self.config).storage_config;
6✔
305
        let mut futures = vec![];
6✔
306
        for (endpoint_name, (schema, connections)) in schemas {
12✔
307
            info!("Building endpoint: {endpoint_name}");
6✔
308
            let endpoint = self
6✔
309
                .config
6✔
310
                .endpoints
6✔
311
                .iter()
6✔
312
                .find(|e| e.name == *endpoint_name)
6✔
313
                .expect("Sink name must be the same as endpoint name");
6✔
314
            let (schema, secondary_indexes) = build::modify_schema(&schema, endpoint)?;
6✔
315
            let schema = BuildSchema {
6✔
316
                schema,
6✔
317
                secondary_indexes,
6✔
318
                enable_token,
6✔
319
                enable_on_event,
6✔
320
                connections,
6✔
321
            };
6✔
322

6✔
323
            futures.push(build::build(
6✔
324
                &home_dir,
6✔
325
                endpoint_name,
6✔
326
                schema,
6✔
327
                storage_config.clone(),
6✔
328
            ));
6✔
329
        }
×
330

×
331
        let results = self.runtime.block_on(join_all(futures.into_iter()));
6✔
332
        for result in results {
12✔
333
            result?;
6✔
334
        }
×
335

×
336
        Ok(())
6✔
337
    }
6✔
338

339
    // Cleaning the entire folder as there will be inconsistencies
×
340
    // between pipeline, cache and generated proto files.
×
341
    pub fn clean(&mut self) -> Result<(), OrchestrationError> {
×
342
        let cache_dir = PathBuf::from(self.config.cache_dir.clone());
×
343
        if cache_dir.exists() {
×
344
            fs::remove_dir_all(&cache_dir)
×
345
                .map_err(|e| ExecutionError::FileSystemError(cache_dir, e))?;
×
346
        };
×
347

×
348
        let home_dir = PathBuf::from(self.config.home_dir.clone());
×
349
        if home_dir.exists() {
×
350
            fs::remove_dir_all(&home_dir)
×
351
                .map_err(|e| ExecutionError::FileSystemError(home_dir, e))?;
×
352
        };
×
353

×
354
        Ok(())
×
355
    }
×
356

×
357
    pub fn run_all(
6✔
358
        &mut self,
6✔
359
        shutdown: ShutdownReceiver,
6✔
360
        err_threshold: Option<u32>,
6✔
361
    ) -> Result<(), OrchestrationError> {
6✔
362
        let shutdown_api = shutdown.clone();
6✔
363

6✔
364
        let mut dozer_api = self.clone();
6✔
365

6✔
366
        let (tx, rx) = channel::unbounded::<bool>();
6✔
367

6✔
368
        self.build(false)?;
6✔
369

×
370
        let mut dozer_pipeline = self.clone();
6✔
371
        let pipeline_thread =
6✔
372
            thread::spawn(move || dozer_pipeline.run_apps(shutdown, Some(tx), err_threshold));
6✔
373

6✔
374
        // Wait for pipeline to initialize caches before starting api server
6✔
375
        if rx.recv().is_err() {
6✔
376
            // This means the pipeline thread returned before sending a message. Either an error happened or it panicked.
×
377
            return match pipeline_thread.join() {
×
378
                Ok(Err(e)) => Err(e),
×
379
                Ok(Ok(())) => panic!("An error must have happened"),
×
380
                Err(e) => {
×
381
                    std::panic::panic_any(e);
×
382
                }
×
383
            };
×
384
        }
6✔
385

6✔
386
        dozer_api.run_api(shutdown_api)?;
6✔
387

×
388
        // wait for pipeline thread to shutdown gracefully
×
389
        pipeline_thread.join().unwrap()
6✔
390
    }
6✔
391
}
×
392

×
393
pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
×
394
    statement_to_pipeline(&sql, &mut AppPipeline::new(), None).map_or_else(
×
395
        |e| {
×
396
            error!(
×
397
                "[sql][{}] Transforms validation error: {}",
×
398
                get_colored_text("X", RED),
×
399
                e
×
400
            );
×
401
            Err(e)
×
402
        },
×
403
        |_| {
×
404
            info!(
×
405
                "[sql][{}]  Transforms validation completed",
×
406
                get_colored_text("✓", GREEN)
×
407
            );
×
408
            Ok(())
×
409
        },
×
410
    )
×
411
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc