• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 19900107178

03 Dec 2025 03:55PM UTC coverage: 82.008% (-0.4%) from 82.382%
19900107178

Pull #487

github

web-flow
Merge 5c2ab4c83 into eeef10c29
Pull Request #487: ref(allocator): Try to use jemalloc for etl-api and etl-replicator

0 of 91 new or added lines in 2 files covered. (0.0%)

2 existing lines in 1 file now uncovered.

16455 of 20065 relevant lines covered (82.01%)

181.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.92
/etl/src/workers/table_sync.rs
1
use chrono::Utc;
2
use etl_config::shared::PipelineConfig;
3
use etl_postgres::replication::slots::EtlReplicationSlot;
4
use etl_postgres::replication::worker::WorkerType;
5
use etl_postgres::types::TableId;
6
use std::ops::Deref;
7
use std::sync::Arc;
8
use std::time::Duration;
9
use tokio::sync::{Mutex, MutexGuard, Notify, Semaphore};
10
use tokio::task::JoinHandle;
11
use tokio_postgres::types::PgLsn;
12
use tracing::{Instrument, debug, error, info, warn};
13

14
use crate::concurrency::shutdown::{ShutdownResult, ShutdownRx};
15
use crate::concurrency::signal::SignalTx;
16
use crate::destination::Destination;
17
use crate::error::{ErrorKind, EtlError, EtlResult};
18
use crate::replication::apply::{
19
    ApplyLoopAction, ApplyLoopHook, ApplyLoopResult, start_apply_loop,
20
};
21
use crate::replication::client::PgReplicationClient;
22
use crate::replication::table_sync::{TableSyncResult, start_table_sync};
23
use crate::state::table::{
24
    RetryPolicy, TableReplicationError, TableReplicationPhase, TableReplicationPhaseType,
25
};
26
use crate::store::schema::SchemaStore;
27
use crate::store::state::StateStore;
28
use crate::types::PipelineId;
29
use crate::workers::base::{Worker, WorkerHandle};
30
use crate::workers::pool::{TableSyncWorkerPool, TableSyncWorkerPoolInner};
31
use crate::{bail, etl_error};
32

33
/// Maximum time to wait for a phase change before trying again.
34
const PHASE_CHANGE_REFRESH_FREQUENCY: Duration = Duration::from_millis(100);
35

36
/// Maximum time to wait for the slot deletion call to complete.
37
///
38
/// The reason for setting a timer on deletion is that we wait for the slot to become unused before
39
/// deleting it. We want to avoid an infinite wait in case the slot fails to be released,
40
/// as this could result in a connection being held indefinitely, potentially stalling the processing
41
/// of new tables.
42
const MAX_DELETE_SLOT_WAIT: Duration = Duration::from_secs(30);
43

44
/// Internal state of [`TableSyncWorkerState`].
45
#[derive(Debug)]
46
pub struct TableSyncWorkerStateInner {
47
    /// Unique identifier for the table whose state this structure tracks.
48
    table_id: TableId,
49
    /// Current replication phase - this is the authoritative in-memory state.
50
    table_replication_phase: TableReplicationPhase,
51
    /// Notification mechanism for notifying state changes to waiting workers.
52
    phase_change: Arc<Notify>,
53
    /// Number of consecutive automatic retry attempts.
54
    retry_attempts: u32,
55
}
56

57
impl TableSyncWorkerStateInner {
58
    /// Updates the table's replication phase and notifies all waiting workers.
59
    ///
60
    /// This method provides the core state transition mechanism for table synchronization.
61
    /// It atomically updates the in-memory state and broadcasts the change to any workers
62
    /// that may be waiting for state transitions.
63
    pub fn set(&mut self, phase: TableReplicationPhase) {
1,253✔
64
        info!(
1,253✔
65
            "table phase changing from '{:?}' to '{:?}'",
×
66
            self.table_replication_phase, phase
67
        );
68

69
        self.table_replication_phase = phase;
1,253✔
70

71
        // Broadcast notification to all active waiters.
72
        //
73
        // Note that this notify will not wake up waiters that will be coming in the future since
74
        // no permit is stored, only active listeners will be notified.
75
        self.phase_change.notify_waiters();
1,253✔
76
    }
1,253✔
77

78
    /// Returns the number of consecutive automatic retry attempts.
79
    pub fn retry_attempts(&self) -> u32 {
20✔
80
        self.retry_attempts
20✔
81
    }
20✔
82

83
    /// Increments the retry attempt counter and returns the updated value.
84
    pub fn increment_retry_attempts(&mut self) -> u32 {
16✔
85
        self.retry_attempts = self.retry_attempts.saturating_add(1);
16✔
86
        self.retry_attempts
16✔
87
    }
16✔
88

89
    /// Resets the automatic retry attempt counter to zero.
90
    pub fn reset_retry_attempts(&mut self) {
263✔
91
        self.retry_attempts = 0;
263✔
92
    }
263✔
93

94
    /// Updates the table's replication phase with conditional persistence to external storage.
95
    ///
96
    /// This method extends the basic [`TableSyncWorkerStateInner::set()`] method with durable persistence capabilities,
97
    /// ensuring that important state transitions survive process restarts and failures.
98
    ///
99
    /// The persistence behavior is controlled by the phase type's storage requirements.
100
    pub async fn set_and_store<S: StateStore>(
1,237✔
101
        &mut self,
1,237✔
102
        phase: TableReplicationPhase,
1,237✔
103
        state_store: &S,
1,237✔
104
    ) -> EtlResult<()> {
1,237✔
105
        // Apply in-memory state change first for immediate visibility
106
        self.set(phase.clone());
1,237✔
107

108
        // Conditionally persist based on phase type requirements
109
        if phase.as_type().should_store() {
1,237✔
110
            info!(
775✔
111
                "storing phase change '{:?}' for table {}",
×
112
                phase, self.table_id
113
            );
114

115
            // Persist to external storage - this may fail without affecting in-memory state
116
            state_store
775✔
117
                .update_table_replication_state(self.table_id, phase)
775✔
118
                .await?;
775✔
119
        }
462✔
120

121
        Ok(())
1,237✔
122
    }
1,237✔
123

124
    /// Rolls back the table's replication state to the previous version.
125
    ///
126
    /// This method coordinates rollback operations between persistent storage and
127
    /// in-memory state. It first queries the state store to retrieve the previous
128
    /// state, then applies that state to the in-memory representation and notifies
129
    /// any waiting workers of the change.
130
    pub async fn rollback<S: StateStore>(&mut self, state_store: &S) -> EtlResult<()> {
16✔
131
        // We rollback the state in the store and then also set the rolled back state in memory.
132
        let previous_phase = state_store
16✔
133
            .rollback_table_replication_state(self.table_id)
16✔
134
            .await?;
16✔
135
        self.set(previous_phase);
16✔
136

137
        Ok(())
16✔
138
    }
16✔
139

140
    /// Returns the current replication phase for this table.
141
    ///
142
    /// This method provides access to the authoritative in-memory state without
143
    /// requiring coordination with external storage. The returned phase represents
144
    /// the most current state as seen by the local worker.
145
    pub fn replication_phase(&self) -> TableReplicationPhase {
839✔
146
        self.table_replication_phase.clone()
839✔
147
    }
839✔
148
}
149

150
/// Thread-safe handle for table synchronization worker state management.
151
///
152
/// [`TableSyncWorkerState`] provides a thread-safe wrapper around table synchronization
153
/// state, enabling multiple workers to coordinate and react to state changes. It serves
154
/// as the primary coordination mechanism between table sync workers and apply workers.
155
///
156
/// The state handle supports atomic updates, notifications, and blocking waits for
157
/// specific phase transitions, making it suitable for complex multi-worker scenarios.
158
#[derive(Debug, Clone)]
159
pub struct TableSyncWorkerState {
160
    inner: Arc<Mutex<TableSyncWorkerStateInner>>,
161
}
162

163
impl TableSyncWorkerState {
164
    /// Creates a new table sync worker state with the given initial phase.
165
    ///
166
    /// This constructor initializes the state management structure with the
167
    /// specified table ID and replication phase. It sets up the notification
168
    /// mechanism for coordinating state changes between workers.
169
    fn new(table_id: TableId, table_replication_phase: TableReplicationPhase) -> Self {
259✔
170
        let inner = TableSyncWorkerStateInner {
259✔
171
            table_id,
259✔
172
            table_replication_phase,
259✔
173
            phase_change: Arc::new(Notify::new()),
259✔
174
            retry_attempts: 0,
259✔
175
        };
259✔
176

177
        Self {
259✔
178
            inner: Arc::new(Mutex::new(inner)),
259✔
179
        }
259✔
180
    }
259✔
181

182
    /// Updates table replication state in both memory and persistent storage.
183
    ///
184
    /// This static method provides a unified interface for updating table state
185
    /// regardless of whether the table has an active worker in the pool.
186
    pub async fn set_and_store<P, S>(
4✔
187
        pool: &P,
4✔
188
        state_store: &S,
4✔
189
        table_id: TableId,
4✔
190
        table_replication_phase: TableReplicationPhase,
4✔
191
    ) -> EtlResult<()>
4✔
192
    where
4✔
193
        P: Deref<Target = TableSyncWorkerPoolInner>,
4✔
194
        S: StateStore,
4✔
195
    {
4✔
196
        let table_sync_worker_state = pool.get_active_worker_state(table_id);
4✔
197

198
        // In case we have the state in memory, we will atomically update the memory and state store
199
        // states. Otherwise, we just update the state store.
200
        if let Some(table_sync_worker_state) = table_sync_worker_state {
4✔
201
            let mut inner = table_sync_worker_state.lock().await;
×
202
            inner
×
203
                .set_and_store(table_replication_phase, state_store)
×
204
                .await?;
×
205
        } else {
206
            state_store
4✔
207
                .update_table_replication_state(table_id, table_replication_phase)
4✔
208
                .await?;
4✔
209
        }
210

211
        Ok(())
4✔
212
    }
4✔
213

214
    /// Waits for the table to reach a specific replication phase type.
215
    ///
216
    /// This method blocks until either the table reaches the desired phase or
217
    /// a shutdown signal is received. It uses an efficient notification system
218
    /// to avoid polling and provides immediate response to state changes.
219
    ///
220
    /// The method returns a `ShutdownResult` that indicates whether the wait
221
    /// completed successfully or was interrupted by shutdown.
222
    pub async fn wait_for_phase_type(
462✔
223
        &self,
462✔
224
        phase_type: TableReplicationPhaseType,
462✔
225
        mut shutdown_rx: ShutdownRx,
462✔
226
    ) -> ShutdownResult<MutexGuard<'_, TableSyncWorkerStateInner>, ()> {
462✔
227
        let table_id = {
462✔
228
            let inner = self.inner.lock().await;
462✔
229
            inner.table_id
462✔
230
        };
231
        info!(
462✔
232
            "waiting for table replication phase '{:?}' for table {:?}",
×
233
            phase_type, table_id
234
        );
235

236
        loop {
237
            tokio::select! {
462✔
238
                biased;
239

240
                // Shutdown signal received, exit loop.
241
                _ = shutdown_rx.changed() => {
462✔
242
                    info!("shutdown signal received, cancelling the wait for phase type {:?}", phase_type);
20✔
243

244
                    return ShutdownResult::Shutdown(());
20✔
245
                }
246

247
                // Try to wait for the phase type.
248
                acquired = self.wait(phase_type) => {
462✔
249
                    if let Some(acquired) = acquired {
442✔
250
                        return ShutdownResult::Ok(acquired);
442✔
251
                    }
×
252
                }
253
            }
254
        }
255
    }
462✔
256

257
    /// Internal wait implementation with timeout-based retry logic.
258
    ///
259
    /// This method implements the core waiting mechanism with built-in timeout
260
    /// protection to handle potential missed notifications. It combines immediate
261
    /// state checking with notification-based waiting to provide reliable phase
262
    /// transition detection.
263
    async fn wait(
462✔
264
        &self,
462✔
265
        phase_type: TableReplicationPhaseType,
462✔
266
    ) -> Option<MutexGuard<'_, TableSyncWorkerStateInner>> {
462✔
267
        // We grab hold of the phase change notify in case we don't immediately have the state
268
        // that we want.
269
        let phase_change = {
462✔
270
            let inner = self.inner.lock().await;
462✔
271
            if inner.table_replication_phase.as_type() == phase_type {
462✔
272
                info!(
×
273
                    "table replication phase '{:?}' was already set, no need to wait",
×
274
                    phase_type
275
                );
276
                return Some(inner);
×
277
            }
462✔
278

279
            inner.phase_change.clone()
462✔
280
        };
281

282
        // We wait for a state change within a timeout. This is done since it might be that a
283
        // notification is missed and in that case we want to avoid blocking indefinitely.
284
        let _ = tokio::time::timeout(PHASE_CHANGE_REFRESH_FREQUENCY, phase_change.notified()).await;
462✔
285

286
        // We read the state and return the lock to the state.
287
        let inner = self.inner.lock().await;
447✔
288
        if inner.table_replication_phase.as_type() == phase_type {
442✔
289
            info!(
442✔
290
                "table replication phase '{:?}' was reached for table {:?}",
×
291
                phase_type, inner.table_id
×
292
            );
293
            return Some(inner);
442✔
294
        }
×
295

296
        None
×
297
    }
442✔
298
}
299

300
impl Deref for TableSyncWorkerState {
301
    type Target = Mutex<TableSyncWorkerStateInner>;
302

303
    fn deref(&self) -> &Self::Target {
1,849✔
304
        &self.inner
1,849✔
305
    }
1,849✔
306
}
307

308
/// Handle for monitoring and controlling table sync workers.
309
///
310
/// [`TableSyncWorkerHandle`] provides control and observability for table
311
/// synchronization workers. It exposes both the worker's state and completion
312
/// status, enabling coordination with other parts of the system.
313
#[derive(Debug)]
314
pub struct TableSyncWorkerHandle {
315
    state: TableSyncWorkerState,
316
    handle: Option<JoinHandle<EtlResult<()>>>,
317
}
318

319
impl WorkerHandle<TableSyncWorkerState> for TableSyncWorkerHandle {
320
    /// Returns a handle to the table sync worker's state.
321
    ///
322
    /// This method provides access to the worker's state management structure,
323
    /// enabling external coordination and monitoring of the worker's progress
324
    /// through different synchronization phases.
325
    fn state(&self) -> TableSyncWorkerState {
335✔
326
        self.state.clone()
335✔
327
    }
335✔
328

329
    /// Waits for the table sync worker to complete execution.
330
    ///
331
    /// This method blocks until the worker finishes processing, either due to
332
    /// successful synchronization completion, shutdown signal, or error. It
333
    /// properly handles panics that might occur within the worker task.
334
    async fn wait(mut self) -> EtlResult<()> {
259✔
335
        let Some(handle) = self.handle.take() else {
259✔
336
            return Ok(());
×
337
        };
338

339
        handle.await.map_err(|err| {
259✔
340
            etl_error!(
×
341
                ErrorKind::TableSyncWorkerPanic,
×
342
                "Table sync worker panicked",
343
                err
×
344
            )
345
        })??;
24✔
346

347
        Ok(())
235✔
348
    }
259✔
349
}
350

351
/// Worker responsible for synchronizing individual tables from Postgres to destinations.
352
///
353
/// [`TableSyncWorker`] handles the complete lifecycle of table synchronization, including
354
/// initial data copying, incremental catchup, and coordination with apply workers. Each
355
/// worker is responsible for a single table and manages its own replication slot.
356
///
357
/// The worker coordinates with the apply worker through shared state and implements
358
/// sophisticated retry and error handling logic to ensure robust operation.
359
#[derive(Debug)]
360
pub struct TableSyncWorker<S, D> {
361
    pipeline_id: PipelineId,
362
    config: Arc<PipelineConfig>,
363
    pool: TableSyncWorkerPool,
364
    table_id: TableId,
365
    store: S,
366
    destination: D,
367
    shutdown_rx: ShutdownRx,
368
    force_syncing_tables_tx: SignalTx,
369
    run_permit: Arc<Semaphore>,
370
}
371

372
impl<S, D> TableSyncWorker<S, D> {
373
    /// Creates a new table sync worker with the given configuration and dependencies.
374
    ///
375
    /// This constructor initializes all necessary components for table synchronization,
376
    /// including coordination channels, resource permits, and storage interfaces.
377
    /// The worker is ready to start synchronization upon creation.
378
    #[expect(clippy::too_many_arguments)]
379
    pub fn new(
259✔
380
        pipeline_id: PipelineId,
259✔
381
        config: Arc<PipelineConfig>,
259✔
382
        pool: TableSyncWorkerPool,
259✔
383
        table_id: TableId,
259✔
384
        store: S,
259✔
385
        destination: D,
259✔
386
        shutdown_rx: ShutdownRx,
259✔
387
        force_syncing_tables_tx: SignalTx,
259✔
388
        run_permit: Arc<Semaphore>,
259✔
389
    ) -> Self {
259✔
390
        Self {
259✔
391
            pipeline_id,
259✔
392
            config,
259✔
393
            pool,
259✔
394
            table_id,
259✔
395
            store,
259✔
396
            destination,
259✔
397
            shutdown_rx,
259✔
398
            force_syncing_tables_tx,
259✔
399
            run_permit,
259✔
400
        }
259✔
401
    }
259✔
402

403
    /// Returns the ID of the table this worker is responsible for synchronizing.
404
    ///
405
    /// This method provides access to the table identifier, which is used for
406
    /// logging, coordination, and state management throughout the synchronization
407
    /// process.
408
    pub fn table_id(&self) -> TableId {
259✔
409
        self.table_id
259✔
410
    }
259✔
411
}
412

413
impl<S, D> TableSyncWorker<S, D>
414
where
415
    S: StateStore + SchemaStore + Clone + Send + Sync + 'static,
416
    D: Destination + Clone + Send + Sync + 'static,
417
{
418
    /// Runs the table sync worker with retry logic and error handling.
419
    ///
420
    /// This method implements the retry loop for table synchronization, handling
421
    /// different error scenarios according to their retry policies. It manages
422
    /// worker lifecycle, state transitions, and cleanup operations while providing
423
    /// robust error recovery capabilities.
424
    async fn guarded_run_table_sync_worker(self, state: TableSyncWorkerState) -> EtlResult<()> {
259✔
425
        let table_id = self.table_id;
259✔
426
        let pool = self.pool.clone();
259✔
427
        let store = self.store.clone();
259✔
428
        let config = self.config.clone();
259✔
429

430
        // Clone all the fields we need for retries.
431
        let pipeline_id = self.pipeline_id;
259✔
432
        let destination = self.destination.clone();
259✔
433
        let shutdown_rx = self.shutdown_rx.clone();
259✔
434
        let force_syncing_tables_tx = self.force_syncing_tables_tx.clone();
259✔
435
        let run_permit = self.run_permit.clone();
259✔
436

437
        loop {
438
            // Recreate the worker for each attempt.
439
            let worker = TableSyncWorker {
275✔
440
                pipeline_id,
275✔
441
                config: config.clone(),
275✔
442
                pool: pool.clone(),
275✔
443
                table_id,
275✔
444
                store: store.clone(),
275✔
445
                destination: destination.clone(),
275✔
446
                shutdown_rx: shutdown_rx.clone(),
275✔
447
                force_syncing_tables_tx: force_syncing_tables_tx.clone(),
275✔
448
                run_permit: run_permit.clone(),
275✔
449
            };
275✔
450

451
            let result = worker.run_table_sync_worker(state.clone()).await;
275✔
452

453
            match result {
275✔
454
                Ok(_) => {
455
                    // Worker completed successfully, mark as finished.
456
                    let mut pool = pool.lock().await;
235✔
457
                    {
458
                        let mut state_guard = state.lock().await;
235✔
459
                        state_guard.reset_retry_attempts();
235✔
460
                    }
461
                    pool.mark_worker_finished(table_id);
235✔
462

463
                    return Ok(());
235✔
464
                }
465
                Err(err) => {
40✔
466
                    error!("table sync worker failed for table {}: {}", table_id, err);
40✔
467

468
                    // Convert error to table replication error to determine retry policy.
469
                    let mut table_error =
40✔
470
                        TableReplicationError::from_etl_error(&config, table_id, &err);
40✔
471
                    let mut retry_policy = table_error.retry_policy().clone();
40✔
472

473
                    // We lock both the pool and the table sync worker state to be consistent.
474
                    let mut pool_guard = pool.lock().await;
40✔
475
                    let mut state_guard = state.lock().await;
40✔
476

477
                    // If it's a timed retry, we want to see if we reached the maximum number of attempts
478
                    // before trying again. If we did, we switch to a manual retry policy.
479
                    if let RetryPolicy::TimedRetry { .. } = retry_policy
40✔
480
                        && state_guard.retry_attempts() >= config.table_error_retry_max_attempts
20✔
481
                    {
482
                        info!(
4✔
483
                            table_id = %table_id,
484
                            config.table_error_retry_max_attempts,
485
                            "maximum automatic retry attempts reached, switching to manual retry"
×
486
                        );
487

488
                        table_error = table_error.with_retry_policy(RetryPolicy::ManualRetry);
4✔
489
                        retry_policy = table_error.retry_policy().clone();
4✔
490

491
                        state_guard.reset_retry_attempts();
4✔
492
                    }
36✔
493

494
                    // Update the state and store with the error.
495
                    if let Err(err) = state_guard.set_and_store(table_error.into(), &store).await {
40✔
496
                        error!(
×
497
                            "failed to update table sync worker state for table {}: {}",
×
498
                            table_id, err
499
                        );
500

501
                        pool_guard.mark_worker_finished(table_id);
×
502

503
                        return Err(err);
×
504
                    };
40✔
505

506
                    match retry_policy {
40✔
507
                        RetryPolicy::TimedRetry { next_retry } => {
16✔
508
                            let now = Utc::now();
16✔
509
                            if now < next_retry {
16✔
510
                                let sleep_duration = (next_retry - now)
16✔
511
                                    .to_std()
16✔
512
                                    .unwrap_or(Duration::from_secs(0));
16✔
513

514
                                info!(
16✔
515
                                    "retrying table sync worker for table {} in {:?}",
×
516
                                    table_id, sleep_duration
517
                                );
518

519
                                // We drop the lock on the pool while waiting. We do not do the same
520
                                // for the state guard since we want to hold the lock for that state
521
                                // since when we are waiting to retry, nobody should be allowed to
522
                                // modify it.
523
                                drop(pool_guard);
16✔
524

525
                                tokio::time::sleep(sleep_duration).await;
16✔
526
                            } else {
527
                                info!(
×
528
                                    "retrying table sync worker for table {} immediately",
×
529
                                    table_id
530
                                );
531
                            }
532

533
                            // We mark that we attempted a retry.
534
                            state_guard.increment_retry_attempts();
16✔
535

536
                            // Before rolling back, we acquire the pool lock again for consistency.
537
                            let mut pool_guard = pool.lock().await;
16✔
538

539
                            // After sleeping, we rollback to the previous state and retry.
540
                            //
541
                            // Note that this rollback is one state before which works only if we are
542
                            // in a table sync worker, this is why it's not in the apply worker:
543
                            // - Errored -> Init: okay since it will restart from scratch.
544
                            // - Errored -> DataSync: okay since it will restart the copy from a new slot.
545
                            // - Errored -> FinishedCopy: okay since the table was already copied, so it resumes
546
                            //   streaming from the `confirmed_flush_lsn`.
547
                            // - Errored -> SyncDone: okay since the table sync will immediately stop.
548
                            // - Errored -> Ready: same as SyncDone.
549
                            //
550
                            // The in-memory states like SyncWait and Catchup won't ever be in a rollback
551
                            // since they are just states used for synchronization and never saved in the
552
                            // state store.
553
                            if let Err(err) = state_guard.rollback(&store).await {
16✔
554
                                error!(
×
555
                                    "failed to rollback table sync worker state for table {}: {}",
×
556
                                    table_id, err
557
                                );
558

559
                                pool_guard.mark_worker_finished(table_id);
×
560

561
                                return Err(err);
×
562
                            };
16✔
563

564
                            continue;
16✔
565
                        }
566
                        RetryPolicy::NoRetry | RetryPolicy::ManualRetry => {
567
                            state_guard.reset_retry_attempts();
24✔
568
                            pool_guard.mark_worker_finished(table_id);
24✔
569

570
                            return Err(err);
24✔
571
                        }
572
                    }
573
                }
574
            }
575
        }
576
    }
259✔
577

578
    /// Executes the core table synchronization process.
579
    ///
580
    /// This method orchestrates the complete table sync workflow: acquiring run
581
    /// permits, establishing replication connections, performing initial data sync,
582
    /// running catchup replication, and cleaning up resources. It handles both
583
    /// the bulk data copy phase and the incremental replication phase.
584
    async fn run_table_sync_worker(mut self, state: TableSyncWorkerState) -> EtlResult<()> {
275✔
585
        debug!(
275✔
586
            "waiting to acquire a running permit for table sync worker for table {}",
×
587
            self.table_id
588
        );
589

590
        // We acquire a permit to run the table sync worker. This helps us limit the number
591
        // of table sync workers running in parallel which in turn helps limit the max
592
        // number of cocurrent connections to the source database.
593
        let permit = tokio::select! {
275✔
594
            _ = self.shutdown_rx.changed() => {
275✔
UNCOV
595
                info!("shutting down table sync worker for table {} while waiting for a run permit", self.table_id);
2✔
UNCOV
596
                return Ok(());
2✔
597
            }
598

599
            permit = self.run_permit.acquire() => {
275✔
600
                permit
273✔
601
            }
602
        };
603

604
        info!(
273✔
605
            "acquired running permit for table sync worker for table {}",
×
606
            self.table_id
607
        );
608

609
        // We create a new replication connection specifically for this table sync worker.
610
        //
611
        // Note that this connection must be tied to the lifetime of this worker, otherwise
612
        // there will be problems when cleaning up the replication slot.
613
        let replication_client =
273✔
614
            PgReplicationClient::connect(self.config.pg_connection.clone()).await?;
273✔
615

616
        let result = start_table_sync(
273✔
617
            self.pipeline_id,
273✔
618
            self.config.clone(),
273✔
619
            replication_client.clone(),
273✔
620
            self.table_id,
273✔
621
            state.clone(),
273✔
622
            self.store.clone(),
273✔
623
            self.destination.clone(),
273✔
624
            self.shutdown_rx.clone(),
273✔
625
            self.force_syncing_tables_tx,
273✔
626
        )
273✔
627
        .await;
273✔
628

629
        let start_lsn = match result {
233✔
630
            Ok(TableSyncResult::SyncCompleted { start_lsn }) => start_lsn,
231✔
631
            Ok(TableSyncResult::SyncStopped | TableSyncResult::SyncNotRequired) => {
632
                return Ok(());
2✔
633
            }
634
            Err(err) => {
40✔
635
                error!("table sync failed for table {}: {}", self.table_id, err);
40✔
636
                return Err(err);
40✔
637
            }
638
        };
639

640
        let result = start_apply_loop(
231✔
641
            self.pipeline_id,
231✔
642
            start_lsn,
231✔
643
            self.config,
231✔
644
            replication_client.clone(),
231✔
645
            self.store.clone(),
231✔
646
            self.destination,
231✔
647
            TableSyncWorkerHook::new(self.table_id, state, self.store),
231✔
648
            self.shutdown_rx,
231✔
649
            None,
231✔
650
        )
231✔
651
        .await?;
231✔
652

653
        // If the apply loop was completed, we perform cleanup since resources are not needed anymore.
654
        if let ApplyLoopResult::Completed = result {
231✔
655
            // We delete the replication slot used by this table sync worker.
656
            //
657
            // Note that if the deletion fails, the slot will remain in the database and will not be
658
            // removed later, so manual intervention will be required. The reason for not implementing
659
            // an automatic cleanup mechanism is that it would introduce performance overhead,
660
            // and we expect this call to fail only rarely.
661
            let slot_name: String =
231✔
662
                EtlReplicationSlot::for_table_sync_worker(self.pipeline_id, self.table_id)
231✔
663
                    .try_into()?;
231✔
664
            let result = tokio::time::timeout(
231✔
665
                MAX_DELETE_SLOT_WAIT,
231✔
666
                replication_client.delete_slot(&slot_name),
231✔
667
            )
231✔
668
            .await;
231✔
669
            if result.is_err() {
231✔
670
                warn!(
×
671
                    "failed to delete the replication slot {slot_name} of the table sync worker {} due to timeout",
×
672
                    self.table_id
673
                );
674
            }
231✔
675
        }
×
676

677
        // This explicit drop is not strictly necessary but is added to make it extra clear
678
        // that the scope of the run permit is needed upto here to avoid multiple parallel
679
        // connections.
680
        drop(permit);
231✔
681

682
        info!("table sync worker {} completed successfully", self.table_id);
231✔
683

684
        Ok(())
231✔
685
    }
275✔
686
}
687

688
impl<S, D> Worker<TableSyncWorkerHandle, TableSyncWorkerState> for TableSyncWorker<S, D>
689
where
690
    S: StateStore + SchemaStore + Clone + Send + Sync + 'static,
691
    D: Destination + Clone + Send + Sync + 'static,
692
{
693
    type Error = EtlError;
694

695
    /// Starts the table sync worker and returns a handle for monitoring.
696
    ///
697
    /// This method initializes the worker by loading its replication state from
698
    /// storage, creating the state management structure, and launching the
699
    /// synchronization process in a background task.
700
    async fn start(self) -> EtlResult<TableSyncWorkerHandle> {
259✔
701
        info!("starting table sync worker for table {}", self.table_id);
259✔
702

703
        let Some(table_replication_phase) = self
259✔
704
            .store
259✔
705
            .get_table_replication_state(self.table_id)
259✔
706
            .await?
259✔
707
        else {
708
            error!(
×
709
                "no replication state found for table {}, cannot start sync worker",
×
710
                self.table_id
711
            );
712

713
            bail!(
×
714
                ErrorKind::InvalidState,
×
715
                "Table replication state not found",
716
                format!("Replication state missing for table {}", self.table_id)
×
717
            );
718
        };
719

720
        info!(
259✔
721
            "loaded table sync worker state for table {}: {:?}",
×
722
            self.table_id, table_replication_phase
723
        );
724

725
        let state = TableSyncWorkerState::new(self.table_id, table_replication_phase);
259✔
726

727
        let table_sync_worker_span = tracing::info_span!(
259✔
728
            "table_sync_worker",
729
            pipeline_id = self.pipeline_id,
730
            publication_name = self.config.publication_name,
259✔
731
            table_id = %self.table_id,
732
        );
733
        let table_sync_worker = self.guarded_run_table_sync_worker(state.clone());
259✔
734

735
        let fut = table_sync_worker.instrument(table_sync_worker_span);
259✔
736
        let handle = tokio::spawn(fut);
259✔
737

738
        Ok(TableSyncWorkerHandle {
259✔
739
            state,
259✔
740
            handle: Some(handle),
259✔
741
        })
259✔
742
    }
259✔
743
}
744

745
/// Internal hook for table sync worker integration with apply loop operations.
746
///
747
/// [`TableSyncWorkerHook`] implements the coordination logic between table sync
748
/// workers and the apply loop that processes replication events. This hook enables
749
/// table sync workers to participate in the apply loop lifecycle while maintaining
750
/// their specific synchronization behavior.
751
#[derive(Debug)]
752
struct TableSyncWorkerHook<S> {
753
    table_id: TableId,
754
    table_sync_worker_state: TableSyncWorkerState,
755
    state_store: S,
756
}
757

758
impl<S> TableSyncWorkerHook<S> {
759
    /// Creates a new table sync worker hook with the given dependencies.
760
    ///
761
    /// This constructor initializes the hook with the table ID, state management
762
    /// structure, and state store implementation.
763
    fn new(
231✔
764
        table_id: TableId,
231✔
765
        table_sync_worker_state: TableSyncWorkerState,
231✔
766
        state_store: S,
231✔
767
    ) -> Self {
231✔
768
        Self {
231✔
769
            table_id,
231✔
770
            table_sync_worker_state,
231✔
771
            state_store,
231✔
772
        }
231✔
773
    }
231✔
774
}
775

776
impl<S> TableSyncWorkerHook<S>
777
where
778
    S: StateStore + Clone,
779
{
780
    /// Tries to advance the [`TableReplicationPhase`] of this table based on the current lsn.
781
    ///
782
    /// Returns `Ok(false)` when the worker is done with its work, signaling the caller that the apply
783
    /// loop should be stopped.
784
    async fn try_advance_phase(
231✔
785
        &self,
231✔
786
        current_lsn: PgLsn,
231✔
787
        update_state: bool,
231✔
788
    ) -> EtlResult<ApplyLoopAction> {
231✔
789
        let mut inner = self.table_sync_worker_state.lock().await;
231✔
790

791
        // If we caught up with the lsn, we mark this table as `SyncDone` and stop the worker.
792
        if let TableReplicationPhase::Catchup { lsn } = inner.replication_phase()
231✔
793
            && current_lsn >= lsn
231✔
794
        {
795
            // If we are told to update the state, we mark the phase as actually changes. We do
796
            // this because we want to update the actual state only when we are sure that the
797
            // progress has been persisted to the destination. When `update_state` is `false` this
798
            // function is used as a lookahead, to determine whether the worker should be stopped.
799
            if update_state {
231✔
800
                inner
231✔
801
                    .set_and_store(
231✔
802
                        TableReplicationPhase::SyncDone { lsn: current_lsn },
231✔
803
                        &self.state_store,
231✔
804
                    )
231✔
805
                    .await?;
231✔
806

807
                info!(
231✔
808
                    "table sync worker for table {} is in sync with the apply worker, the worker will terminate",
×
809
                    self.table_id
810
                );
811
            }
×
812

813
            // If we caught up, we want to complete the apply loop.
814
            //
815
            // Note that completion is different from stopping, when completion is returned, the loop
816
            // is expected to never be run again.
817
            return Ok(ApplyLoopAction::Complete);
231✔
818
        }
×
819

820
        Ok(ApplyLoopAction::Continue)
×
821
    }
231✔
822
}
823

824
impl<S> ApplyLoopHook for TableSyncWorkerHook<S>
825
where
826
    S: StateStore + Clone + Send + Sync + 'static,
827
{
828
    /// Checks if the table sync worker is already synchronized before starting the apply loop.
829
    ///
830
    /// This hook method evaluates whether the worker has already caught up with the
831
    /// apply worker's starting position.
832
    async fn before_loop(&self, start_lsn: PgLsn) -> EtlResult<ApplyLoopAction> {
231✔
833
        info!("checking if the table sync worker is already caught up with the apply worker");
231✔
834

835
        self.try_advance_phase(start_lsn, true).await
231✔
836
    }
231✔
837

838
    /// This function compares `current_lsn` against the table's catch up lsn
839
    /// and if it is greater than or equal to the `Catchup` `lsn`:
840
    ///
841
    /// * Marks the table as sync done in state store if `update_state` is true.
842
    /// * Returns Ok(false) to indicate to the callers that this table has been marked sync done,
843
    ///   meaning that the apply loop should not continue.
844
    ///
845
    /// In all other cases it returns Ok(true)
846
    /// Processes the table's synchronization state based on current LSN progress.
847
    ///
848
    /// This method compares the current LSN against the table's catchup LSN and
849
    /// transitions the table to `SyncDone` state when synchronization is complete.
850
    /// If `update_state` is true, it persists the state change; otherwise, it
851
    /// performs a lookahead check.
852
    ///
853
    /// Returns `Ok(false)` when the table sync is complete and the worker should
854
    /// terminate, `Ok(true)` otherwise.
855
    async fn process_syncing_tables(
×
856
        &self,
×
857
        current_lsn: PgLsn,
×
858
        update_state: bool,
×
859
    ) -> EtlResult<ApplyLoopAction> {
×
860
        info!(
×
861
            "processing syncing tables for table sync worker with lsn {}",
×
862
            current_lsn
863
        );
864

865
        self.try_advance_phase(current_lsn, update_state).await
×
866
    }
×
867

868
    /// Handles table replication errors for the table sync worker.
869
    ///
870
    /// This method processes errors specific to the table this worker manages.
871
    /// If the error relates to this worker's table, it updates the state and
872
    /// signals the worker to terminate. Errors for other tables are ignored.
873
    async fn mark_table_errored(
×
874
        &self,
×
875
        table_replication_error: TableReplicationError,
×
876
    ) -> EtlResult<ApplyLoopAction> {
×
877
        if self.table_id != table_replication_error.table_id() {
×
878
            // If the table is different from the one handled by this table sync worker, marking
879
            // the table will be a noop, and we want to continue the loop.
880
            return Ok(ApplyLoopAction::Continue);
×
881
        }
×
882

883
        // Since we already have access to the table sync worker state, we can avoid going through
884
        // the pool, and we just modify the state here and also update the state store.
885
        let mut inner = self.table_sync_worker_state.lock().await;
×
886
        inner
×
887
            .set_and_store(table_replication_error.into(), &self.state_store)
×
888
            .await?;
×
889

890
        // If a table is marked as errored, this worker should stop processing immediately since there
891
        // is no need to continue, and for this we mark the loop as completed.
892
        Ok(ApplyLoopAction::Complete)
×
893
    }
×
894

895
    /// Determines whether changes should be applied for the given table.
896
    ///
897
    /// For table sync workers, changes are only applied if the table matches
898
    /// the worker's assigned table and the table is not in an error state.
899
    /// This ensures that table sync workers only process events for their
900
    /// specific table during the catchup phase.
901
    ///
902
    /// This method assumes that it is called when the table is in `Catchup` phase for this
903
    /// reason it doesn't check it.
904
    async fn should_apply_changes(
×
905
        &self,
×
906
        table_id: TableId,
×
907
        _remote_final_lsn: PgLsn,
×
908
    ) -> EtlResult<bool> {
×
909
        let inner = self.table_sync_worker_state.lock().await;
×
910
        let is_errored = matches!(
×
911
            inner.table_replication_phase.as_type(),
×
912
            TableReplicationPhaseType::Errored
913
        );
914

915
        let should_apply_changes = !is_errored && self.table_id == table_id;
×
916

917
        debug!(
×
918
            "table {} should apply changes in {:?}: {}",
×
919
            table_id,
920
            self.worker_type(),
×
921
            should_apply_changes
922
        );
923

924
        Ok(should_apply_changes)
×
925
    }
×
926

927
    /// Returns the worker type for this hook.
928
    ///
929
    /// This method identifies this hook as belonging to a table sync worker
930
    /// for the specific table, which is used for coordination, logging, and
931
    /// replication slot management throughout the system.
932
    fn worker_type(&self) -> WorkerType {
×
933
        WorkerType::TableSync {
×
934
            table_id: self.table_id,
×
935
        }
×
936
    }
×
937
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc