• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 16051058340

03 Jul 2025 12:57PM UTC coverage: 66.236%. First build
16051058340

Pull #163

github

web-flow
Merge b692f44a1 into ac3aad28c
Pull Request #163: chore: bump edition for all crates to 2024

5 of 28 new or added lines in 12 files covered. (17.86%)

7031 of 10615 relevant lines covered (66.24%)

38.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/replicator/src/core.rs
1
use crate::config::load_replicator_config;
2
use crate::migrations::migrate_state_store;
3
use config::shared::{DestinationConfig, PgConnectionConfig};
4
use etl::v2::destination::bigquery::BigQueryDestination;
5
use etl::v2::destination::memory::MemoryDestination;
6
use etl::v2::encryption::bigquery::install_crypto_provider_once;
7
use etl::v2::pipeline::Pipeline;
8
use etl::v2::state::store::base::StateStore;
9
use etl::v2::state::store::postgres::PostgresStateStore;
10
use etl::v2::{destination::base::Destination, pipeline::PipelineId};
11
use secrecy::ExposeSecret;
12
use std::fmt;
13
use tracing::{error, info, warn};
14

15
pub async fn start_replicator() -> anyhow::Result<()> {
×
16
    let replicator_config = load_replicator_config()?;
×
17

18
    // We initialize the state store, which for the replicator is not configurable.
NEW
19
    let state_store = init_state_store(
×
NEW
20
        replicator_config.pipeline.id,
×
NEW
21
        replicator_config.pipeline.pg_connection.clone(),
×
NEW
22
    )
×
NEW
23
    .await?;
×
24

25
    // For each destination, we start the pipeline. This is more verbose due to static dispatch, but
26
    // we prefer more performance at the cost of ergonomics.
27
    match &replicator_config.destination {
×
28
        DestinationConfig::Memory => {
29
            let destination = MemoryDestination::new();
×
30

×
31
            let pipeline = Pipeline::new(
×
32
                replicator_config.pipeline.id,
×
33
                replicator_config.pipeline,
×
34
                state_store,
×
35
                destination,
×
36
            );
×
37
            start_pipeline(pipeline).await?;
×
38
        }
39
        DestinationConfig::BigQuery {
40
            project_id,
×
41
            dataset_id,
×
42
            service_account_key,
×
43
            max_staleness_mins,
×
44
        } => {
×
45
            install_crypto_provider_once();
×
46
            let destination = BigQueryDestination::new_with_key(
×
47
                project_id.clone(),
×
48
                dataset_id.clone(),
×
49
                service_account_key.expose_secret(),
×
50
                *max_staleness_mins,
×
51
            )
×
52
            .await?;
×
53

54
            let pipeline = Pipeline::new(
×
55
                replicator_config.pipeline.id,
×
56
                replicator_config.pipeline,
×
57
                state_store,
×
58
                destination,
×
59
            );
×
60
            start_pipeline(pipeline).await?;
×
61
        }
62
    }
63

64
    Ok(())
×
65
}
×
66

NEW
67
async fn init_state_store(
×
NEW
68
    pipeline_id: PipelineId,
×
NEW
69
    pg_connection_config: PgConnectionConfig,
×
NEW
70
) -> anyhow::Result<impl StateStore + Clone> {
×
NEW
71
    migrate_state_store(&pg_connection_config).await?;
×
NEW
72
    Ok(PostgresStateStore::new(pipeline_id, pg_connection_config))
×
73
}
×
74

75
async fn start_pipeline<S, D>(mut pipeline: Pipeline<S, D>) -> anyhow::Result<()>
×
76
where
×
77
    S: StateStore + Clone + Send + Sync + 'static,
×
78
    D: Destination + Clone + Send + Sync + fmt::Debug + 'static,
×
79
{
×
80
    // Start the pipeline.
×
81
    pipeline.start().await?;
×
82

83
    // Spawn a task to listen for Ctrl+C and trigger shutdown.
84
    let shutdown_tx = pipeline.shutdown_tx();
×
85
    let shutdown_handle = tokio::spawn(async move {
×
86
        if let Err(e) = tokio::signal::ctrl_c().await {
×
87
            error!("Failed to listen for Ctrl+C: {:?}", e);
×
88
            return;
×
89
        }
×
90

×
91
        info!("Ctrl+C received, shutting down pipeline...");
×
92
        if let Err(e) = shutdown_tx.shutdown() {
×
93
            warn!("Failed to send shutdown signal: {:?}", e);
×
94
        }
×
95
    });
×
96

97
    // Wait for the pipeline to finish (either normally or via shutdown).
98
    let result = pipeline.wait().await;
×
99

100
    // Ensure the shutdown task is finished before returning.
101
    // If the pipeline finished before Ctrl+C, we want to abort the shutdown task.
102
    // If Ctrl+C was pressed, the shutdown task will have already triggered shutdown.
103
    // We don't care about the result of the shutdown_handle, but we should abort it if it's still running.
104
    shutdown_handle.abort();
×
105
    let _ = shutdown_handle.await;
×
106

107
    // Propagate any pipeline error as anyhow error.
108
    result?;
×
109

110
    Ok(())
×
111
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc