• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6008064648

29 Aug 2023 04:53AM UTC coverage: 78.078%. First build
6008064648

Pull #1928

github

supergi0
added mongodb template
Pull Request #1928: Added MongoDB, MySQL and S3 templates to dozer init

39 of 39 new or added lines in 1 file covered. (100.0%)

49079 of 62859 relevant lines covered (78.08%)

49112.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-cli/src/cli/init.rs
1
use crate::errors::{CliError, OrchestrationError};
2
use dozer_types::constants::{DEFAULT_LAMBDAS_DIRECTORY, DEFAULT_QUERIES_DIRECTORY};
3
use dozer_types::log::warn;
4
use dozer_types::models::config::{default_cache_dir, default_home_dir, get_cache_dir};
5
use dozer_types::{
6
    constants::DEFAULT_CONFIG_PATH,
7
    ingestion_types::{EthConfig, EthFilter, EthLogConfig, EthProviderConfig, SnowflakeConfig, MySQLConfig, S3Details, S3Storage, MongodbConfig},
8
    log::info,
9
    models::{
10
        config::Config,
11
        connection::{Connection, ConnectionConfig, PostgresConfig},
12
    },
13
    serde_yaml,
14
};
15
use rustyline::history::DefaultHistory;
16
use rustyline::{
17
    completion::{Completer, Pair},
18
    Context,
19
};
20
use rustyline::{error::ReadlineError, Editor};
21
use rustyline_derive::{Helper, Highlighter, Hinter, Validator};
22
use std::path::{Path, PathBuf};
23

24
#[derive(Helper, Highlighter, Hinter, Validator)]
25
pub struct InitHelper {}
26

27
impl Completer for InitHelper {
28
    type Candidate = Pair;
29
    fn complete(
×
30
        &self,
×
31
        line: &str,
×
32
        _pos: usize,
×
33
        _ctx: &Context,
×
34
    ) -> rustyline::Result<(usize, Vec<Self::Candidate>)> {
×
35
        let line = format!("{line}_");
×
36
        let mut tokens = line.split_whitespace();
×
37
        let mut last_token = String::from(tokens.next_back().unwrap());
×
38
        last_token.pop();
×
39
        let candidates: Vec<String> = vec![
×
40
            "Postgres".to_owned(),
×
41
            "Ethereum".to_owned(),
×
42
            "Snowflake".to_owned(),
×
43
            "MySQL".to_owned(),
×
44
            "S3".to_owned(),
×
45
            "MongoDB".to_owned(),
×
46
        ];
×
47
        let mut match_pair: Vec<Pair> = candidates
×
48
            .iter()
×
49
            .filter_map(|f| {
×
50
                if f.to_lowercase().starts_with(&last_token.to_lowercase()) {
×
51
                    Some(Pair {
×
52
                        display: f.to_owned(),
×
53
                        replacement: f.to_owned(),
×
54
                    })
×
55
                } else {
56
                    None
×
57
                }
58
            })
×
59
            .collect();
×
60
        if match_pair.is_empty() {
×
61
            match_pair = vec![Pair {
×
62
                display: "Postgres".to_owned(),
×
63
                replacement: "Postgres".to_owned(),
×
64
            }]
×
65
        }
×
66
        Ok((line.len() - last_token.len() - 1, match_pair))
×
67
    }
×
68
}
69

70
pub fn generate_connection(connection_name: &str) -> Connection {
×
71
    match connection_name {
×
72
        "Snowflake" | "snowflake" | "S" | "s" => {
×
73
            let snowflake_config = SnowflakeConfig {
×
74
                server: "<account_name>.<region_id>.snowflakecomputing.com".to_owned(),
×
75
                port: "443".to_owned(),
×
76
                user: "bob".to_owned(),
×
77
                password: "password".to_owned(),
×
78
                database: "database".to_owned(),
×
79
                schema: "schema".to_owned(),
×
80
                warehouse: "warehouse".to_owned(),
×
81
                driver: Some("SnowflakeDSIIDriver".to_owned()),
×
82
                role: "role".to_owned(),
×
83
            };
×
84
            let connection: Connection = Connection {
×
85
                name: "snowflake".to_owned(),
×
86
                config: Some(ConnectionConfig::Snowflake(snowflake_config)),
×
87
            };
×
88
            connection
×
89
        }
90
        "Ethereum" | "ethereum" | "E" | "e" => {
×
91
            let eth_filter = EthFilter {
×
92
                from_block: Some(0),
×
93
                to_block: None,
×
94
                addresses: vec![],
×
95
                topics: vec![],
×
96
            };
×
97
            let ethereum_config = EthConfig {
×
98
                provider: Some(EthProviderConfig::Log(EthLogConfig {
×
99
                    wss_url: "wss://link".to_owned(),
×
100
                    filter: Some(eth_filter),
×
101
                    contracts: vec![],
×
102
                })),
×
103
            };
×
104
            let connection: Connection = Connection {
×
105
                name: "ethereum".to_owned(),
×
106
                config: Some(ConnectionConfig::Ethereum(ethereum_config)),
×
107
            };
×
108
            connection
×
109
        }
110
        "MySQL" | "MYSQL" | "mysql" | "Mysql" | "My" => {
×
111
            let mysql_config = MySQLConfig {
×
112
                url: "mysql://<user>:<password>@localhost:3306/<database>".to_owned(),
×
113
                server_id: Some((1).to_owned()),
×
114
            };
×
115
            let connection: Connection = Connection {
×
116
                name: "mysql".to_owned(),
×
117
                config: Some(ConnectionConfig::MySQL(mysql_config)),
×
118
            };
×
119
            connection
×
120
        }
121
        "S3" | "s3" => {
×
122
            let s3_details = S3Details {
×
123
                access_key_id: "<your_access_key_id>".to_owned(),
×
124
                secret_access_key: "<your_secret_access_key>".to_owned(),
×
125
                region: "<your_region>".to_owned(),
×
126
                bucket_name: "<your_bucket_name>".to_owned(),
×
127
            };
×
128
            let s3_config = S3Storage {
×
129
              details: Some(s3_details),
×
130
              tables: vec![],
×
131
            };
×
132
            let connection: Connection = Connection {
×
133
                name: "s3".to_owned(),
×
134
                config: Some(ConnectionConfig::S3Storage(s3_config)),
×
135
            };
×
136
            connection
×
137
        }
138
        "MongoDB" | "mongodb" | "MONGODB" | "Mongodb" | "Mo" | "MO" => {
×
139
            let mongo_config = MongodbConfig {
×
140
               connection_string: "mongodb://<username>:<password>@localhost:27017/<database_name>".to_owned(),
×
141
            };
×
142
            let connection: Connection = Connection {
×
143
                name: "mongodb".to_owned(),
×
144
                config: Some(ConnectionConfig::MongoDB(mongo_config)),
×
145
            };
×
146
            connection
×
147
        }
148
        _ => {
149
            let postgres_config = PostgresConfig {
×
150
                user: Some("postgres".to_owned()),
×
151
                password: Some("postgres".to_owned()),
×
152
                host: Some("localhost".to_owned()),
×
153
                port: Some(5432),
×
154
                database: Some("users".to_owned()),
×
155
                sslmode: None,
×
156
                connection_url: None,
×
157
            };
×
158
            let connection: Connection = Connection {
×
159
                name: "postgres".to_owned(),
×
160
                config: Some(ConnectionConfig::Postgres(postgres_config)),
×
161
            };
×
162
            connection
×
163
        }
164
    }
165
}
×
166
type Question = (
167
    String,
168
    Box<dyn Fn((String, &mut Config)) -> Result<(), OrchestrationError>>,
169
);
170
pub fn generate_config_repl() -> Result<(), OrchestrationError> {
×
171
    let mut rl = Editor::<InitHelper, DefaultHistory>::new()
×
172
        .map_err(|e| OrchestrationError::CliError(CliError::ReadlineError(e)))?;
×
173
    rl.set_helper(Some(InitHelper {}));
×
174
    let mut default_config = Config::default();
×
175
    let default_app_name = "quick-start-app";
×
176
    let questions: Vec<Question> = vec![
×
177
        (
×
178
            format!("question: App name ({:}): ", default_app_name),
×
179
            Box::new(move |(app_name, config)| {
×
180
                let app_name = app_name.trim();
×
181
                if app_name.is_empty() {
×
182
                    config.app_name = default_app_name.to_string();
×
183
                } else {
×
184
                    config.app_name = app_name.to_string();
×
185
                }
×
186
                Ok(())
×
187
            }),
×
188
        ),
×
189
        (
×
190
            format!("question: Home directory ({:}): ", default_home_dir()),
×
191
            Box::new(move |(home_dir, config)| {
×
192
                if home_dir.is_empty() {
×
193
                    config.home_dir = default_home_dir();
×
194
                    config.cache_dir = default_cache_dir();
×
195
                } else {
×
196
                    config.home_dir = home_dir;
×
197
                    config.cache_dir = get_cache_dir(&config.home_dir);
×
198
                }
×
199
                Ok(())
×
200
            }),
×
201
        ),
×
202
        (
×
203
            "question: Connection Type - one of: [P]ostgres, [E]thereum, [S]nowflake, [My]SQL, [S3]Storage, [Mo]ngoDB: "
×
204
                .to_string(),
×
205
            Box::new(move |(connection, config)| {
×
206
                let sample_connection = generate_connection(&connection);
×
207
                config.connections.push(sample_connection);
×
208

×
209
                Ok(())
×
210
            }),
×
211
        ),
×
212
        (
×
213
            format!("question: Config path ({:}): ", DEFAULT_CONFIG_PATH),
×
214
            Box::new(move |(yaml_path, config)| {
×
215
                let mut yaml_path = yaml_path.trim();
×
216
                if yaml_path.is_empty() {
×
217
                    yaml_path = DEFAULT_CONFIG_PATH;
×
218
                }
×
219
                let f = std::fs::OpenOptions::new()
×
220
                    .create(true)
×
221
                    .write(true)
×
222
                    .open(yaml_path)
×
223
                    .map_err(|e| {
×
224
                        OrchestrationError::CliError(CliError::FileSystem(
×
225
                            yaml_path.to_string().into(),
×
226
                            e,
×
227
                        ))
×
228
                    })?;
×
229
                serde_yaml::to_writer(f, &config)
×
230
                    .map_err(OrchestrationError::FailedToWriteConfigYaml)?;
×
231

232
                info!("Generating workspace: \n\
×
233
                \n- {} (main configuration)\n- ./queries (folder for sql queries)\n- ./lambdas (folder for lambda functions)
×
234
                \n• More details about our config: https://getdozer.io/docs/reference/configuration/introduction\
×
235
                \n• Connector & Sources: https://getdozer.io/docs/reference/configuration/connectors\
×
236
                \n• Endpoints: https://getdozer.io/docs/reference/configuration/endpoints/",
×
237
                   yaml_path.to_owned());
×
238

239
                let path = PathBuf::from(yaml_path);
×
240
                if let Some(dir) = path.parent() {
×
241
                    let queries_path = Path::new(dir).join(DEFAULT_QUERIES_DIRECTORY);
×
242
                    if let Err(_e) = std::fs::create_dir(queries_path) {
×
243
                        warn!("Cannot create queries directory");
×
244
                    }
×
245

246
                    let lambdas_path = Path::new(dir).join(DEFAULT_LAMBDAS_DIRECTORY);
×
247
                    if let Err(_e) = std::fs::create_dir(lambdas_path) {
×
248
                        warn!("Cannot create lambdas directory");
×
249
                    }
×
250
                }
×
251

252
                Ok(())
×
253
            }),
×
254
        ),
×
255
    ];
×
256
    let result = questions.iter().try_for_each(|(question, func)| {
×
257
        let readline = rl.readline(question);
×
258
        match readline {
×
259
            Ok(input) => func((input, &mut default_config)),
×
260
            Err(err) => Err(OrchestrationError::CliError(CliError::ReadlineError(err))),
×
261
        }
262
    });
×
263

×
264
    match result {
×
265
        Ok(_) => Ok(()),
×
266
        Err(e) => match e {
×
267
            OrchestrationError::CliError(CliError::ReadlineError(ReadlineError::Interrupted)) => {
268
                info!("Exiting..");
×
269
                Ok(())
×
270
            }
271
            OrchestrationError::CliError(CliError::ReadlineError(ReadlineError::Eof)) => {
272
                info!("CTRL-D - exiting...");
×
273
                Ok(())
×
274
            }
275
            _ => Err(e),
×
276
        },
277
    }
278
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc