• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 15851302206

24 Jun 2025 01:08PM UTC coverage: 61.443% (+0.2%) from 61.288%
15851302206

push

github

web-flow
feat(security): Add `Secret` to more fields (#151)

43 of 57 new or added lines in 6 files covered. (75.44%)

5 existing lines in 4 files now uncovered.

5654 of 9202 relevant lines covered (61.44%)

30.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/replicator/src/core.rs
1
use config::shared::{DestinationConfig, ReplicatorConfig, StateStoreConfig};
2
use etl::v2::config::batch::BatchConfig;
3
use etl::v2::config::pipeline::PipelineConfig;
4
use etl::v2::config::retry::RetryConfig;
5
use etl::v2::destination::base::Destination;
6
use etl::v2::destination::memory::MemoryDestination;
7
use etl::v2::pipeline::{Pipeline, PipelineIdentity};
8
use etl::v2::state::store::base::StateStore;
9
use etl::v2::state::store::memory::MemoryStateStore;
10
use etl::SslMode;
11
use postgres::tokio::config::PgConnectionConfig;
12
use std::fmt;
13
use std::io::BufReader;
14
use std::time::Duration;
15
use thiserror::Error;
16
use tracing::{error, info, warn};
17

18
use crate::config::load_replicator_config;
19

20
#[derive(Debug, Error)]
21
pub enum ReplicatorError {
22
    #[error("The destination {0} is currently unsupported")]
23
    UnsupportedDestination(String),
24
}
25

26
pub async fn start_replicator() -> anyhow::Result<()> {
×
27
    let replicator_config = load_replicator_config()?;
×
28

29
    // We set up the certificates and SSL mode.
30
    let mut trusted_root_certs = vec![];
×
31
    let ssl_mode = if replicator_config.source.tls.enabled {
×
32
        let mut root_certs_reader =
×
33
            BufReader::new(replicator_config.source.tls.trusted_root_certs.as_bytes());
×
34
        for cert in rustls_pemfile::certs(&mut root_certs_reader) {
×
35
            let cert = cert?;
×
36
            trusted_root_certs.push(cert);
×
37
        }
38

39
        SslMode::VerifyFull
×
40
    } else {
41
        SslMode::Disable
×
42
    };
43

44
    // We initialize the state store and destination.
45
    let state_store = init_state_store(&replicator_config).await?;
×
46
    let destination = init_destination(&replicator_config).await?;
×
47

48
    // We create the identity of this pipeline.
49
    let identity = PipelineIdentity::new(
×
50
        replicator_config.pipeline.id,
×
51
        &replicator_config.pipeline.publication_name,
×
52
    );
×
53

×
54
    // We prepare the configuration of the pipeline.
×
55
    // TODO: improve v2 pipeline config to make conversions nicer.
×
56
    let pipeline_config = PipelineConfig {
×
57
        pg_connection: PgConnectionConfig {
×
58
            host: replicator_config.source.host,
×
59
            port: replicator_config.source.port,
×
60
            name: replicator_config.source.name,
×
61
            username: replicator_config.source.username,
×
NEW
62
            password: replicator_config.source.password.map(Into::into),
×
63
            ssl_mode,
×
64
        },
×
65
        batch: BatchConfig {
×
66
            max_size: replicator_config.pipeline.batch.max_size,
×
67
            max_fill: Duration::from_millis(replicator_config.pipeline.batch.max_fill_ms),
×
68
        },
×
69
        apply_worker_initialization_retry: RetryConfig {
×
70
            max_attempts: replicator_config
×
71
                .pipeline
×
72
                .apply_worker_init_retry
×
73
                .max_attempts,
×
74
            initial_delay: Duration::from_millis(
×
75
                replicator_config
×
76
                    .pipeline
×
77
                    .apply_worker_init_retry
×
78
                    .initial_delay_ms,
×
79
            ),
×
80
            max_delay: Duration::from_millis(
×
81
                replicator_config
×
82
                    .pipeline
×
83
                    .apply_worker_init_retry
×
84
                    .max_delay_ms,
×
85
            ),
×
86
            backoff_factor: replicator_config
×
87
                .pipeline
×
88
                .apply_worker_init_retry
×
89
                .backoff_factor,
×
90
        },
×
91
    };
×
92

×
93
    let pipeline = Pipeline::new(
×
94
        identity,
×
95
        pipeline_config,
×
96
        trusted_root_certs,
×
97
        state_store,
×
98
        destination,
×
99
    );
×
100
    start_pipeline(pipeline).await?;
×
101

102
    Ok(())
×
103
}
×
104

105
async fn init_state_store(
×
106
    config: &ReplicatorConfig,
×
107
) -> anyhow::Result<impl StateStore + Clone + Send + Sync + fmt::Debug + 'static> {
×
108
    match config.state_store {
×
109
        StateStoreConfig::Memory => Ok(MemoryStateStore::new()),
×
110
    }
×
111
}
×
112

113
async fn init_destination(
×
114
    config: &ReplicatorConfig,
×
115
) -> anyhow::Result<impl Destination + Clone + Send + Sync + fmt::Debug + 'static> {
×
116
    match config.destination {
×
117
        DestinationConfig::Memory => Ok(MemoryDestination::new()),
×
118
        _ => {
119
            Err(ReplicatorError::UnsupportedDestination(format!("{:?}", config.destination)).into())
×
120
        }
121
    }
122
}
×
123

124
async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> anyhow::Result<()>
×
125
where
×
126
    S: StateStore + Clone + Send + Sync + fmt::Debug + 'static,
×
127
    D: Destination + Clone + Send + Sync + fmt::Debug + 'static,
×
128
{
×
129
    // Start the pipeline.
×
130
    pipeline.start().await?;
×
131

132
    // Spawn a task to listen for Ctrl+C and trigger shutdown.
133
    let shutdown_tx = pipeline.shutdown_tx();
×
134
    let shutdown_handle = tokio::spawn(async move {
×
135
        if let Err(e) = tokio::signal::ctrl_c().await {
×
136
            error!("Failed to listen for Ctrl+C: {:?}", e);
×
137
            return;
×
138
        }
×
139

×
140
        info!("Ctrl+C received, shutting down pipeline...");
×
141
        if let Err(e) = shutdown_tx.shutdown() {
×
142
            warn!("Failed to send shutdown signal: {:?}", e);
×
143
        }
×
144
    });
×
145

146
    // Wait for the pipeline to finish (either normally or via shutdown).
147
    let result = pipeline.wait().await;
×
148

149
    // Ensure the shutdown task is finished before returning.
150
    // If the pipeline finished before Ctrl+C, we want to abort the shutdown task.
151
    // If Ctrl+C was pressed, the shutdown task will have already triggered shutdown.
152
    // We don't care about the result of the shutdown_handle, but we should abort it if it's still running.
153
    shutdown_handle.abort();
×
154
    let _ = shutdown_handle.await;
×
155

156
    // Propagate any pipeline error as anyhow error.
157
    result?;
×
158

159
    Ok(())
×
160
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc