• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 15848695850

24 Jun 2025 10:53AM UTC coverage: 59.837% (-1.5%) from 61.288%
15848695850

Pull #152

github

imor
move BoxedStateStore to a new file
Pull Request #152: feat: add postgres state store to allow state to be persisted to the source db

86 of 337 new or added lines in 13 files covered. (25.52%)

21 existing lines in 6 files now uncovered.

5563 of 9297 relevant lines covered (59.84%)

27.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/replicator/src/core.rs
1
use config::shared::{DestinationConfig, ReplicatorConfig, StateStoreConfig};
2
use etl::v2::config::batch::BatchConfig;
3
use etl::v2::config::pipeline::PipelineConfig;
4
use etl::v2::config::retry::RetryConfig;
5
use etl::v2::destination::base::Destination;
6
use etl::v2::destination::memory::MemoryDestination;
7
use etl::v2::pipeline::{Pipeline, PipelineIdentity};
8
use etl::v2::state::store::base::StateStore;
9
use etl::v2::state::store::boxed::BoxedStateStore;
10
use etl::v2::state::store::memory::MemoryStateStore;
11
use etl::v2::state::store::postgres::PostgresStateStore;
12
use etl::SslMode;
13
use postgres::tokio::config::PgConnectionConfig;
14
use std::fmt;
15
use std::io::BufReader;
16
use std::time::Duration;
17
use thiserror::Error;
18
use tracing::{error, info, warn};
19

20
use crate::config::load_replicator_config;
21
use crate::migrations::migrate_state_store;
22

23
#[derive(Debug, Error)]
24
pub enum ReplicatorError {
25
    #[error("The destination {0} is currently unsupported")]
26
    UnsupportedDestination(String),
27
}
28

29
pub async fn start_replicator() -> anyhow::Result<()> {
×
30
    let replicator_config = load_replicator_config()?;
×
31

32
    // We set up the certificates and SSL mode.
33
    let mut trusted_root_certs = vec![];
×
34
    let ssl_mode = if replicator_config.source.tls.enabled {
×
35
        let mut root_certs_reader =
×
36
            BufReader::new(replicator_config.source.tls.trusted_root_certs.as_bytes());
×
37
        for cert in rustls_pemfile::certs(&mut root_certs_reader) {
×
38
            let cert = cert?;
×
39
            trusted_root_certs.push(cert);
×
40
        }
41

42
        SslMode::VerifyFull
×
43
    } else {
44
        SslMode::Disable
×
45
    };
46

47
    // We initialize the state store and destination.
48
    let state_store = init_state_store(&replicator_config).await?;
×
49
    let destination = init_destination(&replicator_config).await?;
×
50

51
    // We create the identity of this pipeline.
52
    let identity = PipelineIdentity::new(
×
53
        replicator_config.pipeline.id,
×
54
        &replicator_config.pipeline.publication_name,
×
55
    );
×
56

×
57
    // We prepare the configuration of the pipeline.
×
58
    // TODO: improve v2 pipeline config to make conversions nicer.
×
59
    let pipeline_config = PipelineConfig {
×
60
        pg_connection: PgConnectionConfig {
×
61
            host: replicator_config.source.host,
×
62
            port: replicator_config.source.port,
×
63
            name: replicator_config.source.name,
×
64
            username: replicator_config.source.username,
×
65
            password: replicator_config.source.password,
×
66
            ssl_mode,
×
67
        },
×
68
        batch: BatchConfig {
×
69
            max_size: replicator_config.pipeline.batch.max_size,
×
70
            max_fill: Duration::from_millis(replicator_config.pipeline.batch.max_fill_ms),
×
71
        },
×
72
        apply_worker_initialization_retry: RetryConfig {
×
73
            max_attempts: replicator_config
×
74
                .pipeline
×
75
                .apply_worker_init_retry
×
76
                .max_attempts,
×
77
            initial_delay: Duration::from_millis(
×
78
                replicator_config
×
79
                    .pipeline
×
80
                    .apply_worker_init_retry
×
81
                    .initial_delay_ms,
×
82
            ),
×
83
            max_delay: Duration::from_millis(
×
84
                replicator_config
×
85
                    .pipeline
×
86
                    .apply_worker_init_retry
×
87
                    .max_delay_ms,
×
88
            ),
×
89
            backoff_factor: replicator_config
×
90
                .pipeline
×
91
                .apply_worker_init_retry
×
92
                .backoff_factor,
×
93
        },
×
94
    };
×
95

×
96
    let pipeline = Pipeline::new(
×
97
        identity,
×
98
        pipeline_config,
×
99
        trusted_root_certs,
×
100
        state_store,
×
101
        destination,
×
102
    );
×
103
    start_pipeline(pipeline).await?;
×
104

105
    Ok(())
×
106
}
×
107

NEW
108
async fn init_state_store(config: &ReplicatorConfig) -> anyhow::Result<BoxedStateStore> {
×
109
    match config.state_store {
×
NEW
110
        StateStoreConfig::Memory => Ok(BoxedStateStore::new(MemoryStateStore::new())),
×
111
        StateStoreConfig::Postgres => {
NEW
112
            migrate_state_store(&config.source).await?;
×
NEW
113
            Ok(BoxedStateStore::new(PostgresStateStore::new(
×
NEW
114
                config.pipeline.id,
×
NEW
115
                config.source.clone(),
×
NEW
116
            )))
×
117
        }
118
    }
119
}
×
120

121
async fn init_destination(
×
122
    config: &ReplicatorConfig,
×
123
) -> anyhow::Result<impl Destination + Clone + Send + Sync + fmt::Debug + 'static> {
×
124
    match config.destination {
×
125
        DestinationConfig::Memory => Ok(MemoryDestination::new()),
×
126
        _ => {
127
            Err(ReplicatorError::UnsupportedDestination(format!("{:?}", config.destination)).into())
×
128
        }
129
    }
130
}
×
131

132
async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> anyhow::Result<()>
×
133
where
×
NEW
134
    S: StateStore + Clone + Send + Sync + 'static,
×
135
    D: Destination + Clone + Send + Sync + fmt::Debug + 'static,
×
136
{
×
137
    // Start the pipeline.
×
138
    pipeline.start().await?;
×
139

140
    // Spawn a task to listen for Ctrl+C and trigger shutdown.
141
    let shutdown_tx = pipeline.shutdown_tx();
×
142
    let shutdown_handle = tokio::spawn(async move {
×
143
        if let Err(e) = tokio::signal::ctrl_c().await {
×
144
            error!("Failed to listen for Ctrl+C: {:?}", e);
×
145
            return;
×
146
        }
×
147

×
148
        info!("Ctrl+C received, shutting down pipeline...");
×
149
        if let Err(e) = shutdown_tx.shutdown() {
×
150
            warn!("Failed to send shutdown signal: {:?}", e);
×
151
        }
×
152
    });
×
153

154
    // Wait for the pipeline to finish (either normally or via shutdown).
155
    let result = pipeline.wait().await;
×
156

157
    // Ensure the shutdown task is finished before returning.
158
    // If the pipeline finished before Ctrl+C, we want to abort the shutdown task.
159
    // If Ctrl+C was pressed, the shutdown task will have already triggered shutdown.
160
    // We don't care about the result of the shutdown_handle, but we should abort it if it's still running.
161
    shutdown_handle.abort();
×
162
    let _ = shutdown_handle.await;
×
163

164
    // Propagate any pipeline error as anyhow error.
165
    result?;
×
166

167
    Ok(())
×
168
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc