• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020902227

pending completion
4020902227

Pull #743

github

GitHub
Merge 57279c6b6 into a12da35a5
Pull Request #743: Chore clippy fix

165 of 165 new or added lines in 60 files covered. (100.0%)

23638 of 35485 relevant lines covered (66.61%)

38417.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/postgres/connector.rs
1
use crate::connectors::postgres::schema_helper::SchemaHelper;
2

3
use crate::connectors::postgres::connection::validator::validate_connection;
4
use crate::connectors::postgres::iterator::PostgresIterator;
5
use crate::connectors::{Connector, TableInfo, ValidationResults};
6
use crate::errors::{ConnectorError, PostgresConnectorError};
7
use crate::ingestion::Ingestor;
8
use dozer_types::parking_lot::RwLock;
9
use dozer_types::tracing::{error, info};
10
use dozer_types::types::SchemaWithChangesType;
11
use postgres::Client;
12
use postgres_types::PgLsn;
13

14
use std::sync::Arc;
15
use tokio_postgres::config::ReplicationMode;
16
use tokio_postgres::Config;
17

18
use super::connection::helper;
19

20
#[derive(Clone, Debug)]
×
21
pub struct PostgresConfig {
22
    pub name: String,
23
    pub tables: Option<Vec<TableInfo>>,
24
    pub config: Config,
25
}
26

27
pub struct PostgresConnector {
28
    pub id: u64,
29
    name: String,
30
    tables: Option<Vec<TableInfo>>,
31
    ingestor: Option<Arc<RwLock<Ingestor>>>,
32
    replication_conn_config: Config,
33
    conn_config: Config,
34
    schema_helper: SchemaHelper,
35
}
36

37
#[derive(Debug)]
×
38
pub struct ReplicationSlotInfo {
39
    pub name: String,
40
    pub start_lsn: PgLsn,
41
}
42

43
impl PostgresConnector {
44
    pub fn new(id: u64, config: PostgresConfig) -> PostgresConnector {
×
45
        let mut replication_conn_config = config.config.clone();
×
46
        replication_conn_config.replication_mode(ReplicationMode::Logical);
×
47

×
48
        let helper = SchemaHelper::new(config.config.clone(), None);
×
49

×
50
        // conn_str - replication_conn_config
×
51
        // conn_str_plain- conn_config
×
52

×
53
        PostgresConnector {
×
54
            id,
×
55
            name: config.name,
×
56
            conn_config: config.config,
×
57
            replication_conn_config,
×
58
            tables: config.tables,
×
59
            ingestor: None,
×
60
            schema_helper: helper,
×
61
        }
×
62
    }
×
63

64
    fn get_lsn_with_offset_from_seq(
×
65
        conn_name: String,
×
66
        from_seq: Option<(u64, u64)>,
×
67
    ) -> Option<(PgLsn, u64)> {
×
68
        from_seq.map_or_else(
×
69
            || {
×
70
                info!("[{}] Starting replication from empty database", conn_name);
×
71
                None
×
72
            },
×
73
            |(lsn, checkpoint)| {
×
74
                if lsn > 0 || checkpoint > 0 {
×
75
                    info!(
×
76
                        "[{}] Starting replication from checkpoint ({}/{})",
×
77
                        conn_name, lsn, checkpoint
×
78
                    );
×
79
                    Some((PgLsn::from(lsn), checkpoint))
×
80
                } else {
81
                    info!("[{}] Starting replication from empty database", conn_name);
×
82
                    None
×
83
                }
84
            },
×
85
        )
×
86
    }
×
87
}
88

89
impl Connector for PostgresConnector {
90
    fn get_tables(&self) -> Result<Vec<TableInfo>, ConnectorError> {
×
91
        self.schema_helper.get_tables(None)
×
92
    }
×
93

94
    fn get_schemas(
×
95
        &self,
×
96
        table_names: Option<Vec<TableInfo>>,
×
97
    ) -> Result<Vec<SchemaWithChangesType>, ConnectorError> {
×
98
        self.schema_helper
×
99
            .get_schemas(table_names)
×
100
            .map_err(ConnectorError::PostgresConnectorError)
×
101
    }
×
102

103
    fn initialize(
×
104
        &mut self,
×
105
        ingestor: Arc<RwLock<Ingestor>>,
×
106
        tables: Option<Vec<TableInfo>>,
×
107
    ) -> Result<(), ConnectorError> {
×
108
        let client = helper::connect(self.replication_conn_config.clone())
×
109
            .map_err(ConnectorError::PostgresConnectorError)?;
×
110
        self.tables = tables;
×
111
        self.create_publication(client)?;
×
112
        self.ingestor = Some(ingestor);
×
113
        Ok(())
×
114
    }
×
115

116
    fn start(&self, from_seq: Option<(u64, u64)>) -> Result<(), ConnectorError> {
×
117
        let lsn = PostgresConnector::get_lsn_with_offset_from_seq(self.name.clone(), from_seq);
×
118

119
        let iterator = PostgresIterator::new(
×
120
            self.id,
×
121
            self.name.clone(),
×
122
            self.get_publication_name(),
×
123
            self.get_slot_name(),
×
124
            self.tables.to_owned(),
×
125
            self.replication_conn_config.clone(),
×
126
            self.ingestor
×
127
                .as_ref()
×
128
                .map_or(Err(ConnectorError::InitializationError), Ok)?
×
129
                .clone(),
×
130
            self.conn_config.clone(),
×
131
        );
×
132
        iterator.start(lsn)
×
133
    }
×
134

135
    fn stop(&self) {}
×
136

137
    fn test_connection(&self) -> Result<(), ConnectorError> {
138
        helper::connect(self.replication_conn_config.clone())
×
139
            .map_err(ConnectorError::PostgresConnectorError)?;
×
140
        Ok(())
×
141
    }
×
142

143
    fn validate(&self, tables: Option<Vec<TableInfo>>) -> Result<(), ConnectorError> {
×
144
        let tables_list = tables.or_else(|| self.tables.clone());
×
145
        validate_connection(
×
146
            &self.name,
×
147
            self.conn_config.clone(),
×
148
            tables_list.as_ref(),
×
149
            None,
×
150
        )?;
×
151

152
        Ok(())
×
153
    }
×
154

155
    fn validate_schemas(&self, tables: &[TableInfo]) -> Result<ValidationResults, ConnectorError> {
×
156
        SchemaHelper::validate(&self.schema_helper, tables)
×
157
            .map_err(ConnectorError::PostgresConnectorError)
×
158
    }
×
159
}
160

161
impl PostgresConnector {
162
    fn get_publication_name(&self) -> String {
×
163
        format!("dozer_publication_{}", self.name)
×
164
    }
×
165

166
    fn get_slot_name(&self) -> String {
×
167
        format!("dozer_slot_{}", self.name)
×
168
    }
×
169

170
    fn create_publication(&self, mut client: Client) -> Result<(), ConnectorError> {
×
171
        let publication_name = self.get_publication_name();
×
172
        let table_str: String = match self.tables.as_ref() {
×
173
            None => "ALL TABLES".to_string(),
×
174
            Some(arr) => {
×
175
                let table_names: Vec<String> = arr.iter().map(|t| t.table_name.clone()).collect();
×
176
                format!("TABLE {}", table_names.join(" , "))
×
177
            }
178
        };
179

180
        client
×
181
            .simple_query(format!("DROP PUBLICATION IF EXISTS {publication_name}").as_str())
×
182
            .map_err(|e| {
×
183
                error!("failed to drop publication {}", e.to_string());
×
184
                PostgresConnectorError::DropPublicationError
×
185
            })?;
×
186

187
        client
×
188
            .simple_query(format!("CREATE PUBLICATION {publication_name} FOR {table_str}").as_str())
×
189
            .map_err(|e| {
×
190
                error!("failed to create publication {}", e.to_string());
×
191
                PostgresConnectorError::CreatePublicationError
×
192
            })?;
×
193
        Ok(())
×
194
    }
×
195
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc