• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 28088805604

24 Jun 2026 09:27AM UTC coverage: 74.565% (-0.007%) from 74.572%
28088805604

Pull #850

github

web-flow
Merge be0c10e4c into 0a7600e50
Pull Request #850: fix(api): Improve tenant deletion reliability

116 of 159 new or added lines in 8 files covered. (72.96%)

38 existing lines in 4 files now uncovered.

39025 of 52337 relevant lines covered (74.56%)

35612.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.19
/crates/etl-api/src/data/source_database.rs
1
use std::{io::ErrorKind, sync::LazyLock};
2

3
use etl_config::shared::{PgConnectionConfig, PgConnectionOptions};
4
use etl_postgres::replication::connect_to_source_database;
5
use sqlx::{PgPool, error::DatabaseError};
6

7
/// Classification for source database errors exposed by API routes.
8
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
9
pub(crate) enum SourceDatabaseErrorKind {
10
    /// The source database did not respond before the request timeout.
11
    TimedOut,
12
    /// The source database is unavailable or the existing connection was lost.
13
    Unavailable,
14
    /// The source database returned an error that is not a connection lifecycle
15
    /// failure.
16
    Failed,
17
}
18

19
/// Minimum number of connections for the source Postgres connection pool.
20
const MIN_POOL_CONNECTIONS: u32 = 1;
21

22
/// Maximum number of connections for the source Postgres connection pool.
23
const MAX_POOL_CONNECTIONS: u32 = 1;
24
/// Application name for ETL API source database connections.
25
const APP_NAME_API: &str = "supabase_etl_api";
26

27
/// Connection options for API source database queries.
28
///
29
/// Uses strict timeouts to keep API requests responsive under contention.
30
static API_OPTIONS: LazyLock<PgConnectionOptions> =
31
    LazyLock::new(|| PgConnectionOptions::builder(APP_NAME_API).build());
30✔
32

33
/// Connects to the source database with API route defaults.
34
///
35
/// Uses state management options with moderate timeouts suitable for
36
/// administrative queries like listing tables and reading publications. If
37
/// configured, the source connection uses `hostaddr` as the TCP target and
38
/// preserves `host` as the canonical database hostname in stored API state.
39
pub(crate) async fn connect(config: &PgConnectionConfig) -> Result<PgPool, sqlx::Error> {
33✔
40
    connect_to_source_database(
33✔
41
        config,
33✔
42
        MIN_POOL_CONNECTIONS,
33✔
43
        MAX_POOL_CONNECTIONS,
33✔
44
        Some(&API_OPTIONS),
33✔
45
    )
33✔
46
    .await
33✔
47
}
33✔
48

49
/// Classifies a source database error using stable SQLSTATE and IO categories.
50
pub(crate) fn classify_error(error: &sqlx::Error) -> SourceDatabaseErrorKind {
22✔
51
    match error {
19✔
52
        sqlx::Error::Io(error) if error.kind() == ErrorKind::TimedOut => {
4✔
53
            SourceDatabaseErrorKind::TimedOut
3✔
54
        }
55
        error if error_is_unavailable(error) => SourceDatabaseErrorKind::Unavailable,
19✔
56
        _ => SourceDatabaseErrorKind::Failed,
5✔
57
    }
58
}
22✔
59

60
/// Returns true when a source database error means the source is unavailable.
61
pub(crate) fn error_is_unavailable(error: &sqlx::Error) -> bool {
19✔
62
    match error {
19✔
63
        sqlx::Error::PoolTimedOut | sqlx::Error::PoolClosed | sqlx::Error::WorkerCrashed => true,
5✔
64
        sqlx::Error::Io(error) => io_error_is_unavailable(error.kind()),
1✔
65
        sqlx::Error::Database(error) => database_error_is_unavailable(error.as_ref()),
9✔
66
        _ => false,
4✔
67
    }
68
}
19✔
69

70
/// Returns true when an IO error means the source connection cannot be used.
71
fn io_error_is_unavailable(error_kind: ErrorKind) -> bool {
1✔
NEW
72
    matches!(
×
73
        error_kind,
1✔
74
        ErrorKind::ConnectionRefused
75
            | ErrorKind::ConnectionReset
76
            | ErrorKind::ConnectionAborted
77
            | ErrorKind::NotConnected
78
            | ErrorKind::BrokenPipe
79
            | ErrorKind::TimedOut
80
            | ErrorKind::UnexpectedEof
81
    )
82
}
1✔
83

84
/// Returns true when a database SQLSTATE means the source connection cannot be
85
/// used.
86
fn database_error_is_unavailable(error: &dyn DatabaseError) -> bool {
9✔
87
    let Some(code) = error.code() else {
9✔
NEW
88
        return false;
×
89
    };
90

91
    // PostgreSQL recommends matching on SQLSTATE codes rather than localized
92
    // message text. Class 08 covers connection exceptions; 53300 covers server
93
    // connection exhaustion; 57P01-57P05 cover shutdown/crash/database-dropped
94
    // and idle-session lifecycle failures.
95
    code.starts_with("08")
9✔
96
        || matches!(code.as_ref(), "53300" | "57P01" | "57P02" | "57P03" | "57P04" | "57P05")
8✔
97
}
9✔
98

99
#[cfg(test)]
100
mod tests {
101
    use std::{borrow::Cow, error::Error as StdError, fmt};
102

103
    use super::*;
104

105
    /// Test database error that lets tests exercise SQLSTATE classification.
106
    #[derive(Debug)]
107
    struct TestDatabaseError {
108
        /// SQLSTATE exposed by this test error.
109
        code: &'static str,
110
    }
111

112
    impl TestDatabaseError {
113
        /// Creates a new test database error with the provided SQLSTATE.
114
        fn new(code: &'static str) -> Self {
8✔
115
            Self { code }
8✔
116
        }
8✔
117
    }
118

119
    impl fmt::Display for TestDatabaseError {
NEW
120
        fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
NEW
121
            write!(f, "test database error {}", self.code)
×
NEW
122
        }
×
123
    }
124

125
    impl StdError for TestDatabaseError {}
126

127
    impl DatabaseError for TestDatabaseError {
NEW
128
        fn message(&self) -> &str {
×
NEW
129
            "test database error"
×
NEW
130
        }
×
131

132
        fn code(&self) -> Option<Cow<'_, str>> {
8✔
133
            Some(Cow::Borrowed(self.code))
8✔
134
        }
8✔
135

NEW
136
        fn as_error(&self) -> &(dyn StdError + Send + Sync + 'static) {
×
NEW
137
            self
×
NEW
138
        }
×
139

NEW
140
        fn as_error_mut(&mut self) -> &mut (dyn StdError + Send + Sync + 'static) {
×
NEW
141
            self
×
NEW
142
        }
×
143

NEW
144
        fn into_error(self: Box<Self>) -> Box<dyn StdError + Send + Sync + 'static> {
×
NEW
145
            self
×
NEW
146
        }
×
147

NEW
148
        fn kind(&self) -> sqlx::error::ErrorKind {
×
NEW
149
            sqlx::error::ErrorKind::Other
×
NEW
150
        }
×
151
    }
152

153
    #[test]
154
    fn pool_timeout_is_reported_as_service_unavailable() {
1✔
155
        let error = sqlx::Error::PoolTimedOut;
1✔
156

157
        assert_eq!(classify_error(&error), SourceDatabaseErrorKind::Unavailable);
1✔
158
    }
1✔
159

160
    #[test]
161
    fn io_timeout_is_reported_as_gateway_timeout() {
1✔
162
        let error = sqlx::Error::Io(std::io::Error::new(ErrorKind::TimedOut, "timed out"));
1✔
163

164
        assert_eq!(classify_error(&error), SourceDatabaseErrorKind::TimedOut);
1✔
165
    }
1✔
166

167
    #[test]
168
    fn connection_io_errors_are_reported_as_service_unavailable() {
1✔
169
        let error =
1✔
170
            sqlx::Error::Io(std::io::Error::new(ErrorKind::ConnectionReset, "connection reset"));
1✔
171

172
        assert_eq!(classify_error(&error), SourceDatabaseErrorKind::Unavailable);
1✔
173
    }
1✔
174

175
    #[test]
176
    fn connection_sqlstate_errors_are_reported_as_service_unavailable() {
1✔
177
        for code in ["08006", "53300", "57P01", "57P02", "57P03", "57P04", "57P05"] {
7✔
178
            let error = sqlx::Error::Database(Box::new(TestDatabaseError::new(code)));
7✔
179

180
            assert_eq!(classify_error(&error), SourceDatabaseErrorKind::Unavailable);
7✔
181
        }
182
    }
1✔
183

184
    #[test]
185
    fn non_connection_sqlstate_errors_are_reported_as_upstream_failures() {
1✔
186
        let error = sqlx::Error::Database(Box::new(TestDatabaseError::new("42501")));
1✔
187

188
        assert_eq!(classify_error(&error), SourceDatabaseErrorKind::Failed);
1✔
189
    }
1✔
190

191
    #[test]
192
    fn protocol_errors_are_reported_as_upstream_failures() {
1✔
193
        let error = sqlx::Error::Protocol("bad source response".to_owned());
1✔
194

195
        assert_eq!(classify_error(&error), SourceDatabaseErrorKind::Failed);
1✔
196
    }
1✔
197
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc