• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_replicate / 15061832574

16 May 2025 06:06AM UTC coverage: 40.541% (+0.7%) from 39.82%
15061832574

Pull #119

github

imor
rename root span and function names
Pull Request #119: emit project field in the root span in replicator

10 of 51 new or added lines in 5 files covered. (19.61%)

20 existing lines in 1 file now uncovered.

2486 of 6132 relevant lines covered (40.54%)

19.4 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/replicator/src/main.rs
1
use std::{io::BufReader, time::Duration, vec};
2

3
use configuration::{
4
    get_configuration, BatchSettings, Settings, SinkSettings, SourceSettings, TlsSettings,
5
};
6
use pg_replicate::{
7
    pipeline::{
8
        batching::{data_pipeline::BatchDataPipeline, BatchConfig},
9
        sinks::bigquery::BigQueryBatchSink,
10
        sources::postgres::{PostgresSource, TableNamesFrom},
11
        PipelineAction,
12
    },
13
    SslMode,
14
};
15
use telemetry::init_tracing;
16
use tracing::{info, instrument};
17

18
mod configuration;
19

20
// APP_SOURCE__POSTGRES__PASSWORD and APP_SINK__BIG_QUERY__PROJECT_ID environment variables must be set
21
// before running because these are sensitive values which can't be configured in the config files
22
#[tokio::main]
23
async fn main() -> anyhow::Result<()> {
×
24
    let app_name = env!("CARGO_BIN_NAME");
×
NEW
25
    // We pass emit_on_span_close = false to avoid emitting logs on span close
×
NEW
26
    // for replicator because it is not a web server and we don't need to emit logs
×
NEW
27
    // for every closing span.
×
NEW
28
    let _log_flusher = init_tracing(app_name, false)?;
×
NEW
29
    let settings = get_configuration()?;
×
NEW
30
    start_replication(settings).await
×
NEW
31
}
×
32

33
#[instrument(name = "replication", skip(settings), fields(project = settings.project))]
34
async fn start_replication(settings: Settings) -> anyhow::Result<()> {
35
    rustls::crypto::aws_lc_rs::default_provider()
36
        .install_default()
37
        .expect("failed to install default crypto provider");
38

39
    let SourceSettings::Postgres {
40
        host,
41
        port,
42
        name,
43
        username,
44
        password: _,
45
        slot_name,
46
        publication,
47
    } = &settings.source;
48
    info!(
49
        host,
50
        port,
51
        dbname = name,
52
        username,
53
        slot_name,
54
        publication,
55
        "source settings"
56
    );
57

58
    let SinkSettings::BigQuery {
59
        project_id,
60
        dataset_id,
61
        service_account_key: _,
62
        max_staleness_mins,
63
    } = &settings.sink;
64

65
    info!(project_id, dataset_id, max_staleness_mins, "sink settings");
66

67
    let BatchSettings {
68
        max_size,
69
        max_fill_secs,
70
    } = &settings.batch;
71
    info!(max_size, max_fill_secs, "batch settings");
72

73
    let TlsSettings {
74
        trusted_root_certs: _,
75
        enabled,
76
    } = &settings.tls;
77
    info!(tls_enabled = enabled, "tls settings");
78

79
    settings.tls.validate()?;
80

81
    let SourceSettings::Postgres {
82
        host,
83
        port,
84
        name,
85
        username,
86
        password,
87
        slot_name,
88
        publication,
89
    } = settings.source;
90

91
    let TlsSettings {
92
        trusted_root_certs,
93
        enabled,
94
    } = settings.tls;
95

96
    let mut trusted_root_certs_vec = vec![];
97
    let ssl_mode = if enabled {
98
        let mut root_certs_reader = BufReader::new(trusted_root_certs.as_bytes());
99
        for cert in rustls_pemfile::certs(&mut root_certs_reader) {
100
            let cert = cert?;
101
            trusted_root_certs_vec.push(cert);
102
        }
103

104
        SslMode::VerifyFull
105
    } else {
106
        SslMode::Disable
107
    };
108

109
    let postgres_source = PostgresSource::new(
110
        &host,
111
        port,
112
        &name,
113
        &username,
114
        password,
115
        ssl_mode,
116
        trusted_root_certs_vec,
117
        Some(slot_name),
118
        TableNamesFrom::Publication(publication),
119
    )
120
    .await?;
121

122
    let SinkSettings::BigQuery {
123
        project_id,
124
        dataset_id,
125
        service_account_key,
126
        max_staleness_mins,
127
    } = settings.sink;
128

129
    let bigquery_sink = BigQueryBatchSink::new_with_key(
130
        project_id,
131
        dataset_id,
132
        &service_account_key,
133
        max_staleness_mins.unwrap_or(5),
134
    )
135
    .await?;
136

137
    let BatchSettings {
138
        max_size,
139
        max_fill_secs,
140
    } = settings.batch;
141

142
    let batch_config = BatchConfig::new(max_size, Duration::from_secs(max_fill_secs));
143
    let mut pipeline = BatchDataPipeline::new(
144
        postgres_source,
145
        bigquery_sink,
146
        PipelineAction::Both,
147
        batch_config,
148
    );
149

150
    pipeline.start().await?;
151

152
    Ok(())
153
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc