• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5972853941

25 Aug 2023 06:52AM UTC coverage: 76.247% (+0.8%) from 75.446%
5972853941

push

github

web-flow
feat: make probabilistic optimizations optional and tunable in the YAML config (#1912)

Probabilistic optimization sacrifices accuracy in order to reduce memory consumption. In certain parts of the pipeline, a Bloom Filter is used ([set_processor](https://github.com/getdozer/dozer/blob/<a class=hub.com/getdozer/dozer/commit/<a class="double-link" href="https://git"><a class=hub.com/getdozer/dozer/commit/2e3ba96c3f4bdf9a691747191ab15617564d8ca2">2e3ba96c3/dozer-sql/src/pipeline/product/set/set_processor.rs#L20)), while in other parts, hash tables that store only the hash of the keys instead of the full keys are used ([aggregation_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/aggregation/processor.rs#L59) and [join_processor](https://github.com/getdozer/dozer/blob/2e3ba96c3f4bdf9a691747191ab15617564d8ca2/dozer-sql/src/pipeline/product/join/operator.rs#L57-L58)).

This commit makes these optimizations disabled by default and offers user-configurable flags to enable each of these optimizations separately.

This is an example of how to turn on probabilistic optimizations for each processor in the Dozer configuration.

```
flags:
  enable_probabilistic_optimizations:
    in_sets: true # enable probabilistic optimizations in set operations (UNION, EXCEPT, INTERSECT); Default: false
    in_joins: true # enable probabilistic optimizations in JOIN operations; Default: false
    in_aggregations: true # enable probabilistic optimizations in aggregations (SUM, COUNT, MIN, etc.); Default: false
```

347 of 347 new or added lines in 25 files covered. (100.0%)

47165 of 61858 relevant lines covered (76.25%)

48442.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.69
/dozer-cli/src/lib.rs
1
pub mod cli;
2
pub mod errors;
3
pub mod live;
4
pub mod pipeline;
5
pub mod shutdown;
6
pub mod simple;
7
mod ui_helper;
8
use dozer_core::{app::AppPipeline, errors::ExecutionError};
9
use dozer_sql::pipeline::{builder::statement_to_pipeline, errors::PipelineError};
10
use dozer_types::log::debug;
11
use errors::OrchestrationError;
12
use shutdown::ShutdownSender;
13
use std::{
14
    backtrace::{Backtrace, BacktraceStatus},
15
    panic, process,
16
    thread::current,
17
};
18
use tokio::task::JoinHandle;
19
#[cfg(feature = "cloud")]
20
pub mod cloud_app_context;
21
#[cfg(feature = "cloud")]
22
mod cloud_helper;
23
pub mod config_helper;
24
pub mod console_helper;
25
#[cfg(feature = "cloud")]
26
mod progress_printer;
27
#[cfg(test)]
28
mod tests;
29
mod utils;
30

31
#[cfg(feature = "cloud")]
32
pub trait CloudOrchestrator {
33
    fn deploy(
34
        &mut self,
35
        cloud: Cloud,
36
        deploy: DeployCommandArgs,
37
        config_paths: Vec<String>,
38
    ) -> Result<(), OrchestrationError>;
39
    fn delete(&mut self, cloud: Cloud) -> Result<(), OrchestrationError>;
40
    fn list(&mut self, cloud: Cloud, list: ListCommandArgs) -> Result<(), OrchestrationError>;
41
    fn status(&mut self, cloud: Cloud) -> Result<(), OrchestrationError>;
42
    fn monitor(&mut self, cloud: Cloud) -> Result<(), OrchestrationError>;
43
    fn trace_logs(&mut self, cloud: Cloud, logs: LogCommandArgs) -> Result<(), OrchestrationError>;
44
    fn login(
45
        &mut self,
46
        cloud: Cloud,
47
        organisation_slug: Option<String>,
48
        profile: Option<String>,
49
        client_id: Option<String>,
50
        client_secret: Option<String>,
51
    ) -> Result<(), OrchestrationError>;
52
    fn execute_secrets_command(
53
        &mut self,
54
        cloud: Cloud,
55
        command: SecretsCommand,
56
    ) -> Result<(), OrchestrationError>;
57
}
58

59
// Re-exports
60
pub use dozer_ingestion::{
61
    connectors::{get_connector, TableInfo},
62
    errors::ConnectorError,
63
};
64
pub use dozer_sql::pipeline::builder::QueryContext;
65
pub use ui_helper::config_to_ui_dag;
66
pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, PipelineError> {
×
67
    let mut pipeline = AppPipeline::new_with_default_flags();
×
68
    statement_to_pipeline(sql, &mut pipeline, None)
×
69
}
×
70

71
#[cfg(feature = "cloud")]
72
use crate::cli::cloud::{
73
    Cloud, DeployCommandArgs, ListCommandArgs, LogCommandArgs, SecretsCommand,
74
};
75
pub use dozer_types::models::connection::Connection;
76
use dozer_types::tracing::error;
77

78
async fn flatten_join_handle(
120✔
79
    handle: JoinHandle<Result<(), OrchestrationError>>,
120✔
80
) -> Result<(), OrchestrationError> {
120✔
81
    match handle.await {
120✔
82
        Ok(Ok(_)) => Ok(()),
120✔
83
        Ok(Err(err)) => Err(err),
×
84
        Err(err) => Err(OrchestrationError::JoinError(err)),
×
85
    }
86
}
120✔
87

88
fn join_handle_map_err<E: Send + 'static>(
30✔
89
    handle: JoinHandle<Result<(), E>>,
30✔
90
    f: impl FnOnce(E) -> OrchestrationError + Send + 'static,
30✔
91
) -> JoinHandle<Result<(), OrchestrationError>> {
30✔
92
    tokio::spawn(async move {
30✔
93
        match handle.await {
30✔
94
            Ok(Ok(_)) => Ok(()),
30✔
95
            Ok(Err(err)) => Err(f(err)),
×
96
            Err(err) => Err(OrchestrationError::JoinError(err)),
×
97
        }
98
    })
30✔
99
}
30✔
100

101
pub fn set_panic_hook() {
5✔
102
    panic::set_hook(Box::new(move |panic_info| {
5✔
103
        // All the orchestrator errors are captured here
104
        if let Some(e) = panic_info.payload().downcast_ref::<OrchestrationError>() {
×
105
            error!("{}", e);
×
106
            debug!("{:?}", e);
×
107
        // All the connector errors are captured here
108
        } else if let Some(e) = panic_info.payload().downcast_ref::<ConnectorError>() {
×
109
            error!("{}", e);
×
110
            debug!("{:?}", e);
×
111
        // All the pipeline errors are captured here
112
        } else if let Some(e) = panic_info.payload().downcast_ref::<ExecutionError>() {
×
113
            error!("{}", e);
×
114
            debug!("{:?}", e);
×
115
        // If any errors are sent as strings.
116
        } else if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
×
117
            error!("{s:?}");
×
118
        } else {
119
            error!("{}", panic_info);
×
120
        }
121

122
        let backtrace = Backtrace::capture();
×
123
        if backtrace.status() == BacktraceStatus::Captured {
×
124
            error!(
×
125
                "thread '{}' panicked at '{}'\n stack backtrace:\n{}",
×
126
                current()
×
127
                    .name()
×
128
                    .map(ToString::to_string)
×
129
                    .unwrap_or_default(),
×
130
                panic_info
×
131
                    .location()
×
132
                    .map(ToString::to_string)
×
133
                    .unwrap_or_default(),
×
134
                backtrace
×
135
            );
×
136
        }
×
137

138
        process::exit(1);
×
139
    }));
5✔
140
}
5✔
141

142
pub fn set_ctrl_handler(shutdown_sender: ShutdownSender) {
×
143
    let mut shutdown = Some(shutdown_sender);
×
144
    ctrlc::set_handler(move || {
×
145
        if let Some(shutdown) = shutdown.take() {
×
146
            shutdown.shutdown()
×
147
        }
×
148
    })
×
149
    .expect("Error setting Ctrl-C handler");
×
150
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc