• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5672512448

pending completion
5672512448

push

github

web-flow
chore: Change `make_from!` in `from_arrow` to func to improve readability (#1792)

31 of 31 new or added lines in 4 files covered. (100.0%)

45630 of 59777 relevant lines covered (76.33%)

38810.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.23
/dozer-cli/src/lib.rs
1
pub mod cli;
×
2
pub mod errors;
3
pub mod pipeline;
4
pub mod shutdown;
5
pub mod simple;
6
mod ui_helper;
7
use dozer_core::{app::AppPipeline, errors::ExecutionError};
8
use dozer_ingestion::connectors::SourceSchema;
9
use dozer_sql::pipeline::{builder::statement_to_pipeline, errors::PipelineError};
10
use dozer_types::{crossbeam::channel::Sender, log::debug};
11
use errors::OrchestrationError;
12
use shutdown::{ShutdownReceiver, ShutdownSender};
13
use std::{
14
    backtrace::{Backtrace, BacktraceStatus},
15
    collections::HashMap,
16
    panic, process,
17
    thread::current,
18
};
19
use tokio::task::JoinHandle;
20
#[cfg(feature = "cloud")]
21
pub mod cloud_app_context;
22
#[cfg(feature = "cloud")]
23
mod cloud_helper;
24
pub mod config_helper;
25
pub mod console_helper;
26
#[cfg(feature = "cloud")]
27
mod progress_printer;
28
#[cfg(test)]
29
mod tests;
30
mod utils;
31

32
pub trait Orchestrator {
33
    fn build(&mut self, force: bool) -> Result<(), OrchestrationError>;
34
    fn clean(&mut self) -> Result<(), OrchestrationError>;
35
    fn run_all(
36
        &mut self,
37
        shutdown: ShutdownReceiver,
38
        err_threshold: Option<u32>,
39
    ) -> Result<(), OrchestrationError>;
40
    fn run_api(&mut self, shutdown: ShutdownReceiver) -> Result<(), OrchestrationError>;
41
    fn run_apps(
42
        &mut self,
43
        shutdown: ShutdownReceiver,
44
        api_notifier: Option<Sender<bool>>,
45
        err_threshold: Option<u32>,
46
    ) -> Result<(), OrchestrationError>;
47
    #[allow(clippy::type_complexity)]
48
    fn list_connectors(
49
        &self,
50
    ) -> Result<HashMap<String, (Vec<TableInfo>, Vec<SourceSchema>)>, OrchestrationError>;
51
    fn generate_token(&self) -> Result<String, OrchestrationError>;
52
}
53

54
#[cfg(feature = "cloud")]
55
pub trait CloudOrchestrator {
56
    fn deploy(
57
        &mut self,
58
        cloud: Cloud,
59
        deploy: DeployCommandArgs,
60
        config_paths: Vec<String>,
61
    ) -> Result<(), OrchestrationError>;
62
    fn delete(&mut self, cloud: Cloud) -> Result<(), OrchestrationError>;
63
    fn list(&mut self, cloud: Cloud, list: ListCommandArgs) -> Result<(), OrchestrationError>;
64
    fn status(&mut self, cloud: Cloud) -> Result<(), OrchestrationError>;
65
    fn monitor(&mut self, cloud: Cloud) -> Result<(), OrchestrationError>;
66
    fn trace_logs(&mut self, cloud: Cloud, logs: LogCommandArgs) -> Result<(), OrchestrationError>;
67
    fn login(
68
        &mut self,
69
        cloud: Cloud,
70
        organisation_name: Option<String>,
71
    ) -> Result<(), OrchestrationError>;
72
    fn execute_secrets_command(
73
        &mut self,
74
        cloud: Cloud,
75
        command: SecretsCommand,
76
    ) -> Result<(), OrchestrationError>;
77
}
78

79
// Re-exports
80
pub use dozer_ingestion::{
81
    connectors::{get_connector, TableInfo},
82
    errors::ConnectorError,
83
};
84
pub use dozer_sql::pipeline::builder::QueryContext;
85
pub use ui_helper::config_to_ui_dag;
86
pub fn wrapped_statement_to_pipeline(sql: &str) -> Result<QueryContext, PipelineError> {
×
87
    let mut pipeline = AppPipeline::new();
×
88
    statement_to_pipeline(sql, &mut pipeline, None)
×
89
}
×
90

91
#[cfg(feature = "cloud")]
92
use crate::cli::cloud::{
93
    Cloud, DeployCommandArgs, ListCommandArgs, LogCommandArgs, SecretsCommand,
94
};
95
pub use dozer_types::models::connection::Connection;
96
use dozer_types::tracing::error;
97

98
async fn flatten_join_handle(
24✔
99
    handle: JoinHandle<Result<(), OrchestrationError>>,
24✔
100
) -> Result<(), OrchestrationError> {
24✔
101
    match handle.await {
24✔
102
        Ok(Ok(_)) => Ok(()),
24✔
103
        Ok(Err(err)) => Err(err),
×
104
        Err(err) => Err(OrchestrationError::JoinError(err)),
×
105
    }
106
}
24✔
107

108
fn join_handle_map_err<E: Send + 'static>(
6✔
109
    handle: JoinHandle<Result<(), E>>,
6✔
110
    f: impl FnOnce(E) -> OrchestrationError + Send + 'static,
6✔
111
) -> JoinHandle<Result<(), OrchestrationError>> {
6✔
112
    tokio::spawn(async move {
6✔
113
        match handle.await {
6✔
114
            Ok(Ok(_)) => Ok(()),
6✔
115
            Ok(Err(err)) => Err(f(err)),
×
116
            Err(err) => Err(OrchestrationError::JoinError(err)),
×
117
        }
118
    })
6✔
119
}
6✔
120

121
pub fn set_panic_hook() {
2✔
122
    panic::set_hook(Box::new(move |panic_info| {
2✔
123
        // All the orchestrator errors are captured here
124
        if let Some(e) = panic_info.payload().downcast_ref::<OrchestrationError>() {
×
125
            error!("{}", e);
×
126
            debug!("{:?}", e);
×
127
        // All the connector errors are captured here
128
        } else if let Some(e) = panic_info.payload().downcast_ref::<ConnectorError>() {
×
129
            error!("{}", e);
×
130
            debug!("{:?}", e);
×
131
        // All the pipeline errors are captured here
132
        } else if let Some(e) = panic_info.payload().downcast_ref::<ExecutionError>() {
×
133
            error!("{}", e);
×
134
            debug!("{:?}", e);
×
135
        // If any errors are sent as strings.
136
        } else if let Some(s) = panic_info.payload().downcast_ref::<&str>() {
×
137
            error!("{s:?}");
×
138
        } else {
139
            error!("{}", panic_info);
×
140
        }
141

142
        let backtrace = Backtrace::capture();
×
143
        if backtrace.status() == BacktraceStatus::Captured {
×
144
            error!(
×
145
                "thread '{}' panicked at '{}'\n stack backtrace:\n{}",
×
146
                current()
×
147
                    .name()
×
148
                    .map(ToString::to_string)
×
149
                    .unwrap_or_default(),
×
150
                panic_info
×
151
                    .location()
×
152
                    .map(ToString::to_string)
×
153
                    .unwrap_or_default(),
×
154
                backtrace
×
155
            );
×
156
        }
×
157

158
        process::exit(1);
×
159
    }));
2✔
160
}
2✔
161

162
pub fn set_ctrl_handler(shutdown_sender: ShutdownSender) {
×
163
    let mut shutdown = Some(shutdown_sender);
×
164
    ctrlc::set_handler(move || {
×
165
        if let Some(shutdown) = shutdown.take() {
×
166
            shutdown.shutdown()
×
167
        }
×
168
    })
×
169
    .expect("Error setting Ctrl-C handler");
×
170
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc