• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4112409903

pending completion
4112409903

Pull #818

github

GitHub
Merge 0e6d61bff into c160ec41f
Pull Request #818: chore: fix dag issues

212 of 212 new or added lines in 23 files covered. (100.0%)

23352 of 37718 relevant lines covered (61.91%)

31647.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-orchestrator/src/simple/orchestrator.rs
1
use super::executor::Executor;
2
use crate::console_helper::get_colored_text;
3
use crate::errors::OrchestrationError;
4
use crate::pipeline::{CacheSinkSettings, PipelineBuilder};
5
use crate::utils::{
6
    get_api_dir, get_api_security_config, get_cache_dir, get_flags, get_grpc_config,
7
    get_pipeline_config, get_pipeline_dir, get_rest_config,
8
};
9
use crate::{flatten_joinhandle, Orchestrator};
10
use dozer_api::auth::{Access, Authorizer};
11
use dozer_api::generator::protoc::generator::ProtoGenerator;
12
use dozer_api::{
13
    actix_web::dev::ServerHandle,
14
    grpc::{
15
        self, internal::internal_pipeline_server::start_internal_pipeline_server,
16
        internal_grpc::PipelineResponse,
17
    },
18
    rest, CacheEndpoint,
19
};
20
use dozer_cache::cache::{CacheCommonOptions, CacheOptions, CacheReadOptions, CacheWriteOptions};
21
use dozer_cache::cache::{CacheOptionsKind, LmdbCache};
22
use dozer_core::dag::app::AppPipeline;
23
use dozer_core::dag::dag_schemas::DagSchemas;
24
use dozer_core::dag::errors::ExecutionError::InternalError;
25
use dozer_sql::pipeline::builder::statement_to_pipeline;
26
use dozer_sql::pipeline::errors::PipelineError;
27
use dozer_types::crossbeam::channel::{self, unbounded, Sender};
28
use dozer_types::log::{info, warn};
29
use dozer_types::models::api_config::ApiConfig;
30
use dozer_types::models::api_endpoint::ApiEndpoint;
31
use dozer_types::models::app_config::Config;
32
use dozer_types::prettytable::{row, Table};
33
use dozer_types::serde_yaml;
34
use dozer_types::tracing::error;
35
use dozer_types::types::{Operation, Schema, SchemaWithChangesType};
36
use futures::stream::FuturesUnordered;
37
use futures::StreamExt;
38
use std::collections::HashMap;
39
use std::fs;
40
use std::path::{Path, PathBuf};
41
use std::sync::atomic::{AtomicBool, Ordering};
42
use std::{sync::Arc, thread};
43
use tokio::sync::{broadcast, oneshot};
44

45
#[derive(Default, Clone)]
×
46
pub struct SimpleOrchestrator {
47
    pub config: Config,
×
48
    pub cache_common_options: CacheCommonOptions,
49
    pub cache_read_options: CacheReadOptions,
50
    pub cache_write_options: CacheWriteOptions,
51
}
52

53
impl SimpleOrchestrator {
54
    pub fn new(config: &Config) -> Self {
×
55
        Self {
×
56
            config: config.clone(),
×
57
            ..Default::default()
×
58
        }
×
59
    }
×
60
    fn write_internal_config(&self) -> Result<(), OrchestrationError> {
×
61
        let path = Path::new(&self.config.home_dir).join("internal_config");
×
62
        if path.exists() {
×
63
            fs::remove_dir_all(&path).unwrap();
×
64
        }
×
65
        fs::create_dir_all(&path).unwrap();
×
66
        let yaml_path = path.join("config.yaml");
×
67
        let f = std::fs::OpenOptions::new()
×
68
            .create(true)
×
69
            .write(true)
×
70
            .open(yaml_path)
×
71
            .expect("Couldn't open file");
×
72
        let api_config = self.config.api.to_owned().unwrap_or_default();
×
73
        let api_internal = api_config.to_owned().api_internal.unwrap_or_default();
×
74
        let pipeline_internal = api_config.pipeline_internal.unwrap_or_default();
×
75
        let mut internal_content = serde_yaml::Value::default();
×
76
        internal_content["api_internal"] = serde_yaml::to_value(api_internal)
×
77
            .map_err(OrchestrationError::FailedToWriteConfigYaml)?;
×
78
        internal_content["pipeline_internal"] = serde_yaml::to_value(pipeline_internal)
×
79
            .map_err(OrchestrationError::FailedToWriteConfigYaml)?;
×
80
        serde_yaml::to_writer(f, &internal_content)
×
81
            .map_err(OrchestrationError::FailedToWriteConfigYaml)?;
×
82
        Ok(())
×
83
    }
×
84
}
×
85

×
86
impl Orchestrator for SimpleOrchestrator {
87
    fn run_api(&mut self, running: Arc<AtomicBool>) -> Result<(), OrchestrationError> {
×
88
        // Channel to communicate CtrlC with API Server
×
89
        let (tx, rx) = unbounded::<ServerHandle>();
×
90
        // gRPC notifier channel
×
91
        let cache_dir = get_cache_dir(self.config.to_owned());
×
92

×
93
        // Flags
×
94
        let flags = self.config.flags.clone().unwrap_or_default();
×
95

×
96
        let mut cache_endpoints = vec![];
×
97
        for ce in &self.config.endpoints {
×
98
            let mut cache_common_options = self.cache_common_options.clone();
×
99
            cache_common_options.set_path(cache_dir.join(ce.name.clone()));
×
100
            cache_endpoints.push(CacheEndpoint {
×
101
                cache: Arc::new(
×
102
                    LmdbCache::new(CacheOptions {
×
103
                        common: cache_common_options,
×
104
                        kind: CacheOptionsKind::ReadOnly(self.cache_read_options.clone()),
×
105
                    })
×
106
                    .map_err(OrchestrationError::CacheInitFailed)?,
×
107
                ),
×
108
                endpoint: ce.to_owned(),
×
109
            });
110
        }
×
111

112
        let ce2 = cache_endpoints.clone();
×
113

×
114
        let rt = tokio::runtime::Runtime::new().expect("Failed to initialize tokio runtime");
×
115
        let (sender_shutdown, receiver_shutdown) = oneshot::channel::<()>();
×
116
        rt.block_on(async {
×
117
            let mut futures = FuturesUnordered::new();
×
118

×
119
            // Initialize API Server
×
120
            let rest_config = get_rest_config(self.config.to_owned());
×
121
            let security = get_api_security_config(self.config.to_owned());
×
122
            let rest_handle = tokio::spawn(async move {
×
123
                let api_server = rest::ApiServer::new(rest_config, security);
×
124
                api_server
×
125
                    .run(cache_endpoints, tx)
×
126
                    .await
×
127
                    .map_err(OrchestrationError::ApiServerFailed)
×
128
            });
×
129
            // Initiate Push Events
×
130
            // create broadcast channel
×
131
            let pipeline_config = get_pipeline_config(self.config.to_owned());
×
132

×
133
            let rx1 = if flags.push_events {
×
134
                let (tx, rx1) = broadcast::channel::<PipelineResponse>(16);
×
135

×
136
                let handle = tokio::spawn(async move {
×
137
                    grpc::ApiServer::setup_broad_cast_channel(tx, pipeline_config)
×
138
                        .await
×
139
                        .map_err(OrchestrationError::GrpcServerFailed)
×
140
                });
×
141

×
142
                futures.push(flatten_joinhandle(handle));
×
143

×
144
                Some(rx1)
×
145
            } else {
×
146
                None
×
147
            };
148

×
149
            // Initialize GRPC Server
150

151
            let api_dir = get_api_dir(self.config.to_owned());
×
152
            let grpc_config = get_grpc_config(self.config.to_owned());
×
153

×
154
            let api_security = get_api_security_config(self.config.to_owned());
×
155
            let grpc_server = grpc::ApiServer::new(grpc_config, api_dir, api_security, flags);
×
156
            let grpc_handle = tokio::spawn(async move {
×
157
                grpc_server
×
158
                    .run(ce2, receiver_shutdown, rx1)
×
159
                    .await
×
160
                    .map_err(OrchestrationError::GrpcServerFailed)
×
161
            });
×
162

×
163
            futures.push(flatten_joinhandle(rest_handle));
×
164
            futures.push(flatten_joinhandle(grpc_handle));
×
165

×
166
            while let Some(result) = futures.next().await {
×
167
                result?;
×
168
            }
×
169
            Ok::<(), OrchestrationError>(())
×
170
        })?;
×
171

×
172
        let server_handle = rx
×
173
            .recv()
×
174
            .map_err(OrchestrationError::GrpcServerHandleError)?;
×
175

×
176
        // Waiting for Ctrl+C
×
177
        while running.load(Ordering::SeqCst) {}
×
178
        sender_shutdown.send(()).unwrap();
×
179
        rest::ApiServer::stop(server_handle);
×
180

×
181
        Ok(())
×
182
    }
×
183

×
184
    fn run_apps(
×
185
        &mut self,
×
186
        running: Arc<AtomicBool>,
×
187
        api_notifier: Option<Sender<bool>>,
×
188
    ) -> Result<(), OrchestrationError> {
×
189
        let pipeline_home_dir = get_pipeline_dir(self.config.to_owned());
×
190
        // gRPC notifier channel
×
191
        let (sender, receiver) = channel::unbounded::<PipelineResponse>();
×
192
        let internal_app_config = self.config.to_owned();
×
193
        let _intern_pipeline_thread = thread::spawn(move || {
×
194
            if let Err(e) = start_internal_pipeline_server(internal_app_config, receiver) {
×
195
                std::panic::panic_any(OrchestrationError::InternalServerFailed(e));
×
196
            }
×
197
            warn!("Shutting down internal pipeline server");
×
198
        });
×
199

×
200
        let cache_dir = get_cache_dir(self.config.to_owned());
×
201

×
202
        let cache_endpoints: Vec<CacheEndpoint> = self.get_cache_endpoints(cache_dir)?;
×
203

×
204
        if let Some(api_notifier) = api_notifier {
×
205
            api_notifier
×
206
                .send(true)
×
207
                .expect("Failed to notify API server");
×
208
        }
×
209

×
210
        let executor = Executor::new(
×
211
            self.config.clone(),
×
212
            cache_endpoints,
×
213
            running,
×
214
            pipeline_home_dir,
×
215
        );
×
216
        let flags = get_flags(self.config.clone());
×
217
        let api_security = get_api_security_config(self.config.clone());
×
218
        let settings = CacheSinkSettings::new(flags, api_security);
×
219
        let dag_executor = executor.create_dag_executor(Some(sender), settings)?.0;
×
220
        Executor::run_dag_executor(dag_executor)
×
221
    }
×
222

×
223
    fn list_connectors(
×
224
        &self,
×
225
    ) -> Result<HashMap<String, Vec<SchemaWithChangesType>>, OrchestrationError> {
×
226
        Executor::get_tables(&self.config.connections)
×
227
    }
×
228

×
229
    fn generate_token(&self) -> Result<String, OrchestrationError> {
×
230
        if let Some(api_config) = self.config.api.to_owned() {
×
231
            if let Some(api_security) = api_config.api_security {
×
232
                match api_security {
×
233
                    dozer_types::models::api_security::ApiSecurity::Jwt(secret) => {
×
234
                        let auth = Authorizer::new(&secret, None, None);
×
235
                        let token = auth.generate_token(Access::All, None).map_err(|err| {
×
236
                            OrchestrationError::GenerateTokenFailed(err.to_string())
×
237
                        })?;
×
238
                        return Ok(token);
×
239
                    }
×
240
                }
×
241
            }
×
242
        }
×
243
        Err(OrchestrationError::GenerateTokenFailed(
×
244
            "Missing api config or security input".to_owned(),
×
245
        ))
×
246
    }
×
247

×
248
    fn query(
×
249
        &self,
×
250
        sql: String,
×
251
        sender: Sender<Operation>,
×
252
        running: Arc<AtomicBool>,
×
253
    ) -> Result<Schema, OrchestrationError> {
×
254
        let pipeline_dir = tempdir::TempDir::new("query4")
×
255
            .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?;
×
256
        let executor = Executor::new(
×
257
            self.config.clone(),
×
258
            vec![],
×
259
            running,
×
260
            pipeline_dir.into_path(),
×
261
        );
×
262

×
263
        let dag = executor.query(sql, sender)?;
×
264
        let dag_schemas = DagSchemas::new(&dag)?;
×
265
        let streaming_sink_index = dag.sink_identifiers().next().expect("Sink is expected");
×
266
        let (schema, _ctx) = dag_schemas
×
267
            .get_node_input_schemas(streaming_sink_index)
×
268
            .values()
×
269
            .next()
×
270
            .expect("schema is expected")
×
271
            .clone();
×
272
        Ok(schema)
×
273
    }
×
274

×
275
    fn migrate(&mut self, force: bool) -> Result<(), OrchestrationError> {
×
276
        self.write_internal_config()
×
277
            .map_err(|e| InternalError(Box::new(e)))?;
×
278
        let pipeline_home_dir = get_pipeline_dir(self.config.to_owned());
×
279
        let api_dir = get_api_dir(self.config.to_owned());
×
280
        let cache_dir = get_cache_dir(self.config.to_owned());
×
281

×
282
        info!(
×
283
            "Initiating app: {}",
×
284
            get_colored_text(&self.config.app_name, "35")
×
285
        );
×
286
        if api_dir.exists() || pipeline_home_dir.exists() || cache_dir.exists() {
×
287
            if force {
×
288
                self.clean()?;
×
289
            } else {
×
290
                return Err(OrchestrationError::InitializationFailed(
×
291
                    self.config.home_dir.to_string(),
×
292
                ));
×
293
            }
×
294
        }
×
295

×
296
        info!(
×
297
            "Home dir: {}",
×
298
            get_colored_text(&self.config.home_dir, "35")
×
299
        );
×
300
        if let Some(api_config) = &self.config.api {
×
301
            print_api_config(api_config)
×
302
        }
×
303

×
304
        print_api_endpoints(&self.config.endpoints);
×
305
        validate_endpoints(&self.config.endpoints)?;
×
306

×
307
        let cache_endpoints: Vec<CacheEndpoint> = self.get_cache_endpoints(cache_dir)?;
×
308

×
309
        let builder = PipelineBuilder::new(
×
310
            self.config.clone(),
×
311
            cache_endpoints,
×
312
            Arc::new(AtomicBool::new(true)),
×
313
            pipeline_home_dir.clone(),
×
314
        );
×
315

×
316
        // Api Path
×
317
        let generated_path = api_dir.join("generated");
×
318
        if !generated_path.exists() {
×
319
            fs::create_dir_all(generated_path.clone()).map_err(|e| InternalError(Box::new(e)))?;
×
320
        }
×
321

×
322
        // Pipeline path
×
323
        fs::create_dir_all(pipeline_home_dir.clone()).map_err(|e| {
×
324
            OrchestrationError::PipelineDirectoryInitFailed(
×
325
                pipeline_home_dir.to_string_lossy().to_string(),
×
326
                e,
×
327
            )
×
328
        })?;
×
329
        let api_security = get_api_security_config(self.config.clone());
×
330
        let flags = get_flags(self.config.clone());
×
331
        let settings = CacheSinkSettings::new(flags, api_security);
×
332
        let dag = builder.build(None, generated_path.clone(), settings)?.0;
×
333
        let dag_schemas = DagSchemas::new(&dag)?;
×
334
        // Every sink will initialize its schema in sink and also in a proto file.
×
335
        dag_schemas.prepare()?;
×
336

×
337
        let mut resources = Vec::new();
×
338
        for e in &self.config.endpoints {
×
339
            resources.push(e.name.clone());
×
340
        }
×
341

×
342
        // Copy common service to be included in descriptor.
×
343
        resources.push("common".to_string());
×
344

×
345
        ProtoGenerator::copy_common(&generated_path)
×
346
            .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?;
×
347
        // Generate a descriptor based on all proto files generated within sink.
×
348
        ProtoGenerator::generate_descriptor(&generated_path, resources)
×
349
            .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?;
×
350

×
351
        Ok(())
×
352
    }
×
353

×
354
    // Cleaning the entire folder as there will be inconsistencies
×
355
    // between pipeline, cache and generated proto files.
×
356
    fn clean(&mut self) -> Result<(), OrchestrationError> {
×
357
        let home_dir = PathBuf::from(self.config.home_dir.clone());
×
358
        if home_dir.exists() {
×
359
            fs::remove_dir_all(&home_dir).map_err(|e| InternalError(Box::new(e)))?;
×
360
        };
×
361
        Ok(())
×
362
    }
×
363
}
×
364

365
impl SimpleOrchestrator {
×
366
    fn get_cache_endpoints(
×
367
        &self,
×
368
        cache_dir: PathBuf,
×
369
    ) -> Result<Vec<CacheEndpoint>, OrchestrationError> {
×
370
        let mut cache_endpoints = Vec::new();
×
371
        for e in &self.config.endpoints {
×
372
            let mut cache_common_options = self.cache_common_options.clone();
×
373
            cache_common_options.set_path(cache_dir.join(e.name.clone()));
×
374
            cache_endpoints.push(CacheEndpoint {
×
375
                cache: Arc::new(
×
376
                    LmdbCache::new(CacheOptions {
×
377
                        common: cache_common_options,
×
378
                        kind: CacheOptionsKind::Write(self.cache_write_options.clone()),
×
379
                    })
×
380
                    .map_err(|e| OrchestrationError::InternalError(Box::new(e)))?,
×
381
                ),
×
382
                endpoint: e.to_owned(),
×
383
            })
×
384
        }
×
385
        Ok(cache_endpoints)
×
386
    }
×
387
}
×
388

×
389
pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
×
390
    statement_to_pipeline(&sql, &mut AppPipeline::new(), None).map_or_else(
×
391
        |e| {
×
392
            error!(
×
393
                "[sql][{}] Transforms validation error: {}",
×
394
                get_colored_text("X", "31"),
×
395
                e
×
396
            );
×
397
            Err(e)
×
398
        },
×
399
        |_| {
×
400
            info!(
×
401
                "[sql][{}]  Transforms validation completed",
×
402
                get_colored_text("✓", "32")
×
403
            );
×
404
            Ok(())
×
405
        },
×
406
    )
×
407
}
×
408

×
409
pub fn validate_endpoints(_endpoints: &[ApiEndpoint]) -> Result<(), OrchestrationError> {
×
410
    Ok(())
×
411
}
×
412

×
413
fn print_api_config(api_config: &ApiConfig) {
×
414
    info!("[API] {}", get_colored_text("Configuration", "35"));
×
415
    let mut table_parent = Table::new();
×
416

×
417
    table_parent.add_row(row!["Type", "IP", "Port"]);
×
418
    if let Some(rest_config) = &api_config.rest {
×
419
        table_parent.add_row(row!["REST", rest_config.host, rest_config.port]);
×
420
    }
×
421

×
422
    if let Some(grpc_config) = &api_config.grpc {
×
423
        table_parent.add_row(row!["GRPC", grpc_config.host, grpc_config.port]);
×
424
    }
×
425

×
426
    table_parent.printstd();
×
427
}
×
428

×
429
fn print_api_endpoints(endpoints: &Vec<ApiEndpoint>) {
×
430
    info!("[API] {}", get_colored_text("Endpoints", "35"));
×
431
    let mut table_parent = Table::new();
×
432

×
433
    table_parent.add_row(row!["Path", "Name", "Sql"]);
×
434
    for endpoint in endpoints {
×
435
        table_parent.add_row(row![endpoint.path, endpoint.name]);
×
436
    }
×
437

×
438
    table_parent.printstd();
×
439
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc