• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 6062556821

01 Sep 2023 04:12PM UTC coverage: 77.717%. First build
6062556821

push

github

web-flow
fix: resume connectors on network errors (#1956)

* fix: retry on network errors in MySQL connector

Resume database queries on network errors.
Select queries with multiple rows resume from the last row received.
CDC continues from its last position.

* fix: retry on network errors in Postgres connector

Similarly to the MySQL connector, select queries resume from the last row received.
The CDC resumes from the position where it was stopped.

* fix: retry on network errors in Kafka connector

Detect network failures, reconnect, and resume.

* fix: retry on network errors in Object Store connector

This is not a complete solution but we should use retry infrastructure provided by the object_store crate.

* chore: add sleep between retries

---------

Co-authored-by: chubei <914745487@qq.com>

638 of 638 new or added lines in 26 files covered. (100.0%)

49643 of 63877 relevant lines covered (77.72%)

51433.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

39.05
/dozer-ingestion/src/connectors/postgres/replicator.rs
1
use crate::connectors::postgres::connection::client::Client;
2
use crate::connectors::postgres::connection::helper::{self, is_network_failure};
3
use crate::connectors::postgres::xlog_mapper::XlogMapper;
4
use crate::errors::ConnectorError;
5
use crate::errors::ConnectorError::PostgresConnectorError;
6
use crate::errors::PostgresConnectorError::{
7
    ReplicationStreamEndError, ReplicationStreamError, UnexpectedReplicationMessageError,
8
};
9
use crate::ingestion::Ingestor;
10
use dozer_types::bytes;
11
use dozer_types::chrono::{TimeZone, Utc};
12
use dozer_types::ingestion_types::IngestionMessage;
13
use dozer_types::log::{error, info};
14
use futures::StreamExt;
15
use postgres_protocol::message::backend::ReplicationMessage::*;
16
use postgres_protocol::message::backend::{LogicalReplicationMessage, ReplicationMessage};
17
use postgres_types::PgLsn;
18

19
use std::pin::Pin;
20
use std::time::SystemTime;
21
use tokio_postgres::Error;
22

23
use super::schema::helper::PostgresTableInfo;
24
use super::xlog_mapper::MappedReplicationMessage;
25

26
pub struct CDCHandler<'a> {
27
    pub name: String,
28
    pub ingestor: &'a Ingestor,
29

30
    pub replication_conn_config: tokio_postgres::Config,
31
    pub publication_name: String,
32
    pub slot_name: String,
33

34
    pub start_lsn: PgLsn,
35
    pub begin_lsn: u64,
36
    pub offset_lsn: u64,
37
    pub last_commit_lsn: u64,
38

39
    pub offset: u64,
40
    pub seq_no: u64,
41
}
42

43
impl<'a> CDCHandler<'a> {
44
    pub async fn start(&mut self, tables: Vec<PostgresTableInfo>) -> Result<(), ConnectorError> {
18✔
45
        let replication_conn_config = self.replication_conn_config.clone();
18✔
46
        let client: Client = helper::connect(replication_conn_config).await?;
90✔
47

48
        info!(
49
            "[{}] Starting Replication: {:?}, {:?}",
×
50
            self.name.clone(),
×
51
            self.start_lsn,
×
52
            self.publication_name.clone()
×
53
        );
54

55
        let lsn = self.start_lsn;
18✔
56
        let options = format!(
18✔
57
            r#"("proto_version" '1', "publication_names" '{publication_name}')"#,
18✔
58
            publication_name = self.publication_name
18✔
59
        );
18✔
60

18✔
61
        self.offset_lsn = u64::from(lsn);
18✔
62
        self.last_commit_lsn = u64::from(lsn);
18✔
63

64
        let mut stream =
18✔
65
            LogicalReplicationStream::new(client, self.slot_name.clone(), lsn, options)
18✔
66
                .await
18✔
67
                .map_err(|e| ConnectorError::InternalError(Box::new(e)))?;
18✔
68

69
        let tables_columns = tables
18✔
70
            .into_iter()
18✔
71
            .enumerate()
18✔
72
            .map(|(table_index, table_info)| {
18✔
73
                (table_info.relation_id, (table_index, table_info.columns))
18✔
74
            })
18✔
75
            .collect();
18✔
76
        let mut mapper = XlogMapper::new(tables_columns);
18✔
77

78
        loop {
79
            let message = stream.next().await;
54✔
80
            if let Some(Ok(PrimaryKeepAlive(ref k))) = message {
36✔
81
                if k.reply() == 1 {
36✔
82
                    // Postgres' keep alive feedback function expects time from 2000-01-01 00:00:00
83
                    let since_the_epoch = SystemTime::now()
×
84
                        .duration_since(SystemTime::from(
×
85
                            Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(),
×
86
                        ))
×
87
                        .unwrap()
×
88
                        .as_millis();
×
89
                    stream
×
90
                        .standby_status_update(
×
91
                            PgLsn::from(self.last_commit_lsn),
×
92
                            PgLsn::from(self.last_commit_lsn),
×
93
                            PgLsn::from(self.last_commit_lsn),
×
94
                            since_the_epoch as i64,
×
95
                            1,
×
96
                        )
×
97
                        .await
×
98
                        .unwrap();
×
99
                }
36✔
100
            } else {
101
                self.handle_replication_message(message, &mut mapper)
×
102
                    .await?;
×
103
            }
104
        }
105
    }
×
106

107
    pub async fn handle_replication_message(
×
108
        &mut self,
×
109
        message: Option<Result<ReplicationMessage<LogicalReplicationMessage>, Error>>,
×
110
        mapper: &mut XlogMapper,
×
111
    ) -> Result<(), ConnectorError> {
×
112
        match message {
×
113
            Some(Ok(XLogData(body))) => {
×
114
                let lsn = body.wal_start();
×
115
                let message = mapper
×
116
                    .handle_message(body)
×
117
                    .map_err(PostgresConnectorError)?;
×
118

119
                match message {
×
120
                    Some(MappedReplicationMessage::Commit(commit)) => {
×
121
                        self.last_commit_lsn = commit.txid;
×
122
                    }
×
123
                    Some(MappedReplicationMessage::Begin) => {
×
124
                        self.begin_lsn = lsn;
×
125
                        self.seq_no = 0;
×
126
                    }
×
127
                    Some(MappedReplicationMessage::Operation { table_index, op }) => {
×
128
                        self.seq_no += 1;
×
129
                        if self.begin_lsn != self.offset_lsn || self.offset < self.seq_no {
×
130
                            self.ingestor
×
131
                                .handle_message(IngestionMessage::new_op(
×
132
                                    self.begin_lsn,
×
133
                                    self.seq_no,
×
134
                                    table_index,
×
135
                                    op,
×
136
                                ))
×
137
                                .map_err(ConnectorError::IngestorError)?;
×
138
                        }
×
139
                    }
140
                    None => {}
×
141
                }
142

143
                Ok(())
×
144
            }
145
            Some(Ok(msg)) => {
×
146
                error!("Unexpected message: {:?}", msg);
×
147
                Err(PostgresConnectorError(UnexpectedReplicationMessageError))
×
148
            }
149
            Some(Err(e)) => Err(PostgresConnectorError(ReplicationStreamError(
×
150
                e.to_string(),
×
151
            ))),
×
152
            None => Err(PostgresConnectorError(ReplicationStreamEndError)),
×
153
        }
154
    }
×
155
}
156

157
pub struct LogicalReplicationStream {
158
    client: Client,
159
    slot_name: String,
160
    resume_lsn: PgLsn,
161
    options: String,
162
    inner: Pin<Box<tokio_postgres::replication::LogicalReplicationStream>>,
163
}
164

165
impl LogicalReplicationStream {
166
    pub async fn new(
18✔
167
        mut client: Client,
18✔
168
        slot_name: String,
18✔
169
        lsn: PgLsn,
18✔
170
        options: String,
18✔
171
    ) -> Result<Self, tokio_postgres::Error> {
18✔
172
        let inner =
18✔
173
            Box::pin(Self::open_replication_stream(&mut client, &slot_name, lsn, &options).await?);
18✔
174
        Ok(Self {
18✔
175
            client,
18✔
176
            slot_name,
18✔
177
            resume_lsn: lsn,
18✔
178
            options,
18✔
179
            inner,
18✔
180
        })
18✔
181
    }
18✔
182

183
    pub async fn next(
54✔
184
        &mut self,
54✔
185
    ) -> Option<Result<ReplicationMessage<LogicalReplicationMessage>, tokio_postgres::Error>> {
54✔
186
        loop {
187
            let result = self.inner.next().await;
54✔
188
            match result.as_ref() {
36✔
189
                Some(Err(err)) if is_network_failure(err) => {
×
190
                    if let Err(err) = self.resume().await {
×
191
                        return Some(Err(err));
×
192
                    }
×
193
                    continue;
×
194
                }
195
                Some(Ok(XLogData(body))) => self.resume_lsn = body.wal_end().into(),
×
196
                _ => {}
36✔
197
            }
198
            return result;
36✔
199
        }
200
    }
36✔
201

202
    pub async fn standby_status_update(
×
203
        &mut self,
×
204
        write_lsn: PgLsn,
×
205
        flush_lsn: PgLsn,
×
206
        apply_lsn: PgLsn,
×
207
        ts: i64,
×
208
        reply: u8,
×
209
    ) -> Result<(), Error> {
×
210
        loop {
211
            match self
×
212
                .inner
×
213
                .as_mut()
×
214
                .standby_status_update(write_lsn, flush_lsn, apply_lsn, ts, reply)
×
215
                .await
×
216
            {
217
                Err(err) if is_network_failure(&err) => {
×
218
                    self.resume().await?;
×
219
                    continue;
×
220
                }
221
                _ => {}
×
222
            }
×
223
            break Ok(());
×
224
        }
225
    }
×
226

227
    async fn resume(&mut self) -> Result<(), tokio_postgres::Error> {
×
228
        self.client.reconnect().await?;
×
229

230
        let stream = Self::open_replication_stream(
×
231
            &mut self.client,
×
232
            &self.slot_name,
×
233
            self.resume_lsn,
×
234
            &self.options,
×
235
        )
×
236
        .await?;
×
237

238
        self.inner = Box::pin(stream);
×
239

×
240
        Ok(())
×
241
    }
×
242

243
    async fn open_replication_stream(
18✔
244
        client: &mut Client,
18✔
245
        slot_name: &str,
18✔
246
        lsn: PgLsn,
18✔
247
        options: &str,
18✔
248
    ) -> Result<tokio_postgres::replication::LogicalReplicationStream, tokio_postgres::Error> {
18✔
249
        let query = format!(
18✔
250
            r#"START_REPLICATION SLOT {:?} LOGICAL {} {}"#,
18✔
251
            slot_name, lsn, options
18✔
252
        );
18✔
253

254
        let copy_stream = client.copy_both_simple::<bytes::Bytes>(&query).await?;
18✔
255

256
        Ok(tokio_postgres::replication::LogicalReplicationStream::new(
18✔
257
            copy_stream,
18✔
258
        ))
18✔
259
    }
18✔
260
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc