• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 15885763648

25 Jun 2025 07:49PM UTC coverage: 60.073% (-1.4%) from 61.443%
15885763648

Pull #152

github

web-flow
Merge 2b95001e0 into 9f0201c2d
Pull Request #152: feat: add postgres state store to allow state to be persisted to the source db

81 of 313 new or added lines in 16 files covered. (25.88%)

29 existing lines in 8 files now uncovered.

5576 of 9282 relevant lines covered (60.07%)

27.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/replicator/src/core.rs
1
use config::shared::{DestinationConfig, ReplicatorConfig};
2
use etl::v2::config::batch::BatchConfig;
3
use etl::v2::config::pipeline::PipelineConfig;
4
use etl::v2::config::retry::RetryConfig;
5
use etl::v2::destination::base::Destination;
6
use etl::v2::destination::memory::MemoryDestination;
7
use etl::v2::pipeline::{Pipeline, PipelineIdentity};
8
use etl::v2::state::store::base::StateStore;
9
use etl::v2::state::store::postgres::PostgresStateStore;
10
use etl::SslMode;
11
use postgres::tokio::config::PgConnectionConfig;
12
use std::fmt;
13
use std::io::BufReader;
14
use std::time::Duration;
15
use thiserror::Error;
16
use tracing::{error, info, warn};
17

18
use crate::config::load_replicator_config;
19
use crate::migrations::migrate_state_store;
20

21
#[derive(Debug, Error)]
22
pub enum ReplicatorError {
23
    #[error("The destination {0} is currently unsupported")]
24
    UnsupportedDestination(String),
25
}
26

27
pub async fn start_replicator() -> anyhow::Result<()> {
×
28
    let replicator_config = load_replicator_config()?;
×
29

30
    // We set up the certificates and SSL mode.
31
    let mut trusted_root_certs = vec![];
×
32
    let ssl_mode = if replicator_config.source.tls.enabled {
×
33
        let mut root_certs_reader =
×
34
            BufReader::new(replicator_config.source.tls.trusted_root_certs.as_bytes());
×
35
        for cert in rustls_pemfile::certs(&mut root_certs_reader) {
×
36
            let cert = cert?;
×
37
            trusted_root_certs.push(cert);
×
38
        }
39

40
        SslMode::VerifyFull
×
41
    } else {
42
        SslMode::Disable
×
43
    };
44

45
    // We initialize the state store and destination.
46
    let state_store = init_state_store(&replicator_config).await?;
×
47
    let destination = init_destination(&replicator_config).await?;
×
48

49
    // We create the identity of this pipeline.
50
    let identity = PipelineIdentity::new(
×
51
        replicator_config.pipeline.id,
×
52
        &replicator_config.pipeline.publication_name,
×
53
    );
×
54

×
55
    // We prepare the configuration of the pipeline.
×
56
    // TODO: improve v2 pipeline config to make conversions nicer.
×
57
    let pipeline_config = PipelineConfig {
×
58
        pg_connection: PgConnectionConfig {
×
59
            host: replicator_config.source.host,
×
60
            port: replicator_config.source.port,
×
61
            name: replicator_config.source.name,
×
62
            username: replicator_config.source.username,
×
63
            password: replicator_config.source.password.map(Into::into),
×
64
            ssl_mode,
×
65
        },
×
66
        batch: BatchConfig {
×
67
            max_size: replicator_config.pipeline.batch.max_size,
×
68
            max_fill: Duration::from_millis(replicator_config.pipeline.batch.max_fill_ms),
×
69
        },
×
70
        apply_worker_initialization_retry: RetryConfig {
×
71
            max_attempts: replicator_config
×
72
                .pipeline
×
73
                .apply_worker_init_retry
×
74
                .max_attempts,
×
75
            initial_delay: Duration::from_millis(
×
76
                replicator_config
×
77
                    .pipeline
×
78
                    .apply_worker_init_retry
×
79
                    .initial_delay_ms,
×
80
            ),
×
81
            max_delay: Duration::from_millis(
×
82
                replicator_config
×
83
                    .pipeline
×
84
                    .apply_worker_init_retry
×
85
                    .max_delay_ms,
×
86
            ),
×
87
            backoff_factor: replicator_config
×
88
                .pipeline
×
89
                .apply_worker_init_retry
×
90
                .backoff_factor,
×
91
        },
×
92
    };
×
93

×
94
    let pipeline = Pipeline::new(
×
95
        identity,
×
96
        pipeline_config,
×
97
        trusted_root_certs,
×
98
        state_store,
×
99
        destination,
×
100
    );
×
101
    start_pipeline(pipeline).await?;
×
102

103
    Ok(())
×
104
}
×
105

NEW
106
async fn init_state_store(config: &ReplicatorConfig) -> anyhow::Result<impl StateStore + Clone> {
×
NEW
107
    migrate_state_store(config.source.clone()).await?;
×
NEW
108
    Ok(PostgresStateStore::new(
×
NEW
109
        config.pipeline.id,
×
NEW
110
        config.source.clone(),
×
NEW
111
    ))
×
UNCOV
112
}
×
113

114
async fn init_destination(
×
115
    config: &ReplicatorConfig,
×
116
) -> anyhow::Result<impl Destination + Clone + Send + Sync + fmt::Debug + 'static> {
×
117
    match config.destination {
×
118
        DestinationConfig::Memory => Ok(MemoryDestination::new()),
×
119
        _ => {
120
            Err(ReplicatorError::UnsupportedDestination(format!("{:?}", config.destination)).into())
×
121
        }
122
    }
123
}
×
124

125
async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> anyhow::Result<()>
×
126
where
×
NEW
127
    S: StateStore + Clone + Send + Sync + 'static,
×
128
    D: Destination + Clone + Send + Sync + fmt::Debug + 'static,
×
129
{
×
130
    // Start the pipeline.
×
131
    pipeline.start().await?;
×
132

133
    // Spawn a task to listen for Ctrl+C and trigger shutdown.
134
    let shutdown_tx = pipeline.shutdown_tx();
×
135
    let shutdown_handle = tokio::spawn(async move {
×
136
        if let Err(e) = tokio::signal::ctrl_c().await {
×
137
            error!("Failed to listen for Ctrl+C: {:?}", e);
×
138
            return;
×
139
        }
×
140

×
141
        info!("Ctrl+C received, shutting down pipeline...");
×
142
        if let Err(e) = shutdown_tx.shutdown() {
×
143
            warn!("Failed to send shutdown signal: {:?}", e);
×
144
        }
×
145
    });
×
146

147
    // Wait for the pipeline to finish (either normally or via shutdown).
148
    let result = pipeline.wait().await;
×
149

150
    // Ensure the shutdown task is finished before returning.
151
    // If the pipeline finished before Ctrl+C, we want to abort the shutdown task.
152
    // If Ctrl+C was pressed, the shutdown task will have already triggered shutdown.
153
    // We don't care about the result of the shutdown_handle, but we should abort it if it's still running.
154
    shutdown_handle.abort();
×
155
    let _ = shutdown_handle.await;
×
156

157
    // Propagate any pipeline error as anyhow error.
158
    result?;
×
159

160
    Ok(())
×
161
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc