• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 15678617192

16 Jun 2025 10:41AM UTC coverage: 59.818% (+0.009%) from 59.809%
15678617192

push

github

web-flow
ref(postgres): Rename `PgDatabaseOptions` structs (#145)

24 of 31 new or added lines in 9 files covered. (77.42%)

2 existing lines in 2 files now uncovered.

5005 of 8367 relevant lines covered (59.82%)

235.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/replicator/src/main.rs
1
use std::{io::BufReader, time::Duration, vec};
2

3
use configuration::{
4
    get_configuration, BatchSettings, DestinationSettings, Settings, SourceSettings, TlsSettings,
5
};
6
use etl::{
7
    pipeline::{
8
        batching::{data_pipeline::BatchDataPipeline, BatchConfig},
9
        destinations::bigquery::BigQueryBatchDestination,
10
        sources::postgres::{PostgresSource, TableNamesFrom},
11
        PipelineAction,
12
    },
13
    SslMode,
14
};
15
use postgres::tokio::config::PgConnectionConfig;
16
use telemetry::init_tracing;
17
use tracing::{info, instrument};
18

19
mod configuration;
20

21
// APP_SOURCE__POSTGRES__PASSWORD and APP_DESTINATION__BIG_QUERY__PROJECT_ID environment variables must be set
22
// before running because these are sensitive values which can't be configured in the config files
23
#[tokio::main]
24
async fn main() -> anyhow::Result<()> {
×
25
    let app_name = env!("CARGO_BIN_NAME");
×
26
    // We pass emit_on_span_close = false to avoid emitting logs on span close
×
27
    // for replicator because it is not a web server and we don't need to emit logs
×
28
    // for every closing span.
×
29
    let _log_flusher = init_tracing(app_name, false)?;
×
30
    let settings = get_configuration()?;
×
31
    start_replication(settings).await
×
32
}
×
33

34
#[instrument(name = "replication", skip(settings), fields(project = settings.project))]
35
async fn start_replication(settings: Settings) -> anyhow::Result<()> {
×
36
    rustls::crypto::aws_lc_rs::default_provider()
37
        .install_default()
38
        .expect("failed to install default crypto provider");
39

40
    let SourceSettings::Postgres {
41
        host,
42
        port,
43
        name,
44
        username,
45
        password: _,
46
        slot_name,
47
        publication,
48
    } = &settings.source;
49
    info!(
50
        host,
51
        port,
52
        dbname = name,
53
        username,
54
        slot_name,
55
        publication,
56
        "source settings"
57
    );
58

59
    let DestinationSettings::BigQuery {
60
        project_id,
61
        dataset_id,
62
        service_account_key: _,
63
        max_staleness_mins,
64
    } = &settings.destination;
65

66
    info!(
67
        project_id,
68
        dataset_id, max_staleness_mins, "destination settings"
69
    );
70

71
    let BatchSettings {
72
        max_size,
73
        max_fill_secs,
74
    } = &settings.batch;
75
    info!(max_size, max_fill_secs, "batch settings");
76

77
    let TlsSettings {
78
        trusted_root_certs: _,
79
        enabled,
80
    } = &settings.tls;
81
    info!(tls_enabled = enabled, "tls settings");
82

83
    settings.tls.validate()?;
84

85
    let SourceSettings::Postgres {
86
        host,
87
        port,
88
        name,
89
        username,
90
        password,
91
        slot_name,
92
        publication,
93
    } = settings.source;
×
94

×
95
    let TlsSettings {
×
96
        trusted_root_certs,
×
97
        enabled,
×
98
    } = settings.tls;
×
99

×
100
    let mut trusted_root_certs_vec = vec![];
×
101
    let ssl_mode = if enabled {
×
102
        let mut root_certs_reader = BufReader::new(trusted_root_certs.as_bytes());
×
103
        for cert in rustls_pemfile::certs(&mut root_certs_reader) {
×
104
            let cert = cert?;
×
105
            trusted_root_certs_vec.push(cert);
×
106
        }
×
107

×
108
        SslMode::VerifyFull
×
109
    } else {
×
110
        SslMode::Disable
×
111
    };
×
112

×
NEW
113
    let options = PgConnectionConfig {
×
114
        host,
×
115
        port,
×
116
        name,
×
117
        username,
×
118
        password,
×
119
        ssl_mode,
×
120
    };
×
121

×
122
    let postgres_source = PostgresSource::new(
×
123
        options,
×
124
        trusted_root_certs_vec,
×
125
        Some(slot_name),
×
126
        TableNamesFrom::Publication(publication),
×
127
    )
×
128
    .await?;
×
129

×
130
    let DestinationSettings::BigQuery {
×
131
        project_id,
×
132
        dataset_id,
×
133
        service_account_key,
×
134
        max_staleness_mins,
×
135
    } = settings.destination;
×
136

×
137
    let bigquery_destination = BigQueryBatchDestination::new_with_key(
×
138
        project_id,
×
139
        dataset_id,
×
140
        &service_account_key,
×
141
        max_staleness_mins.unwrap_or(5),
×
142
    )
×
143
    .await?;
×
144

×
145
    let BatchSettings {
×
146
        max_size,
×
147
        max_fill_secs,
×
148
    } = settings.batch;
×
149

150
    let batch_config = BatchConfig::new(max_size, Duration::from_secs(max_fill_secs));
151
    let mut pipeline = BatchDataPipeline::new(
152
        postgres_source,
153
        bigquery_destination,
154
        PipelineAction::Both,
155
        batch_config,
156
    );
157

158
    pipeline.start().await?;
159

160
    Ok(())
161
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc