• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

getdozer / dozer / 5921125008

21 Aug 2023 02:02AM UTC coverage: 74.902% (-1.2%) from 76.06%
5921125008

push

github

web-flow
Wait for connectors to stop on shutdown (#1865)

* Wait for connectors to stop on shutdown

* Fix shutdown of object store connector

* Propagate errors in object store connector

338 of 338 new or added lines in 14 files covered. (100.0%)

46077 of 61516 relevant lines covered (74.9%)

39792.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

34.34
/dozer-ingestion/src/connectors/postgres/replicator.rs
1
use crate::connectors::postgres::connection::helper;
2
use crate::connectors::postgres::xlog_mapper::XlogMapper;
3
use crate::errors::ConnectorError;
4
use crate::errors::ConnectorError::PostgresConnectorError;
5
use crate::errors::PostgresConnectorError::{
6
    ReplicationStreamEndError, ReplicationStreamError, UnexpectedReplicationMessageError,
7
};
8
use crate::ingestion::Ingestor;
9
use dozer_types::bytes;
10
use dozer_types::chrono::{TimeZone, Utc};
11
use dozer_types::ingestion_types::IngestionMessage;
12
use dozer_types::log::{error, info};
13
use futures::StreamExt;
14
use postgres_protocol::message::backend::ReplicationMessage::*;
15
use postgres_protocol::message::backend::{LogicalReplicationMessage, ReplicationMessage};
16
use postgres_types::PgLsn;
17

18
use std::time::SystemTime;
19
use tokio_postgres::replication::LogicalReplicationStream;
20
use tokio_postgres::Error;
21

22
use super::schema::helper::PostgresTableInfo;
23
use super::xlog_mapper::MappedReplicationMessage;
24

25
pub struct CDCHandler<'a> {
26
    pub name: String,
27
    pub ingestor: &'a Ingestor,
28

29
    pub replication_conn_config: tokio_postgres::Config,
30
    pub publication_name: String,
31
    pub slot_name: String,
32

33
    pub start_lsn: PgLsn,
34
    pub begin_lsn: u64,
35
    pub offset_lsn: u64,
36
    pub last_commit_lsn: u64,
37

38
    pub offset: u64,
39
    pub seq_no: u64,
40
}
41

42
impl<'a> CDCHandler<'a> {
43
    pub async fn start(&mut self, tables: Vec<PostgresTableInfo>) -> Result<(), ConnectorError> {
12✔
44
        let replication_conn_config = self.replication_conn_config.clone();
12✔
45
        let client: tokio_postgres::Client = helper::connect(replication_conn_config).await?;
60✔
46

47
        info!(
48
            "[{}] Starting Replication: {:?}, {:?}",
×
49
            self.name.clone(),
×
50
            self.start_lsn,
×
51
            self.publication_name.clone()
×
52
        );
53

54
        let lsn = self.start_lsn;
12✔
55
        let options = format!(
12✔
56
            r#"("proto_version" '1', "publication_names" '{publication_name}')"#,
12✔
57
            publication_name = self.publication_name
12✔
58
        );
12✔
59
        let query = format!(
12✔
60
            r#"START_REPLICATION SLOT {:?} LOGICAL {} {}"#,
12✔
61
            self.slot_name, lsn, options
12✔
62
        );
12✔
63

12✔
64
        self.offset_lsn = u64::from(lsn);
12✔
65
        self.last_commit_lsn = u64::from(lsn);
12✔
66

67
        let copy_stream = client
12✔
68
            .copy_both_simple::<bytes::Bytes>(&query)
12✔
69
            .await
12✔
70
            .map_err(|e| ConnectorError::InternalError(Box::new(e)))?;
12✔
71

72
        let stream = LogicalReplicationStream::new(copy_stream);
12✔
73
        let tables_columns = tables
12✔
74
            .into_iter()
12✔
75
            .enumerate()
12✔
76
            .map(|(table_index, table_info)| {
12✔
77
                (table_info.relation_id, (table_index, table_info.columns))
12✔
78
            })
12✔
79
            .collect();
12✔
80
        let mut mapper = XlogMapper::new(tables_columns);
12✔
81

12✔
82
        tokio::pin!(stream);
12✔
83
        loop {
84
            let message = stream.next().await;
36✔
85
            if let Some(Ok(PrimaryKeepAlive(ref k))) = message {
24✔
86
                if k.reply() == 1 {
24✔
87
                    // Postgres' keep alive feedback function expects time from 2000-01-01 00:00:00
88
                    let since_the_epoch = SystemTime::now()
×
89
                        .duration_since(SystemTime::from(
×
90
                            Utc.with_ymd_and_hms(2020, 1, 1, 0, 0, 0).unwrap(),
×
91
                        ))
×
92
                        .unwrap()
×
93
                        .as_millis();
×
94
                    stream
×
95
                        .as_mut()
×
96
                        .standby_status_update(
×
97
                            PgLsn::from(self.last_commit_lsn),
×
98
                            PgLsn::from(self.last_commit_lsn),
×
99
                            PgLsn::from(self.last_commit_lsn),
×
100
                            since_the_epoch as i64,
×
101
                            1,
×
102
                        )
×
103
                        .await
×
104
                        .unwrap();
×
105
                }
24✔
106
            } else {
107
                self.handle_replication_message(message, &mut mapper)
×
108
                    .await?;
×
109
            }
110
        }
111
    }
×
112

113
    pub async fn handle_replication_message(
×
114
        &mut self,
×
115
        message: Option<Result<ReplicationMessage<LogicalReplicationMessage>, Error>>,
×
116
        mapper: &mut XlogMapper,
×
117
    ) -> Result<(), ConnectorError> {
×
118
        match message {
×
119
            Some(Ok(XLogData(body))) => {
×
120
                let lsn = body.wal_start();
×
121
                let message = mapper
×
122
                    .handle_message(body)
×
123
                    .map_err(PostgresConnectorError)?;
×
124

125
                match message {
×
126
                    Some(MappedReplicationMessage::Commit(commit)) => {
×
127
                        self.last_commit_lsn = commit.txid;
×
128
                    }
×
129
                    Some(MappedReplicationMessage::Begin) => {
×
130
                        self.begin_lsn = lsn;
×
131
                        self.seq_no = 0;
×
132
                    }
×
133
                    Some(MappedReplicationMessage::Operation { table_index, op }) => {
×
134
                        self.seq_no += 1;
×
135
                        if self.begin_lsn != self.offset_lsn || self.offset < self.seq_no {
×
136
                            self.ingestor
×
137
                                .handle_message(IngestionMessage::new_op(
×
138
                                    self.begin_lsn,
×
139
                                    self.seq_no,
×
140
                                    table_index,
×
141
                                    op,
×
142
                                ))
×
143
                                .map_err(ConnectorError::IngestorError)?;
×
144
                        }
×
145
                    }
146
                    None => {}
×
147
                }
148

149
                Ok(())
×
150
            }
151
            Some(Ok(msg)) => {
×
152
                error!("Unexpected message: {:?}", msg);
×
153
                Err(PostgresConnectorError(UnexpectedReplicationMessageError))
×
154
            }
155
            Some(Err(e)) => Err(PostgresConnectorError(ReplicationStreamError(
×
156
                e.to_string(),
×
157
            ))),
×
158
            None => Err(PostgresConnectorError(ReplicationStreamEndError)),
×
159
        }
160
    }
×
161
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc