• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6472204957

09 Oct 2023 06:57PM UTC coverage: 77.582%. First build
6472204957

push

github

web-flow
chore: init schema option at connection level optionally (#2145)

* chore: init schema option at connection level

* chore: fix clippy

40 of 40 new or added lines in 11 files covered. (100.0%)

51962 of 66977 relevant lines covered (77.58%)

165501.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/cli/init.rs
1
use crate::errors::{CliError, OrchestrationError};
2
use dozer_types::constants::{DEFAULT_LAMBDAS_DIRECTORY, DEFAULT_QUERIES_DIRECTORY};
3
use dozer_types::log::warn;
4
use dozer_types::models::config::{default_cache_dir, default_home_dir, get_cache_dir};
5
use dozer_types::{
6
    constants::DEFAULT_CONFIG_PATH,
7
    log::info,
8
    models::ingestion_types::{
9
        EthConfig, EthFilter, EthLogConfig, EthProviderConfig, MongodbConfig, MySQLConfig,
10
        S3Details, S3Storage, SnowflakeConfig,
11
    },
12
    models::{
13
        config::Config,
14
        connection::{Connection, ConnectionConfig, PostgresConfig},
15
    },
16
    serde_yaml,
17
};
18
use rustyline::history::DefaultHistory;
19
use rustyline::{
20
    completion::{Completer, Pair},
21
    Context,
22
};
23
use rustyline::{error::ReadlineError, Editor};
24
use rustyline_derive::{Helper, Highlighter, Hinter, Validator};
25
use std::path::{Path, PathBuf};
26

27
#[derive(Helper, Highlighter, Hinter, Validator)]
28
pub struct InitHelper {}
29

30
impl Completer for InitHelper {
31
    type Candidate = Pair;
32
    fn complete(
×
33
        &self,
×
34
        line: &str,
×
35
        _pos: usize,
×
36
        _ctx: &Context,
×
37
    ) -> rustyline::Result<(usize, Vec<Self::Candidate>)> {
×
38
        let line = format!("{line}_");
×
39
        let mut tokens = line.split_whitespace();
×
40
        let mut last_token = String::from(tokens.next_back().unwrap());
×
41
        last_token.pop();
×
42
        let candidates: Vec<String> = vec![
×
43
            "Postgres".to_owned(),
×
44
            "Ethereum".to_owned(),
×
45
            "Snowflake".to_owned(),
×
46
            "MySQL".to_owned(),
×
47
            "S3".to_owned(),
×
48
            "MongoDB".to_owned(),
×
49
        ];
×
50
        let mut match_pair: Vec<Pair> = candidates
×
51
            .iter()
×
52
            .filter_map(|f| {
×
53
                if f.to_lowercase().starts_with(&last_token.to_lowercase()) {
×
54
                    Some(Pair {
×
55
                        display: f.to_owned(),
×
56
                        replacement: f.to_owned(),
×
57
                    })
×
58
                } else {
59
                    None
×
60
                }
61
            })
×
62
            .collect();
×
63
        if match_pair.is_empty() {
×
64
            match_pair = vec![Pair {
×
65
                display: "Postgres".to_owned(),
×
66
                replacement: "Postgres".to_owned(),
×
67
            }]
×
68
        }
×
69
        Ok((line.len() - last_token.len() - 1, match_pair))
×
70
    }
×
71
}
72

73
pub fn generate_connection(connection_name: &str) -> Connection {
×
74
    match connection_name {
×
75
        "Snowflake" | "snowflake" | "S" | "s" => {
×
76
            let snowflake_config = SnowflakeConfig {
×
77
                server: "<account_name>.<region_id>.snowflakecomputing.com".to_owned(),
×
78
                port: "443".to_owned(),
×
79
                user: "bob".to_owned(),
×
80
                password: "password".to_owned(),
×
81
                database: "database".to_owned(),
×
82
                schema: "schema".to_owned(),
×
83
                warehouse: "warehouse".to_owned(),
×
84
                driver: Some("SnowflakeDSIIDriver".to_owned()),
×
85
                role: "role".to_owned(),
×
86
                poll_interval_seconds: None,
×
87
            };
×
88
            let connection: Connection = Connection {
×
89
                name: "snowflake".to_owned(),
×
90
                config: ConnectionConfig::Snowflake(snowflake_config),
×
91
            };
×
92
            connection
×
93
        }
94
        "Ethereum" | "ethereum" | "E" | "e" => {
×
95
            let eth_filter = EthFilter {
×
96
                from_block: Some(0),
×
97
                to_block: None,
×
98
                addresses: vec![],
×
99
                topics: vec![],
×
100
            };
×
101
            let ethereum_config = EthConfig {
×
102
                provider: EthProviderConfig::Log(EthLogConfig {
×
103
                    wss_url: "wss://link".to_owned(),
×
104
                    filter: Some(eth_filter),
×
105
                    contracts: vec![],
×
106
                }),
×
107
            };
×
108
            let connection: Connection = Connection {
×
109
                name: "ethereum".to_owned(),
×
110
                config: ConnectionConfig::Ethereum(ethereum_config),
×
111
            };
×
112
            connection
×
113
        }
114
        "MySQL" | "MYSQL" | "mysql" | "Mysql" | "My" => {
×
115
            let mysql_config = MySQLConfig {
×
116
                url: "mysql://<user>:<password>@localhost:3306/<database>".to_owned(),
×
117
                server_id: Some((1).to_owned()),
×
118
            };
×
119
            let connection: Connection = Connection {
×
120
                name: "mysql".to_owned(),
×
121
                config: ConnectionConfig::MySQL(mysql_config),
×
122
            };
×
123
            connection
×
124
        }
125
        "S3" | "s3" => {
×
126
            let s3_details = S3Details {
×
127
                access_key_id: "<your_access_key_id>".to_owned(),
×
128
                secret_access_key: "<your_secret_access_key>".to_owned(),
×
129
                region: "<your_region>".to_owned(),
×
130
                bucket_name: "<your_bucket_name>".to_owned(),
×
131
            };
×
132
            let s3_config = S3Storage {
×
133
                details: s3_details,
×
134
                tables: vec![],
×
135
            };
×
136
            let connection: Connection = Connection {
×
137
                name: "s3".to_owned(),
×
138
                config: ConnectionConfig::S3Storage(s3_config),
×
139
            };
×
140
            connection
×
141
        }
142
        "MongoDB" | "mongodb" | "MONGODB" | "Mongodb" | "Mo" | "MO" => {
×
143
            let mongo_config = MongodbConfig {
×
144
                connection_string:
×
145
                    "mongodb://<username>:<password>@localhost:27017/<database_name>".to_owned(),
×
146
            };
×
147
            let connection: Connection = Connection {
×
148
                name: "mongodb".to_owned(),
×
149
                config: ConnectionConfig::MongoDB(mongo_config),
×
150
            };
×
151
            connection
×
152
        }
153
        _ => {
154
            let postgres_config = PostgresConfig {
×
155
                user: Some("postgres".to_owned()),
×
156
                password: Some("postgres".to_owned()),
×
157
                host: Some("localhost".to_owned()),
×
158
                port: Some(5432),
×
159
                database: Some("users".to_owned()),
×
160
                sslmode: None,
×
161
                connection_url: None,
×
162
                schema: None,
×
163
            };
×
164
            let connection: Connection = Connection {
×
165
                name: "postgres".to_owned(),
×
166
                config: ConnectionConfig::Postgres(postgres_config),
×
167
            };
×
168
            connection
×
169
        }
170
    }
171
}
×
172
type Question = (
173
    String,
174
    Box<dyn Fn((String, &mut Config)) -> Result<(), OrchestrationError>>,
175
);
176
pub fn generate_config_repl() -> Result<(), OrchestrationError> {
×
177
    let mut rl = Editor::<InitHelper, DefaultHistory>::new()
×
178
        .map_err(|e| OrchestrationError::CliError(CliError::ReadlineError(e)))?;
×
179
    rl.set_helper(Some(InitHelper {}));
×
180
    let mut default_config = Config {
×
181
        version: 1,
×
182
        ..Default::default()
×
183
    };
×
184
    let default_app_name = "quick-start-app";
×
185
    let questions: Vec<Question> = vec![
×
186
        (
×
187
            format!("question: App name ({:}): ", default_app_name),
×
188
            Box::new(move |(app_name, config)| {
×
189
                let app_name = app_name.trim();
×
190
                if app_name.is_empty() {
×
191
                    config.app_name = default_app_name.to_string();
×
192
                } else {
×
193
                    config.app_name = app_name.to_string();
×
194
                }
×
195
                Ok(())
×
196
            }),
×
197
        ),
×
198
        (
×
199
            format!("question: Home directory ({:}): ", default_home_dir()),
×
200
            Box::new(move |(home_dir, config)| {
×
201
                if home_dir.is_empty() {
×
202
                    config.home_dir = Some(default_home_dir());
×
203
                    config.cache_dir = Some(default_cache_dir());
×
204
                } else {
×
205
                    config.cache_dir = Some(get_cache_dir(&home_dir));
×
206
                    config.home_dir = Some(home_dir);
×
207
                }
×
208
                Ok(())
×
209
            }),
×
210
        ),
×
211
        (
×
212
            "question: Connection Type - one of: [P]ostgres, [E]thereum, [S]nowflake, [My]SQL, [S3]Storage, [Mo]ngoDB: "
×
213
                .to_string(),
×
214
            Box::new(move |(connection, config)| {
×
215
                let sample_connection = generate_connection(&connection);
×
216
                config.connections.push(sample_connection);
×
217

×
218
                Ok(())
×
219
            }),
×
220
        ),
×
221
        (
×
222
            format!("question: Config path ({:}): ", DEFAULT_CONFIG_PATH),
×
223
            Box::new(move |(yaml_path, config)| {
×
224
                let mut yaml_path = yaml_path.trim();
×
225
                if yaml_path.is_empty() {
×
226
                    yaml_path = DEFAULT_CONFIG_PATH;
×
227
                }
×
228
                let f = std::fs::OpenOptions::new()
×
229
                    .create(true)
×
230
                    .write(true)
×
231
                    .open(yaml_path)
×
232
                    .map_err(|e| {
×
233
                        OrchestrationError::CliError(CliError::FileSystem(
×
234
                            yaml_path.to_string().into(),
×
235
                            e,
×
236
                        ))
×
237
                    })?;
×
238
                serde_yaml::to_writer(f, &config)
×
239
                    .map_err(OrchestrationError::FailedToWriteConfigYaml)?;
×
240

241
                info!("Generating workspace: \n\
×
242
                \n- {} (main configuration)\n- ./queries (folder for sql queries)\n- ./lambdas (folder for lambda functions)
×
243
                \n• More details about our config: https://getdozer.io/docs/reference/configuration/introduction\
×
244
                \n• Connector & Sources: https://getdozer.io/docs/reference/configuration/connectors\
×
245
                \n• Endpoints: https://getdozer.io/docs/reference/configuration/endpoints/",
×
246
                   yaml_path.to_owned());
×
247

248
                let path = PathBuf::from(yaml_path);
×
249
                if let Some(dir) = path.parent() {
×
250
                    let queries_path = Path::new(dir).join(DEFAULT_QUERIES_DIRECTORY);
×
251
                    if let Err(_e) = std::fs::create_dir(queries_path) {
×
252
                        warn!("Cannot create queries directory");
×
253
                    }
×
254

255
                    let lambdas_path = Path::new(dir).join(DEFAULT_LAMBDAS_DIRECTORY);
×
256
                    if let Err(_e) = std::fs::create_dir(lambdas_path) {
×
257
                        warn!("Cannot create lambdas directory");
×
258
                    }
×
259
                }
×
260

261
                Ok(())
×
262
            }),
×
263
        ),
×
264
    ];
×
265
    let result = questions.iter().try_for_each(|(question, func)| {
×
266
        let readline = rl.readline(question);
×
267
        match readline {
×
268
            Ok(input) => func((input, &mut default_config)),
×
269
            Err(err) => Err(OrchestrationError::CliError(CliError::ReadlineError(err))),
×
270
        }
271
    });
×
272

×
273
    match result {
×
274
        Ok(_) => Ok(()),
×
275
        Err(e) => match e {
×
276
            OrchestrationError::CliError(CliError::ReadlineError(ReadlineError::Interrupted)) => {
277
                info!("Exiting..");
×
278
                Ok(())
×
279
            }
280
            OrchestrationError::CliError(CliError::ReadlineError(ReadlineError::Eof)) => {
281
                info!("CTRL-D - exiting...");
×
282
                Ok(())
×
283
            }
284
            _ => Err(e),
×
285
        },
286
    }
287
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc