• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 4020775132

pending completion
4020775132

Pull #743

github

GitHub
Merge 4fcec7da8 into a12da35a5
Pull Request #743: Chore clippy fix

141 of 141 new or added lines in 55 files covered. (100.0%)

23768 of 35589 relevant lines covered (66.78%)

40602.31 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/dozer-ingestion/src/connectors/postgres/iterator.rs
1
use crate::connectors::TableInfo;
2

3
use crate::errors::{ConnectorError, PostgresConnectorError};
4
use crate::ingestion::Ingestor;
5
use dozer_types::log::debug;
6

7
use dozer_types::parking_lot::RwLock;
8
use std::cell::RefCell;
9
use std::str::FromStr;
10

11
use std::sync::Arc;
12

13
use crate::connectors::postgres::connection::helper;
14
use crate::connectors::postgres::replicator::CDCHandler;
15
use crate::connectors::postgres::snapshotter::PostgresSnapshotter;
16
use crate::errors::ConnectorError::UnexpectedQueryMessageError;
17
use crate::errors::PostgresConnectorError::{
18
    LSNNotStoredError, LsnNotReturnedFromReplicationSlot, LsnParseError,
19
};
20
use postgres::Client;
21
use postgres_types::PgLsn;
22
use tokio::runtime::Runtime;
23
use tokio_postgres::SimpleQueryMessage;
24

25
pub struct Details {
26
    #[allow(dead_code)]
27
    id: u64,
28
    name: String,
29
    publication_name: String,
30
    slot_name: String,
31
    tables: Option<Vec<TableInfo>>,
32
    replication_conn_config: tokio_postgres::Config,
33
    conn_config: tokio_postgres::Config,
34
}
35

36
#[derive(Debug, Clone, Copy)]
×
37
pub enum ReplicationState {
38
    Pending,
39
    SnapshotInProgress,
40
    Replicating,
41
}
42

43
pub struct PostgresIterator {
44
    details: Arc<Details>,
45
    ingestor: Arc<RwLock<Ingestor>>,
46
    connector_id: u64,
47
}
48

49
impl PostgresIterator {
50
    #![allow(clippy::too_many_arguments)]
51
    pub fn new(
×
52
        id: u64,
×
53
        name: String,
×
54
        publication_name: String,
×
55
        slot_name: String,
×
56
        tables: Option<Vec<TableInfo>>,
×
57
        replication_conn_config: tokio_postgres::Config,
×
58
        ingestor: Arc<RwLock<Ingestor>>,
×
59
        conn_config: tokio_postgres::Config,
×
60
    ) -> Self {
×
61
        let details = Arc::new(Details {
×
62
            id,
×
63
            name,
×
64
            publication_name,
×
65
            slot_name,
×
66
            tables,
×
67
            replication_conn_config,
×
68
            conn_config,
×
69
        });
×
70
        PostgresIterator {
×
71
            details,
×
72
            ingestor,
×
73
            connector_id: id,
×
74
        }
×
75
    }
×
76
}
77

78
impl PostgresIterator {
79
    pub fn start(&self, lsn: Option<(PgLsn, u64)>) -> Result<(), ConnectorError> {
×
80
        let lsn = RefCell::new(lsn);
×
81
        let state = RefCell::new(ReplicationState::Pending);
×
82
        let details = self.details.clone();
×
83
        let ingestor = self.ingestor.clone();
×
84
        let connector_id = self.connector_id;
×
85

×
86
        let mut stream_inner = PostgresIteratorHandler {
×
87
            details,
×
88
            ingestor,
×
89
            state,
×
90
            lsn,
×
91
            connector_id,
×
92
        };
×
93
        stream_inner._start()
×
94
    }
×
95
}
96

97
pub struct PostgresIteratorHandler {
98
    pub details: Arc<Details>,
99
    pub lsn: RefCell<Option<(PgLsn, u64)>>,
100
    pub state: RefCell<ReplicationState>,
101
    pub ingestor: Arc<RwLock<Ingestor>>,
102
    pub connector_id: u64,
103
}
104

105
impl PostgresIteratorHandler {
106
    /*
107
     Replication involves 3 states
108
        1) Pending
109
        - Initialize a replication slot.
110
        - Initialize snapshots
111

112
        2) SnapshotInProgress
113
        - Sync initial snapshots of specified tables
114
        - Commit with lsn
115

116
        3) Replicating
117
        - Replicate CDC events using lsn
118
    */
119
    pub fn _start(&mut self) -> Result<(), ConnectorError> {
×
120
        let details = Arc::clone(&self.details);
×
121
        let replication_conn_config = details.replication_conn_config.to_owned();
×
122
        let client = Arc::new(RefCell::new(
×
123
            helper::connect(replication_conn_config)
×
124
                .map_err(ConnectorError::PostgresConnectorError)?,
×
125
        ));
126

127
        // TODO: Handle cases:
128
        // - When snapshot replication is not completed
129
        // - When there is gap between available lsn (in case when slot dropped and new created) and last lsn
130
        // - When publication tables changes
131
        let mut tables = details.tables.clone();
×
132
        if self.lsn.clone().into_inner().is_none() {
×
133
            debug!("\nCreating Slot....");
×
134
            if let Ok(true) = self.replication_slot_exists(client.clone()) {
×
135
                // We dont have lsn, so we need to drop replication slot and start from scratch
×
136
                self.drop_replication_slot(client.clone());
×
137
            }
×
138

139
            client
×
140
                .borrow_mut()
×
141
                .simple_query("BEGIN READ ONLY ISOLATION LEVEL REPEATABLE READ;")
×
142
                .map_err(|_e| {
×
143
                    debug!("failed to begin txn for replication");
×
144
                    PostgresConnectorError::BeginReplication
×
145
                })?;
×
146

147
            let replication_slot_lsn = self.create_replication_slot(client.clone())?;
×
148
            if let Some(lsn) = replication_slot_lsn {
×
149
                let parsed_lsn =
×
150
                    PgLsn::from_str(&lsn).map_err(|_| LsnParseError(lsn.to_string()))?;
×
151
                self.lsn.replace(Some((parsed_lsn, 0)));
×
152
            } else {
153
                return Err(ConnectorError::PostgresConnectorError(
×
154
                    LsnNotReturnedFromReplicationSlot,
×
155
                ));
×
156
            }
157

158
            self.state
×
159
                .clone()
×
160
                .replace(ReplicationState::SnapshotInProgress);
×
161

×
162
            /* #####################        SnapshotInProgress         ###################### */
×
163
            debug!("\nInitializing snapshots...");
×
164

165
            let snapshotter = PostgresSnapshotter {
×
166
                tables: details.tables.clone(),
×
167
                conn_config: details.conn_config.to_owned(),
×
168
                ingestor: Arc::clone(&self.ingestor),
×
169
                connector_id: self.connector_id,
×
170
            };
×
171
            tables = snapshotter.sync_tables(details.tables.clone(), self.lsn.borrow().as_ref())?;
×
172

173
            debug!("\nInitialized with tables: {:?}", tables);
×
174

175
            client.borrow_mut().simple_query("COMMIT;").map_err(|_e| {
×
176
                debug!("failed to commit txn for replication");
×
177
                ConnectorError::PostgresConnectorError(PostgresConnectorError::CommitReplication)
×
178
            })?;
×
179
        }
×
180

181
        self.state.clone().replace(ReplicationState::Replicating);
×
182

×
183
        /*  ####################        Replicating         ######################  */
×
184
        self.replicate(tables)
×
185
    }
×
186

187
    fn drop_replication_slot(&self, client: Arc<RefCell<Client>>) {
×
188
        let slot = self.details.slot_name.clone();
×
189
        let res = client
×
190
            .borrow_mut()
×
191
            .simple_query(format!("select pg_drop_replication_slot('{slot}');").as_ref());
×
192
        match res {
×
193
            Ok(_) => debug!("dropped replication slot {}", slot),
×
194
            Err(_) => debug!("failed to drop replication slot..."),
×
195
        };
196
    }
×
197

198
    fn create_replication_slot(
×
199
        &self,
×
200
        client: Arc<RefCell<Client>>,
×
201
    ) -> Result<Option<String>, ConnectorError> {
×
202
        let details = Arc::clone(&self.details);
×
203

×
204
        let create_replication_slot_query = format!(
×
205
            r#"CREATE_REPLICATION_SLOT {:?} LOGICAL "pgoutput" USE_SNAPSHOT"#,
×
206
            details.slot_name
×
207
        );
×
208

209
        let slot_query_row = client
×
210
            .borrow_mut()
×
211
            .simple_query(&create_replication_slot_query)
×
212
            .map_err(|_e| {
×
213
                let slot_name = self.details.slot_name.clone();
×
214
                debug!("failed to create replication slot {}", slot_name);
×
215
                ConnectorError::PostgresConnectorError(PostgresConnectorError::CreateSlotError(
×
216
                    slot_name,
×
217
                ))
×
218
            })?;
×
219

220
        if let SimpleQueryMessage::Row(row) = &slot_query_row[0] {
×
221
            Ok(row.get("consistent_point").map(|lsn| lsn.to_string()))
×
222
        } else {
223
            Err(UnexpectedQueryMessageError)
×
224
        }
225
    }
×
226

227
    fn replication_slot_exists(
×
228
        &self,
×
229
        client: Arc<RefCell<Client>>,
×
230
    ) -> Result<bool, ConnectorError> {
×
231
        let details = Arc::clone(&self.details);
×
232

×
233
        let replication_slot_info_query = format!(
×
234
            r#"SELECT * FROM pg_replication_slots where slot_name = '{}';"#,
×
235
            details.slot_name
×
236
        );
×
237

238
        let slot_query_row = client
×
239
            .borrow_mut()
×
240
            .simple_query(&replication_slot_info_query)
×
241
            .map_err(|_e| {
×
242
                debug!("failed to begin txn for replication");
×
243
                ConnectorError::PostgresConnectorError(PostgresConnectorError::FetchReplicationSlot)
×
244
            })?;
×
245

246
        Ok(!slot_query_row.is_empty())
×
247
    }
×
248

249
    fn replicate(&self, tables: Option<Vec<TableInfo>>) -> Result<(), ConnectorError> {
×
250
        let rt = Runtime::new().unwrap();
×
251
        let ingestor = self.ingestor.clone();
×
252
        let lsn = self.lsn.borrow();
×
253
        let (lsn, offset) = lsn
×
254
            .as_ref()
×
255
            .map_or(Err(LSNNotStoredError), |(x, offset)| Ok((x, offset)))?;
×
256

257
        let publication_name = self.details.publication_name.clone();
×
258
        let slot_name = self.details.slot_name.clone();
×
259
        rt.block_on(async {
×
260
            let mut replicator = CDCHandler {
×
261
                replication_conn_config: self.details.replication_conn_config.clone(),
×
262
                ingestor,
×
263
                start_lsn: *lsn,
×
264
                begin_lsn: 0,
×
265
                offset_lsn: 0,
×
266
                offset: *offset,
×
267
                publication_name,
×
268
                slot_name,
×
269
                last_commit_lsn: 0,
×
270
                connector_id: self.connector_id,
×
271
                seq_no: 0,
×
272
                name: self.details.name.clone(),
×
273
            };
×
274
            replicator.start(tables).await
×
275
        })
×
276
    }
×
277
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc