• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_replicate / 15022667797

14 May 2025 01:57PM UTC coverage: 39.539% (-0.4%) from 39.958%
15022667797

Pull #115

github

imor
use correct log file name based on binary name (api or replicator)
Pull Request #115: Improve logging in the API and replicator

0 of 119 new or added lines in 3 files covered. (0.0%)

15 existing lines in 1 file now uncovered.

2451 of 6199 relevant lines covered (39.54%)

18.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

49.17
/replicator/src/configuration.rs
1
use std::fmt::Debug;
2

3
use thiserror::Error;
4

5
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
6
#[serde(rename_all = "snake_case")]
7
pub enum SourceSettings {
8
    Postgres {
9
        /// Host on which Postgres is running
10
        host: String,
11

12
        /// Port on which Postgres is running
13
        port: u16,
14

15
        /// Postgres database name
16
        name: String,
17

18
        /// Postgres database user name
19
        username: String,
20

21
        /// Postgres database user password
22
        password: Option<String>,
23

24
        /// Postgres slot name
25
        slot_name: String,
26

27
        /// Postgres publication name
28
        publication: String,
29
    },
30
}
31

32
impl Debug for SourceSettings {
33
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
34
        match self {
×
35
            Self::Postgres {
×
36
                host,
×
37
                port,
×
38
                name,
×
39
                username,
×
40
                password: _,
×
41
                slot_name,
×
42
                publication,
×
43
            } => f
×
44
                .debug_struct("Postgres")
×
45
                .field("host", host)
×
46
                .field("port", port)
×
47
                .field("name", name)
×
48
                .field("username", username)
×
49
                .field("password", &"REDACTED")
×
50
                .field("slot_name", slot_name)
×
51
                .field("publication", publication)
×
52
                .finish(),
×
53
        }
×
54
    }
×
55
}
56

57
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
58
#[serde(rename_all = "snake_case")]
59
pub enum SinkSettings {
60
    BigQuery {
61
        /// BigQuery project id
62
        project_id: String,
63

64
        /// BigQuery dataset id
65
        dataset_id: String,
66

67
        /// BigQuery service account key
68
        service_account_key: String,
69

70
        /// The max_staleness parameter for BigQuery: https://cloud.google.com/bigquery/docs/change-data-capture#create-max-staleness
71
        #[serde(skip_serializing_if = "Option::is_none")]
72
        max_staleness_mins: Option<u16>,
73
    },
74
}
75

76
impl Debug for SinkSettings {
77
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
78
        match self {
×
79
            Self::BigQuery {
×
80
                project_id,
×
81
                dataset_id,
×
82
                service_account_key: _,
×
83
                max_staleness_mins,
×
84
            } => f
×
85
                .debug_struct("BigQuery")
×
86
                .field("project_id", project_id)
×
87
                .field("dataset_id", dataset_id)
×
88
                .field("service_account_key", &"REDACTED")
×
89
                .field("max_staleness_mins", max_staleness_mins)
×
90
                .finish(),
×
91
        }
×
92
    }
×
93
}
94

95
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
96
pub struct BatchSettings {
97
    /// maximum batch size in number of events
98
    pub max_size: usize,
99

100
    /// maximum duration, in seconds, to wait for a batch to fill
101
    pub max_fill_secs: u64,
102
}
103

104
#[derive(serde::Serialize, serde::Deserialize, PartialEq, Eq)]
105
pub struct TlsSettings {
106
    /// trusted root certificates in PEM format
107
    pub trusted_root_certs: String,
108

109
    /// true when TLS is enabled
110
    pub enabled: bool,
111
}
112

113
impl Debug for TlsSettings {
114
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
115
        f.debug_struct("TlsSettings")
×
116
            .field("trusted_root_certs", &"OMITTED")
×
117
            .field("enabled", &self.enabled)
×
118
            .finish()
×
119
    }
×
120
}
121

122
#[derive(Debug, Error)]
123
pub enum TlsSettingsError {
124
    #[error("Invalid TLS settings: `trusted_root_certs` must be set when `enabled` is true")]
125
    MissingTrustedRootCerts,
126
}
127

128
impl TlsSettings {
NEW
129
    pub fn validate(&self) -> Result<(), TlsSettingsError> {
×
130
        if self.enabled && self.trusted_root_certs.is_empty() {
×
NEW
131
            return Err(TlsSettingsError::MissingTrustedRootCerts);
×
132
        }
×
133
        Ok(())
×
134
    }
×
135
}
136

137
#[derive(Debug, serde::Serialize, serde::Deserialize, PartialEq, Eq)]
138
pub struct Settings {
139
    pub source: SourceSettings,
140
    pub sink: SinkSettings,
141
    pub batch: BatchSettings,
142
    pub tls: TlsSettings,
143
}
144

145
pub fn get_configuration() -> Result<Settings, config::ConfigError> {
×
146
    let base_path = std::env::current_dir().expect("Failed to determine the current directory");
×
147
    let configuration_directory = base_path.join("configuration");
×
148

×
149
    // Detect the running environment.
×
150
    // Default to `dev` if unspecified.
×
151
    let environment: Environment = std::env::var("APP_ENVIRONMENT")
×
152
        .unwrap_or_else(|_| DEV_ENV_NAME.into())
×
153
        .try_into()
×
154
        .expect("Failed to parse APP_ENVIRONMENT.");
×
155

×
156
    let environment_filename = format!("{}.yaml", environment.as_str());
×
157
    let settings = config::Config::builder()
×
158
        .add_source(config::File::from(
×
159
            configuration_directory.join("base.yaml"),
×
160
        ))
×
161
        .add_source(config::File::from(
×
162
            configuration_directory.join(environment_filename),
×
163
        ))
×
164
        // Add in settings from environment variables (with a prefix of APP and '__' as separator)
×
165
        // E.g. `APP_SINK__BIG_QUERY__PROJECT_ID=my-project-id would set `Settings { sink: BigQuery { project_id }}` to my-project-id
×
166
        .add_source(
×
167
            config::Environment::with_prefix("APP")
×
168
                .prefix_separator("_")
×
169
                .separator("__"),
×
170
        )
×
171
        .build()?;
×
172

173
    settings.try_deserialize::<Settings>()
×
174
}
×
175

176
pub const DEV_ENV_NAME: &str = "dev";
177
pub const PROD_ENV_NAME: &str = "prod";
178

179
/// The possible runtime environment for our application.
180
pub enum Environment {
181
    Dev,
182
    Prod,
183
}
184

185
impl Environment {
186
    pub fn as_str(&self) -> &'static str {
×
187
        match self {
×
188
            Environment::Dev => DEV_ENV_NAME,
×
189
            Environment::Prod => PROD_ENV_NAME,
×
190
        }
191
    }
×
192
}
193

194
impl TryFrom<String> for Environment {
195
    type Error = String;
196

197
    fn try_from(s: String) -> Result<Self, Self::Error> {
×
198
        match s.to_lowercase().as_str() {
×
199
            "dev" => Ok(Self::Dev),
×
200
            "prod" => Ok(Self::Prod),
×
201
            other => Err(format!(
×
202
                "{other} is not a supported environment. Use either `{DEV_ENV_NAME}` or `{PROD_ENV_NAME}`.",
×
203
            )),
×
204
        }
205
    }
×
206
}
207

208
#[cfg(test)]
209
mod tests {
210
    use crate::{
211
        configuration::{Settings, TlsSettings},
212
        BatchSettings, SinkSettings, SourceSettings,
213
    };
214

215
    #[test]
216
    pub fn deserialize_settings_test() {
1✔
217
        let settings = r#"{
1✔
218
            "source": {
1✔
219
                "postgres": {
1✔
220
                    "host": "localhost",
1✔
221
                    "port": 5432,
1✔
222
                    "name": "postgres",
1✔
223
                    "username": "postgres",
1✔
224
                    "password": "postgres",
1✔
225
                    "slot_name": "replicator_slot",
1✔
226
                    "publication": "replicator_publication"
1✔
227
                }
1✔
228
            },
1✔
229
            "sink": {
1✔
230
                "big_query": {
1✔
231
                    "project_id": "project-id",
1✔
232
                    "dataset_id": "dataset-id",
1✔
233
                    "service_account_key": "key"
1✔
234
                }
1✔
235
            },
1✔
236
            "batch": {
1✔
237
                "max_size": 1000,
1✔
238
                "max_fill_secs": 10
1✔
239
            },
1✔
240
            "tls": {
1✔
241
                "trusted_root_certs": "",
1✔
242
                "enabled": false
1✔
243
            }
1✔
244
        }"#;
1✔
245
        let actual = serde_json::from_str::<Settings>(settings);
1✔
246
        let expected = Settings {
1✔
247
            source: SourceSettings::Postgres {
1✔
248
                host: "localhost".to_string(),
1✔
249
                port: 5432,
1✔
250
                name: "postgres".to_string(),
1✔
251
                username: "postgres".to_string(),
1✔
252
                password: Some("postgres".to_string()),
1✔
253
                slot_name: "replicator_slot".to_string(),
1✔
254
                publication: "replicator_publication".to_string(),
1✔
255
            },
1✔
256
            sink: SinkSettings::BigQuery {
1✔
257
                project_id: "project-id".to_string(),
1✔
258
                dataset_id: "dataset-id".to_string(),
1✔
259
                service_account_key: "key".to_string(),
1✔
260
                max_staleness_mins: None,
1✔
261
            },
1✔
262
            batch: BatchSettings {
1✔
263
                max_size: 1000,
1✔
264
                max_fill_secs: 10,
1✔
265
            },
1✔
266
            tls: TlsSettings {
1✔
267
                trusted_root_certs: String::new(),
1✔
268
                enabled: false,
1✔
269
            },
1✔
270
        };
1✔
271
        assert!(actual.is_ok());
1✔
272
        assert_eq!(expected, actual.unwrap());
1✔
273
    }
1✔
274

275
    #[test]
276
    pub fn serialize_settings_test() {
1✔
277
        let actual = Settings {
1✔
278
            source: SourceSettings::Postgres {
1✔
279
                host: "localhost".to_string(),
1✔
280
                port: 5432,
1✔
281
                name: "postgres".to_string(),
1✔
282
                username: "postgres".to_string(),
1✔
283
                password: Some("postgres".to_string()),
1✔
284
                slot_name: "replicator_slot".to_string(),
1✔
285
                publication: "replicator_publication".to_string(),
1✔
286
            },
1✔
287
            sink: SinkSettings::BigQuery {
1✔
288
                project_id: "project-id".to_string(),
1✔
289
                dataset_id: "dataset-id".to_string(),
1✔
290
                service_account_key: "key".to_string(),
1✔
291
                max_staleness_mins: None,
1✔
292
            },
1✔
293
            batch: BatchSettings {
1✔
294
                max_size: 1000,
1✔
295
                max_fill_secs: 10,
1✔
296
            },
1✔
297
            tls: TlsSettings {
1✔
298
                trusted_root_certs: String::new(),
1✔
299
                enabled: false,
1✔
300
            },
1✔
301
        };
1✔
302
        let expected = r#"{"source":{"postgres":{"host":"localhost","port":5432,"name":"postgres","username":"postgres","password":"postgres","slot_name":"replicator_slot","publication":"replicator_publication"}},"sink":{"big_query":{"project_id":"project-id","dataset_id":"dataset-id","service_account_key":"key"}},"batch":{"max_size":1000,"max_fill_secs":10},"tls":{"trusted_root_certs":"","enabled":false}}"#;
1✔
303
        let actual = serde_json::to_string(&actual);
1✔
304
        assert!(actual.is_ok());
1✔
305
        assert_eq!(expected, actual.unwrap());
1✔
306
    }
1✔
307
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc