• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020902227

pending completion
4020902227

Pull #743

github

GitHub
Merge 57279c6b6 into a12da35a5
Pull Request #743: Chore clippy fix

165 of 165 new or added lines in 60 files covered. (100.0%)

23638 of 35485 relevant lines covered (66.61%)

38417.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

13.4
/dozer-ingestion/src/connectors/postgres/connection/validator.rs
1
use crate::connectors::postgres::connector::ReplicationSlotInfo;
2

3
use crate::connectors::TableInfo;
4
use crate::errors::PostgresConnectorError;
5
use crate::errors::PostgresConnectorError::{
6
    ColumnNameNotValid, ConnectionFailure, InvalidQueryError, NoAvailableSlotsError,
7
    ReplicationIsNotAvailableForUserError, SlotIsInUseError, SlotNotExistError,
8
    StartLsnIsBeforeLastFlushedLsnError, TableError, TableNameNotValid, WALLevelIsNotCorrect,
9
};
10
use dozer_types::indicatif::ProgressStyle;
11
use postgres::Client;
12
use postgres_types::PgLsn;
13
use regex::Regex;
14
use std::borrow::BorrowMut;
15
use std::collections::HashMap;
16

17
pub enum Validations {
18
    Details,
19
    User,
20
    Tables,
21
    WALLevel,
22
    Slot,
23
}
24

25
pub fn validate_connection(
×
26
    name: &str,
×
27
    config: tokio_postgres::Config,
×
28
    tables: Option<&Vec<TableInfo>>,
×
29
    replication_info: Option<ReplicationSlotInfo>,
×
30
) -> Result<(), PostgresConnectorError> {
×
31
    let validations_order: Vec<Validations> = vec![
×
32
        Validations::Details,
×
33
        Validations::User,
×
34
        Validations::Tables,
×
35
        Validations::WALLevel,
×
36
        Validations::Slot,
×
37
    ];
×
38
    let pb = dozer_types::indicatif::ProgressBar::new(validations_order.len() as u64);
×
39
    pb.set_style(
×
40
        ProgressStyle::with_template(&format!(
×
41
            "[{}] {}",
×
42
            name, "{spinner:.green} {wide_msg} {bar}"
×
43
        ))
×
44
        .unwrap(),
×
45
    );
×
46
    pb.set_message("Validating connection to source");
×
47

48
    let mut client = super::helper::connect(config)?;
×
49

50
    for validation_type in validations_order {
×
51
        match validation_type {
×
52
            Validations::Details => validate_details(client.borrow_mut())?,
×
53
            Validations::User => validate_user(client.borrow_mut())?,
×
54
            Validations::Tables => {
55
                if let Some(tables_info) = &tables {
×
56
                    validate_tables(client.borrow_mut(), tables_info)?;
×
57
                }
×
58
            }
59
            Validations::WALLevel => validate_wal_level(client.borrow_mut())?,
×
60
            Validations::Slot => {
61
                if let Some(replication_details) = &replication_info {
×
62
                    validate_slot(client.borrow_mut(), replication_details)?;
×
63
                } else {
64
                    validate_limit_of_replications(client.borrow_mut())?;
×
65
                }
66
            }
67
        }
68

69
        pb.inc(1);
×
70
    }
71

72
    pb.finish_and_clear();
×
73

×
74
    Ok(())
×
75
}
×
76

77
fn validate_details(client: &mut Client) -> Result<(), PostgresConnectorError> {
78
    client
×
79
        .simple_query("SELECT version()")
×
80
        .map_err(ConnectionFailure)?;
×
81

82
    Ok(())
×
83
}
×
84

85
fn validate_user(client: &mut Client) -> Result<(), PostgresConnectorError> {
×
86
    client
×
87
        .query_one(
×
88
            "
×
89
                SELECT r.rolcanlogin AS can_login, r.rolreplication AS is_replication_role,
×
90
                    ARRAY(SELECT b.rolname
×
91
                             FROM pg_catalog.pg_auth_members m
×
92
                                      JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid)
×
93
                             WHERE m.member = r.oid
×
94
                    ) &&
×
95
                      '{rds_superuser, rdsadmin, rdsrepladmin, rds_replication}'::name[] AS is_aws_replication_role
×
96
                FROM pg_roles r
×
97
                WHERE r.rolname = current_user
×
98
            ",
×
99
            &[],
×
100
        )
×
101
        .map_or(Err(ReplicationIsNotAvailableForUserError), |row| {
×
102
            let can_login: bool = row.get("can_login");
×
103
            let is_replication_role: bool = row.get("is_replication_role");
×
104
            let is_aws_replication_role: bool = row.get("is_aws_replication_role");
×
105

×
106
            if can_login && (is_replication_role || is_aws_replication_role) {
×
107
                Ok(())
×
108
            } else {
109
                Err(ReplicationIsNotAvailableForUserError)
×
110
            }
111
        })
×
112
}
×
113

114
fn validate_wal_level(client: &mut Client) -> Result<(), PostgresConnectorError> {
×
115
    let result = client
×
116
        .query_one("SHOW wal_level", &[])
×
117
        .map_err(|_e| WALLevelIsNotCorrect())?;
×
118

119
    let wal_level: Result<String, _> = result.try_get(0);
×
120
    wal_level.map_or_else(
×
121
        |e| Err(InvalidQueryError(e)),
×
122
        |level| {
×
123
            if level == "logical" {
×
124
                Ok(())
×
125
            } else {
126
                Err(WALLevelIsNotCorrect())
×
127
            }
128
        },
×
129
    )
×
130
}
×
131

132
fn validate_tables_names(table_info: &Vec<TableInfo>) -> Result<(), PostgresConnectorError> {
4✔
133
    let table_regex = Regex::new(r"^([[:lower:]_][[:alnum:]_]*)$").unwrap();
4✔
134
    for t in table_info {
6✔
135
        if !table_regex.is_match(&t.table_name) {
4✔
136
            return Err(TableNameNotValid(t.table_name.clone()));
2✔
137
        }
2✔
138
    }
139

140
    Ok(())
2✔
141
}
4✔
142

143
fn validate_columns_names(table_info: &Vec<TableInfo>) -> Result<(), PostgresConnectorError> {
4✔
144
    let column_name_regex = Regex::new(r"^([[:lower:]_][[:alnum:]_]*)$").unwrap();
4✔
145
    for t in table_info {
6✔
146
        if let Some(columns) = &t.columns {
4✔
147
            for column in columns {
6✔
148
                if !column_name_regex.is_match(column) {
4✔
149
                    return Err(ColumnNameNotValid(column.clone()));
2✔
150
                }
2✔
151
            }
152
        }
×
153
    }
154

155
    Ok(())
2✔
156
}
4✔
157

158
fn validate_tables(
×
159
    client: &mut Client,
×
160
    table_info: &Vec<TableInfo>,
×
161
) -> Result<(), PostgresConnectorError> {
×
162
    let mut tables_names: HashMap<String, bool> = HashMap::new();
×
163
    table_info.iter().for_each(|t| {
×
164
        tables_names.insert(t.table_name.clone(), true);
×
165
    });
×
166

×
167
    validate_tables_names(table_info)?;
×
168
    validate_columns_names(table_info)?;
×
169

170
    let table_name_keys: Vec<String> = tables_names.keys().cloned().collect();
×
171
    let result = client
×
172
        .query(
×
173
            "SELECT table_name FROM information_schema.tables WHERE table_name = ANY($1)",
×
174
            &[&table_name_keys],
×
175
        )
×
176
        .map_err(InvalidQueryError)?;
×
177

178
    for r in result.iter() {
×
179
        let table_name: String = r.try_get(0).map_err(InvalidQueryError)?;
×
180
        tables_names.remove(&table_name);
×
181
    }
182

183
    if !tables_names.is_empty() {
×
184
        let table_name_keys = tables_names.keys().cloned().collect();
×
185
        Err(TableError(table_name_keys))
×
186
    } else {
187
        Ok(())
×
188
    }
189
}
×
190

191
fn validate_slot(
×
192
    client: &mut Client,
×
193
    replication_info: &ReplicationSlotInfo,
×
194
) -> Result<(), PostgresConnectorError> {
×
195
    let result = client
×
196
        .query_one(
×
197
            "SELECT active, confirmed_flush_lsn FROM pg_replication_slots WHERE slot_name = $1",
×
198
            &[&replication_info.name],
×
199
        )
×
200
        .map_err(|_e| SlotNotExistError(replication_info.name.clone()))?;
×
201

202
    let is_already_running: bool = result.try_get(0).map_err(InvalidQueryError)?;
×
203
    if is_already_running {
×
204
        return Err(SlotIsInUseError(replication_info.name.clone()));
×
205
    }
×
206

207
    let flush_lsn: PgLsn = result.try_get(1).map_err(InvalidQueryError)?;
×
208

209
    if flush_lsn.gt(&replication_info.start_lsn) {
×
210
        Err(StartLsnIsBeforeLastFlushedLsnError(
×
211
            flush_lsn.to_string(),
×
212
            replication_info.start_lsn.to_string(),
×
213
        ))
×
214
    } else {
215
        Ok(())
×
216
    }
217
}
×
218

219
fn validate_limit_of_replications(client: &mut Client) -> Result<(), PostgresConnectorError> {
×
220
    let slots_limit_result = client
×
221
        .query_one("SHOW max_replication_slots", &[])
×
222
        .map_err(ConnectionFailure)?;
×
223

224
    let slots_limit_str: String = slots_limit_result.try_get(0).map_err(InvalidQueryError)?;
×
225
    let slots_limit: i64 = slots_limit_str.parse().unwrap();
×
226

227
    let used_slots_result = client
×
228
        .query_one("SELECT COUNT(*) FROM pg_replication_slots;", &[])
×
229
        .map_err(ConnectionFailure)?;
×
230

231
    let used_slots: i64 = used_slots_result.try_get(0).map_err(InvalidQueryError)?;
×
232

233
    if used_slots == slots_limit {
×
234
        Err(NoAvailableSlotsError)
×
235
    } else {
236
        Ok(())
×
237
    }
238
}
×
239

240
#[cfg(test)]
241
mod tests {
242
    use crate::connectors::postgres::connection::validator::{
243
        validate_columns_names, validate_connection, validate_tables_names,
244
    };
245
    use crate::connectors::postgres::connector::ReplicationSlotInfo;
246

247
    use postgres_types::PgLsn;
248

249
    use std::ops::Deref;
250
    use std::panic;
251

252
    use tokio_postgres::NoTls;
253

254
    use crate::connectors::TableInfo;
255
    use crate::errors::PostgresConnectorError;
256
    use serial_test::serial;
257

258
    fn get_config() -> tokio_postgres::Config {
×
259
        let mut config = tokio_postgres::Config::new();
×
260
        config
×
261
            .dbname("users")
×
262
            .user("postgres")
×
263
            .host("localhost")
×
264
            .deref()
×
265
            .clone()
×
266
    }
×
267

268
    #[test]
×
269
    #[ignore]
270
    #[serial]
×
271
    fn test_fail_to_connect() {
×
272
        run_test(|| {
×
273
            let mut config = get_config();
×
274
            config.dbname("not_existing");
×
275

×
276
            let result = validate_connection("pg_test_conn", config, None, None);
×
277
            assert!(result.is_err());
×
278
        });
×
279
    }
×
280

281
    #[test]
×
282
    #[ignore]
283
    #[serial]
×
284
    fn test_user_not_have_permission_to_use_replication() {
×
285
        run_test(|| {
×
286
            let mut config = get_config();
×
287
            let mut client = postgres::Config::from(config.clone())
×
288
                .connect(NoTls)
×
289
                .unwrap();
×
290

×
291
            client
×
292
                .simple_query("CREATE USER dozer_test_without_permission")
×
293
                .expect("User creation failed");
×
294
            config.user("dozer_test_without_permission");
×
295

×
296
            let result = validate_connection("pg_test_conn", config, None, None);
×
297

×
298
            client
×
299
                .simple_query("DROP USER dozer_test_without_permission")
×
300
                .expect("User delete failed");
×
301

×
302
            assert!(result.is_err());
×
303

304
            match result {
×
305
                Ok(_) => panic!("Validation should fail"),
×
306
                Err(e) => {
×
307
                    assert!(matches!(
×
308
                        e,
×
309
                        PostgresConnectorError::ReplicationIsNotAvailableForUserError
310
                    ));
311
                }
312
            }
313
        });
×
314
    }
×
315

316
    #[test]
×
317
    #[ignore]
318
    #[serial]
×
319
    fn test_requested_tables_not_exist() {
×
320
        run_test(|| {
×
321
            let config = get_config();
×
322
            let mut client = postgres::Config::from(config.clone())
×
323
                .connect(NoTls)
×
324
                .unwrap();
×
325

×
326
            client
×
327
                .simple_query("DROP TABLE IF EXISTS not_existing")
×
328
                .expect("User creation failed");
×
329

×
330
            let tables = vec![TableInfo {
×
331
                name: "not_existing".to_string(),
×
332
                table_name: "not_existing".to_string(),
×
333
                id: 0,
×
334
                columns: None,
×
335
            }];
×
336
            let result = validate_connection("pg_test_conn", config, Some(&tables), None);
×
337

×
338
            assert!(result.is_err());
×
339

340
            match result {
×
341
                Ok(_) => panic!("Validation should fail"),
×
342
                Err(e) => {
×
343
                    assert!(matches!(e, PostgresConnectorError::TableError(_)));
×
344

345
                    if let PostgresConnectorError::TableError(msg) = e {
×
346
                        assert_eq!(msg, vec!["not_existing".to_string()]);
×
347
                    } else {
348
                        panic!("Unexpected error occurred");
×
349
                    }
350
                }
351
            }
352
        });
×
353
    }
×
354

355
    #[test]
×
356
    #[ignore]
357
    #[serial]
×
358
    fn test_replication_slot_not_exist() {
×
359
        let config = get_config();
×
360
        let _client = postgres::Config::from(config.clone())
×
361
            .connect(NoTls)
×
362
            .unwrap();
×
363

×
364
        let replication_info = ReplicationSlotInfo {
×
365
            name: "not_existing_slot".to_string(),
×
366
            start_lsn: PgLsn::from(0),
×
367
        };
×
368
        let result = validate_connection("pg_test_conn", config, None, Some(replication_info));
×
369

×
370
        assert!(result.is_err());
×
371

372
        match result {
×
373
            Ok(_) => panic!("Validation should fail"),
×
374
            Err(e) => {
×
375
                assert!(matches!(e, PostgresConnectorError::SlotNotExistError(_)));
×
376

377
                if let PostgresConnectorError::SlotNotExistError(msg) = e {
×
378
                    assert_eq!(msg, "not_existing_slot");
×
379
                } else {
380
                    panic!("Unexpected error occurred");
×
381
                }
382
            }
383
        }
384
    }
385

386
    #[test]
×
387
    #[ignore]
388
    #[serial]
×
389
    fn test_start_lsn_is_before_last_flush_lsn() {
×
390
        let config = get_config();
×
391
        let mut client = postgres::Config::from(config.clone())
×
392
            .connect(NoTls)
×
393
            .unwrap();
×
394

×
395
        client
×
396
            .query(
×
397
                r#"SELECT pg_create_logical_replication_slot('existing_slot', 'pgoutput');"#,
×
398
                &[],
×
399
            )
×
400
            .expect("User creation failed");
×
401

×
402
        let replication_info = ReplicationSlotInfo {
×
403
            name: "existing_slot".to_string(),
×
404
            start_lsn: PgLsn::from(0),
×
405
        };
×
406
        let result = validate_connection("pg_test_conn", config, None, Some(replication_info));
×
407

×
408
        client
×
409
            .query(r#"SELECT pg_drop_replication_slot('existing_slot');"#, &[])
×
410
            .expect("Slot drop failed");
×
411

×
412
        assert!(result.is_err());
×
413

414
        match result {
×
415
            Ok(_) => panic!("Validation should fail"),
×
416
            Err(PostgresConnectorError::StartLsnIsBeforeLastFlushedLsnError(_, _)) => {}
×
417
            Err(_) => panic!("Unexpected error occurred"),
×
418
        }
419
    }
420

421
    #[test]
×
422
    #[ignore]
423
    #[serial]
×
424
    fn test_limit_of_replication_slots_reached() {
×
425
        let config = get_config();
×
426
        let mut client = postgres::Config::from(config.clone())
×
427
            .connect(NoTls)
×
428
            .unwrap();
×
429

×
430
        let slots_limit_result = client.query_one("SHOW max_replication_slots", &[]).unwrap();
×
431

×
432
        let slots_limit_str: String = slots_limit_result.try_get(0).unwrap();
×
433
        let slots_limit: i64 = slots_limit_str.parse().unwrap();
×
434

×
435
        let used_slots_result = client
×
436
            .query_one("SELECT COUNT(*) FROM pg_replication_slots;", &[])
×
437
            .unwrap();
×
438

×
439
        let used_slots: i64 = used_slots_result.try_get(0).unwrap();
×
440

×
441
        let range = used_slots..slots_limit - 1;
×
442
        for n in range {
×
443
            let slot_name = format!("slot_{n}");
×
444
            client
×
445
                .query(
×
446
                    r#"SELECT pg_create_logical_replication_slot($1, 'pgoutput');"#,
×
447
                    &[&slot_name],
×
448
                )
×
449
                .unwrap();
×
450
        }
×
451

452
        // One replication slot is available
453
        let result = validate_connection("pg_test_conn", config.clone(), None, None);
×
454
        assert!(result.is_ok());
×
455

456
        let slot_name = format!("slot_{}", slots_limit - 1);
×
457
        client
×
458
            .query(
×
459
                r#"SELECT pg_create_logical_replication_slot($1, 'pgoutput');"#,
×
460
                &[&slot_name],
×
461
            )
×
462
            .unwrap();
×
463

×
464
        // No replication slots are available
×
465
        let result = validate_connection("pg_test_conn", config, None, None);
×
466
        assert!(result.is_err());
×
467

468
        match result.unwrap_err() {
×
469
            PostgresConnectorError::NoAvailableSlotsError => {}
×
470
            _ => panic!("Unexpected error occurred"),
×
471
        }
472

473
        // Teardown
474
        for n in used_slots..slots_limit {
×
475
            let slot_name = format!("slot_{n}");
×
476
            client
×
477
                .query(r#"SELECT pg_drop_replication_slot($1);"#, &[&slot_name])
×
478
                .expect("Slot drop failed");
×
479
        }
×
480
    }
481

482
    #[test]
1✔
483
    fn test_validate_tables_names() {
1✔
484
        let tables_with_result = vec![
1✔
485
            ("test", true),
1✔
486
            ("Test", false),
1✔
487
            (";Drop table test", false),
1✔
488
            ("test_with_underscore", true),
1✔
489
        ];
1✔
490

491
        for (table_name, expected_result) in tables_with_result {
5✔
492
            let res = validate_tables_names(&vec![TableInfo {
4✔
493
                name: table_name.to_string(),
4✔
494
                table_name: table_name.to_string(),
4✔
495
                id: 0,
4✔
496
                columns: None,
4✔
497
            }]);
4✔
498

4✔
499
            assert_eq!(expected_result, res.is_ok());
4✔
500
        }
501
    }
1✔
502

503
    #[test]
1✔
504
    fn test_validate_column_names() {
1✔
505
        let columns_names_with_result = vec![
1✔
506
            ("test", true),
1✔
507
            ("Test", false),
1✔
508
            (";Drop table test", false),
1✔
509
            ("test_with_underscore", true),
1✔
510
        ];
1✔
511

512
        for (column_name, expected_result) in columns_names_with_result {
5✔
513
            let res = validate_columns_names(&vec![TableInfo {
4✔
514
                name: "column_test_table".to_string(),
4✔
515
                table_name: "column_test_table".to_string(),
4✔
516
                id: 0,
4✔
517
                columns: Some(vec![column_name.to_string()]),
4✔
518
            }]);
4✔
519

4✔
520
            assert_eq!(expected_result, res.is_ok());
4✔
521
        }
522
    }
1✔
523

524
    fn setup() {
×
525
        let config = postgres::Config::from(get_config());
×
526
        let mut client = config.connect(NoTls).unwrap();
×
527
        client
×
528
            .query("DROP DATABASE IF EXISTS dozer_tests", &[])
×
529
            .unwrap();
×
530
        client.query("CREATE DATABASE dozer_tests", &[]).unwrap();
×
531
    }
×
532

533
    fn run_test<T>(test: T)
×
534
    where
×
535
        T: FnOnce() + panic::UnwindSafe,
×
536
    {
×
537
        setup();
×
538

×
539
        let result = panic::catch_unwind(|| {
×
540
            test();
×
541
        });
×
542

×
543
        assert!(result.is_ok())
×
544
    }
×
545
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc