• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 24713061833

21 Apr 2026 08:47AM UTC coverage: 78.316% (+0.07%) from 78.248%
24713061833

push

github

web-flow
refactor: enable extended clippy lint set (#675)

* refactor: explicit_iter_loop

* refactor: implicit_clone

* refactor: manual_let_else

* refactor: map_unwrap_or

* refactor: match_same_arms

* refactor: clippy::redundant_closure_for_method_calls

* refactor: redundant_clone

* refactor: redundant_test_prefix

* refactor: semicolon_if_nothing_returned

* refactor: uninlined_format_args

* refactor: unnested_or_patterns

* feat: enable extra lints

* fix: rebase

* fix: removed test

* fix: after rebase

* fix: fmt

* fix: http tests

* fix: rebase

* fix: fmt

* fix: fmt

414 of 503 new or added lines in 59 files covered. (82.31%)

3292 existing lines in 120 files now uncovered.

24379 of 31129 relevant lines covered (78.32%)

1069.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

44.19
/etl-replicator/src/error_notification.rs
1
use std::{
2
    collections::hash_map::DefaultHasher,
3
    error::Error,
4
    hash::{Hash, Hasher},
5
    time::Duration,
6
};
7

8
use serde::{Deserialize, Serialize};
9
use tracing::{info, warn};
10

11
/// Request payload for error notifications.
12
///
13
/// Contains error information to be sent to the Supabase API for tracking
14
/// and monitoring purposes.
15
#[derive(Debug, Clone, Serialize, Deserialize)]
16
struct NotificationRequest {
17
    /// Unique identifier for the pipeline that encountered the error.
18
    pipeline_id: String,
19
    /// Human-readable error message describing the failure.
20
    error_message: String,
21
    /// Stable hash of the error for grouping and deduplication.
22
    ///
23
    /// The hash is computed from error kind, description, and detail to
24
    /// provide a consistent identifier across multiple occurrences of the
25
    /// same error type.
26
    error_hash: String,
27
}
28

29
/// Response from the error notification API.
30
///
31
/// Contains information about whether the notification was successfully
32
/// processed and if it was deduplicated based on the error hash.
33
#[derive(Debug, Clone, Serialize, Deserialize)]
34
struct NotificationResponse {
35
    /// Success message from the API.
36
    message: String,
37
    /// Whether the notification was deduplicated based on the error hash.
38
    deduplicated: bool,
39
}
40

41
/// Client for sending error notifications to Supabase API.
42
///
43
/// Provides async methods to notify external systems about errors that occur
44
/// during replication. Uses reqwest for HTTP communication and handles
45
/// errors gracefully without blocking pipeline operations.
46
#[derive(Debug, Clone)]
47
pub(crate) struct ErrorNotificationClient {
48
    /// HTTP client for making requests.
49
    client: reqwest::Client,
50
    /// Supabase API URL for error notifications.
51
    api_url: String,
52
    /// Supabase API key for authentication.
53
    api_key: String,
54
    /// Supabase project reference.
55
    project_ref: String,
56
    /// Pipeline identifier.
57
    pipeline_id: String,
58
}
59

60
impl ErrorNotificationClient {
61
    /// Creates a new error notification client.
62
    ///
63
    /// The client is configured with the necessary credentials and endpoints
64
    /// to send error notifications to the Supabase API.
65
    pub(crate) fn new(
×
66
        api_url: String,
×
UNCOV
67
        api_key: String,
×
68
        project_ref: String,
×
69
        pipeline_id: String,
×
70
    ) -> Self {
×
71
        let client =
×
72
            reqwest::Client::builder().timeout(Duration::from_secs(10)).build().unwrap_or_default();
×
73

74
        Self { client, api_url, api_key, project_ref, pipeline_id }
×
75
    }
×
76

77
    /// Sends an error notification to the Supabase API.
78
    ///
79
    /// This method is fire-and-forget - it logs any failures but does not
80
    /// propagate them to avoid disrupting the pipeline. The notification is
81
    /// sent asynchronously without blocking pipeline operations.
82
    pub(crate) async fn notify_error<H: Hash>(&self, error_message: String, error_hash: H) {
×
83
        let error_hash = compute_error_hash(error_hash);
×
84

85
        let notification = NotificationRequest {
×
86
            pipeline_id: self.pipeline_id.clone(),
×
87
            error_message,
×
88
            error_hash,
×
89
        };
×
90

91
        info!(
×
92
            error_hash = %notification.error_hash,
93
            "sending error notification to supabase api"
94
        );
95

96
        match self.send_notification(notification).await {
×
97
            Ok(response) => {
×
98
                info!(
×
99
                    message = %response.message,
100
                    deduplicated = %response.deduplicated,
101
                    "error notification sent successfully"
102
                );
103
            }
104
            Err(err) => {
×
105
                warn!(
×
106
                    error = %err,
107
                    "failed to send error notification, continuing without notification"
108
                );
109
            }
110
        }
111
    }
×
112

113
    /// Returns the URL for the error notification endpoint.
114
    fn error_notification_url(&self) -> String {
×
115
        format!("{}/system/replication/{}/pipeline-error", self.api_url, self.project_ref)
×
UNCOV
116
    }
×
117

118
    /// Sends the notification request to the API endpoint.
119
    async fn send_notification(
×
UNCOV
120
        &self,
×
UNCOV
121
        notification: NotificationRequest,
×
122
    ) -> Result<NotificationResponse, Box<dyn Error>> {
×
123
        let response = self
×
124
            .client
×
125
            .post(self.error_notification_url())
×
126
            .header("apikey", &self.api_key)
×
127
            .header("Content-Type", "application/json")
×
128
            .json(&notification)
×
129
            .send()
×
130
            .await?;
×
131

132
        if !response.status().is_success() {
×
133
            let status = response.status();
×
UNCOV
134
            let body =
×
135
                response.text().await.unwrap_or_else(|_| "<unable to read body>".to_string());
×
136
            return Err(format!("API returned status {status}: {body}").into());
×
137
        }
×
138

139
        let notification_response = response.json::<NotificationResponse>().await?;
×
140
        Ok(notification_response)
×
141
    }
×
142
}
143

144
/// Computes a stable hash for an error.
145
///
146
/// This provides a consistent identifier across multiple occurrences of the
147
/// same error type, enabling grouping and deduplication in monitoring systems.
148
fn compute_error_hash<H: Hash>(error_hash: H) -> String {
6✔
149
    let mut hasher = DefaultHasher::new();
6✔
150
    error_hash.hash(&mut hasher);
6✔
151
    let hash_value = hasher.finish();
6✔
152

153
    format!("{hash_value:016x}")
6✔
154
}
6✔
155

156
#[cfg(test)]
157
mod tests {
158
    use etl::error::{ErrorKind, EtlError};
159

160
    use super::*;
161

162
    #[test]
163
    fn compute_error_hash_stability() {
1✔
164
        let err1 =
1✔
165
            EtlError::from((ErrorKind::SourceConnectionFailed, "Database connection failed"));
1✔
166
        let err2 =
1✔
167
            EtlError::from((ErrorKind::SourceConnectionFailed, "Database connection failed"));
1✔
168

169
        let hash1 = compute_error_hash(&err1);
1✔
170
        let hash2 = compute_error_hash(&err2);
1✔
171

172
        // Hashes should be identical for the same error kind and description.
173
        assert_eq!(hash1, hash2);
1✔
174
    }
1✔
175

176
    #[test]
177
    fn compute_error_hash_with_detail() {
1✔
178
        let err1 = EtlError::from((
1✔
179
            ErrorKind::SourceQueryFailed,
1✔
180
            "Query execution failed",
1✔
181
            "Table 'users' not found".to_string(),
1✔
182
        ));
1✔
183
        let err2 = EtlError::from((
1✔
184
            ErrorKind::SourceQueryFailed,
1✔
185
            "Query execution failed",
1✔
186
            "Table 'users' not found".to_string(),
1✔
187
        ));
1✔
188

189
        let hash1 = compute_error_hash(&err1);
1✔
190
        let hash2 = compute_error_hash(&err2);
1✔
191

192
        assert_eq!(hash1, hash2);
1✔
193
    }
1✔
194

195
    #[test]
196
    fn compute_error_hash_different_errors() {
1✔
197
        let err1 =
1✔
198
            EtlError::from((ErrorKind::SourceConnectionFailed, "Database connection failed"));
1✔
199
        let err2 = EtlError::from((ErrorKind::SourceQueryFailed, "Query execution failed"));
1✔
200

201
        let hash1 = compute_error_hash(&err1);
1✔
202
        let hash2 = compute_error_hash(&err2);
1✔
203

204
        // Different errors should produce different hashes.
205
        assert_ne!(hash1, hash2);
1✔
206
    }
1✔
207
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc