• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 24781564525

22 Apr 2026 01:40PM UTC coverage: 77.896% (-0.4%) from 78.324%
24781564525

Pull #674

github

web-flow
Merge c71720499 into c2e8201d8
Pull Request #674: ref(tests): Improve shutdown and conversion handling

1273 of 1709 new or added lines in 24 files covered. (74.49%)

292 existing lines in 21 files now uncovered.

25173 of 32316 relevant lines covered (77.9%)

1038.42 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.2
/etl/src/replication/table_cache.rs
1
//! Shared per-table protocol state for logical replication workers.
2
//!
3
//! PostgreSQL 15+ supports column-level publication filtering, where only
4
//! specific columns are replicated rather than all columns. The worker that
5
//! currently owns a table therefore needs three pieces of protocol state to
6
//! decode row changes: the schema snapshot to decode against, the publication
7
//! column filter for that snapshot, and the replica-identity semantics for
8
//! that same snapshot.
9
//!
10
//! The apply worker and table sync workers share this state because ownership
11
//! of a table can move between them over time, but at any point exactly one
12
//! worker owns protocol interpretation for that table. Non-owning workers skip
13
//! DDL, RELATION, and DML for that table and rely on the owning worker to
14
//! advance the shared state.
15
//!
16
//! The cache is kept in-memory because PostgreSQL guarantees that `RELATION`
17
//! messages are sent at the start of each connection and after schema changes
18
//! before any dependent DML. Persisted schemas remain the durable source of
19
//! truth; this cache only tracks the latest per-table decoding state needed by
20
//! active workers.
21
//!
22
//! Each table moves between two explicit states:
23
//! - [`SharedTableState::WaitingForRelation`], where we know which schema
24
//!   snapshot future relation decoding must target but do not yet have runtime
25
//!   relation state for that snapshot.
26
//! - [`SharedTableState::Ready`], where the full [`ReplicatedTableSchema`]
27
//!   needed for row decoding is already materialized in memory.
28

29
use std::{collections::HashMap, sync::Arc};
30

31
use etl_postgres::types::{ReplicatedTableSchema, SnapshotId, TableId};
32
use tokio::sync::RwLock;
33
use tracing::warn;
34

35
/// Shared per-table protocol state used to decode logical replication messages.
36
#[derive(Debug, Clone)]
37
pub(crate) enum SharedTableState {
38
    /// A newer schema snapshot exists, but the runtime relation state for that
39
    /// snapshot has not been materialized yet.
40
    WaitingForRelation {
41
        /// The latest schema snapshot known for the table.
42
        snapshot_id: SnapshotId,
43
    },
44
    /// The runtime replicated schema for the current snapshot is ready to use.
45
    Ready {
46
        /// The replicated table schema used to decode row events.
47
        replicated_table_schema: ReplicatedTableSchema,
48
    },
49
}
50

51
impl SharedTableState {
52
    /// Returns the snapshot that this shared state targets.
53
    pub(crate) fn snapshot_id(&self) -> SnapshotId {
535✔
54
        match self {
535✔
55
            Self::WaitingForRelation { snapshot_id } => *snapshot_id,
116✔
56
            Self::Ready { replicated_table_schema } => replicated_table_schema.inner().snapshot_id,
419✔
57
        }
58
    }
535✔
59

60
    /// Returns the runtime replicated schema when it is ready.
61
    pub(crate) fn replicated_table_schema(&self) -> Option<&ReplicatedTableSchema> {
214✔
62
        match self {
214✔
63
            Self::WaitingForRelation { .. } => None,
1✔
64
            Self::Ready { replicated_table_schema } => Some(replicated_table_schema),
213✔
65
        }
66
    }
214✔
67
}
68

69
/// Thread-safe container for shared per-table protocol state.
70
#[derive(Debug, Clone, Default)]
71
pub(crate) struct SharedTableCache {
72
    inner: Arc<RwLock<HashMap<TableId, SharedTableState>>>,
73
}
74

75
impl SharedTableCache {
76
    /// Creates a new empty [`SharedTableCache`] container.
77
    pub(crate) fn new() -> Self {
99✔
78
        Self { inner: Arc::new(RwLock::new(HashMap::new())) }
99✔
79
    }
99✔
80

81
    /// Returns the shared state for a table, if present.
82
    pub(crate) async fn get(&self, table_id: &TableId) -> Option<SharedTableState> {
335✔
83
        let guard = self.inner.read().await;
335✔
84
        guard.get(table_id).cloned()
335✔
85
    }
335✔
86

87
    /// Records that a table is waiting for a new relation-state refresh for
88
    /// the supplied snapshot.
89
    pub(crate) async fn note_waiting_for_relation(
44✔
90
        &self,
44✔
91
        table_id: TableId,
44✔
92
        snapshot_id: SnapshotId,
44✔
93
    ) {
44✔
94
        self.upsert(table_id, SharedTableState::WaitingForRelation { snapshot_id }).await;
44✔
95
    }
44✔
96

97
    /// Records that a table has a ready runtime replicated schema.
98
    pub(crate) async fn note_ready(
226✔
99
        &self,
226✔
100
        table_id: TableId,
226✔
101
        replicated_table_schema: ReplicatedTableSchema,
226✔
102
    ) {
226✔
103
        self.upsert(table_id, SharedTableState::Ready { replicated_table_schema }).await;
226✔
104
    }
226✔
105

106
    /// Inserts or updates shared table state when the incoming snapshot is not
107
    /// stale.
108
    async fn upsert(&self, table_id: TableId, new_state: SharedTableState) {
270✔
109
        let mut guard = self.inner.write().await;
270✔
110
        let snapshot_id = new_state.snapshot_id();
270✔
111
        match guard.get_mut(&table_id) {
270✔
112
            Some(state) if state.snapshot_id() > snapshot_id => {
149✔
113
                warn!(
1✔
114
                    table_id = %table_id,
NEW
115
                    current_snapshot_id = %state.snapshot_id(),
×
116
                    requested_snapshot_id = %snapshot_id,
117
                    "shared table cache received a stale snapshot update; this may indicate out-of-order protocol state"
118
                );
119

120
                debug_assert!(
1✔
121
                    state.snapshot_id() <= snapshot_id,
1✔
122
                    "shared table cache received stale snapshot update for table {table_id}: \
123
                     current={}, requested={}",
124
                    state.snapshot_id(),
1✔
125
                    snapshot_id
126
                );
127
            }
128
            Some(state) => {
148✔
129
                *state = new_state;
148✔
130
            }
148✔
131
            None => {
121✔
132
                guard.insert(table_id, new_state);
121✔
133
            }
121✔
134
        }
135
    }
269✔
136
}
137

138
#[cfg(test)]
139
mod tests {
140
    use std::collections::HashSet;
141

142
    use etl_postgres::types::{ColumnSchema, TableName, TableSchema};
143
    use tokio_postgres::types::Type;
144

145
    use super::*;
146

147
    fn create_test_schema() -> ReplicatedTableSchema {
4✔
148
        let schema = TableSchema::with_snapshot_id(
4✔
149
            TableId::new(123),
4✔
150
            TableName::new("public".to_string(), "test_table".to_string()),
4✔
151
            vec![
4✔
152
                ColumnSchema::new("id".to_string(), Type::INT4, -1, 1, Some(1), false),
4✔
153
                ColumnSchema::new("name".to_string(), Type::TEXT, -1, 2, None, true),
4✔
154
                ColumnSchema::new("age".to_string(), Type::INT4, -1, 3, None, true),
4✔
155
            ],
156
            SnapshotId::new(10.into()),
4✔
157
        );
158

159
        let replicated_columns: HashSet<String> =
4✔
160
            ["id".to_string(), "age".to_string()].into_iter().collect();
4✔
161
        let replication_mask =
4✔
162
            etl_postgres::types::ReplicationMask::build(&schema, &replicated_columns);
4✔
163
        let identity_mask = etl_postgres::types::IdentityMask::from_bytes(vec![1, 0, 1]);
4✔
164
        ReplicatedTableSchema::from_masks(Arc::new(schema), replication_mask, identity_mask)
4✔
165
    }
4✔
166

167
    #[tokio::test]
168
    async fn note_ready_and_get() {
1✔
169
        let cache = SharedTableCache::new();
1✔
170
        let table_id = TableId::new(123);
1✔
171
        let snapshot_id = SnapshotId::new(10.into());
1✔
172
        let replicated_table_schema = create_test_schema();
1✔
173

174
        cache.note_ready(table_id, replicated_table_schema.clone()).await;
1✔
175

176
        let state = cache.get(&table_id).await.expect("table state should exist");
1✔
177
        assert_eq!(state.snapshot_id(), snapshot_id);
1✔
178
        assert_eq!(
1✔
179
            state
1✔
180
                .replicated_table_schema()
1✔
181
                .expect("replicated table schema should exist")
1✔
182
                .replication_mask()
1✔
183
                .as_slice(),
1✔
184
            replicated_table_schema.replication_mask().as_slice()
1✔
185
        );
1✔
186
    }
1✔
187

188
    #[tokio::test]
189
    async fn note_waiting_for_relation_invalidates_ready_schema() {
1✔
190
        let cache = SharedTableCache::new();
1✔
191
        let table_id = TableId::new(123);
1✔
192
        let replicated_table_schema = create_test_schema();
1✔
193

194
        cache.note_ready(table_id, replicated_table_schema).await;
1✔
195
        cache.note_waiting_for_relation(table_id, SnapshotId::new(11.into())).await;
1✔
196

197
        let state = cache.get(&table_id).await.expect("table state should exist");
1✔
198
        assert_eq!(state.snapshot_id(), SnapshotId::new(11.into()));
1✔
199
        assert!(matches!(state, SharedTableState::WaitingForRelation { .. }));
1✔
200
    }
1✔
201

202
    #[should_panic(expected = "shared table cache received stale snapshot update")]
203
    #[tokio::test]
204
    async fn older_snapshot_panics() {
1✔
205
        let cache = SharedTableCache::new();
1✔
206
        let table_id = TableId::new(123);
1✔
207
        let replicated_table_schema = create_test_schema();
1✔
208

209
        cache.note_ready(table_id, replicated_table_schema).await;
1✔
210
        cache.note_waiting_for_relation(table_id, SnapshotId::new(9.into())).await;
1✔
211
    }
1✔
212

213
    #[tokio::test]
214
    async fn clone_shares_state() {
1✔
215
        let cache1 = SharedTableCache::new();
1✔
216
        let cache2 = cache1.clone();
1✔
217
        let table_id = TableId::new(123);
1✔
218
        let snapshot_id = SnapshotId::new(10.into());
1✔
219
        let replicated_table_schema = create_test_schema();
1✔
220

221
        cache1.note_ready(table_id, replicated_table_schema.clone()).await;
1✔
222

223
        let state = cache2.get(&table_id).await.expect("table state should exist");
1✔
224
        assert_eq!(state.snapshot_id(), snapshot_id);
1✔
225
        assert_eq!(
1✔
226
            state
1✔
227
                .replicated_table_schema()
1✔
228
                .expect("replicated table schema should exist")
1✔
229
                .identity_mask()
1✔
230
                .as_slice(),
1✔
231
            replicated_table_schema.identity_mask().as_slice()
1✔
232
        );
1✔
233
    }
1✔
234

235
    #[tokio::test]
236
    async fn waiting_state_exposes_snapshot_without_schema() {
1✔
237
        let cache = SharedTableCache::new();
1✔
238
        let table_id = TableId::new(123);
1✔
239
        let snapshot_id = SnapshotId::new(10.into());
1✔
240

241
        cache.note_waiting_for_relation(table_id, snapshot_id).await;
1✔
242

243
        let state = cache.get(&table_id).await.expect("table state should exist");
1✔
244
        assert_eq!(state.snapshot_id(), snapshot_id);
1✔
245
        assert!(state.replicated_table_schema().is_none());
1✔
246
    }
1✔
247
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc