• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 19900107178

03 Dec 2025 03:55PM UTC coverage: 82.008% (-0.4%) from 82.382%
19900107178

Pull #487

github

web-flow
Merge 5c2ab4c83 into eeef10c29
Pull Request #487: ref(allocator): Try to use jemalloc for etl-api and etl-replicator

0 of 91 new or added lines in 2 files covered. (0.0%)

2 existing lines in 1 file now uncovered.

16455 of 20065 relevant lines covered (82.01%)

181.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/etl-replicator/src/main.rs
1
//! ETL replicator service binary.
2
//!
3
//! Initializes and runs the replicator pipeline that handles Postgres logical replication
4
//! and routes data to configured destinations. Includes telemetry, error handling, and
5
//! graceful shutdown capabilities.
6

7
#[cfg(not(target_env = "msvc"))]
8
#[global_allocator]
9
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
10

11
/// Jemalloc configuration optimized for high-throughput async CDC workloads.
12
///
13
/// - `background_thread:true`: Offloads memory purging to background threads, improving tail latency.
14
/// - `metadata_thp:auto`: Enables transparent huge pages for jemalloc metadata, reducing TLB misses.
15
/// - `dirty_decay_ms:10000`: Returns unused dirty pages to the OS after 10 seconds.
16
/// - `muzzy_decay_ms:10000`: Returns unused muzzy pages to the OS after 10 seconds.
17
/// - `tcache_max:8192`: Reduces thread-local cache size for better container memory efficiency.
18
/// - `abort_conf:true`: Aborts on invalid configuration for fail-fast behavior.
19
///
20
/// Note: `narenas` should be set via `MALLOC_CONF` env var to match container CPU limits per environment.
21
#[cfg(not(target_env = "msvc"))]
22
#[allow(non_upper_case_globals)]
23
#[unsafe(export_name = "malloc_conf")]
24
pub static malloc_conf: &[u8] =
25
    b"background_thread:true,metadata_thp:auto,dirty_decay_ms:10000,muzzy_decay_ms:10000,tcache_max:8192,abort_conf:true\0";
26

27
use crate::config::load_replicator_config;
28
use crate::core::start_replicator_with_config;
29
use crate::notification::ErrorNotificationClient;
30
use etl::error::EtlError;
31
use etl_config::Environment;
32
use etl_config::shared::ReplicatorConfig;
33
use etl_telemetry::metrics::init_metrics;
34
use etl_telemetry::tracing::init_tracing_with_top_level_fields;
35
use secrecy::ExposeSecret;
36
use std::sync::Arc;
37
use tracing::{error, info, warn};
38

39
mod config;
40
mod core;
41
mod feature_flags;
42
#[cfg(not(target_env = "msvc"))]
43
mod jemalloc_metrics;
44
mod migrations;
45
mod notification;
46

47
/// The name of the environment variable which contains version information for this replicator.
48
const APP_VERSION_ENV_NAME: &str = "APP_VERSION";
49

50
/// Entry point for the replicator service.
51
///
52
/// Loads configuration, initializes tracing and Sentry, starts the async runtime,
53
/// and launches the replicator pipeline. Handles all errors and ensures proper
54
/// service initialization sequence.
55
fn main() -> anyhow::Result<()> {
×
56
    // Load replicator config
57
    let replicator_config = load_replicator_config()?;
×
58

59
    // Initialize tracing with project reference
60
    let _log_flusher = init_tracing_with_top_level_fields(
×
61
        env!("CARGO_BIN_NAME"),
×
62
        replicator_config.project_ref(),
×
63
        Some(replicator_config.pipeline.id),
×
64
    )?;
×
65

66
    // Initialize Sentry before the async runtime starts
67
    let _sentry_guard = init_sentry()?;
×
68

69
    // Initialize metrics collection
70
    init_metrics(replicator_config.project_ref())?;
×
71

72
    // We start the runtime.
73
    tokio::runtime::Builder::new_multi_thread()
×
74
        .enable_all()
×
75
        .build()?
×
76
        .block_on(async_main(replicator_config))?;
×
77

78
    Ok(())
×
79
}
×
80

81
/// Main async entry point that starts the replicator pipeline.
82
///
83
/// Launches the replicator with the provided configuration and captures any errors
84
/// to Sentry and optionally sends notifications to the Supabase API.
85
async fn async_main(replicator_config: ReplicatorConfig) -> anyhow::Result<()> {
×
86
    // Start the jemalloc metrics collection background task.
87
    #[cfg(not(target_env = "msvc"))]
NEW
88
    jemalloc_metrics::spawn_jemalloc_metrics_task(replicator_config.pipeline.id);
×
89

90
    let notification_client = replicator_config.supabase.as_ref().and_then(
×
91
        |supabase_config| match (&supabase_config.api_url, &supabase_config.api_key) {
×
92
            (Some(api_url), Some(api_key)) => Some(ErrorNotificationClient::new(
×
93
                api_url.clone(),
×
94
                api_key.expose_secret().to_owned(),
×
95
                supabase_config.project_ref.clone(),
×
96
                replicator_config.pipeline.id.to_string(),
×
97
            )),
×
98
            _ => {
99
                warn!(
×
100
                    "missing supabase api url and/or key, failure notifications will not be sent"
×
101
                );
102
                None
×
103
            }
104
        },
×
105
    );
106

107
    // Initialize ConfigCat feature flags if supplied.
108
    let configcat_sdk_key = replicator_config
×
109
        .supabase
×
110
        .as_ref()
×
111
        .and_then(|s| s.configcat_sdk_key.as_deref());
×
112
    let _feature_flags_client = if let Some(configcat_sdk_key) = configcat_sdk_key {
×
113
        Some(feature_flags::init_feature_flags(
×
114
            configcat_sdk_key,
×
115
            replicator_config.project_ref(),
×
116
        )?)
×
117
    } else {
118
        info!("configcat not configured for replicator, skipping initialization");
×
119
        None
×
120
    };
121

122
    // We start the replicator and catch any errors.
123
    if let Err(err) = start_replicator_with_config(replicator_config).await {
×
124
        sentry::capture_error(&*err);
×
125
        error!("an error occurred in the replicator: {err}");
×
126

127
        // Send an error notification if a client is available.
128
        if let Some(client) = notification_client {
×
129
            let error_message = format!("{err}");
×
130
            match err.downcast_ref::<EtlError>() {
×
131
                Some(err) => {
×
132
                    client.notify_error(error_message.clone(), err).await;
×
133
                }
134
                None => {
135
                    client
×
136
                        .notify_error(error_message.clone(), error_message)
×
137
                        .await;
×
138
                }
139
            };
140
        }
×
141

142
        return Err(err);
×
143
    }
×
144

145
    Ok(())
×
146
}
×
147

148
/// Initializes Sentry with replicator-specific configuration.
149
///
150
/// Loads configuration and sets up Sentry if a DSN is provided in the config.
151
/// Tags all errors with the "replicator" service identifier and configures
152
/// panic handling to automatically capture and send panics to Sentry.
153
fn init_sentry() -> anyhow::Result<Option<sentry::ClientInitGuard>> {
×
154
    if let Ok(config) = load_replicator_config()
×
155
        && let Some(sentry_config) = &config.sentry
×
156
    {
157
        info!("initializing sentry with supplied dsn");
×
158

159
        let environment = Environment::load()?;
×
160
        let guard = sentry::init(sentry::ClientOptions {
×
161
            dsn: Some(sentry_config.dsn.expose_secret().parse()?),
×
162
            environment: Some(environment.to_string().into()),
×
163
            integrations: vec![Arc::new(
×
164
                sentry::integrations::panic::PanicIntegration::new(),
×
165
            )],
×
166
            ..Default::default()
×
167
        });
168

169
        // We load the version of the replicator which is specified via environment variable.
170
        let version = std::env::var(APP_VERSION_ENV_NAME);
×
171

172
        // Set service tag to differentiate replicator from other services
173
        sentry::configure_scope(|scope| {
×
174
            scope.set_tag("service", "replicator");
×
175
            if let Ok(version) = version {
×
176
                scope.set_tag("version", version);
×
177
            }
×
178
        });
×
179

180
        return Ok(Some(guard));
×
181
    }
×
182

183
    info!("sentry not configured for replicator, skipping initialization");
×
184

185
    Ok(None)
×
186
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc