• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020902227

pending completion
4020902227

Pull #743

github

GitHub
Merge 57279c6b6 into a12da35a5
Pull Request #743: Chore clippy fix

165 of 165 new or added lines in 60 files covered. (100.0%)

23638 of 35485 relevant lines covered (66.61%)

38417.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/postgres/snapshotter.rs
1
use crate::connectors::TableInfo;
2
use crate::ingestion::Ingestor;
3

4
use super::helper;
5
use super::schema_helper::SchemaHelper;
6
use crate::connectors::postgres::connection::helper as connection_helper;
7
use crate::errors::ConnectorError;
8
use crate::errors::PostgresConnectorError::SyncWithSnapshotError;
9
use crate::errors::PostgresConnectorError::{InvalidQueryError, PostgresSchemaError};
10
use dozer_types::ingestion_types::IngestionMessage;
11
use dozer_types::parking_lot::RwLock;
12

13
use crate::errors::ConnectorError::PostgresConnectorError;
14
use postgres::fallible_iterator::FallibleIterator;
15
use postgres_types::PgLsn;
16
use std::cell::RefCell;
17
use std::sync::Arc;
18

19
// 0.4.10
20
pub struct PostgresSnapshotter {
21
    pub tables: Option<Vec<TableInfo>>,
22
    pub conn_config: tokio_postgres::Config,
23
    pub ingestor: Arc<RwLock<Ingestor>>,
24
    pub connector_id: u64,
25
}
26

27
impl PostgresSnapshotter {
28
    pub fn get_tables(
×
29
        &self,
×
30
        tables: Option<Vec<TableInfo>>,
×
31
    ) -> Result<Vec<TableInfo>, ConnectorError> {
×
32
        let helper = SchemaHelper::new(self.conn_config.clone(), None);
×
33
        let arr = helper.get_tables(tables).unwrap();
×
34
        match self.tables.as_ref() {
×
35
            None => Ok(arr),
×
36
            Some(filtered_tables) => {
×
37
                let table_names: Vec<String> = filtered_tables
×
38
                    .iter()
×
39
                    .map(|t| t.table_name.to_owned())
×
40
                    .collect();
×
41
                let arr = arr
×
42
                    .iter()
×
43
                    .filter(|t| table_names.contains(&t.table_name))
×
44
                    .cloned()
×
45
                    .collect();
×
46
                Ok(arr)
×
47
            }
48
        }
49
    }
×
50

51
    pub fn sync_tables(
×
52
        &self,
×
53
        tables: Option<Vec<TableInfo>>,
×
54
        lsn_option: Option<&(PgLsn, u64)>,
×
55
    ) -> Result<Option<Vec<TableInfo>>, ConnectorError> {
×
56
        let client_plain = Arc::new(RefCell::new(
×
57
            connection_helper::connect(self.conn_config.clone()).map_err(PostgresConnectorError)?,
×
58
        ));
59

60
        let lsn = lsn_option.map_or(0u64, |(pg_lsn, _)| u64::from(*pg_lsn));
×
61
        let tables = self.get_tables(tables)?;
×
62

63
        let mut idx: u64 = 0;
×
64
        for table_info in tables.iter() {
×
65
            let column_str: Vec<String> = table_info
×
66
                .columns
×
67
                .clone()
×
68
                .map_or(Err(ConnectorError::ColumnsNotFound), Ok)?
×
69
                .iter()
×
70
                .map(|c| format!("\"{c}\""))
×
71
                .collect();
×
72

×
73
            let column_str = column_str.join(",");
×
74
            let query = format!("select {} from {}", column_str, table_info.table_name);
×
75
            let stmt = client_plain
×
76
                .clone()
×
77
                .borrow_mut()
×
78
                .prepare(&query)
×
79
                .map_err(|e| PostgresConnectorError(InvalidQueryError(e)))?;
×
80
            let columns = stmt.columns();
×
81

82
            // Ingest schema for every table
83
            let schema = helper::map_schema(&table_info.id, columns)?;
×
84

85
            let empty_vec: Vec<String> = Vec::new();
×
86
            for msg in client_plain
×
87
                .clone()
×
88
                .borrow_mut()
×
89
                .query_raw(&stmt, empty_vec)
×
90
                .map_err(|e| PostgresConnectorError(InvalidQueryError(e)))?
×
91
                .iterator()
×
92
            {
93
                match msg {
×
94
                    Ok(msg) => {
×
95
                        let evt = helper::map_row_to_operation_event(
×
96
                            table_info.table_name.to_string(),
×
97
                            schema
×
98
                                .identifier
×
99
                                .map_or(Err(ConnectorError::SchemaIdentifierNotFound), Ok)?,
×
100
                            &msg,
×
101
                            columns,
×
102
                            idx,
×
103
                        )
×
104
                        .map_err(|e| PostgresConnectorError(PostgresSchemaError(e)))?;
×
105

106
                        self.ingestor
×
107
                            .write()
×
108
                            .handle_message(((lsn, idx), IngestionMessage::OperationEvent(evt)))
×
109
                            .map_err(ConnectorError::IngestorError)?;
×
110
                    }
111
                    Err(e) => {
×
112
                        return Err(PostgresConnectorError(SyncWithSnapshotError(e.to_string())))
×
113
                    }
114
                }
115
                idx += 1;
×
116
            }
117
        }
118

119
        Ok(Some(tables))
×
120
    }
×
121
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc