• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 23427993831

23 Mar 2026 08:25AM UTC coverage: 78.055% (-0.1%) from 78.184%
23427993831

Pull #637

github

web-flow
Merge f70ed5814 into 20f94735f
Pull Request #637: feat(errors): Report errors to Sentry

16 of 158 new or added lines in 7 files covered. (10.13%)

8 existing lines in 2 files now uncovered.

18890 of 24201 relevant lines covered (78.05%)

1351.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/etl-replicator/src/error_reporting.rs
1
use etl::error::{EtlError, EtlResult};
2
use etl::state::table::TableReplicationPhase;
3
use etl::store::cleanup::CleanupStore;
4
use etl::store::schema::SchemaStore;
5
use etl::store::state::StateStore;
6
use etl::types::{TableId, TableSchema};
7
use std::collections::{BTreeMap, HashMap};
8
use std::sync::Arc;
9
use tracing::info;
10

11
use crate::error_notification::ErrorNotificationClient;
12
use crate::sentry;
13

14
/// State store decorator that reports persisted table replication errors.
15
///
16
/// After [`StateStore::update_table_replication_states`] succeeds, this wrapper
17
/// reports each [`TableReplicationPhase::Errored`] update to Sentry and, when
18
/// configured, to the Supabase error-notification endpoint.
19
#[derive(Debug, Clone)]
20
pub struct ErrorReportingStateStore<S> {
21
    inner: S,
22
    notification_client: Option<Arc<ErrorNotificationClient>>,
23
}
24

25
/// Persisted table error waiting to be reported.
26
#[derive(Debug)]
27
struct ReportableTableError {
28
    /// Table whose replication state was persisted as errored.
29
    table_id: TableId,
30
    /// Source ETL error captured in the persisted table phase.
31
    source_err: EtlError,
32
}
33

34
impl<S> ErrorReportingStateStore<S> {
35
    /// Creates a reporting wrapper around `inner`.
NEW
36
    pub fn new(inner: S, notification_client: Option<ErrorNotificationClient>) -> Self {
×
NEW
37
        Self {
×
NEW
38
            inner,
×
NEW
39
            notification_client: notification_client.map(Arc::new),
×
NEW
40
        }
×
NEW
41
    }
×
42

43
    /// Reports persisted errored table state updates.
NEW
44
    async fn report_errored_updates(&self, updates: Vec<ReportableTableError>) {
×
NEW
45
        let notification_client = self.notification_client.as_ref();
×
46

NEW
47
        for update in updates {
×
NEW
48
            info!(
×
49
                table_id = update.table_id.0,
50
                "reporting table replication error"
51
            );
52

NEW
53
            sentry::capture_table_error(update.table_id, &update.source_err);
×
NEW
54
            if let Some(notification_client) = notification_client {
×
NEW
55
                notification_client
×
NEW
56
                    .notify_error(update.source_err.to_string(), &update.source_err)
×
NEW
57
                    .await;
×
NEW
58
            }
×
59
        }
NEW
60
    }
×
61

62
    /// Extracts only the errored state updates that need post-persistence reporting.
NEW
63
    fn collect_reportable_errors(
×
NEW
64
        updates: &[(TableId, TableReplicationPhase)],
×
NEW
65
    ) -> Vec<ReportableTableError> {
×
NEW
66
        updates
×
NEW
67
            .iter()
×
NEW
68
            .filter_map(|(table_id, phase)| match phase {
×
NEW
69
                TableReplicationPhase::Errored { source_err, .. } => Some(ReportableTableError {
×
NEW
70
                    table_id: *table_id,
×
NEW
71
                    source_err: source_err.clone(),
×
NEW
72
                }),
×
NEW
73
                _ => None,
×
NEW
74
            })
×
NEW
75
            .collect()
×
NEW
76
    }
×
77
}
78

79
impl<S> StateStore for ErrorReportingStateStore<S>
80
where
81
    S: StateStore + Send + Sync,
82
{
NEW
83
    async fn get_table_replication_state(
×
NEW
84
        &self,
×
NEW
85
        table_id: TableId,
×
NEW
86
    ) -> EtlResult<Option<TableReplicationPhase>> {
×
NEW
87
        self.inner.get_table_replication_state(table_id).await
×
NEW
88
    }
×
89

NEW
90
    async fn get_table_replication_states(
×
NEW
91
        &self,
×
NEW
92
    ) -> EtlResult<BTreeMap<TableId, TableReplicationPhase>> {
×
NEW
93
        self.inner.get_table_replication_states().await
×
NEW
94
    }
×
95

NEW
96
    async fn load_table_replication_states(&self) -> EtlResult<usize> {
×
NEW
97
        self.inner.load_table_replication_states().await
×
NEW
98
    }
×
99

NEW
100
    async fn update_table_replication_states(
×
NEW
101
        &self,
×
NEW
102
        updates: Vec<(TableId, TableReplicationPhase)>,
×
NEW
103
    ) -> EtlResult<()> {
×
104
        // We collect all errors in advance, to avoid cloning the whole set of updates.
NEW
105
        let reportable_errors = Self::collect_reportable_errors(&updates);
×
106

NEW
107
        self.inner.update_table_replication_states(updates).await?;
×
108

109
        // This operation must be infallible or at least not propagate failures, otherwise the
110
        // error thrown here, will be caught and handled by the core of etl itself. There is no
111
        // infinite recursion problem, but it might make the system harder to understand.
NEW
112
        self.report_errored_updates(reportable_errors).await;
×
113

NEW
114
        Ok(())
×
NEW
115
    }
×
116

NEW
117
    async fn rollback_table_replication_state(
×
NEW
118
        &self,
×
NEW
119
        table_id: TableId,
×
NEW
120
    ) -> EtlResult<TableReplicationPhase> {
×
NEW
121
        self.inner.rollback_table_replication_state(table_id).await
×
NEW
122
    }
×
123

NEW
124
    async fn get_table_mapping(&self, source_table_id: &TableId) -> EtlResult<Option<String>> {
×
NEW
125
        self.inner.get_table_mapping(source_table_id).await
×
NEW
126
    }
×
127

NEW
128
    async fn get_table_mappings(&self) -> EtlResult<HashMap<TableId, String>> {
×
NEW
129
        self.inner.get_table_mappings().await
×
NEW
130
    }
×
131

NEW
132
    async fn load_table_mappings(&self) -> EtlResult<usize> {
×
NEW
133
        self.inner.load_table_mappings().await
×
NEW
134
    }
×
135

NEW
136
    async fn store_table_mapping(
×
NEW
137
        &self,
×
NEW
138
        source_table_id: TableId,
×
NEW
139
        destination_table_id: String,
×
NEW
140
    ) -> EtlResult<()> {
×
NEW
141
        self.inner
×
NEW
142
            .store_table_mapping(source_table_id, destination_table_id)
×
NEW
143
            .await
×
NEW
144
    }
×
145
}
146

147
impl<S> SchemaStore for ErrorReportingStateStore<S>
148
where
149
    S: SchemaStore + Send + Sync,
150
{
NEW
151
    async fn get_table_schema(&self, table_id: &TableId) -> EtlResult<Option<Arc<TableSchema>>> {
×
NEW
152
        self.inner.get_table_schema(table_id).await
×
NEW
153
    }
×
154

NEW
155
    async fn get_table_schemas(&self) -> EtlResult<Vec<Arc<TableSchema>>> {
×
NEW
156
        self.inner.get_table_schemas().await
×
NEW
157
    }
×
158

NEW
159
    async fn load_table_schemas(&self) -> EtlResult<usize> {
×
NEW
160
        self.inner.load_table_schemas().await
×
NEW
161
    }
×
162

NEW
163
    async fn store_table_schema(&self, table_schema: TableSchema) -> EtlResult<Arc<TableSchema>> {
×
NEW
164
        self.inner.store_table_schema(table_schema).await
×
NEW
165
    }
×
166
}
167

168
impl<S> CleanupStore for ErrorReportingStateStore<S>
169
where
170
    S: CleanupStore + Send + Sync,
171
{
NEW
172
    async fn cleanup_table_state(&self, table_id: TableId) -> EtlResult<()> {
×
NEW
173
        self.inner.cleanup_table_state(table_id).await
×
NEW
174
    }
×
175
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc