• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6034126612

31 Aug 2023 07:03AM UTC coverage: 77.119%. First build
6034126612

Pull #1945

github

Jesse-Bakker
Write dozer.lock
Pull Request #1945: Write dozer.lock

86 of 86 new or added lines in 9 files covered. (100.0%)

49169 of 63757 relevant lines covered (77.12%)

69984.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.76
/dozer-cli/src/simple/orchestrator.rs
1
use super::executor::{run_dag_executor, Executor};
2
use super::Contract;
3
use crate::errors::OrchestrationError;
4
use crate::pipeline::PipelineBuilder;
5
use crate::shutdown::ShutdownReceiver;
6
use crate::simple::build;
7
use crate::simple::helper::validate_config;
8
use crate::utils::{
9
    get_api_security_config, get_app_grpc_config, get_cache_manager_options,
10
    get_checkpoint_factory_options, get_executor_options, get_grpc_config, get_rest_config,
11
    get_storage_config,
12
};
13

14
use crate::{flatten_join_handle, join_handle_map_err};
15
use dozer_api::auth::{Access, Authorizer};
16
use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server;
17
use dozer_api::{grpc, rest, CacheEndpoint};
18
use dozer_cache::cache::LmdbRwCacheManager;
19
use dozer_cache::dozer_log::camino::Utf8PathBuf;
20
use dozer_cache::dozer_log::home_dir::HomeDir;
21
use dozer_core::app::AppPipeline;
22
use dozer_core::dag_schemas::DagSchemas;
23
use dozer_tracing::LabelsAndProgress;
24
use dozer_types::constants::LOCK_FILE;
25
use dozer_types::models::flags::default_push_events;
26
use tokio::select;
27

28
use crate::console_helper::get_colored_text;
29
use crate::console_helper::GREEN;
30
use crate::console_helper::PURPLE;
31
use crate::console_helper::RED;
32
use dozer_core::errors::ExecutionError;
33
use dozer_ingestion::connectors::{get_connector, SourceSchema, TableInfo};
34
use dozer_sql::pipeline::builder::statement_to_pipeline;
35
use dozer_sql::pipeline::errors::PipelineError;
36
use dozer_types::crossbeam::channel::{self, Sender};
37
use dozer_types::log::info;
38
use dozer_types::models::config::Config;
39
use dozer_types::tracing::error;
40
use futures::stream::FuturesUnordered;
41
use futures::{FutureExt, StreamExt, TryFutureExt};
42
use metrics::{describe_counter, describe_histogram};
43
use std::collections::HashMap;
44
use std::fs;
45

46
use std::sync::Arc;
47
use std::thread;
48
use tokio::runtime::Runtime;
49
use tokio::sync::broadcast;
×
50

51
#[derive(Clone)]
60✔
52
pub struct SimpleOrchestrator {
53
    pub base_directory: Utf8PathBuf,
54
    pub config: Config,
55
    pub runtime: Arc<Runtime>,
56
    pub labels: LabelsAndProgress,
57
}
×
58

×
59
impl SimpleOrchestrator {
×
60
    pub fn new(
30✔
61
        base_directory: Utf8PathBuf,
30✔
62
        config: Config,
30✔
63
        runtime: Arc<Runtime>,
30✔
64
        labels: LabelsAndProgress,
30✔
65
    ) -> Self {
30✔
66
        Self {
30✔
67
            base_directory,
30✔
68
            config,
30✔
69
            runtime,
30✔
70
            labels,
30✔
71
        }
30✔
72
    }
30✔
73

×
74
    pub fn run_api(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
30✔
75
        describe_histogram!(
30✔
76
            dozer_api::API_LATENCY_HISTOGRAM_NAME,
×
77
            "The api processing latency in seconds"
×
78
        );
×
79
        describe_counter!(
30✔
80
            dozer_api::API_REQUEST_COUNTER_NAME,
×
81
            "Number of requests processed by the api"
×
82
        );
×
83
        self.runtime.block_on(async {
30✔
84
            let mut futures = FuturesUnordered::new();
30✔
85

30✔
86
            // Open `RoCacheEndpoint`s. Streaming operations if necessary.
30✔
87
            let flags = self.config.flags.clone().unwrap_or_default();
30✔
88
            let (operations_sender, operations_receiver) = if flags.dynamic {
30✔
89
                let (sender, receiver) = broadcast::channel(16);
30✔
90
                (Some(sender), Some(receiver))
30✔
91
            } else {
×
92
                (None, None)
×
93
            };
×
94

×
95
            let internal_grpc_config = get_app_grpc_config(&self.config);
30✔
96
            let app_server_addr = format!(
30✔
97
                "http://{}:{}",
30✔
98
                internal_grpc_config.host, internal_grpc_config.port
30✔
99
            );
30✔
100
            let cache_manager = Arc::new(
30✔
101
                LmdbRwCacheManager::new(get_cache_manager_options(&self.config))
30✔
102
                    .map_err(OrchestrationError::CacheInitFailed)?,
30✔
103
            );
×
104
            let mut cache_endpoints = vec![];
30✔
105
            for endpoint in &self.config.endpoints {
60✔
106
                let (cache_endpoint, handle) = select! {
30✔
107
                    // If we're shutting down, the cache endpoint will fail to connect
108
                    _shutdown_future = shutdown.create_shutdown_future() => return Ok(()),
×
109
                    result = CacheEndpoint::new(
30✔
110
                        app_server_addr.clone(),
×
111
                        &*cache_manager,
×
112
                        endpoint.clone(),
×
113
                        Box::pin(shutdown.create_shutdown_future()),
114
                        operations_sender.clone(),
×
115
                        self.labels.clone(),
×
116
                    ) => result?
×
117
                };
×
118
                let cache_name = endpoint.name.clone();
30✔
119
                futures.push(flatten_join_handle(join_handle_map_err(handle, move |e| {
30✔
120
                    if e.is_map_full() {
×
121
                        OrchestrationError::CacheFull(cache_name)
×
122
                    } else {
×
123
                        OrchestrationError::CacheBuildFailed(cache_name, e)
×
124
                    }
×
125
                })));
30✔
126
                cache_endpoints.push(Arc::new(cache_endpoint));
30✔
127
            }
×
128

×
129
            // Initialize API Server
×
130
            let rest_config = get_rest_config(&self.config);
30✔
131
            let rest_handle = if rest_config.enabled {
30✔
132
                let security = get_api_security_config(&self.config).cloned();
30✔
133
                let cache_endpoints_for_rest = cache_endpoints.clone();
30✔
134
                let shutdown_for_rest = shutdown.create_shutdown_future();
30✔
135
                let api_server = rest::ApiServer::new(rest_config, security);
30✔
136
                let api_server = api_server
30✔
137
                    .run(
30✔
138
                        cache_endpoints_for_rest,
30✔
139
                        shutdown_for_rest,
30✔
140
                        self.labels.clone(),
30✔
141
                    )
30✔
142
                    .map_err(OrchestrationError::ApiInitFailed)?;
30✔
143
                tokio::spawn(api_server.map_err(OrchestrationError::RestServeFailed))
30✔
144
            } else {
×
145
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
146
            };
×
147

×
148
            // Initialize gRPC Server
×
149
            let grpc_config = get_grpc_config(&self.config);
30✔
150
            let grpc_handle = if grpc_config.enabled {
30✔
151
                let api_security = get_api_security_config(&self.config).cloned();
30✔
152
                let grpc_server = grpc::ApiServer::new(grpc_config, api_security, flags);
30✔
153
                let shutdown = shutdown.create_shutdown_future();
30✔
154
                let grpc_server = grpc_server
30✔
155
                    .run(
30✔
156
                        cache_endpoints,
30✔
157
                        shutdown,
30✔
158
                        operations_receiver,
30✔
159
                        self.labels.clone(),
30✔
160
                    )
30✔
161
                    .await
×
162
                    .map_err(OrchestrationError::ApiInitFailed)?;
30✔
163
                tokio::spawn(async move {
30✔
164
                    grpc_server
30✔
165
                        .await
45✔
166
                        .map_err(OrchestrationError::GrpcServeFailed)
30✔
167
                })
30✔
168
            } else {
×
169
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
170
            };
×
171

×
172
            futures.push(flatten_join_handle(rest_handle));
30✔
173
            futures.push(flatten_join_handle(grpc_handle));
30✔
174

×
175
            while let Some(result) = futures.next().await {
120✔
176
                result?;
90✔
177
            }
×
178

×
179
            Ok::<(), OrchestrationError>(())
30✔
180
        })?;
30✔
181

×
182
        Ok(())
30✔
183
    }
30✔
184

×
185
    pub fn home_dir(&self) -> Utf8PathBuf {
60✔
186
        self.base_directory.join(&self.config.home_dir)
60✔
187
    }
60✔
188

×
189
    pub fn cache_dir(&self) -> Utf8PathBuf {
60✔
190
        self.base_directory.join(&self.config.cache_dir)
60✔
191
    }
60✔
192

×
193
    pub fn lockfile_path(&self) -> Utf8PathBuf {
60✔
194
        self.base_directory.join(LOCK_FILE)
60✔
195
    }
60✔
196

×
197
    pub fn run_apps(
30✔
198
        &mut self,
30✔
199
        shutdown: ShutdownReceiver,
30✔
200
        api_notifier: Option<Sender<bool>>,
30✔
201
    ) -> Result<(), OrchestrationError> {
30✔
202
        let home_dir = HomeDir::new(self.home_dir(), self.cache_dir());
30✔
203
        let contract = Contract::deserialize(self.lockfile_path().as_std_path())?;
30✔
204
        let executor = self.runtime.block_on(Executor::new(
30✔
205
            &home_dir,
30✔
206
            &contract,
30✔
207
            &self.config.connections,
30✔
208
            &self.config.sources,
30✔
209
            self.config.sql.as_deref(),
30✔
210
            &self.config.endpoints,
30✔
211
            get_checkpoint_factory_options(&self.config),
30✔
212
            self.labels.clone(),
30✔
213
        ))?;
30✔
214
        let dag_executor = self.runtime.block_on(executor.create_dag_executor(
30✔
215
            &self.runtime,
30✔
216
            get_executor_options(&self.config),
30✔
217
            shutdown.clone(),
30✔
218
            self.config.flags.clone().unwrap_or_default(),
30✔
219
        ))?;
30✔
220

×
221
        let app_grpc_config = get_app_grpc_config(&self.config);
30✔
222
        let internal_server_future = self
30✔
223
            .runtime
30✔
224
            .block_on(start_internal_pipeline_server(
30✔
225
                executor.endpoint_and_logs().to_vec(),
30✔
226
                &app_grpc_config,
30✔
227
                shutdown.create_shutdown_future(),
30✔
228
            ))
30✔
229
            .map_err(OrchestrationError::InternalServerFailed)?;
30✔
230

231
        if let Some(api_notifier) = api_notifier {
30✔
232
            api_notifier
30✔
233
                .send(true)
30✔
234
                .expect("Failed to notify API server");
30✔
235
        }
30✔
236

×
237
        let labels = self.labels.clone();
30✔
238
        let pipeline_future = self.runtime.spawn_blocking(move || {
30✔
239
            run_dag_executor(dag_executor, shutdown.get_running_flag(), labels)
30✔
240
        });
30✔
241

30✔
242
        let mut futures = FuturesUnordered::new();
30✔
243
        futures.push(
30✔
244
            internal_server_future
30✔
245
                .map_err(OrchestrationError::GrpcServeFailed)
30✔
246
                .boxed(),
30✔
247
        );
30✔
248
        futures.push(flatten_join_handle(pipeline_future).boxed());
30✔
249

30✔
250
        self.runtime.block_on(async move {
30✔
251
            while let Some(result) = futures.next().await {
190✔
252
                result?;
60✔
253
            }
×
254
            Ok(())
30✔
255
        })
30✔
256
    }
30✔
257

×
258
    #[allow(clippy::type_complexity)]
×
259
    pub fn list_connectors(
×
260
        &self,
×
261
    ) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError> {
×
262
        self.runtime.block_on(async {
×
263
            let mut schema_map = HashMap::new();
×
264
            for connection in &self.config.connections {
×
265
                let connector = get_connector(connection.clone())?;
×
266
                let schema_tuples = connector.list_all_schemas().await?;
×
267
                schema_map.insert(connection.name.clone(), schema_tuples);
×
268
            }
×
269

×
270
            Ok(schema_map)
×
271
        })
×
272
    }
×
273

×
274
    pub fn generate_token(&self) -> Result<String, OrchestrationError> {
×
275
        if let Some(api_config) = &self.config.api {
×
276
            if let Some(api_security) = &api_config.api_security {
×
277
                match api_security {
×
278
                    dozer_types::models::api_security::ApiSecurity::Jwt(secret) => {
×
279
                        let auth = Authorizer::new(secret, None, None);
×
280
                        let token = auth
×
281
                            .generate_token(Access::All, None)
×
282
                            .map_err(OrchestrationError::GenerateTokenFailed)?;
×
283
                        return Ok(token);
×
284
                    }
×
285
                }
×
286
            }
×
287
        }
×
288
        Err(OrchestrationError::MissingSecurityConfig)
×
289
    }
×
290

×
291
    pub fn build(
30✔
292
        &mut self,
30✔
293
        force: bool,
30✔
294
        shutdown: ShutdownReceiver,
30✔
295
    ) -> Result<(), OrchestrationError> {
30✔
296
        let home_dir = self.home_dir();
30✔
297
        let cache_dir = self.cache_dir();
30✔
298
        let home_dir = HomeDir::new(home_dir, cache_dir);
30✔
299

30✔
300
        info!(
30✔
301
            "Initiating app: {}",
×
302
            get_colored_text(&self.config.app_name, PURPLE)
×
303
        );
×
304
        if force {
30✔
305
            self.clean()?;
×
306
        }
30✔
307
        validate_config(&self.config)?;
30✔
308

×
309
        // Calculate schemas.
×
310
        let endpoint_and_logs = self
30✔
311
            .config
30✔
312
            .endpoints
30✔
313
            .iter()
30✔
314
            // We're not really going to run the pipeline, so we don't create logs.
30✔
315
            .map(|endpoint| (endpoint.clone(), None))
30✔
316
            .collect();
30✔
317
        let builder = PipelineBuilder::new(
30✔
318
            &self.config.connections,
30✔
319
            &self.config.sources,
30✔
320
            self.config.sql.as_deref(),
30✔
321
            endpoint_and_logs,
30✔
322
            self.labels.clone(),
30✔
323
            self.config.flags.clone().unwrap_or_default(),
30✔
324
        );
30✔
325
        let dag = self
30✔
326
            .runtime
30✔
327
            .block_on(builder.build(&self.runtime, shutdown))?;
30✔
328
        // Populate schemas.
×
329
        let dag_schemas = DagSchemas::new(dag)?;
30✔
330

331
        // Get current contract.
×
332
        let enable_token = self
30✔
333
            .config
30✔
334
            .api
30✔
335
            .as_ref()
30✔
336
            .map(|api| api.api_security.is_some())
30✔
337
            .unwrap_or(false);
30✔
338
        let enable_on_event = self
30✔
339
            .config
30✔
340
            .flags
30✔
341
            .as_ref()
30✔
342
            .map(|flags| flags.push_events)
30✔
343
            .unwrap_or_else(default_push_events);
30✔
344
        let version = self.config.version as usize;
30✔
345

×
346
        let contract = build::Contract::new(
30✔
347
            version,
30✔
348
            &dag_schemas,
30✔
349
            &self.config.connections,
30✔
350
            &self.config.endpoints,
30✔
351
            enable_token,
30✔
352
            enable_on_event,
30✔
353
        )?;
30✔
354

×
355
        let contract_path = self.lockfile_path();
30✔
356
        let existing_contract = Contract::deserialize(contract_path.as_std_path()).ok();
30✔
357

30✔
358
        // Run build
30✔
359
        let storage_config = get_storage_config(&self.config);
30✔
360
        self.runtime.block_on(build::build(
30✔
361
            &home_dir,
30✔
362
            &contract,
30✔
363
            existing_contract.as_ref(),
30✔
364
            &storage_config,
30✔
365
        ))?;
30✔
366

×
367
        contract.serialize(contract_path.as_std_path())?;
30✔
368

×
369
        Ok(())
30✔
370
    }
30✔
371

×
372
    // Cleaning the entire folder as there will be inconsistencies
×
373
    // between pipeline, cache and generated proto files.
×
374
    pub fn clean(&mut self) -> Result<(), OrchestrationError> {
×
375
        let cache_dir = self.cache_dir();
×
376
        if cache_dir.exists() {
×
377
            fs::remove_dir_all(&cache_dir)
×
378
                .map_err(|e| ExecutionError::FileSystemError(cache_dir.into_std_path_buf(), e))?;
×
379
        };
×
380

×
381
        let home_dir = self.home_dir();
×
382
        if home_dir.exists() {
×
383
            fs::remove_dir_all(&home_dir)
×
384
                .map_err(|e| ExecutionError::FileSystemError(home_dir.into_std_path_buf(), e))?;
×
385
        };
×
386

×
387
        Ok(())
×
388
    }
×
389

×
390
    pub fn run_all(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
30✔
391
        let mut dozer_api = self.clone();
30✔
392

30✔
393
        let (tx, rx) = channel::unbounded::<bool>();
30✔
394

30✔
395
        self.build(false, shutdown.clone())?;
30✔
396

×
397
        let mut dozer_pipeline = self.clone();
30✔
398
        let pipeline_shutdown = shutdown.clone();
30✔
399
        let pipeline_thread =
30✔
400
            thread::spawn(move || dozer_pipeline.run_apps(pipeline_shutdown, Some(tx)));
30✔
401

30✔
402
        // Wait for pipeline to initialize caches before starting api server
30✔
403
        if rx.recv().is_err() {
30✔
404
            // This means the pipeline thread returned before sending a message. Either an error happened or it panicked.
×
405
            return match pipeline_thread.join() {
×
406
                Ok(Err(e)) => Err(e),
×
407
                Ok(Ok(())) => panic!("An error must have happened"),
×
408
                Err(e) => {
×
409
                    std::panic::panic_any(e);
×
410
                }
411
            };
412
        }
30✔
413

30✔
414
        dozer_api.run_api(shutdown)?;
30✔
415

416
        // wait for pipeline thread to shutdown gracefully
417
        pipeline_thread.join().unwrap()
30✔
418
    }
30✔
419
}
420

421
pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
×
422
    statement_to_pipeline(&sql, &mut AppPipeline::new_with_default_flags(), None).map_or_else(
×
423
        |e| {
×
424
            error!(
×
425
                "[sql][{}] Transforms validation error: {}",
×
426
                get_colored_text("X", RED),
×
427
                e
×
428
            );
×
429
            Err(e)
×
430
        },
×
431
        |_| {
×
432
            info!(
×
433
                "[sql][{}]  Transforms validation completed",
×
434
                get_colored_text("✓", GREEN)
×
435
            );
436
            Ok(())
×
437
        },
×
438
    )
×
439
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc