• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 24713061833

21 Apr 2026 08:47AM UTC coverage: 78.316% (+0.07%) from 78.248%
24713061833

push

github

web-flow
refactor: enable extended clippy lint set (#675)

* refactor: explicit_iter_loop

* refactor: implicit_clone

* refactor: manual_let_else

* refactor: map_unwrap_or

* refactor: match_same_arms

* refactor: clippy::redundant_closure_for_method_calls

* refactor: redundant_clone

* refactor: redundant_test_prefix

* refactor: semicolon_if_nothing_returned

* refactor: uninlined_format_args

* refactor: unnested_or_patterns

* feat: enable extra lints

* fix: rebase

* fix: removed test

* fix: after rebase

* fix: fmt

* fix: http tests

* fix: rebase

* fix: fmt

* fix: fmt

414 of 503 new or added lines in 59 files covered. (82.31%)

3292 existing lines in 120 files now uncovered.

24379 of 31129 relevant lines covered (78.32%)

1069.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/etl-replicator/src/error_reporting.rs
1
use std::sync::Arc;
2

3
use etl::{
4
    error::{EtlError, EtlResult},
5
    state::{
6
        destination_metadata::{AppliedDestinationTableMetadata, DestinationTableMetadata},
7
        table::TableReplicationPhase,
8
    },
9
    store::{
10
        cleanup::CleanupStore,
11
        schema::SchemaStore,
12
        state::{StateStore, TableReplicationStates},
13
    },
14
    types::{SnapshotId, TableId, TableSchema},
15
};
16
use tracing::info;
17

18
use crate::{error_notification::ErrorNotificationClient, sentry};
19

20
/// State store decorator that reports persisted table replication errors.
21
///
22
/// After [`StateStore::update_table_replication_states`] succeeds, this wrapper
23
/// reports each [`TableReplicationPhase::Errored`] update to Sentry and, when
24
/// configured, to the Supabase error-notification endpoint.
25
#[derive(Debug, Clone)]
26
pub(crate) struct ErrorReportingStateStore<S> {
27
    inner: S,
28
    notification_client: Option<Arc<ErrorNotificationClient>>,
29
}
30

31
/// Persisted table error waiting to be reported.
32
#[derive(Debug)]
33
struct ReportableTableError {
34
    /// Table whose replication state was persisted as errored.
35
    table_id: TableId,
36
    /// Source ETL error captured in the persisted table phase.
37
    source_err: EtlError,
38
}
39

40
impl<S> ErrorReportingStateStore<S> {
41
    /// Creates a reporting wrapper around `inner`.
UNCOV
42
    pub(crate) fn new(inner: S, notification_client: Option<ErrorNotificationClient>) -> Self {
×
UNCOV
43
        Self { inner, notification_client: notification_client.map(Arc::new) }
×
44
    }
×
45

46
    /// Reports persisted errored table state updates.
47
    async fn report_errored_updates(&self, updates: Vec<ReportableTableError>) {
×
48
        let notification_client = self.notification_client.as_ref();
×
49

UNCOV
50
        for update in updates {
×
UNCOV
51
            info!(table_id = update.table_id.0, "reporting table replication error");
×
52

53
            sentry::capture_table_error(update.table_id, &update.source_err);
×
54
            if let Some(notification_client) = notification_client {
×
55
                notification_client
×
56
                    .notify_error(update.source_err.to_string(), &update.source_err)
×
57
                    .await;
×
58
            }
×
59
        }
60
    }
×
61

62
    /// Extracts only the errored state updates that need post-persistence
63
    /// reporting.
64
    fn collect_reportable_errors(
×
65
        updates: &[(TableId, TableReplicationPhase)],
×
66
    ) -> Vec<ReportableTableError> {
×
67
        updates
×
68
            .iter()
×
69
            .filter_map(|(table_id, phase)| match phase {
×
70
                TableReplicationPhase::Errored { source_err, .. } => Some(ReportableTableError {
×
71
                    table_id: *table_id,
×
72
                    source_err: source_err.clone(),
×
73
                }),
×
74
                _ => None,
×
75
            })
×
76
            .collect()
×
UNCOV
77
    }
×
78
}
79

80
impl<S> StateStore for ErrorReportingStateStore<S>
81
where
82
    S: StateStore + Send + Sync,
83
{
84
    async fn get_table_replication_state(
×
85
        &self,
×
86
        table_id: TableId,
×
87
    ) -> EtlResult<Option<TableReplicationPhase>> {
×
88
        self.inner.get_table_replication_state(table_id).await
×
UNCOV
89
    }
×
90

91
    async fn get_table_replication_states(&self) -> EtlResult<TableReplicationStates> {
×
92
        self.inner.get_table_replication_states().await
×
UNCOV
93
    }
×
94

95
    async fn load_table_replication_states(&self) -> EtlResult<usize> {
×
96
        self.inner.load_table_replication_states().await
×
UNCOV
97
    }
×
98

99
    async fn update_table_replication_states(
×
100
        &self,
×
101
        updates: Vec<(TableId, TableReplicationPhase)>,
×
UNCOV
102
    ) -> EtlResult<()> {
×
103
        // We collect all errors in advance, to avoid cloning the whole set of updates.
UNCOV
104
        let reportable_errors = Self::collect_reportable_errors(&updates);
×
105

UNCOV
106
        self.inner.update_table_replication_states(updates).await?;
×
107

108
        // This operation must be infallible or at least not propagate failures,
109
        // otherwise the error thrown here, will be caught and handled by the
110
        // core of etl itself. There is no infinite recursion problem, but it
111
        // might make the system harder to understand.
112
        self.report_errored_updates(reportable_errors).await;
×
113

UNCOV
114
        Ok(())
×
115
    }
×
116

117
    async fn rollback_table_replication_state(
×
118
        &self,
×
119
        table_id: TableId,
×
120
    ) -> EtlResult<TableReplicationPhase> {
×
UNCOV
121
        self.inner.rollback_table_replication_state(table_id).await
×
122
    }
×
123

124
    async fn get_destination_table_metadata(
×
125
        &self,
×
126
        table_id: TableId,
×
127
    ) -> EtlResult<Option<DestinationTableMetadata>> {
×
UNCOV
128
        self.inner.get_destination_table_metadata(table_id).await
×
129
    }
×
130

131
    async fn get_applied_destination_table_metadata(
×
132
        &self,
×
133
        table_id: TableId,
×
134
    ) -> EtlResult<Option<AppliedDestinationTableMetadata>> {
×
135
        self.inner.get_applied_destination_table_metadata(table_id).await
×
136
    }
×
137

138
    async fn load_destination_tables_metadata(&self) -> EtlResult<usize> {
×
139
        self.inner.load_destination_tables_metadata().await
×
140
    }
×
141

142
    async fn store_destination_table_metadata(
×
143
        &self,
×
144
        table_id: TableId,
×
145
        metadata: DestinationTableMetadata,
×
146
    ) -> EtlResult<()> {
×
147
        self.inner.store_destination_table_metadata(table_id, metadata).await
×
148
    }
×
149
}
150

151
impl<S> SchemaStore for ErrorReportingStateStore<S>
152
where
153
    S: SchemaStore + Send + Sync,
154
{
UNCOV
155
    async fn get_table_schema(
×
UNCOV
156
        &self,
×
157
        table_id: &TableId,
×
158
        snapshot_id: SnapshotId,
×
159
    ) -> EtlResult<Option<Arc<TableSchema>>> {
×
160
        self.inner.get_table_schema(table_id, snapshot_id).await
×
161
    }
×
162

163
    async fn get_table_schemas(&self) -> EtlResult<Vec<Arc<TableSchema>>> {
×
UNCOV
164
        self.inner.get_table_schemas().await
×
165
    }
×
166

167
    async fn load_table_schemas(&self) -> EtlResult<usize> {
×
UNCOV
168
        self.inner.load_table_schemas().await
×
169
    }
×
170

171
    async fn store_table_schema(&self, table_schema: TableSchema) -> EtlResult<Arc<TableSchema>> {
×
UNCOV
172
        self.inner.store_table_schema(table_schema).await
×
173
    }
×
174
}
175

176
impl<S> CleanupStore for ErrorReportingStateStore<S>
177
where
178
    S: CleanupStore + Send + Sync,
179
{
UNCOV
180
    async fn cleanup_table_state(&self, table_id: TableId) -> EtlResult<()> {
×
UNCOV
181
        self.inner.cleanup_table_state(table_id).await
×
182
    }
×
183
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc