• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vnvo / deltaforge / 20841599773

09 Jan 2026 04:50AM UTC coverage: 51.06% (-1.1%) from 52.164%
20841599773

push

github

vnvo
chore: fixing clippy warnings

54 of 262 new or added lines in 4 files covered. (20.61%)

590 existing lines in 7 files now uncovered.

4674 of 9154 relevant lines covered (51.06%)

2.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/sources/src/postgres/postgres_errors.rs
1
//! PostgreSQL source error types and loop control.
2

3
use common::RetryOutcome;
4
use deltaforge_core::SourceError;
5
use thiserror::Error;
6
use tracing::{error, warn};
7

8
// =============================================================================
9
// PostgresSourceError - for helper functions (NOT the event loop)
10
// =============================================================================
11

12
#[derive(Debug, Error)]
13
pub enum PostgresSourceError {
14
    #[error("invalid postgres dsn: {0}")]
15
    InvalidDsn(String),
16

17
    #[error("replication connection failed: {0}")]
18
    ReplicationConnect(String),
19

20
    #[error("auth failed: {0}")]
21
    Auth(String),
22

23
    #[error("checkpoint error: {0}")]
24
    Checkpoint(String),
25

26
    #[error("LSN parse error: {0}")]
27
    LsnParse(String),
28

29
    #[error("schema resolution failed for {schema}.{table}: {details}")]
30
    Schema {
31
        schema: String,
32
        table: String,
33
        details: String,
34
    },
35

36
    #[error("I/O error: {0}")]
37
    Io(#[from] std::io::Error),
38

39
    #[error("database error: {0}")]
40
    Database(String),
41
}
42

43
pub type PostgresSourceResult<T> = Result<T, PostgresSourceError>;
44

45
impl From<PostgresSourceError> for SourceError {
UNCOV
46
    fn from(e: PostgresSourceError) -> Self {
×
UNCOV
47
        match e {
×
UNCOV
48
            PostgresSourceError::InvalidDsn(msg) => SourceError::Incompatible {
×
UNCOV
49
                details: msg.into(),
×
50
            },
×
51
            PostgresSourceError::ReplicationConnect(msg) => {
×
52
                SourceError::Connect {
×
53
                    details: msg.into(),
×
54
                }
×
55
            }
56
            PostgresSourceError::Auth(msg) => SourceError::Auth {
×
57
                details: msg.into(),
×
58
            },
×
UNCOV
59
            PostgresSourceError::Checkpoint(msg) => SourceError::Checkpoint {
×
60
                details: msg.into(),
×
61
            },
×
62
            PostgresSourceError::LsnParse(msg) => SourceError::Incompatible {
×
63
                details: format!("LSN: {msg}").into(),
×
64
            },
×
65
            PostgresSourceError::Schema {
66
                schema,
×
67
                table,
×
UNCOV
68
                details,
×
69
            } => SourceError::Schema {
×
70
                details: format!("{schema}.{table}: {details}").into(),
×
71
            },
×
72
            PostgresSourceError::Io(e) => SourceError::Io(e),
×
73
            PostgresSourceError::Database(msg) => {
×
UNCOV
74
                SourceError::Other(anyhow::anyhow!("{msg}"))
×
75
            }
76
        }
77
    }
×
78
}
79

80
impl From<tokio_postgres::Error> for PostgresSourceError {
81
    fn from(e: tokio_postgres::Error) -> Self {
×
82
        PostgresSourceError::Database(e.to_string())
×
83
    }
×
84
}
85

86
// =============================================================================
87
// LoopControl - for event loop control flow
88
// =============================================================================
89

90
#[derive(Debug)]
91
pub enum LoopControl {
92
    Reconnect,
93
    ReloadSchema {
94
        schema: Option<String>,
95
        table: Option<String>,
96
    },
97
    Stop,
98
    Fail(SourceError),
99
}
100

101
impl LoopControl {
102
    /// Classify RetryOutcome from watchdog/retry_async.
103
    pub fn from_replication_outcome<E: std::fmt::Display>(
×
UNCOV
104
        outcome: RetryOutcome<E>,
×
UNCOV
105
    ) -> Self {
×
UNCOV
106
        match outcome {
×
107
            RetryOutcome::Cancelled => Self::Stop,
×
108
            RetryOutcome::Timeout { .. } => Self::Reconnect,
×
109
            RetryOutcome::Failed(e)
×
UNCOV
110
            | RetryOutcome::Exhausted { last_error: e, .. } => {
×
UNCOV
111
                Self::from_error_message(&e.to_string())
×
112
            }
113
        }
UNCOV
114
    }
×
115

116
    /// Classify PgWireError (main event loop errors).
117
    #[allow(dead_code)]
UNCOV
118
    pub fn from_pgwire_error(err: pgwire_replication::PgWireError) -> Self {
×
119
        use pgwire_replication::PgWireError::*;
120

UNCOV
121
        match err {
×
UNCOV
122
            Io(ref msg) if msg.contains("permission denied") => {
×
UNCOV
123
                error!(error = %msg, "permission denied");
×
UNCOV
124
                Self::Fail(SourceError::Auth {
×
UNCOV
125
                    details: msg.clone().into(),
×
UNCOV
126
                })
×
127
            }
UNCOV
128
            Io(msg) => {
×
UNCOV
129
                warn!(error = %msg, "IO error, will reconnect");
×
UNCOV
130
                Self::Reconnect
×
131
            }
132

UNCOV
133
            Auth(msg) => {
×
UNCOV
134
                error!(error = %msg, "authentication failed");
×
UNCOV
135
                Self::Fail(SourceError::Auth {
×
UNCOV
136
                    details: msg.into(),
×
UNCOV
137
                })
×
138
            }
139

UNCOV
140
            Tls(msg) => {
×
UNCOV
141
                error!(error = %msg, "TLS error");
×
UNCOV
142
                Self::Fail(SourceError::Incompatible {
×
UNCOV
143
                    details: format!("TLS: {msg}").into(),
×
UNCOV
144
                })
×
145
            }
146

UNCOV
147
            Server(ref msg) => Self::classify_server_error(msg),
×
148

UNCOV
149
            Protocol(ref msg) if is_schema_error(msg) => {
×
UNCOV
150
                warn!(error = %msg, "schema error, will reload");
×
UNCOV
151
                Self::ReloadSchema {
×
UNCOV
152
                    schema: None,
×
UNCOV
153
                    table: None,
×
UNCOV
154
                }
×
155
            }
UNCOV
156
            Protocol(msg) => {
×
UNCOV
157
                warn!(error = %msg, "protocol error, will reconnect");
×
UNCOV
158
                Self::Reconnect
×
159
            }
160

UNCOV
161
            Task(msg) => {
×
UNCOV
162
                warn!(error = %msg, "task error, will reconnect");
×
UNCOV
163
                Self::Reconnect
×
164
            }
165

UNCOV
166
            Internal(msg) => {
×
UNCOV
167
                error!(error = %msg, "internal error");
×
UNCOV
168
                Self::Fail(SourceError::Other(anyhow::anyhow!(
×
UNCOV
169
                    "pgwire: {}",
×
UNCOV
170
                    msg
×
UNCOV
171
                )))
×
172
            }
173
        }
UNCOV
174
    }
×
175

176
    /// Classify tokio_postgres errors (control plane queries).
UNCOV
177
    pub fn from_tokio_postgres_error(err: &tokio_postgres::Error) -> Self {
×
UNCOV
178
        if let Some(db_err) = err.as_db_error() {
×
UNCOV
179
            let code = db_err.code().code();
×
UNCOV
180
            if code.starts_with("28") {
×
UNCOV
181
                return Self::Fail(SourceError::Auth {
×
UNCOV
182
                    details: err.to_string().into(),
×
UNCOV
183
                });
×
UNCOV
184
            }
×
UNCOV
185
            if code.starts_with("42") {
×
UNCOV
186
                return Self::ReloadSchema {
×
UNCOV
187
                    schema: None,
×
UNCOV
188
                    table: None,
×
UNCOV
189
                };
×
UNCOV
190
            }
×
UNCOV
191
            if code.starts_with("08")
×
UNCOV
192
                || code.starts_with("53")
×
UNCOV
193
                || code.starts_with("57")
×
194
            {
UNCOV
195
                return Self::Reconnect;
×
UNCOV
196
            }
×
UNCOV
197
        }
×
UNCOV
198
        Self::from_error_message(&err.to_string())
×
UNCOV
199
    }
×
200

UNCOV
201
    fn classify_server_error(msg: &str) -> Self {
×
UNCOV
202
        let lower = msg.to_lowercase();
×
203

204
        // Auth errors (28xxx)
UNCOV
205
        if lower.contains("28p01")
×
UNCOV
206
            || lower.contains("28000")
×
UNCOV
207
            || lower.contains("password authentication failed")
×
UNCOV
208
            || lower.contains("no pg_hba.conf entry")
×
209
        {
UNCOV
210
            error!(error = %msg, "auth failed");
×
UNCOV
211
            return Self::Fail(SourceError::Auth {
×
UNCOV
212
                details: msg.to_string().into(),
×
UNCOV
213
            });
×
UNCOV
214
        }
×
215

216
        // Schema errors (42xxx)
UNCOV
217
        if lower.contains("42p01")
×
UNCOV
218
            || lower.contains("42703")
×
UNCOV
219
            || lower.contains("42704")
×
UNCOV
220
            || is_schema_error(&lower)
×
221
        {
UNCOV
222
            warn!(error = %msg, "schema error, will reload");
×
UNCOV
223
            return Self::ReloadSchema {
×
UNCOV
224
                schema: None,
×
UNCOV
225
                table: None,
×
UNCOV
226
            };
×
UNCOV
227
        }
×
228

229
        // Slot errors
UNCOV
230
        if lower.contains("replication slot") {
×
UNCOV
231
            if lower.contains("does not exist") {
×
UNCOV
232
                error!(error = %msg, "slot not found");
×
UNCOV
233
                return Self::Fail(SourceError::Checkpoint {
×
UNCOV
234
                    details: msg.to_string().into(),
×
UNCOV
235
                });
×
UNCOV
236
            }
×
UNCOV
237
            if lower.contains("already active") {
×
UNCOV
238
                warn!(error = %msg, "slot in use, will retry");
×
UNCOV
239
                return Self::Reconnect;
×
UNCOV
240
            }
×
UNCOV
241
        }
×
242

243
        // WAL removed
UNCOV
244
        if lower.contains("requested wal segment") && lower.contains("removed")
×
245
        {
UNCOV
246
            error!(error = %msg, "WAL removed");
×
UNCOV
247
            return Self::Fail(SourceError::Checkpoint {
×
UNCOV
248
                details: msg.to_string().into(),
×
UNCOV
249
            });
×
UNCOV
250
        }
×
251

252
        // Admin shutdown (57P01) - this is retryable
UNCOV
253
        if lower.contains("57p01")
×
UNCOV
254
            || lower.contains("terminating connection")
×
UNCOV
255
            || lower.contains("administrator command")
×
256
        {
UNCOV
257
            warn!(error = %msg, "connection terminated by admin, will reconnect");
×
UNCOV
258
            return Self::Reconnect;
×
UNCOV
259
        }
×
260

UNCOV
261
        warn!(error = %msg, "server error, will reconnect");
×
UNCOV
262
        Self::Reconnect
×
UNCOV
263
    }
×
264

UNCOV
265
    fn from_error_message(msg: &str) -> Self {
×
UNCOV
266
        let lower = msg.to_lowercase();
×
267

268
        // Invalid DSN/connection string is a config error, not retryable
UNCOV
269
        if lower.contains("invalid connection string")
×
UNCOV
270
            || lower.contains("invalid dsn")
×
271
        {
UNCOV
272
            error!(error = %msg, "invalid connection string - check DSN format");
×
UNCOV
273
            return Self::Fail(SourceError::Incompatible {
×
UNCOV
274
                details: msg.to_string().into(),
×
UNCOV
275
            });
×
UNCOV
276
        }
×
277

UNCOV
278
        if is_auth_error(&lower) {
×
UNCOV
279
            return Self::Fail(SourceError::Auth {
×
UNCOV
280
                details: msg.to_string().into(),
×
UNCOV
281
            });
×
UNCOV
282
        }
×
UNCOV
283
        if is_connection_error(&lower) {
×
UNCOV
284
            return Self::Reconnect;
×
UNCOV
285
        }
×
UNCOV
286
        if is_schema_error(&lower) {
×
UNCOV
287
            return Self::ReloadSchema {
×
UNCOV
288
                schema: None,
×
UNCOV
289
                table: None,
×
UNCOV
290
            };
×
UNCOV
291
        }
×
UNCOV
292
        Self::Fail(SourceError::Other(anyhow::anyhow!("{}", msg)))
×
UNCOV
293
    }
×
294

UNCOV
295
    pub fn is_retryable(&self) -> bool {
×
UNCOV
296
        matches!(self, Self::Reconnect | Self::ReloadSchema { .. })
×
UNCOV
297
    }
×
298
}
299

UNCOV
300
fn is_auth_error(s: &str) -> bool {
×
UNCOV
301
    s.contains("password authentication failed")
×
UNCOV
302
        || s.contains("permission denied")
×
UNCOV
303
        || s.contains("no pg_hba.conf entry")
×
UNCOV
304
        || s.contains("insufficient privilege")
×
UNCOV
305
}
×
306

UNCOV
307
fn is_connection_error(s: &str) -> bool {
×
UNCOV
308
    s.contains("connection reset")
×
UNCOV
309
        || s.contains("connection refused")
×
UNCOV
310
        || s.contains("broken pipe")
×
UNCOV
311
        || s.contains("connection closed")
×
UNCOV
312
        || s.contains("timed out")
×
UNCOV
313
        || s.contains("unexpected eof")
×
UNCOV
314
        || s.contains("terminating connection")
×
UNCOV
315
        || s.contains("administrator command")
×
UNCOV
316
        || s.contains("57p01")
×
UNCOV
317
}
×
318

UNCOV
319
fn is_schema_error(s: &str) -> bool {
×
UNCOV
320
    (s.contains("relation") || s.contains("column") || s.contains("type"))
×
UNCOV
321
        && s.contains("does not exist")
×
UNCOV
322
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc