• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 22061519674

16 Feb 2026 11:48AM UTC coverage: 74.154% (-0.8%) from 74.949%
22061519674

Pull #603

github

web-flow
Merge 74f14b6de into 7f3f24044
Pull Request #603: feat(core): Improve truncation behavior on table copy and cleanup memory destination

50 of 57 new or added lines in 2 files covered. (87.72%)

261 existing lines in 8 files now uncovered.

17966 of 24228 relevant lines covered (74.15%)

660.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.53
/etl-api/src/validation/validators.rs
1
//! Built-in validators for ETL pipeline and destination prerequisites.
2

3
use std::collections::HashMap;
4

5
use async_trait::async_trait;
6
use etl_destinations::bigquery::BigQueryClient;
7
use etl_destinations::iceberg::{
8
    IcebergClient, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_SECRET_ACCESS_KEY,
9
};
10
use secrecy::ExposeSecret;
11

12
use crate::configs::destination::{FullApiDestinationConfig, FullApiIcebergConfig};
13
use crate::configs::pipeline::FullApiPipelineConfig;
14

15
use super::{ValidationContext, ValidationError, ValidationFailure, Validator};
16

17
/// Validates that the required publication exists in the source database.
18
#[derive(Debug)]
19
pub struct PublicationExistsValidator {
20
    publication_name: String,
21
}
22

23
impl PublicationExistsValidator {
24
    pub fn new(publication_name: String) -> Self {
8✔
25
        Self { publication_name }
8✔
26
    }
8✔
27
}
28

29
#[async_trait]
30
impl Validator for PublicationExistsValidator {
31
    async fn validate(
32
        &self,
33
        ctx: &ValidationContext,
34
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
8✔
35
        let source_pool = ctx
36
            .source_pool
37
            .as_ref()
38
            .expect("source pool required for publication validation");
39

40
        let exists: bool =
41
            sqlx::query_scalar("select exists(select 1 from pg_publication where pubname = $1)")
42
                .bind(&self.publication_name)
43
                .fetch_one(source_pool)
44
                .await?;
45

46
        if exists {
47
            Ok(vec![])
48
        } else {
49
            Ok(vec![ValidationFailure::critical(
50
                "Publication Not Found",
51
                format!(
52
                    "Publication '{}' does not exist in the source database. \
53
                    Create it with: CREATE PUBLICATION {} FOR TABLE <table_name>, ...",
54
                    self.publication_name, self.publication_name
55
                ),
56
            )])
57
        }
58
    }
8✔
59
}
60

61
/// Validates that there are enough free replication slots for the pipeline.
62
#[derive(Debug)]
63
pub struct ReplicationSlotsValidator {
64
    max_table_sync_workers: u16,
65
}
66

67
impl ReplicationSlotsValidator {
68
    pub fn new(max_table_sync_workers: u16) -> Self {
8✔
69
        Self {
8✔
70
            max_table_sync_workers,
8✔
71
        }
8✔
72
    }
8✔
73
}
74

75
#[async_trait]
76
impl Validator for ReplicationSlotsValidator {
77
    async fn validate(
78
        &self,
79
        ctx: &ValidationContext,
80
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
8✔
81
        let source_pool = ctx
82
            .source_pool
83
            .as_ref()
84
            .expect("source pool required for replication slots validation");
85

86
        let max_slots: i32 = sqlx::query_scalar(
87
            "select setting::int from pg_settings where name = 'max_replication_slots'",
88
        )
89
        .fetch_one(source_pool)
90
        .await?;
91

92
        let used_slots: i64 = sqlx::query_scalar("select count(*) from pg_replication_slots")
93
            .fetch_one(source_pool)
94
            .await?;
95

96
        let free_slots = max_slots as i64 - used_slots;
97
        // We need 1 slot for the apply worker plus at most `max_table_sync_workers` other slots
98
        // for table sync workers.
99
        let required_slots = self.max_table_sync_workers as i64 + 1;
100

101
        if required_slots <= free_slots {
102
            Ok(vec![])
103
        } else {
104
            Ok(vec![ValidationFailure::critical(
105
                "Insufficient Replication Slots",
106
                format!(
107
                    "Not enough replication slots available.\n\
108
                    Found {free_slots} free slots, but {required_slots} are required at most during initial table copy ({used_slots}/{max_slots} currently in use).\n\
109
                    Once all tables are copied, only 1 slot will be used.\n\n\
110
                    Please verify:\n\
111
                    (1) max_replication_slots in postgresql.conf is sufficient\n\
112
                    (2) Unused replication slots can be removed\n\
113
                    (3) max_table_sync_workers can be reduced if needed",
114
                ),
115
            )])
116
        }
117
    }
8✔
118
}
119

120
/// Validates that the WAL level is set to 'logical' for replication.
121
#[derive(Debug)]
122
pub struct WalLevelValidator;
123

124
#[async_trait]
125
impl Validator for WalLevelValidator {
126
    async fn validate(
127
        &self,
128
        ctx: &ValidationContext,
129
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
8✔
130
        let source_pool = ctx
131
            .source_pool
132
            .as_ref()
133
            .expect("source pool required for WAL level validation");
134

135
        let wal_level: String = sqlx::query_scalar("select current_setting('wal_level')")
136
            .fetch_one(source_pool)
137
            .await?;
138

139
        if wal_level == "logical" {
140
            Ok(vec![])
141
        } else {
142
            Ok(vec![ValidationFailure::critical(
143
                "Invalid WAL Level",
144
                format!(
145
                    "WAL level is set to '{wal_level}', but must be 'logical' for replication. \
146
                    Update postgresql.conf with: wal_level = 'logical' and restart PostgreSQL"
147
                ),
148
            )])
149
        }
150
    }
8✔
151
}
152

153
/// Validates that the database user has replication permissions.
154
#[derive(Debug)]
155
pub struct ReplicationPermissionsValidator;
156

157
#[async_trait]
158
impl Validator for ReplicationPermissionsValidator {
159
    async fn validate(
160
        &self,
161
        ctx: &ValidationContext,
162
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
8✔
163
        let source_pool = ctx
164
            .source_pool
165
            .as_ref()
166
            .expect("source pool required for replication permissions validation");
167

168
        // Check if user is superuser OR has replication privilege
169
        let has_permission: bool = sqlx::query_scalar(
170
            "select rolsuper or rolreplication from pg_roles where rolname = current_user",
171
        )
172
        .fetch_one(source_pool)
173
        .await?;
174

175
        if has_permission {
176
            Ok(vec![])
177
        } else {
178
            Ok(vec![ValidationFailure::critical(
179
                "Missing Replication Permission",
180
                "The database user does not have replication privileges",
181
            )])
182
        }
183
    }
8✔
184
}
185

186
/// Validates that a publication contains at least one table.
187
#[derive(Debug)]
188
pub struct PublicationHasTablesValidator {
189
    publication_name: String,
190
}
191

192
impl PublicationHasTablesValidator {
193
    pub fn new(publication_name: String) -> Self {
8✔
194
        Self { publication_name }
8✔
195
    }
8✔
196
}
197

198
#[async_trait]
199
impl Validator for PublicationHasTablesValidator {
200
    async fn validate(
201
        &self,
202
        ctx: &ValidationContext,
203
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
8✔
204
        let source_pool = ctx
205
            .source_pool
206
            .as_ref()
207
            .expect("source pool required for publication tables validation");
208

209
        // Check if publication publishes all tables or has specific tables
210
        let result: Option<(bool, i64)> = sqlx::query_as(
211
            r#"
212
            select
213
                p.puballtables,
214
                (select count(*) from pg_publication_tables pt where pt.pubname = p.pubname)
215
            from pg_publication p
216
            where p.pubname = $1
217
            "#,
218
        )
219
        .bind(&self.publication_name)
220
        .fetch_optional(source_pool)
221
        .await?;
222

223
        // If publication doesn't exist, skip this check (PublicationExistsValidator handles it)
224
        let Some((puballtables, table_count)) = result else {
225
            return Ok(vec![]);
226
        };
227

228
        if puballtables || table_count > 0 {
229
            Ok(vec![])
230
        } else {
231
            Ok(vec![ValidationFailure::critical(
232
                "Publication Empty",
233
                format!(
234
                    "Publication '{}' exists but contains no tables.\n\n\
235
                    Add tables with: ALTER PUBLICATION {} ADD TABLE <table_name>",
236
                    self.publication_name, self.publication_name
237
                ),
238
            )])
239
        }
240
    }
8✔
241
}
242

243
/// Validates that all tables in a publication have primary keys.
244
#[derive(Debug)]
245
pub struct PrimaryKeysValidator {
246
    publication_name: String,
247
}
248

249
impl PrimaryKeysValidator {
250
    pub fn new(publication_name: String) -> Self {
8✔
251
        Self { publication_name }
8✔
252
    }
8✔
253
}
254

255
#[async_trait]
256
impl Validator for PrimaryKeysValidator {
257
    async fn validate(
258
        &self,
259
        ctx: &ValidationContext,
260
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
8✔
261
        let source_pool = ctx
262
            .source_pool
263
            .as_ref()
264
            .expect("source pool required for primary keys validation");
265

266
        // Find tables without primary keys using pg_publication_rel for direct OID access
267
        let tables_without_pk: Vec<String> = sqlx::query_scalar(
268
            r#"
269
            select n.nspname || '.' || c.relname
270
            from pg_publication_rel pr
271
            join pg_publication p on p.oid = pr.prpubid
272
            join pg_class c on c.oid = pr.prrelid
273
            join pg_namespace n on n.oid = c.relnamespace
274
            where p.pubname = $1
275
              and not exists (
276
                select 1
277
                from pg_constraint con
278
                where con.conrelid = pr.prrelid
279
                  and con.contype = 'p'
280
              )
281
            order by n.nspname, c.relname
282
            limit 100
283
            "#,
284
        )
285
        .bind(&self.publication_name)
286
        .fetch_all(source_pool)
287
        .await?;
288

289
        if tables_without_pk.is_empty() {
290
            Ok(vec![])
291
        } else {
292
            Ok(vec![ValidationFailure::warning(
293
                "Tables Missing Primary Keys",
294
                format!(
295
                    "Tables without primary keys: {}\n\n\
296
                    Primary keys are required for UPDATE and DELETE replication.",
297
                    tables_without_pk.join(", ")
298
                ),
299
            )])
300
        }
301
    }
8✔
302
}
303

304
/// Validates that tables in a publication don't have generated columns.
305
#[derive(Debug)]
306
pub struct GeneratedColumnsValidator {
307
    publication_name: String,
308
}
309

310
impl GeneratedColumnsValidator {
311
    pub fn new(publication_name: String) -> Self {
8✔
312
        Self { publication_name }
8✔
313
    }
8✔
314
}
315

316
#[async_trait]
317
impl Validator for GeneratedColumnsValidator {
318
    async fn validate(
319
        &self,
320
        ctx: &ValidationContext,
321
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
8✔
322
        let source_pool = ctx
323
            .source_pool
324
            .as_ref()
325
            .expect("source pool required for generated columns validation");
326

327
        // Find tables with generated columns using pg_publication_rel for direct OID access
328
        let tables_with_generated: Vec<String> = sqlx::query_scalar(
329
            r#"
330
            select distinct n.nspname || '.' || c.relname
331
            from pg_publication_rel pr
332
            join pg_publication p on p.oid = pr.prpubid
333
            join pg_class c on c.oid = pr.prrelid
334
            join pg_namespace n on n.oid = c.relnamespace
335
            where p.pubname = $1
336
              and exists (
337
                select 1
338
                from pg_attribute a
339
                where a.attrelid = pr.prrelid
340
                  and a.attnum > 0
341
                  and not a.attisdropped
342
                  and a.attgenerated != ''
343
              )
344
            order by 1
345
            limit 100
346
            "#,
347
        )
348
        .bind(&self.publication_name)
349
        .fetch_all(source_pool)
350
        .await?;
351

352
        if tables_with_generated.is_empty() {
353
            Ok(vec![])
354
        } else {
355
            Ok(vec![ValidationFailure::warning(
356
                "Tables With Generated Columns",
357
                format!(
358
                    "Tables with generated columns: {}\n\n\
359
                    Generated columns cannot be replicated and will be excluded from the destination.",
360
                    tables_with_generated.join(", ")
361
                ),
362
            )])
363
        }
364
    }
8✔
365
}
366

367
/// Composite validator for pipeline prerequisites.
368
#[derive(Debug)]
369
pub struct PipelineValidator {
370
    config: FullApiPipelineConfig,
371
}
372

373
impl PipelineValidator {
374
    pub fn new(config: FullApiPipelineConfig) -> Self {
8✔
375
        Self { config }
8✔
376
    }
8✔
377

378
    fn sub_validators(&self) -> Vec<Box<dyn Validator>> {
8✔
379
        let max_table_sync_workers = self.config.max_table_sync_workers.unwrap_or(4);
8✔
380
        let publication_name = self.config.publication_name.clone();
8✔
381

382
        vec![
8✔
383
            Box::new(WalLevelValidator),
8✔
384
            Box::new(ReplicationPermissionsValidator),
8✔
385
            Box::new(PublicationExistsValidator::new(publication_name.clone())),
8✔
386
            Box::new(PublicationHasTablesValidator::new(publication_name.clone())),
8✔
387
            Box::new(PrimaryKeysValidator::new(publication_name.clone())),
8✔
388
            Box::new(GeneratedColumnsValidator::new(publication_name)),
8✔
389
            Box::new(ReplicationSlotsValidator::new(max_table_sync_workers)),
8✔
390
        ]
391
    }
8✔
392
}
393

394
#[async_trait]
395
impl Validator for PipelineValidator {
396
    async fn validate(
397
        &self,
398
        ctx: &ValidationContext,
399
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
8✔
400
        let mut failures = Vec::new();
401

402
        for validator in self.sub_validators() {
403
            failures.extend(validator.validate(ctx).await?);
404
        }
405

406
        Ok(failures)
407
    }
8✔
408
}
409

410
/// Validates BigQuery destination connectivity and dataset accessibility.
411
#[derive(Debug)]
412
struct BigQueryValidator {
413
    project_id: String,
414
    dataset_id: String,
415
    service_account_key: String,
416
}
417

418
impl BigQueryValidator {
419
    fn new(project_id: String, dataset_id: String, service_account_key: String) -> Self {
3✔
420
        Self {
3✔
421
            project_id,
3✔
422
            dataset_id,
3✔
423
            service_account_key,
3✔
424
        }
3✔
425
    }
3✔
426
}
427

428
#[async_trait]
429
impl Validator for BigQueryValidator {
430
    async fn validate(
431
        &self,
432
        _ctx: &ValidationContext,
433
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
3✔
434
        let client = match BigQueryClient::new_with_key(
435
            self.project_id.clone(),
436
            &self.service_account_key,
437
            1,
438
        )
439
        .await
440
        {
441
            Ok(client) => client,
442
            Err(_) => {
443
                return Ok(vec![ValidationFailure::critical(
444
                    "BigQuery Authentication Failed",
445
                    "Unable to authenticate with BigQuery.\n\n\
446
                    Please verify:\n\
447
                    (1) The service account key is valid JSON\n\
448
                    (2) The key has not expired or been revoked\n\
449
                    (3) The project ID is correct",
450
                )]);
451
            }
452
        };
453

454
        match client.dataset_exists(&self.dataset_id).await {
455
            Ok(true) => Ok(vec![]),
456
            Ok(false) => Ok(vec![ValidationFailure::critical(
457
                "BigQuery Dataset Not Found",
458
                format!(
459
                    "Dataset '{}' does not exist in project '{}'.\n\n\
460
                    Please verify:\n\
461
                    (1) The dataset name is correct\n\
462
                    (2) The dataset exists in the specified project\n\
463
                    (3) The service account has permission to access it",
464
                    self.dataset_id, self.project_id
465
                ),
466
            )]),
467
            Err(_) => Ok(vec![ValidationFailure::critical(
468
                "BigQuery Connection Failed",
469
                "Unable to connect to BigQuery.\n\n\
470
                Please verify:\n\
471
                (1) Network connectivity to Google Cloud\n\
472
                (2) The service account has the required permissions (BigQuery Data Editor, BigQuery Job User)\n\
473
                (3) BigQuery API is enabled for your project",
474
            )]),
475
        }
476
    }
3✔
477
}
478

479
/// Validates Iceberg destination connectivity.
480
#[derive(Debug)]
481
struct IcebergValidator {
482
    config: FullApiIcebergConfig,
483
}
484

485
impl IcebergValidator {
486
    fn new(config: FullApiIcebergConfig) -> Self {
2✔
487
        Self { config }
2✔
488
    }
2✔
489
}
490

491
#[async_trait]
492
impl Validator for IcebergValidator {
493
    async fn validate(
494
        &self,
495
        ctx: &ValidationContext,
496
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
2✔
497
        let client = match &self.config {
498
            FullApiIcebergConfig::Supabase {
499
                project_ref,
500
                warehouse_name,
501
                catalog_token,
502
                s3_access_key_id,
503
                s3_secret_access_key,
504
                s3_region,
505
                ..
506
            } => {
507
                IcebergClient::new_with_supabase_catalog(
508
                    project_ref,
509
                    ctx.environment.get_supabase_domain(),
510
                    catalog_token.expose_secret().to_string(),
511
                    warehouse_name.clone(),
512
                    s3_access_key_id.expose_secret().to_string(),
513
                    s3_secret_access_key.expose_secret().to_string(),
514
                    s3_region.clone(),
515
                )
516
                .await
517
            }
518
            FullApiIcebergConfig::Rest {
519
                catalog_uri,
520
                warehouse_name,
521
                s3_access_key_id,
522
                s3_secret_access_key,
523
                s3_endpoint,
524
                ..
525
            } => {
526
                let mut props = HashMap::new();
527
                props.insert(
528
                    S3_ACCESS_KEY_ID.to_string(),
529
                    s3_access_key_id.expose_secret().to_string(),
530
                );
531
                props.insert(
532
                    S3_SECRET_ACCESS_KEY.to_string(),
533
                    s3_secret_access_key.expose_secret().to_string(),
534
                );
535
                props.insert(S3_ENDPOINT.to_string(), s3_endpoint.clone());
536

537
                IcebergClient::new_with_rest_catalog(
538
                    catalog_uri.clone(),
539
                    warehouse_name.clone(),
540
                    props,
541
                )
542
                .await
543
            }
544
        };
545

546
        let client = match client {
547
            Ok(client) => client,
548
            Err(_) => {
549
                return Ok(vec![ValidationFailure::critical(
550
                    "Iceberg Authentication Failed",
551
                    "Unable to authenticate with Iceberg.\n\n\
552
                    Please verify:\n\
553
                    (1) The catalog token is valid and has not expired\n\
554
                    (2) The S3 access key and secret key are correct\n\
555
                    (3) The catalog URI is properly formatted",
556
                )]);
557
            }
558
        };
559

560
        match client.validate_connectivity().await {
561
            Ok(()) => Ok(vec![]),
562
            Err(_) => Ok(vec![ValidationFailure::critical(
563
                "Iceberg Connection Failed",
564
                "Unable to connect to Iceberg catalog.\n\n\
565
                Please verify:\n\
566
                (1) Network connectivity to the catalog and S3\n\
567
                (2) The warehouse name exists in the catalog\n\
568
                (3) You have the required permissions to access the warehouse\n\
569
                (4) The S3 endpoint is reachable",
570
            )]),
571
        }
572
    }
2✔
573
}
574

575
/// Composite validator for destination prerequisites.
576
#[derive(Debug)]
577
pub struct DestinationValidator {
578
    config: FullApiDestinationConfig,
579
}
580

581
impl DestinationValidator {
582
    pub fn new(config: FullApiDestinationConfig) -> Self {
5✔
583
        Self { config }
5✔
584
    }
5✔
585
}
586

587
#[async_trait]
588
impl Validator for DestinationValidator {
589
    async fn validate(
590
        &self,
591
        ctx: &ValidationContext,
592
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
5✔
593
        match &self.config {
594
            FullApiDestinationConfig::BigQuery {
595
                project_id,
596
                dataset_id,
597
                service_account_key,
598
                ..
599
            } => {
600
                let validator = BigQueryValidator::new(
601
                    project_id.clone(),
602
                    dataset_id.clone(),
603
                    service_account_key.expose_secret().to_string(),
604
                );
605
                validator.validate(ctx).await
606
            }
607
            FullApiDestinationConfig::Iceberg { config } => {
608
                let validator = IcebergValidator::new(config.clone());
609
                validator.validate(ctx).await
610
            }
611
        }
612
    }
5✔
UNCOV
613
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc