• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 25000809530

27 Apr 2026 02:25PM UTC coverage: 78.603% (+0.2%) from 78.38%
25000809530

push

github

web-flow
feat(ducklake): add catalog maintenances (#700)

* feat(ducklake): add catalog maintenances

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* feat(ducklake): add catalog maintenances

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

---------

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

344 of 379 new or added lines in 9 files covered. (90.77%)

4 existing lines in 4 files now uncovered.

27038 of 34398 relevant lines covered (78.6%)

992.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

58.33
/etl-api/src/validation/validators.rs
1
//! Built-in validators for ETL pipeline and destination prerequisites.
2

3
use std::collections::HashMap;
4

5
use async_trait::async_trait;
6
use etl::store::both::memory::MemoryStore;
7
use etl_config::parse_ducklake_url;
8
use etl_destinations::{
9
    bigquery::BigQueryClient,
10
    ducklake::{DuckLakeDestination, S3Config as DucklakeS3Config},
11
    iceberg::{IcebergClient, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_SECRET_ACCESS_KEY},
12
};
13
use secrecy::ExposeSecret;
14
use sqlx::FromRow;
15

16
use super::{ValidationContext, ValidationError, ValidationFailure, Validator};
17
use crate::configs::{
18
    destination::{FullApiDestinationConfig, FullApiIcebergConfig},
19
    pipeline::FullApiPipelineConfig,
20
};
21

22
const ETL_SCHEMA_NAME: &str = "etl";
23

24
/// Validates the connected source role profile for ETL.
25
#[derive(Debug)]
26
pub struct SourceValidator;
27

28
#[derive(Debug, FromRow)]
29
struct SourceRoleAudit {
30
    rolcanlogin: bool,
31
    rolreplication: bool,
32
    rolbypassrls: bool,
33
    rolcreaterole: bool,
34
    rolcreatedb: bool,
35
    rolinherit: bool,
36
    rolvaliduntil_is_null: bool,
37
    etl_schema_exists: bool,
38
    etl_schema_usage: Option<bool>,
39
    etl_schema_create: Option<bool>,
40
    controls_all_existing_etl_tables: Option<bool>,
41
    can_create_schema_if_missing: Option<bool>,
42
}
43

44
#[async_trait]
45
impl Validator for SourceValidator {
46
    async fn validate(
47
        &self,
48
        ctx: &ValidationContext,
49
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
28✔
50
        let Some(expected_username) = ctx.trusted_username.as_ref() else {
51
            return Ok(vec![]);
52
        };
53

54
        let source_pool =
55
            ctx.source_pool.as_ref().expect("source pool required for source validation");
56

57
        let current_user: String =
58
            sqlx::query_scalar("select current_user").fetch_one(source_pool).await?;
59

60
        if current_user != *expected_username {
61
            return Ok(vec![ValidationFailure::critical(
62
                "Invalid source username",
63
                format!("connected as '{current_user}' but expected '{expected_username}'"),
64
            )]);
65
        }
66

67
        // This validation is best effort: it relies on catalog metadata and
68
        // privilege checks to confirm the trusted role profile without running
69
        // invasive probes against the customer database.
70
        let audit = sqlx::query_as::<_, SourceRoleAudit>(
71
            r#"
72
            with target as (
73
              -- Load the direct role attributes for the trusted ETL user.
74
              select
75
                oid,
76
                rolcanlogin,
77
                rolreplication,
78
                rolbypassrls,
79
                rolcreaterole,
80
                rolcreatedb,
81
                rolinherit,
82
                rolvaliduntil is null as rolvaliduntil_is_null
83
              from pg_roles
84
              where rolname = $1
85
            ),
86
            etl_schema as (
87
              -- Check whether the etl schema already exists.
88
              select oid
89
              from pg_namespace
90
              where nspname = $2
91
            ),
92
            etl_tables as (
93
              -- List the existing tables in the etl schema, if present.
94
              select
95
                c.relowner
96
              from pg_class c
97
              join etl_schema s on s.oid = c.relnamespace
98
              where c.relkind in ('r', 'p')
99
            ),
100
            etl_table_ownership as (
101
              -- Determine whether the trusted role controls every existing ETL table.
102
              -- pg_has_role(..., 'USAGE') means the owning role's privileges are
103
              -- immediately available without requiring SET ROLE, which matches
104
              -- how ETL connects and operates.
105
              select
106
                coalesce(bool_and(pg_has_role($1, relowner, 'USAGE')), true)
107
                  as controls_all_existing_etl_tables
108
              from etl_tables
109
            )
110
            select
111
              t.rolcanlogin,
112
              t.rolreplication,
113
              t.rolbypassrls,
114
              t.rolcreaterole,
115
              t.rolcreatedb,
116
              t.rolinherit,
117
              t.rolvaliduntil_is_null,
118
              exists(select 1 from etl_schema) as etl_schema_exists,
119
              case
120
                when exists(select 1 from etl_schema)
121
                then has_schema_privilege($1, $2, 'USAGE')
122
                else null
123
              end as etl_schema_usage,
124
              case
125
                when exists(select 1 from etl_schema)
126
                then has_schema_privilege($1, $2, 'CREATE')
127
                else null
128
              end as etl_schema_create,
129
              case
130
                when exists(select 1 from etl_schema)
131
                then (select controls_all_existing_etl_tables from etl_table_ownership)
132
                else null
133
              end as controls_all_existing_etl_tables,
134
              case
135
                when not exists(select 1 from etl_schema)
136
                then has_database_privilege($1, current_database(), 'CREATE')
137
                else null
138
              end as can_create_schema_if_missing
139
            from target t
140
            "#,
141
        )
142
        .bind(expected_username)
143
        .bind(ETL_SCHEMA_NAME)
144
        .fetch_optional(source_pool)
145
        .await?;
146

147
        let Some(audit) = audit else {
148
            return Ok(vec![ValidationFailure::critical(
149
                "Invalid source role attributes",
150
                "role not found",
151
            )]);
152
        };
153

154
        let has_required_role_attributes = audit.rolcanlogin
155
            && audit.rolreplication
156
            && audit.rolbypassrls
157
            && !audit.rolcreaterole
158
            && !audit.rolcreatedb
159
            && audit.rolinherit
160
            && audit.rolvaliduntil_is_null;
161

162
        let mut failures = Vec::new();
163
        if !has_required_role_attributes {
164
            failures.push(ValidationFailure::critical(
165
                "Invalid source role attributes",
166
                "The source database does not grant the trusted username role all permissions ETL \
167
                 needs to work properly.",
168
            ));
169
        }
170

171
        let has_required_etl_schema_permissions = if audit.etl_schema_exists {
172
            audit.etl_schema_usage == Some(true)
173
                && audit.etl_schema_create == Some(true)
174
                && audit.controls_all_existing_etl_tables == Some(true)
175
        } else {
176
            audit.can_create_schema_if_missing == Some(true)
177
        };
178

179
        if !has_required_etl_schema_permissions {
180
            failures.push(ValidationFailure::critical(
181
                "Invalid source etl schema permissions",
182
                format!(
183
                    "The source database does not grant the trusted username role all permissions \
184
                     ETL needs to manage schema {ETL_SCHEMA_NAME} properly."
185
                ),
186
            ));
187
        }
188

189
        Ok(failures)
190
    }
28✔
191
}
192

193
/// Validates that the required publication exists in the source database.
194
#[derive(Debug)]
195
pub struct PublicationExistsValidator {
196
    publication_name: String,
197
}
198

199
impl PublicationExistsValidator {
200
    pub fn new(publication_name: String) -> Self {
9✔
201
        Self { publication_name }
9✔
202
    }
9✔
203
}
204

205
#[async_trait]
206
impl Validator for PublicationExistsValidator {
207
    async fn validate(
208
        &self,
209
        ctx: &ValidationContext,
210
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
211
        let source_pool =
212
            ctx.source_pool.as_ref().expect("source pool required for publication validation");
213

214
        let exists: bool =
215
            sqlx::query_scalar("select exists(select 1 from pg_publication where pubname = $1)")
216
                .bind(&self.publication_name)
217
                .fetch_one(source_pool)
218
                .await?;
219

220
        if exists {
221
            Ok(vec![])
222
        } else {
223
            Ok(vec![ValidationFailure::critical(
224
                "Publication Not Found",
225
                format!(
226
                    "Publication '{}' does not exist in the source database. Create it with: \
227
                     CREATE PUBLICATION {} FOR TABLE <table_name>, ...",
228
                    self.publication_name, self.publication_name
229
                ),
230
            )])
231
        }
232
    }
9✔
233
}
234

235
/// Validates that there are enough free replication slots for the pipeline.
236
#[derive(Debug)]
237
pub struct ReplicationSlotsValidator {
238
    max_table_sync_workers: u16,
239
}
240

241
impl ReplicationSlotsValidator {
242
    pub fn new(max_table_sync_workers: u16) -> Self {
9✔
243
        Self { max_table_sync_workers }
9✔
244
    }
9✔
245
}
246

247
#[async_trait]
248
impl Validator for ReplicationSlotsValidator {
249
    async fn validate(
250
        &self,
251
        ctx: &ValidationContext,
252
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
253
        let source_pool = ctx
254
            .source_pool
255
            .as_ref()
256
            .expect("source pool required for replication slots validation");
257

258
        let max_slots: i32 = sqlx::query_scalar(
259
            "select setting::int from pg_settings where name = 'max_replication_slots'",
260
        )
261
        .fetch_one(source_pool)
262
        .await?;
263

264
        let used_slots: i64 = sqlx::query_scalar("select count(*) from pg_replication_slots")
265
            .fetch_one(source_pool)
266
            .await?;
267

268
        let free_slots = max_slots as i64 - used_slots;
269
        // We need 1 slot for the apply worker plus at most `max_table_sync_workers`
270
        // other slots for table sync workers.
271
        let required_slots = self.max_table_sync_workers as i64 + 1;
272

273
        if required_slots <= free_slots {
274
            Ok(vec![])
275
        } else {
276
            Ok(vec![ValidationFailure::critical(
277
                "Insufficient Replication Slots",
278
                format!(
279
                    "Not enough replication slots available.\nFound {free_slots} free slots, but \
280
                     {required_slots} are required at most during initial table copy \
281
                     ({used_slots}/{max_slots} currently in use).\nOnce all tables are copied, \
282
                     only 1 slot will be used.\n\nPlease verify:\n(1) max_replication_slots in \
283
                     postgresql.conf is sufficient\n(2) Unused replication slots can be \
284
                     removed\n(3) max_table_sync_workers can be reduced if needed",
285
                ),
286
            )])
287
        }
288
    }
9✔
289
}
290

291
/// Validates that the WAL level is set to 'logical' for replication.
292
#[derive(Debug)]
293
pub struct WalLevelValidator;
294

295
#[async_trait]
296
impl Validator for WalLevelValidator {
297
    async fn validate(
298
        &self,
299
        ctx: &ValidationContext,
300
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
301
        let source_pool =
302
            ctx.source_pool.as_ref().expect("source pool required for WAL level validation");
303

304
        let wal_level: String = sqlx::query_scalar("select current_setting('wal_level')")
305
            .fetch_one(source_pool)
306
            .await?;
307

308
        if wal_level == "logical" {
309
            Ok(vec![])
310
        } else {
311
            Ok(vec![ValidationFailure::critical(
312
                "Invalid WAL Level",
313
                format!(
314
                    "WAL level is set to '{wal_level}', but must be 'logical' for replication. \
315
                     Update postgresql.conf with: wal_level = 'logical' and restart PostgreSQL"
316
                ),
317
            )])
318
        }
319
    }
9✔
320
}
321

322
/// Validates that the database user has replication permissions.
323
#[derive(Debug)]
324
pub struct ReplicationPermissionsValidator;
325

326
#[async_trait]
327
impl Validator for ReplicationPermissionsValidator {
328
    async fn validate(
329
        &self,
330
        ctx: &ValidationContext,
331
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
332
        let source_pool = ctx
333
            .source_pool
334
            .as_ref()
335
            .expect("source pool required for replication permissions validation");
336

337
        // Check if user is superuser OR has replication privilege
338
        let has_permission: bool = sqlx::query_scalar(
339
            "select rolsuper or rolreplication from pg_roles where rolname = current_user",
340
        )
341
        .fetch_one(source_pool)
342
        .await?;
343

344
        if has_permission {
345
            Ok(vec![])
346
        } else {
347
            Ok(vec![ValidationFailure::critical(
348
                "Missing Replication Permission",
349
                "The database user does not have replication privileges",
350
            )])
351
        }
352
    }
9✔
353
}
354

355
/// Validates that a publication contains at least one table.
356
#[derive(Debug)]
357
pub struct PublicationHasTablesValidator {
358
    publication_name: String,
359
}
360

361
impl PublicationHasTablesValidator {
362
    pub fn new(publication_name: String) -> Self {
9✔
363
        Self { publication_name }
9✔
364
    }
9✔
365
}
366

367
#[async_trait]
368
impl Validator for PublicationHasTablesValidator {
369
    async fn validate(
370
        &self,
371
        ctx: &ValidationContext,
372
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
373
        let source_pool = ctx
374
            .source_pool
375
            .as_ref()
376
            .expect("source pool required for publication tables validation");
377

378
        // Check if publication publishes all tables or has specific tables
379
        let result: Option<(bool, i64)> = sqlx::query_as(
380
            r#"
381
            select
382
                p.puballtables,
383
                (select count(*) from pg_publication_tables pt where pt.pubname = p.pubname)
384
            from pg_publication p
385
            where p.pubname = $1
386
            "#,
387
        )
388
        .bind(&self.publication_name)
389
        .fetch_optional(source_pool)
390
        .await?;
391

392
        // If publication doesn't exist, skip this check (PublicationExistsValidator
393
        // handles it)
394
        let Some((puballtables, table_count)) = result else {
395
            return Ok(vec![]);
396
        };
397

398
        if puballtables || table_count > 0 {
399
            Ok(vec![])
400
        } else {
401
            Ok(vec![ValidationFailure::critical(
402
                "Publication Empty",
403
                format!(
404
                    "Publication '{}' exists but contains no tables.\n\nAdd tables with: ALTER \
405
                     PUBLICATION {} ADD TABLE <table_name>",
406
                    self.publication_name, self.publication_name
407
                ),
408
            )])
409
        }
410
    }
9✔
411
}
412

413
/// Validates that all tables in a publication have primary keys.
414
#[derive(Debug)]
415
pub struct PrimaryKeysValidator {
416
    publication_name: String,
417
}
418

419
impl PrimaryKeysValidator {
420
    pub fn new(publication_name: String) -> Self {
9✔
421
        Self { publication_name }
9✔
422
    }
9✔
423
}
424

425
#[async_trait]
426
impl Validator for PrimaryKeysValidator {
427
    async fn validate(
428
        &self,
429
        ctx: &ValidationContext,
430
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
431
        let source_pool =
432
            ctx.source_pool.as_ref().expect("source pool required for primary keys validation");
433

434
        // Find tables without primary keys using pg_publication_rel for direct OID
435
        // access
436
        let tables_without_pk: Vec<String> = sqlx::query_scalar(
437
            r#"
438
            select n.nspname || '.' || c.relname
439
            from pg_publication_rel pr
440
            join pg_publication p on p.oid = pr.prpubid
441
            join pg_class c on c.oid = pr.prrelid
442
            join pg_namespace n on n.oid = c.relnamespace
443
            where p.pubname = $1
444
              and not exists (
445
                select 1
446
                from pg_constraint con
447
                where con.conrelid = pr.prrelid
448
                  and con.contype = 'p'
449
              )
450
            order by n.nspname, c.relname
451
            limit 100
452
            "#,
453
        )
454
        .bind(&self.publication_name)
455
        .fetch_all(source_pool)
456
        .await?;
457

458
        if tables_without_pk.is_empty() {
459
            Ok(vec![])
460
        } else {
461
            Ok(vec![ValidationFailure::warning(
462
                "Tables Missing Primary Keys",
463
                format!(
464
                    "Tables without primary keys: {}\n\nPrimary keys are required for UPDATE and \
465
                     DELETE replication.",
466
                    tables_without_pk.join(", ")
467
                ),
468
            )])
469
        }
470
    }
9✔
471
}
472

473
/// Validates that tables in a publication don't have generated columns.
474
#[derive(Debug)]
475
pub struct GeneratedColumnsValidator {
476
    publication_name: String,
477
}
478

479
impl GeneratedColumnsValidator {
480
    pub fn new(publication_name: String) -> Self {
9✔
481
        Self { publication_name }
9✔
482
    }
9✔
483
}
484

485
#[async_trait]
486
impl Validator for GeneratedColumnsValidator {
487
    async fn validate(
488
        &self,
489
        ctx: &ValidationContext,
490
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
491
        let source_pool = ctx
492
            .source_pool
493
            .as_ref()
494
            .expect("source pool required for generated columns validation");
495

496
        // Find tables with generated columns using pg_publication_rel for direct OID
497
        // access
498
        let tables_with_generated: Vec<String> = sqlx::query_scalar(
499
            r#"
500
            select distinct n.nspname || '.' || c.relname
501
            from pg_publication_rel pr
502
            join pg_publication p on p.oid = pr.prpubid
503
            join pg_class c on c.oid = pr.prrelid
504
            join pg_namespace n on n.oid = c.relnamespace
505
            where p.pubname = $1
506
              and exists (
507
                select 1
508
                from pg_attribute a
509
                where a.attrelid = pr.prrelid
510
                  and a.attnum > 0
511
                  and not a.attisdropped
512
                  and a.attgenerated != ''
513
              )
514
            order by 1
515
            limit 100
516
            "#,
517
        )
518
        .bind(&self.publication_name)
519
        .fetch_all(source_pool)
520
        .await?;
521

522
        if tables_with_generated.is_empty() {
523
            Ok(vec![])
524
        } else {
525
            Ok(vec![ValidationFailure::warning(
526
                "Tables With Generated Columns",
527
                format!(
528
                    "Tables with generated columns: {}\n\nGenerated columns cannot be replicated \
529
                     and will be excluded from the destination.",
530
                    tables_with_generated.join(", ")
531
                ),
532
            )])
533
        }
534
    }
9✔
535
}
536

537
/// Composite validator for pipeline prerequisites.
538
#[derive(Debug)]
539
pub struct PipelineValidator {
540
    config: FullApiPipelineConfig,
541
}
542

543
impl PipelineValidator {
544
    pub fn new(config: FullApiPipelineConfig) -> Self {
9✔
545
        Self { config }
9✔
546
    }
9✔
547

548
    fn sub_validators(&self) -> Vec<Box<dyn Validator>> {
9✔
549
        let max_table_sync_workers = self.config.max_table_sync_workers.unwrap_or(4);
9✔
550
        let publication_name = self.config.publication_name.clone();
9✔
551

552
        vec![
9✔
553
            Box::new(WalLevelValidator),
9✔
554
            Box::new(ReplicationPermissionsValidator),
9✔
555
            Box::new(PublicationExistsValidator::new(publication_name.clone())),
9✔
556
            Box::new(PublicationHasTablesValidator::new(publication_name.clone())),
9✔
557
            Box::new(PrimaryKeysValidator::new(publication_name.clone())),
9✔
558
            Box::new(GeneratedColumnsValidator::new(publication_name)),
9✔
559
            Box::new(ReplicationSlotsValidator::new(max_table_sync_workers)),
9✔
560
        ]
561
    }
9✔
562
}
563

564
#[async_trait]
565
impl Validator for PipelineValidator {
566
    async fn validate(
567
        &self,
568
        ctx: &ValidationContext,
569
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
570
        let mut failures = Vec::new();
571

572
        for validator in self.sub_validators() {
573
            failures.extend(validator.validate(ctx).await?);
574
        }
575

576
        Ok(failures)
577
    }
9✔
578
}
579

580
/// Validates BigQuery destination connectivity and dataset accessibility.
581
#[derive(Debug)]
582
struct BigQueryValidator {
583
    project_id: String,
584
    dataset_id: String,
585
    service_account_key: String,
586
}
587

588
impl BigQueryValidator {
589
    fn new(project_id: String, dataset_id: String, service_account_key: String) -> Self {
4✔
590
        Self { project_id, dataset_id, service_account_key }
4✔
591
    }
4✔
592
}
593

594
#[async_trait]
595
impl Validator for BigQueryValidator {
596
    async fn validate(
597
        &self,
598
        _ctx: &ValidationContext,
599
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
4✔
600
        let Ok(client) =
601
            BigQueryClient::new_with_key(self.project_id.clone(), &self.service_account_key, 1)
602
                .await
603
        else {
604
            return Ok(vec![ValidationFailure::critical(
605
                "BigQuery Authentication Failed",
606
                "Unable to authenticate with BigQuery.\n\nPlease verify:\n(1) The service account \
607
                 key is valid JSON\n(2) The key has not expired or been revoked\n(3) The project \
608
                 ID is correct",
609
            )]);
610
        };
611

612
        match client.dataset_exists(&self.dataset_id).await {
613
            Ok(true) => Ok(vec![]),
614
            Ok(false) => Ok(vec![ValidationFailure::critical(
615
                "BigQuery Dataset Not Found",
616
                format!(
617
                    "Dataset '{}' does not exist in project '{}'.\n\nPlease verify:\n(1) The \
618
                     dataset name is correct\n(2) The dataset exists in the specified \
619
                     project\n(3) The service account has permission to access it",
620
                    self.dataset_id, self.project_id
621
                ),
622
            )]),
623
            Err(_) => Ok(vec![ValidationFailure::critical(
624
                "BigQuery Connection Failed",
625
                "Unable to connect to BigQuery.\n\nPlease verify:\n(1) Network connectivity to \
626
                 Google Cloud\n(2) The service account has the required permissions (BigQuery \
627
                 Data Editor, BigQuery Job User)\n(3) BigQuery API is enabled for your project",
628
            )]),
629
        }
630
    }
4✔
631
}
632

633
/// Validates Iceberg destination connectivity.
634
#[derive(Debug)]
635
struct IcebergValidator {
636
    config: FullApiIcebergConfig,
637
}
638

639
/// Validates DuckLake destination connectivity.
640
#[derive(Debug)]
641
struct DucklakeValidator {
642
    catalog_url: String,
643
    data_path: String,
644
    pool_size: u32,
645
    s3_access_key_id: Option<String>,
646
    s3_secret_access_key: Option<String>,
647
    s3_region: Option<String>,
648
    s3_endpoint: Option<String>,
649
    s3_url_style: Option<String>,
650
    s3_use_ssl: Option<bool>,
651
    metadata_schema: Option<String>,
652
    duckdb_memory_cache_limit: Option<String>,
653
    maintenance_target_file_size: Option<String>,
654
    expire_snapshots_older_than: Option<String>,
655
}
656

657
impl DucklakeValidator {
658
    #[allow(clippy::too_many_arguments)]
659
    fn new(
×
660
        catalog_url: String,
×
661
        data_path: String,
×
662
        pool_size: u32,
×
663
        s3_access_key_id: Option<String>,
×
664
        s3_secret_access_key: Option<String>,
×
665
        s3_region: Option<String>,
×
666
        s3_endpoint: Option<String>,
×
667
        s3_url_style: Option<String>,
×
668
        s3_use_ssl: Option<bool>,
×
669
        metadata_schema: Option<String>,
×
670
        duckdb_memory_cache_limit: Option<String>,
×
671
        maintenance_target_file_size: Option<String>,
×
NEW
672
        expire_snapshots_older_than: Option<String>,
×
673
    ) -> Self {
×
674
        Self {
×
675
            catalog_url,
×
676
            data_path,
×
677
            pool_size,
×
678
            s3_access_key_id,
×
679
            s3_secret_access_key,
×
680
            s3_region,
×
681
            s3_endpoint,
×
682
            s3_url_style,
×
683
            s3_use_ssl,
×
684
            metadata_schema,
×
685
            duckdb_memory_cache_limit,
×
686
            maintenance_target_file_size,
×
NEW
687
            expire_snapshots_older_than,
×
688
        }
×
689
    }
×
690
}
691

692
#[async_trait]
693
impl Validator for DucklakeValidator {
694
    async fn validate(
695
        &self,
696
        _ctx: &ValidationContext,
697
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
×
698
        match (&self.s3_access_key_id, &self.s3_secret_access_key) {
699
            (Some(_), None) | (None, Some(_)) => {
700
                return Ok(vec![ValidationFailure::critical(
701
                    "Ducklake S3 Configuration Invalid",
702
                    "DuckLake S3 credentials must include both access key ID and secret access \
703
                     key.",
704
                )]);
705
            }
706
            _ => {}
707
        }
708

709
        let catalog_url = match parse_ducklake_url(&self.catalog_url) {
710
            Ok(url) => url,
711
            Err(error) => {
712
                return Ok(vec![ValidationFailure::critical(
713
                    "Ducklake Catalog Url Invalid",
714
                    error.to_string(),
715
                )]);
716
            }
717
        };
718

719
        let data_path = match parse_ducklake_url(&self.data_path) {
720
            Ok(url) => url,
721
            Err(error) => {
722
                return Ok(vec![ValidationFailure::critical(
723
                    "Ducklake Data Path Invalid",
724
                    error.to_string(),
725
                )]);
726
            }
727
        };
728

729
        let s3_config = self.s3_access_key_id.clone().map(|access_key_id| DucklakeS3Config {
730
            access_key_id,
×
731
            secret_access_key: self
×
732
                .s3_secret_access_key
×
733
                .clone()
×
734
                .expect("ducklake s3 secret access key should be present"),
×
735
            region: self.s3_region.clone().unwrap_or_else(|| "us-east-1".to_string()),
×
736
            endpoint: self.s3_endpoint.clone(),
×
737
            url_style: self.s3_url_style.clone().unwrap_or_else(|| "path".to_string()),
×
738
            use_ssl: self.s3_use_ssl.unwrap_or(false),
×
739
        });
×
740

741
        match DuckLakeDestination::new(
742
            catalog_url,
743
            data_path,
744
            self.pool_size,
745
            s3_config,
746
            self.metadata_schema.clone(),
747
            self.duckdb_memory_cache_limit.clone(),
748
            self.maintenance_target_file_size.clone(),
749
            self.expire_snapshots_older_than.clone(),
750
            MemoryStore::new(),
751
        )
752
        .await
753
        {
754
            Ok(_) => Ok(vec![]),
755
            Err(_) => Ok(vec![ValidationFailure::critical(
756
                "Ducklake Connection Failed",
757
                "Unable to connect to DuckLake.\n\nPlease verify:\n(1) The catalog URL and data \
758
                 path are valid and reachable\n(2) DuckLake catalog credentials are embedded \
759
                 correctly in the catalog URL\n(3) The S3-compatible credentials and endpoint are \
760
                 correct when using object storage",
761
            )]),
762
        }
763
    }
×
764
}
765

766
impl IcebergValidator {
767
    fn new(config: FullApiIcebergConfig) -> Self {
2✔
768
        Self { config }
2✔
769
    }
2✔
770
}
771

772
#[async_trait]
773
impl Validator for IcebergValidator {
774
    async fn validate(
775
        &self,
776
        ctx: &ValidationContext,
777
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
2✔
778
        let client = match &self.config {
779
            FullApiIcebergConfig::Supabase {
780
                project_ref,
781
                warehouse_name,
782
                catalog_token,
783
                s3_access_key_id,
784
                s3_secret_access_key,
785
                s3_region,
786
                ..
787
            } => {
788
                IcebergClient::new_with_supabase_catalog(
789
                    project_ref,
790
                    ctx.environment.get_supabase_domain(),
791
                    catalog_token.expose_secret().to_string(),
792
                    warehouse_name.clone(),
793
                    s3_access_key_id.expose_secret().to_string(),
794
                    s3_secret_access_key.expose_secret().to_string(),
795
                    s3_region.clone(),
796
                )
797
                .await
798
            }
799
            FullApiIcebergConfig::Rest {
800
                catalog_uri,
801
                warehouse_name,
802
                s3_access_key_id,
803
                s3_secret_access_key,
804
                s3_endpoint,
805
                ..
806
            } => {
807
                let mut props = HashMap::new();
808
                props.insert(
809
                    S3_ACCESS_KEY_ID.to_string(),
810
                    s3_access_key_id.expose_secret().to_string(),
811
                );
812
                props.insert(
813
                    S3_SECRET_ACCESS_KEY.to_string(),
814
                    s3_secret_access_key.expose_secret().to_string(),
815
                );
816
                props.insert(S3_ENDPOINT.to_string(), s3_endpoint.clone());
817

818
                IcebergClient::new_with_rest_catalog(
819
                    catalog_uri.clone(),
820
                    warehouse_name.clone(),
821
                    props,
822
                )
823
                .await
824
            }
825
        };
826
        let Ok(client) = client else {
827
            return Ok(vec![ValidationFailure::critical(
828
                "Iceberg Authentication Failed",
829
                "Unable to authenticate with Iceberg.\n\nPlease verify:\n(1) The catalog token is \
830
                 valid and has not expired\n(2) The S3 access key and secret key are correct\n(3) \
831
                 The catalog URI is properly formatted",
832
            )]);
833
        };
834

835
        match client.validate_connectivity().await {
836
            Ok(()) => Ok(vec![]),
837
            Err(_) => Ok(vec![ValidationFailure::critical(
838
                "Iceberg Connection Failed",
839
                "Unable to connect to Iceberg catalog.\n\nPlease verify:\n(1) Network \
840
                 connectivity to the catalog and S3\n(2) The warehouse name exists in the \
841
                 catalog\n(3) You have the required permissions to access the warehouse\n(4) The \
842
                 S3 endpoint is reachable",
843
            )]),
844
        }
845
    }
2✔
846
}
847

848
/// Composite validator for destination prerequisites.
849
#[derive(Debug)]
850
pub struct DestinationValidator {
851
    config: FullApiDestinationConfig,
852
}
853

854
impl DestinationValidator {
855
    pub fn new(config: FullApiDestinationConfig) -> Self {
6✔
856
        Self { config }
6✔
857
    }
6✔
858
}
859

860
#[async_trait]
861
impl Validator for DestinationValidator {
862
    async fn validate(
863
        &self,
864
        ctx: &ValidationContext,
865
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
6✔
866
        match &self.config {
867
            FullApiDestinationConfig::BigQuery {
868
                project_id,
869
                dataset_id,
870
                service_account_key,
871
                ..
872
            } => {
873
                let validator = BigQueryValidator::new(
874
                    project_id.clone(),
875
                    dataset_id.clone(),
876
                    service_account_key.expose_secret().to_string(),
877
                );
878
                validator.validate(ctx).await
879
            }
880
            FullApiDestinationConfig::Iceberg { config } => {
881
                let validator = IcebergValidator::new(config.clone());
882
                validator.validate(ctx).await
883
            }
884
            FullApiDestinationConfig::Ducklake {
885
                catalog_url,
886
                data_path,
887
                pool_size,
888
                s3_access_key_id,
889
                s3_secret_access_key,
890
                s3_region,
891
                s3_endpoint,
892
                s3_url_style,
893
                s3_use_ssl,
894
                metadata_schema,
895
                duckdb_memory_cache_limit,
896
                maintenance_target_file_size,
897
                expire_snapshots_older_than,
898
            } => {
899
                let validator = DucklakeValidator::new(
900
                    catalog_url.clone(),
901
                    data_path.clone(),
902
                    pool_size.unwrap_or(
903
                        etl_config::shared::DestinationConfig::DEFAULT_DUCKLAKE_POOL_SIZE,
904
                    ),
905
                    s3_access_key_id.as_ref().map(|value| value.expose_secret().to_string()),
×
906
                    s3_secret_access_key.as_ref().map(|value| value.expose_secret().to_string()),
×
907
                    s3_region.clone(),
908
                    s3_endpoint.clone(),
909
                    s3_url_style.clone(),
910
                    *s3_use_ssl,
911
                    metadata_schema.clone(),
912
                    duckdb_memory_cache_limit.clone(),
913
                    maintenance_target_file_size.clone(),
914
                    expire_snapshots_older_than.clone(),
915
                );
916
                validator.validate(ctx).await
917
            }
918
        }
919
    }
6✔
920
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc