• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5972853941

25 Aug 2023 06:52AM UTC coverage: 76.247% (+0.8%) from 75.446%
5972853941

push

github

web-flow
feat: make probabilistic optimizations optional and tunable in the YAML config (#1912)

Probabilistic optimization sacrifices accuracy in order to reduce memory consumption. In certain parts of the pipeline, a Bloom Filter is used ([set_processor](https://github.com/getdozer/dozer/blob/<a class=hub.com/getdozer/dozer/commit/<a class="double-link" href="https://git"><a class=hub.com/getdozer/dozer/commit/2e3ba96c3f4bdf9a691747191ab15617564d8ca2">2e3ba96c3/dozer-sql/src/pipeline/product/set/set_processor.rs#L20)), while in other parts, hash tables that store only the hash of the keys instead of the full keys are used ([aggregation_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/aggregation/processor.rs#L59) and [join_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/join/operator.rs#L57-L58)).

This commit makes these optimizations disabled by default and offers user-configurable flags to enable each of these optimizations separately.

This is an example of how to turn on probabilistic optimizations for each processor in the Dozer configuration.

```
flags:
  enable_probabilistic_optimizations:
    in_sets: true # enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false
    in_joins: true # enable probabilistic optimizations in JOIN operations; Default: false
    in_aggregations: true # enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false
```

347 of 347 new or added lines in 25 files covered. (100.0%)

47165 of 61858 relevant lines covered (76.25%)

48442.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.38
/dozer-cli/src/simple/orchestrator.rs
1
use super::executor::{run_dag_executor, Executor};
2
use crate::errors::OrchestrationError;
3
use crate::pipeline::PipelineBuilder;
4
use crate::shutdown::ShutdownReceiver;
5
use crate::simple::build;
6
use crate::simple::helper::validate_config;
7
use crate::utils::{
8
    get_api_security_config, get_app_grpc_config, get_cache_manager_options,
9
    get_checkpoint_factory_options, get_executor_options, get_grpc_config, get_rest_config,
10
    get_storage_config,
11
};
12

13
use crate::{flatten_join_handle, join_handle_map_err};
14
use dozer_api::auth::{Access, Authorizer};
15
use dozer_api::grpc::internal::internal_pipeline_server::start_internal_pipeline_server;
16
use dozer_api::{grpc, rest, CacheEndpoint};
17
use dozer_cache::cache::LmdbRwCacheManager;
18
use dozer_cache::dozer_log::home_dir::HomeDir;
19
use dozer_core::app::AppPipeline;
20
use dozer_core::dag_schemas::DagSchemas;
21
use tokio::select;
22

23
use crate::console_helper::get_colored_text;
24
use crate::console_helper::GREEN;
25
use crate::console_helper::PURPLE;
26
use crate::console_helper::RED;
27
use dozer_core::errors::ExecutionError;
28
use dozer_ingestion::connectors::{get_connector, SourceSchema, TableInfo};
29
use dozer_sql::pipeline::builder::statement_to_pipeline;
30
use dozer_sql::pipeline::errors::PipelineError;
31
use dozer_types::crossbeam::channel::{self, Sender};
32
use dozer_types::indicatif::{MultiProgress, ProgressDrawTarget};
33
use dozer_types::log::info;
34
use dozer_types::models::config::Config;
35
use dozer_types::tracing::error;
36
use futures::stream::FuturesUnordered;
37
use futures::{FutureExt, StreamExt, TryFutureExt};
38
use metrics::{describe_counter, describe_histogram};
39
use std::collections::HashMap;
40
use std::fs;
41
use std::path::PathBuf;
42

43
use std::sync::Arc;
44
use std::thread;
45
use tokio::runtime::Runtime;
46
use tokio::sync::broadcast;
47

48
#[derive(Clone)]
60✔
49
pub struct SimpleOrchestrator {
50
    pub config: Config,
51
    pub runtime: Arc<Runtime>,
52
    pub multi_pb: MultiProgress,
53
}
54

55
impl SimpleOrchestrator {
56
    pub fn new(config: Config, runtime: Arc<Runtime>, enable_progress: bool) -> Self {
30✔
57
        let progress_draw_target = if enable_progress && atty::is(atty::Stream::Stderr) {
30✔
58
            ProgressDrawTarget::stderr()
×
59
        } else {
60
            ProgressDrawTarget::hidden()
30✔
61
        };
62

63
        Self {
30✔
64
            config,
30✔
65
            runtime,
30✔
66
            multi_pb: MultiProgress::with_draw_target(progress_draw_target),
30✔
67
        }
30✔
68
    }
30✔
69

70
    pub fn run_api(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
30✔
71
        describe_histogram!(
30✔
72
            dozer_api::API_LATENCY_HISTOGRAM_NAME,
×
73
            "The api processing latency in seconds"
×
74
        );
75
        describe_counter!(
30✔
76
            dozer_api::API_REQUEST_COUNTER_NAME,
×
77
            "Number of requests processed by the api"
×
78
        );
79
        self.runtime.block_on(async {
30✔
80
            let mut futures = FuturesUnordered::new();
30✔
81

30✔
82
            // Open `RoCacheEndpoint`s. Streaming operations if necessary.
30✔
83
            let flags = self.config.flags.clone().unwrap_or_default();
30✔
84
            let (operations_sender, operations_receiver) = if flags.dynamic {
30✔
85
                let (sender, receiver) = broadcast::channel(16);
30✔
86
                (Some(sender), Some(receiver))
30✔
87
            } else {
88
                (None, None)
×
89
            };
90

91
            let internal_grpc_config = get_app_grpc_config(&self.config);
30✔
92
            let app_server_addr = format!(
30✔
93
                "http://{}:{}",
30✔
94
                internal_grpc_config.host, internal_grpc_config.port
30✔
95
            );
30✔
96
            let cache_manager = Arc::new(
30✔
97
                LmdbRwCacheManager::new(get_cache_manager_options(&self.config))
30✔
98
                    .map_err(OrchestrationError::CacheInitFailed)?,
30✔
99
            );
100
            let mut cache_endpoints = vec![];
30✔
101
            for endpoint in &self.config.endpoints {
60✔
102
                let (cache_endpoint, handle) = select! {
30✔
103
                    // If we're shutting down, the cache endpoint will fail to connect
104
                    _shutdown_future = shutdown.create_shutdown_future() => return Ok(()),
×
105
                    result = CacheEndpoint::new(
30✔
106
                        app_server_addr.clone(),
107
                        &*cache_manager,
108
                        endpoint.clone(),
109
                        Box::pin(shutdown.create_shutdown_future()),
110
                        operations_sender.clone(),
111
                        Some(self.multi_pb.clone()),
112
                    ) => result?
113
                };
114
                let cache_name = endpoint.name.clone();
30✔
115
                futures.push(flatten_join_handle(join_handle_map_err(handle, move |e| {
30✔
116
                    if e.is_map_full() {
×
117
                        OrchestrationError::CacheFull(cache_name)
×
118
                    } else {
119
                        OrchestrationError::CacheBuildFailed(cache_name, e)
×
120
                    }
121
                })));
30✔
122
                cache_endpoints.push(Arc::new(cache_endpoint));
30✔
123
            }
124

125
            // Initialize API Server
126
            let rest_config = get_rest_config(&self.config);
30✔
127
            let rest_handle = if rest_config.enabled {
30✔
128
                let security = get_api_security_config(&self.config).cloned();
30✔
129
                let cache_endpoints_for_rest = cache_endpoints.clone();
30✔
130
                let shutdown_for_rest = shutdown.create_shutdown_future();
30✔
131
                let api_server = rest::ApiServer::new(rest_config, security);
30✔
132
                let api_server = api_server
30✔
133
                    .run(cache_endpoints_for_rest, shutdown_for_rest)
30✔
134
                    .map_err(OrchestrationError::ApiInitFailed)?;
30✔
135
                tokio::spawn(api_server.map_err(OrchestrationError::RestServeFailed))
30✔
136
            } else {
137
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
138
            };
139

140
            // Initialize gRPC Server
141
            let grpc_config = get_grpc_config(&self.config);
30✔
142
            let grpc_handle = if grpc_config.enabled {
30✔
143
                let api_security = get_api_security_config(&self.config).cloned();
30✔
144
                let grpc_server = grpc::ApiServer::new(grpc_config, api_security, flags);
30✔
145
                let shutdown = shutdown.create_shutdown_future();
30✔
146
                let grpc_server = grpc_server
30✔
147
                    .run(cache_endpoints, shutdown, operations_receiver)
30✔
148
                    .await
×
149
                    .map_err(OrchestrationError::ApiInitFailed)?;
30✔
150
                tokio::spawn(async move {
30✔
151
                    grpc_server
30✔
152
                        .await
45✔
153
                        .map_err(OrchestrationError::GrpcServeFailed)
30✔
154
                })
30✔
155
            } else {
156
                tokio::spawn(async move { Ok::<(), OrchestrationError>(()) })
×
157
            };
158

159
            futures.push(flatten_join_handle(rest_handle));
30✔
160
            futures.push(flatten_join_handle(grpc_handle));
30✔
161

162
            while let Some(result) = futures.next().await {
120✔
163
                result?;
90✔
164
            }
165

166
            Ok::<(), OrchestrationError>(())
30✔
167
        })?;
30✔
168

169
        Ok(())
30✔
170
    }
30✔
171

172
    pub fn run_apps(
30✔
173
        &mut self,
30✔
174
        shutdown: ShutdownReceiver,
30✔
175
        api_notifier: Option<Sender<bool>>,
30✔
176
    ) -> Result<(), OrchestrationError> {
30✔
177
        let home_dir = HomeDir::new(self.config.home_dir.clone(), self.config.cache_dir.clone());
30✔
178
        let executor = self.runtime.block_on(Executor::new(
30✔
179
            &home_dir,
30✔
180
            &self.config.connections,
30✔
181
            &self.config.sources,
30✔
182
            self.config.sql.as_deref(),
30✔
183
            &self.config.endpoints,
30✔
184
            get_checkpoint_factory_options(&self.config),
30✔
185
            self.multi_pb.clone(),
30✔
186
        ))?;
30✔
187
        let dag_executor = self.runtime.block_on(executor.create_dag_executor(
30✔
188
            &self.runtime,
30✔
189
            get_executor_options(&self.config),
30✔
190
            shutdown.clone(),
30✔
191
            self.config.flags.clone().unwrap_or_default(),
30✔
192
        ))?;
30✔
193

194
        let app_grpc_config = get_app_grpc_config(&self.config);
30✔
195
        let internal_server_future = self
30✔
196
            .runtime
30✔
197
            .block_on(start_internal_pipeline_server(
30✔
198
                executor.endpoint_and_logs().to_vec(),
30✔
199
                &app_grpc_config,
30✔
200
                shutdown.create_shutdown_future(),
30✔
201
            ))
30✔
202
            .map_err(OrchestrationError::InternalServerFailed)?;
30✔
203

204
        if let Some(api_notifier) = api_notifier {
30✔
205
            api_notifier
30✔
206
                .send(true)
30✔
207
                .expect("Failed to notify API server");
30✔
208
        }
30✔
209

210
        let pipeline_future = self
30✔
211
            .runtime
30✔
212
            .spawn_blocking(move || run_dag_executor(dag_executor, shutdown.get_running_flag()));
30✔
213

30✔
214
        let mut futures = FuturesUnordered::new();
30✔
215
        futures.push(
30✔
216
            internal_server_future
30✔
217
                .map_err(OrchestrationError::GrpcServeFailed)
30✔
218
                .boxed(),
30✔
219
        );
30✔
220
        futures.push(flatten_join_handle(pipeline_future).boxed());
30✔
221

30✔
222
        self.runtime.block_on(async move {
30✔
223
            while let Some(result) = futures.next().await {
190✔
224
                result?;
60✔
225
            }
226
            Ok(())
30✔
227
        })
30✔
228
    }
30✔
229

230
    #[allow(clippy::type_complexity)]
231
    pub fn list_connectors(
×
232
        &self,
×
233
    ) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError> {
×
234
        self.runtime.block_on(async {
×
235
            let mut schema_map = HashMap::new();
×
236
            for connection in &self.config.connections {
×
237
                let connector = get_connector(connection.clone())?;
×
238
                let schema_tuples = connector.list_all_schemas().await?;
×
239
                schema_map.insert(connection.name.clone(), schema_tuples);
×
240
            }
241

242
            Ok(schema_map)
×
243
        })
×
244
    }
×
245

246
    pub fn generate_token(&self) -> Result<String, OrchestrationError> {
247
        if let Some(api_config) = &self.config.api {
×
248
            if let Some(api_security) = &api_config.api_security {
×
249
                match api_security {
×
250
                    dozer_types::models::api_security::ApiSecurity::Jwt(secret) => {
×
251
                        let auth = Authorizer::new(secret, None, None);
×
252
                        let token = auth
×
253
                            .generate_token(Access::All, None)
×
254
                            .map_err(OrchestrationError::GenerateTokenFailed)?;
×
255
                        return Ok(token);
×
256
                    }
257
                }
258
            }
×
259
        }
×
260
        Err(OrchestrationError::MissingSecurityConfig)
×
261
    }
×
262

263
    pub fn build(
30✔
264
        &mut self,
30✔
265
        force: bool,
30✔
266
        shutdown: ShutdownReceiver,
30✔
267
    ) -> Result<(), OrchestrationError> {
30✔
268
        let home_dir = HomeDir::new(self.config.home_dir.clone(), self.config.cache_dir.clone());
30✔
269

30✔
270
        info!(
30✔
271
            "Initiating app: {}",
×
272
            get_colored_text(&self.config.app_name, PURPLE)
×
273
        );
274
        if force {
30✔
275
            self.clean()?;
×
276
        }
30✔
277
        validate_config(&self.config)?;
30✔
278

279
        // Calculate schemas.
280
        let endpoint_and_logs = self
30✔
281
            .config
30✔
282
            .endpoints
30✔
283
            .iter()
30✔
284
            // We're not really going to run the pipeline, so we don't create logs.
30✔
285
            .map(|endpoint| (endpoint.clone(), None))
30✔
286
            .collect();
30✔
287
        let builder = PipelineBuilder::new(
30✔
288
            &self.config.connections,
30✔
289
            &self.config.sources,
30✔
290
            self.config.sql.as_deref(),
30✔
291
            endpoint_and_logs,
30✔
292
            self.multi_pb.clone(),
30✔
293
            self.config.flags.clone().unwrap_or_default(),
30✔
294
        );
30✔
295
        let dag = self
30✔
296
            .runtime
30✔
297
            .block_on(builder.build(&self.runtime, shutdown))?;
30✔
298
        // Populate schemas.
299
        let dag_schemas = DagSchemas::new(dag)?;
30✔
300

301
        // Get current contract.
302
        let enable_token = self
30✔
303
            .config
30✔
304
            .api
30✔
305
            .as_ref()
30✔
306
            .map(|api| api.api_security.is_some())
30✔
307
            .unwrap_or(false);
30✔
308
        let enable_on_event = self
30✔
309
            .config
30✔
310
            .flags
30✔
311
            .as_ref()
30✔
312
            .map(|flags| flags.push_events)
30✔
313
            .unwrap_or(false);
30✔
314
        let contract = build::Contract::new(
30✔
315
            &dag_schemas,
30✔
316
            &self.config.endpoints,
30✔
317
            enable_token,
30✔
318
            enable_on_event,
30✔
319
        )?;
30✔
320

321
        // Run build
322
        let storage_config = get_storage_config(&self.config);
30✔
323
        self.runtime
30✔
324
            .block_on(build::build(&home_dir, &contract, &storage_config))?;
30✔
325

326
        Ok(())
30✔
327
    }
30✔
328

329
    // Cleaning the entire folder as there will be inconsistencies
330
    // between pipeline, cache and generated proto files.
331
    pub fn clean(&mut self) -> Result<(), OrchestrationError> {
×
332
        let cache_dir = PathBuf::from(self.config.cache_dir.clone());
×
333
        if cache_dir.exists() {
×
334
            fs::remove_dir_all(&cache_dir)
×
335
                .map_err(|e| ExecutionError::FileSystemError(cache_dir, e))?;
×
336
        };
×
337

338
        let home_dir = PathBuf::from(self.config.home_dir.clone());
×
339
        if home_dir.exists() {
×
340
            fs::remove_dir_all(&home_dir)
×
341
                .map_err(|e| ExecutionError::FileSystemError(home_dir, e))?;
×
342
        };
×
343

344
        Ok(())
×
345
    }
×
346

347
    pub fn run_all(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError> {
30✔
348
        let mut dozer_api = self.clone();
30✔
349

30✔
350
        let (tx, rx) = channel::unbounded::<bool>();
30✔
351

30✔
352
        self.build(false, shutdown.clone())?;
30✔
353

354
        let mut dozer_pipeline = self.clone();
30✔
355
        let pipeline_shutdown = shutdown.clone();
30✔
356
        let pipeline_thread =
30✔
357
            thread::spawn(move || dozer_pipeline.run_apps(pipeline_shutdown, Some(tx)));
30✔
358

30✔
359
        // Wait for pipeline to initialize caches before starting api server
30✔
360
        if rx.recv().is_err() {
30✔
361
            // This means the pipeline thread returned before sending a message. Either an error happened or it panicked.
362
            return match pipeline_thread.join() {
×
363
                Ok(Err(e)) => Err(e),
×
364
                Ok(Ok(())) => panic!("An error must have happened"),
×
365
                Err(e) => {
×
366
                    std::panic::panic_any(e);
×
367
                }
368
            };
369
        }
30✔
370

30✔
371
        dozer_api.run_api(shutdown)?;
30✔
372

373
        // wait for pipeline thread to shutdown gracefully
374
        pipeline_thread.join().unwrap()
30✔
375
    }
30✔
376
}
377

378
pub fn validate_sql(sql: String) -> Result<(), PipelineError> {
×
379
    statement_to_pipeline(&sql, &mut AppPipeline::new_with_default_flags(), None).map_or_else(
×
380
        |e| {
×
381
            error!(
×
382
                "[sql][{}] Transforms validation error: {}",
×
383
                get_colored_text("X", RED),
×
384
                e
×
385
            );
×
386
            Err(e)
×
387
        },
×
388
        |_| {
×
389
            info!(
×
390
                "[sql][{}]  Transforms validation completed",
×
391
                get_colored_text("✓", GREEN)
×
392
            );
393
            Ok(())
×
394
        },
×
395
    )
×
396
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc