• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6299724219

25 Sep 2023 12:58PM UTC coverage: 77.81% (+0.5%) from 77.275%
6299724219

push

github

chubei
fix: Add `BINDGEN_EXTRA_CLANG_ARGS` to cross compile rocksdb

50223 of 64546 relevant lines covered (77.81%)

148909.49 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.33
/dozer-cli/src/simple/orchestrator.rs
1
use super::executor::{run_dag_executor, Executor};
2
use super::Contract;
3
use crate::errors::OrchestrationError;
4
use crate::pipeline::PipelineBuilder;
5
use crate::shutdown::ShutdownReceiver;
6
use crate::simple::build;
7
use crate::simple::helper::validate_config;
8
use crate::utils::{
9
    get_cache_manager_options, get_checkpoint_options, get_default_max_num_records,
10
    get_executor_options,
11
};
12

13
use crate::{flatten_join_handle, join_handle_map_err};
14
use dozer_api::auth::{Access, Authorizer};
15
use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server;
16
use dozer_api::{get_api_security, grpc, rest, CacheEndpoint};
17
use dozer_cache::cache::LmdbRwCacheManager;
18
use dozer_cache::dozer_log::camino::Utf8PathBuf;
19
use dozer_cache::dozer_log::home_dir::HomeDir;
20
use dozer_core::app::AppPipeline;
21
use dozer_core::dag_schemas::DagSchemas;
22
use dozer_tracing::LabelsAndProgress;
23
use dozer_types::constants::LOCK_FILE;
24
use dozer_types::models::api_config::{default_app_grpc_host, default_app_grpc_port};
25
use dozer_types::models::flags::{default_dynamic, default_push_events};
26
use tokio::select;
27

28
use crate::console_helper::get_colored_text;
29
use crate::console_helper::GREEN;
30
use crate::console_helper::PURPLE;
31
use crate::console_helper::RED;
32
use dozer_core::errors::ExecutionError;
33
use dozer_ingestion::connectors::{get_connector, SourceSchema, TableInfo};
34
use dozer_sql::builder::statement_to_pipeline;
35
use dozer_sql::errors::PipelineError;
36
use dozer_types::log::info;
37
use dozer_types::models::config::{default_cache_dir, default_home_dir, Config};
38
use dozer_types::tracing::error;
39
use futures::stream::FuturesUnordered;
40
use futures::{FutureExt, StreamExt, TryFutureExt};
41
use metrics::{describe_counter, describe_histogram};
42
use std::collections::HashMap;
43
use std::fs;
44

45
use std::sync::mpsc::Sender;
46
use std::sync::{mpsc, Arc};
47
use std::thread;
48
use tokio::runtime::Runtime;
49
use tokio::sync::broadcast;
50

51
#[derive(Clone)]
60✔
52
pub struct SimpleOrchestrator {
53
    pub base_directory: Utf8PathBuf,
54
    pub config: Config,
55
    pub runtime: Arc<Runtime>,
56
    pub labels: LabelsAndProgress,
57
}
58

59
impl SimpleOrchestrator {
60
    pub fn new(
30✔
61
        base_directory: Utf8PathBuf,
30✔
62
        config: Config,
30✔
63
        runtime: Arc<Runtime>,
30✔
64
        labels: LabelsAndProgress,
30✔
65
    ) -> Self {
30✔
66
        Self {
30✔
67
            base_directory,
30✔
68
            config,
30✔
69
            runtime,
30✔
70
            labels,
30✔
71
        }
30✔
72
    }
30✔
73

74
    pub fn run_api(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
30✔
75
        describe_histogram!(
30✔
76
            dozer_api::API_LATENCY_HISTOGRAM_NAME,
×
77
            "The api processing latency in seconds"
×
78
        );
79
        describe_counter!(
30✔
80
            dozer_api::API_REQUEST_COUNTER_NAME,
×
81
            "Number of requests processed by the api"
×
82
        );
83
        self.runtime.block_on(async {
30✔
84
            let mut futures = FuturesUnordered::new();
30✔
85

30✔
86
            // Open `RoCacheEndpoint`s. Streaming operations if necessary.
30✔
87
            let flags = self.config.flags.clone();
30✔
88
            let (operations_sender, operations_receiver) =
30✔
89
                if flags.dynamic.unwrap_or_else(default_dynamic) {
30✔
90
                    let (sender, receiver) = broadcast::channel(16);
30✔
91
                    (Some(sender), Some(receiver))
30✔
92
                } else {
93
                    (None, None)
×
94
                };
95

96
            let internal_grpc_config = &self.config.api.app_grpc;
30✔
97
            let app_server_addr = format!(
30✔
98
                "http://{}:{}",
30✔
99
                internal_grpc_config
30✔
100
                    .host
30✔
101
                    .clone()
30✔
102
                    .unwrap_or_else(default_app_grpc_host),
30✔
103
                internal_grpc_config
30✔
104
                    .port
30✔
105
                    .unwrap_or_else(default_app_grpc_port)
30✔
106
            );
30✔
107
            let cache_manager = Arc::new(
30✔
108
                LmdbRwCacheManager::new(get_cache_manager_options(&self.config))
30✔
109
                    .map_err(OrchestrationError::CacheInitFailed)?,
30✔
110
            );
111
            let default_max_num_records = get_default_max_num_records(&self.config);
30✔
112
            let mut cache_endpoints = vec![];
30✔
113
            for endpoint in &self.config.endpoints {
60✔
114
                let (cache_endpoint, handle) = select! {
30✔
115
                    // If we're shutting down, the cache endpoint will fail to connect
116
                    _shutdown_future = shutdown.create_shutdown_future() => return Ok(()),
×
117
                    result = CacheEndpoint::new(
30✔
118
                        app_server_addr.clone(),
119
                        &*cache_manager,
120
                        endpoint.clone(),
121
                        Box::pin(shutdown.create_shutdown_future()),
122
                        operations_sender.clone(),
123
                        self.labels.clone(),
124

125
                    ) => result?
126
                };
127
                let cache_name = endpoint.name.clone();
30✔
128
                futures.push(flatten_join_handle(join_handle_map_err(handle, move |e| {
30✔
129
                    if e.is_map_full() {
×
130
                        OrchestrationError::CacheFull(cache_name)
×
131
                    } else {
132
                        OrchestrationError::CacheBuildFailed(cache_name, e)
×
133
                    }
134
                })));
30✔
135
                cache_endpoints.push(Arc::new(cache_endpoint));
30✔
136
            }
137

138
            // Initialize API Server
139
            let rest_config = self.config.api.rest.clone();
30✔
140
            let rest_handle = if rest_config.enabled.unwrap_or(true) {
30✔
141
                let security = self.config.api.api_security.clone();
30✔
142
                let cache_endpoints_for_rest = cache_endpoints.clone();
30✔
143
                let shutdown_for_rest = shutdown.create_shutdown_future();
30✔
144
                let api_server =
30✔
145
                    rest::ApiServer::new(rest_config, security, default_max_num_records);
30✔
146
                let api_server = api_server
30✔
147
                    .run(
30✔
148
                        cache_endpoints_for_rest,
30✔
149
                        shutdown_for_rest,
30✔
150
                        self.labels.clone(),
30✔
151
                    )
30✔
152
                    .map_err(OrchestrationError::ApiInitFailed)?;
30✔
153
                tokio::spawn(api_server.map_err(OrchestrationError::RestServeFailed))
30✔
154
            } else {
155
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
156
            };
157

158
            // Initialize gRPC Server
159
            let grpc_config = self.config.api.grpc.clone();
30✔
160
            let grpc_handle = if grpc_config.enabled.unwrap_or(true) {
30✔
161
                let api_security = self.config.api.api_security.clone();
30✔
162
                let grpc_server = grpc::ApiServer::new(grpc_config, api_security, flags);
30✔
163
                let shutdown = shutdown.create_shutdown_future();
30✔
164
                let grpc_server = grpc_server
30✔
165
                    .run(
30✔
166
                        cache_endpoints,
30✔
167
                        shutdown,
30✔
168
                        operations_receiver,
30✔
169
                        self.labels.clone(),
30✔
170
                        default_max_num_records,
30✔
171
                    )
30✔
172
                    .await
×
173
                    .map_err(OrchestrationError::ApiInitFailed)?;
30✔
174
                tokio::spawn(async move {
30✔
175
                    grpc_server
30✔
176
                        .await
45✔
177
                        .map_err(OrchestrationError::GrpcServeFailed)
30✔
178
                })
30✔
179
            } else {
180
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
181
            };
182

183
            futures.push(flatten_join_handle(rest_handle));
30✔
184
            futures.push(flatten_join_handle(grpc_handle));
30✔
185

186
            while let Some(result) = futures.next().await {
120✔
187
                result?;
90✔
188
            }
189

190
            Ok::<(), OrchestrationError>(())
30✔
191
        })?;
30✔
192

193
        Ok(())
30✔
194
    }
30✔
195

196
    pub fn home_dir(&self) -> Utf8PathBuf {
60✔
197
        self.base_directory.join(
60✔
198
            self.config
60✔
199
                .home_dir
60✔
200
                .clone()
60✔
201
                .unwrap_or_else(default_home_dir),
60✔
202
        )
60✔
203
    }
60✔
204

205
    pub fn cache_dir(&self) -> Utf8PathBuf {
60✔
206
        self.base_directory.join(
60✔
207
            self.config
60✔
208
                .cache_dir
60✔
209
                .clone()
60✔
210
                .unwrap_or_else(default_cache_dir),
60✔
211
        )
60✔
212
    }
60✔
213

214
    pub fn lockfile_path(&self) -> Utf8PathBuf {
60✔
215
        self.base_directory.join(LOCK_FILE)
60✔
216
    }
60✔
217

218
    pub fn run_apps(
30✔
219
        &mut self,
30✔
220
        shutdown: ShutdownReceiver,
30✔
221
        api_notifier: Option<Sender<()>>,
30✔
222
    ) -> Result<(), OrchestrationError> {
30✔
223
        let home_dir = HomeDir::new(self.home_dir(), self.cache_dir());
30✔
224
        let contract = Contract::deserialize(self.lockfile_path().as_std_path())?;
30✔
225
        let executor = self.runtime.block_on(Executor::new(
30✔
226
            &home_dir,
30✔
227
            &contract,
30✔
228
            &self.config.connections,
30✔
229
            &self.config.sources,
30✔
230
            self.config.sql.as_deref(),
30✔
231
            &self.config.endpoints,
30✔
232
            get_checkpoint_options(&self.config),
30✔
233
            self.labels.clone(),
30✔
234
            &self.config.udfs,
30✔
235
        ))?;
30✔
236
        let endpoint_and_logs = executor.endpoint_and_logs().to_vec();
30✔
237
        let dag_executor = self.runtime.block_on(executor.create_dag_executor(
30✔
238
            &self.runtime,
30✔
239
            get_executor_options(&self.config),
30✔
240
            shutdown.clone(),
30✔
241
            self.config.flags.clone(),
30✔
242
        ))?;
30✔
243

244
        let app_grpc_config = &self.config.api.app_grpc;
30✔
245
        let internal_server_future = self
30✔
246
            .runtime
30✔
247
            .block_on(start_internal_pipeline_server(
30✔
248
                endpoint_and_logs,
30✔
249
                app_grpc_config,
30✔
250
                shutdown.create_shutdown_future(),
30✔
251
            ))
30✔
252
            .map_err(OrchestrationError::InternalServerFailed)?;
30✔
253

254
        if let Some(api_notifier) = api_notifier {
30✔
255
            api_notifier.send(()).expect("Failed to notify API server");
30✔
256
        }
30✔
257

258
        let labels = self.labels.clone();
30✔
259
        let pipeline_future = self.runtime.spawn_blocking(move || {
30✔
260
            run_dag_executor(dag_executor, shutdown.get_running_flag(), labels)
30✔
261
        });
30✔
262

30✔
263
        let mut futures = FuturesUnordered::new();
30✔
264
        futures.push(
30✔
265
            internal_server_future
30✔
266
                .map_err(OrchestrationError::GrpcServeFailed)
30✔
267
                .boxed(),
30✔
268
        );
30✔
269
        futures.push(flatten_join_handle(pipeline_future).boxed());
30✔
270

30✔
271
        self.runtime.block_on(async move {
30✔
272
            while let Some(result) = futures.next().await {
195✔
273
                result?;
60✔
274
            }
275
            Ok(())
30✔
276
        })
30✔
277
    }
30✔
278

279
    #[allow(clippy::type_complexity)]
280
    pub fn list_connectors(
×
281
        &self,
×
282
    ) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError> {
×
283
        self.runtime.block_on(async {
×
284
            let mut schema_map = HashMap::new();
×
285
            for connection in &self.config.connections {
×
286
                let connector = get_connector(connection.clone())?;
×
287
                let schema_tuples = connector.list_all_schemas().await?;
×
288
                schema_map.insert(connection.name.clone(), schema_tuples);
×
289
            }
290

291
            Ok(schema_map)
×
292
        })
×
293
    }
×
294

295
    pub fn generate_token(&self, ttl_in_secs: Option<i32>) -> Result<String, OrchestrationError> {
296
        if let Some(api_security) = get_api_security(self.config.api.api_security.clone()) {
×
297
            match api_security {
×
298
                dozer_types::models::api_security::ApiSecurity::Jwt(secret) => {
×
299
                    let auth = Authorizer::new(&secret, None, None);
×
300
                    let duration = ttl_in_secs.map(|f| std::time::Duration::from_secs(f as u64));
×
301
                    let token = auth
×
302
                        .generate_token(Access::All, duration)
×
303
                        .map_err(OrchestrationError::GenerateTokenFailed)?;
×
304
                    return Ok(token);
×
305
                }
306
            }
307
        }
×
308
        Err(OrchestrationError::MissingSecurityConfig)
×
309
    }
×
310

311
    pub fn build(
30✔
312
        &mut self,
30✔
313
        force: bool,
30✔
314
        shutdown: ShutdownReceiver,
30✔
315
        locked: bool,
30✔
316
    ) -> Result<(), OrchestrationError> {
30✔
317
        let home_dir = self.home_dir();
30✔
318
        let cache_dir = self.cache_dir();
30✔
319
        let home_dir = HomeDir::new(home_dir, cache_dir);
30✔
320

30✔
321
        info!(
30✔
322
            "Initiating app: {}",
×
323
            get_colored_text(&self.config.app_name, PURPLE)
×
324
        );
325
        if force {
30✔
326
            self.clean()?;
×
327
        }
30✔
328
        validate_config(&self.config)?;
30✔
329

330
        // Calculate schemas.
331
        let endpoint_and_logs = self
30✔
332
            .config
30✔
333
            .endpoints
30✔
334
            .iter()
30✔
335
            // We're not really going to run the pipeline, so we don't create logs.
30✔
336
            .map(|endpoint| (endpoint.clone(), None))
30✔
337
            .collect();
30✔
338
        let builder = PipelineBuilder::new(
30✔
339
            &self.config.connections,
30✔
340
            &self.config.sources,
30✔
341
            self.config.sql.as_deref(),
30✔
342
            endpoint_and_logs,
30✔
343
            self.labels.clone(),
30✔
344
            self.config.flags.clone(),
30✔
345
            &self.config.udfs,
30✔
346
        );
30✔
347
        let dag = self
30✔
348
            .runtime
30✔
349
            .block_on(builder.build(&self.runtime, shutdown))?;
30✔
350
        // Populate schemas.
351
        let dag_schemas = DagSchemas::new(dag)?;
30✔
352

353
        // Get current contract.
354
        let enable_token = self.config.api.api_security.is_some();
30✔
355
        let enable_on_event = self
30✔
356
            .config
30✔
357
            .flags
30✔
358
            .push_events
30✔
359
            .unwrap_or_else(default_push_events);
30✔
360
        let version = self.config.version as usize;
30✔
361

362
        let contract = build::Contract::new(
30✔
363
            version,
30✔
364
            &dag_schemas,
30✔
365
            &self.config.connections,
30✔
366
            &self.config.endpoints,
30✔
367
            enable_token,
30✔
368
            enable_on_event,
30✔
369
        )?;
30✔
370

371
        let contract_path = self.lockfile_path();
30✔
372
        let existing_contract = Contract::deserialize(contract_path.as_std_path()).ok();
30✔
373
        if locked {
30✔
374
            let Some(existing_contract) = existing_contract.as_ref() else {
×
375
                return Err(OrchestrationError::LockedNoLockFile);
×
376
            };
377

378
            if &contract != existing_contract {
×
379
                return Err(OrchestrationError::LockedOutdatedLockfile);
×
380
            }
×
381
        }
30✔
382

383
        // Run build
384
        self.runtime.block_on(build::build(
30✔
385
            &home_dir,
30✔
386
            &contract,
30✔
387
            existing_contract.as_ref(),
30✔
388
        ))?;
30✔
389

390
        contract.serialize(contract_path.as_std_path())?;
30✔
391

392
        Ok(())
30✔
393
    }
30✔
394

395
    // Cleaning the entire folder as there will be inconsistencies
396
    // between pipeline, cache and generated proto files.
397
    pub fn clean(&mut self) -> Result<(), OrchestrationError> {
×
398
        let cache_dir = self.cache_dir();
×
399
        if cache_dir.exists() {
×
400
            fs::remove_dir_all(&cache_dir)
×
401
                .map_err(|e| ExecutionError::FileSystemError(cache_dir.into_std_path_buf(), e))?;
×
402
        };
×
403

404
        let home_dir = self.home_dir();
×
405
        if home_dir.exists() {
×
406
            fs::remove_dir_all(&home_dir)
×
407
                .map_err(|e| ExecutionError::FileSystemError(home_dir.into_std_path_buf(), e))?;
×
408
        };
×
409

410
        Ok(())
×
411
    }
×
412

413
    pub fn run_all(
30✔
414
        &mut self,
30✔
415
        shutdown: ShutdownReceiver,
30✔
416
        locked: bool,
30✔
417
    ) -> Result<(), OrchestrationError> {
30✔
418
        let mut dozer_api = self.clone();
30✔
419

30✔
420
        let (tx, rx) = mpsc::channel::<()>();
30✔
421

30✔
422
        self.build(false, shutdown.clone(), locked)?;
30✔
423

424
        let mut dozer_pipeline = self.clone();
30✔
425
        let pipeline_shutdown = shutdown.clone();
30✔
426
        let pipeline_thread =
30✔
427
            thread::spawn(move || dozer_pipeline.run_apps(pipeline_shutdown, Some(tx)));
30✔
428

30✔
429
        // Wait for pipeline to initialize caches before starting api server
30✔
430
        if rx.recv().is_err() {
30✔
431
            // This means the pipeline thread returned before sending a message. Either an error happened or it panicked.
432
            return match pipeline_thread.join() {
×
433
                Ok(Err(e)) => Err(e),
×
434
                Ok(Ok(())) => panic!("An error must have happened"),
×
435
                Err(e) => {
×
436
                    std::panic::panic_any(e);
×
437
                }
438
            };
439
        }
30✔
440

30✔
441
        dozer_api.run_api(shutdown)?;
30✔
442

443
        // wait for pipeline thread to shutdown gracefully
444
        pipeline_thread.join().unwrap()
30✔
445
    }
30✔
446
}
447

448
pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
×
449
    statement_to_pipeline(
×
450
        &sql,
×
451
        &mut AppPipeline::new_with_default_flags(),
×
452
        None,
×
453
        vec![],
×
454
    )
×
455
    .map_or_else(
×
456
        |e| {
×
457
            error!(
×
458
                "[sql][{}] Transforms validation error: {}",
×
459
                get_colored_text("X", RED),
×
460
                e
×
461
            );
×
462
            Err(e)
×
463
        },
×
464
        |_| {
×
465
            info!(
×
466
                "[sql][{}]  Transforms validation completed",
×
467
                get_colored_text("✓", GREEN)
×
468
            );
469
            Ok(())
×
470
        },
×
471
    )
×
472
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc