• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 25202951682

01 May 2026 04:58AM UTC coverage: 76.898% (+0.5%) from 76.407%
25202951682

Pull #658

github

web-flow
Merge 81fbd1062 into 86d525935
Pull Request #658: Add ClickHouse destination

1450 of 1736 new or added lines in 17 files covered. (83.53%)

7 existing lines in 5 files now uncovered.

29489 of 38348 relevant lines covered (76.9%)

911.69 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.94
/etl-api/src/validation/validators.rs
1
//! Built-in validators for ETL pipeline and destination prerequisites.
2

3
use std::collections::HashMap;
4

5
use async_trait::async_trait;
6
use etl::store::both::memory::MemoryStore;
7
use etl_config::{SerializableSecretString, parse_ducklake_url};
8
use etl_destinations::{
9
    bigquery::BigQueryClient,
10
    clickhouse::ClickHouseClient,
11
    ducklake::{DuckLakeDestination, S3Config as DucklakeS3Config},
12
    iceberg::{IcebergClient, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_SECRET_ACCESS_KEY},
13
};
14
use secrecy::ExposeSecret;
15
use sqlx::FromRow;
16
use url::Url;
17

18
use super::{ValidationContext, ValidationError, ValidationFailure, Validator};
19
use crate::configs::{
20
    destination::{FullApiDestinationConfig, FullApiIcebergConfig},
21
    pipeline::FullApiPipelineConfig,
22
};
23

24
const ETL_SCHEMA_NAME: &str = "etl";
25

26
/// Validates the connected source role profile for ETL.
27
#[derive(Debug)]
28
pub struct SourceValidator;
29

30
#[derive(Debug, FromRow)]
31
struct SourceRoleAudit {
32
    rolcanlogin: bool,
33
    rolreplication: bool,
34
    rolbypassrls: bool,
35
    rolcreaterole: bool,
36
    rolcreatedb: bool,
37
    rolinherit: bool,
38
    rolvaliduntil_is_null: bool,
39
    etl_schema_exists: bool,
40
    etl_schema_usage: Option<bool>,
41
    etl_schema_create: Option<bool>,
42
    controls_all_existing_etl_tables: Option<bool>,
43
    can_create_schema_if_missing: Option<bool>,
44
}
45

46
#[async_trait]
47
impl Validator for SourceValidator {
48
    async fn validate(
49
        &self,
50
        ctx: &ValidationContext,
51
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
28✔
52
        let Some(expected_username) = ctx.trusted_username.as_ref() else {
53
            return Ok(vec![]);
54
        };
55

56
        let source_pool =
57
            ctx.source_pool.as_ref().expect("source pool required for source validation");
58

59
        let current_user: String =
60
            sqlx::query_scalar("select current_user").fetch_one(source_pool).await?;
61

62
        if current_user != *expected_username {
63
            return Ok(vec![ValidationFailure::critical(
64
                "Invalid source username",
65
                format!("connected as '{current_user}' but expected '{expected_username}'"),
66
            )]);
67
        }
68

69
        // This validation is best effort: it relies on catalog metadata and
70
        // privilege checks to confirm the trusted role profile without running
71
        // invasive probes against the customer database.
72
        let audit = sqlx::query_as::<_, SourceRoleAudit>(
73
            r#"
74
            with target as (
75
              -- Load the direct role attributes for the trusted ETL user.
76
              select
77
                oid,
78
                rolcanlogin,
79
                rolreplication,
80
                rolbypassrls,
81
                rolcreaterole,
82
                rolcreatedb,
83
                rolinherit,
84
                rolvaliduntil is null as rolvaliduntil_is_null
85
              from pg_roles
86
              where rolname = $1
87
            ),
88
            etl_schema as (
89
              -- Check whether the etl schema already exists.
90
              select oid
91
              from pg_namespace
92
              where nspname = $2
93
            ),
94
            etl_tables as (
95
              -- List the existing tables in the etl schema, if present.
96
              select
97
                c.relowner
98
              from pg_class c
99
              join etl_schema s on s.oid = c.relnamespace
100
              where c.relkind in ('r', 'p')
101
            ),
102
            etl_table_ownership as (
103
              -- Determine whether the trusted role controls every existing ETL table.
104
              -- pg_has_role(..., 'USAGE') means the owning role's privileges are
105
              -- immediately available without requiring SET ROLE, which matches
106
              -- how ETL connects and operates.
107
              select
108
                coalesce(bool_and(pg_has_role($1, relowner, 'USAGE')), true)
109
                  as controls_all_existing_etl_tables
110
              from etl_tables
111
            )
112
            select
113
              t.rolcanlogin,
114
              t.rolreplication,
115
              t.rolbypassrls,
116
              t.rolcreaterole,
117
              t.rolcreatedb,
118
              t.rolinherit,
119
              t.rolvaliduntil_is_null,
120
              exists(select 1 from etl_schema) as etl_schema_exists,
121
              case
122
                when exists(select 1 from etl_schema)
123
                then has_schema_privilege($1, $2, 'USAGE')
124
                else null
125
              end as etl_schema_usage,
126
              case
127
                when exists(select 1 from etl_schema)
128
                then has_schema_privilege($1, $2, 'CREATE')
129
                else null
130
              end as etl_schema_create,
131
              case
132
                when exists(select 1 from etl_schema)
133
                then (select controls_all_existing_etl_tables from etl_table_ownership)
134
                else null
135
              end as controls_all_existing_etl_tables,
136
              case
137
                when not exists(select 1 from etl_schema)
138
                then has_database_privilege($1, current_database(), 'CREATE')
139
                else null
140
              end as can_create_schema_if_missing
141
            from target t
142
            "#,
143
        )
144
        .bind(expected_username)
145
        .bind(ETL_SCHEMA_NAME)
146
        .fetch_optional(source_pool)
147
        .await?;
148

149
        let Some(audit) = audit else {
150
            return Ok(vec![ValidationFailure::critical(
151
                "Invalid source role attributes",
152
                "role not found",
153
            )]);
154
        };
155

156
        let has_required_role_attributes = audit.rolcanlogin
157
            && audit.rolreplication
158
            && audit.rolbypassrls
159
            && !audit.rolcreaterole
160
            && !audit.rolcreatedb
161
            && audit.rolinherit
162
            && audit.rolvaliduntil_is_null;
163

164
        let mut failures = Vec::new();
165
        if !has_required_role_attributes {
166
            failures.push(ValidationFailure::critical(
167
                "Invalid source role attributes",
168
                "The source database does not grant the trusted username role all permissions ETL \
169
                 needs to work properly.",
170
            ));
171
        }
172

173
        let has_required_etl_schema_permissions = if audit.etl_schema_exists {
174
            audit.etl_schema_usage == Some(true)
175
                && audit.etl_schema_create == Some(true)
176
                && audit.controls_all_existing_etl_tables == Some(true)
177
        } else {
178
            audit.can_create_schema_if_missing == Some(true)
179
        };
180

181
        if !has_required_etl_schema_permissions {
182
            failures.push(ValidationFailure::critical(
183
                "Invalid source etl schema permissions",
184
                format!(
185
                    "The source database does not grant the trusted username role all permissions \
186
                     ETL needs to manage schema {ETL_SCHEMA_NAME} properly."
187
                ),
188
            ));
189
        }
190

191
        Ok(failures)
192
    }
28✔
193
}
194

195
/// Validates that the required publication exists in the source database.
196
#[derive(Debug)]
197
pub struct PublicationExistsValidator {
198
    publication_name: String,
199
}
200

201
impl PublicationExistsValidator {
202
    pub fn new(publication_name: String) -> Self {
9✔
203
        Self { publication_name }
9✔
204
    }
9✔
205
}
206

207
#[async_trait]
208
impl Validator for PublicationExistsValidator {
209
    async fn validate(
210
        &self,
211
        ctx: &ValidationContext,
212
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
213
        let source_pool =
214
            ctx.source_pool.as_ref().expect("source pool required for publication validation");
215

216
        let exists: bool =
217
            sqlx::query_scalar("select exists(select 1 from pg_publication where pubname = $1)")
218
                .bind(&self.publication_name)
219
                .fetch_one(source_pool)
220
                .await?;
221

222
        if exists {
223
            Ok(vec![])
224
        } else {
225
            Ok(vec![ValidationFailure::critical(
226
                "Publication Not Found",
227
                format!(
228
                    "Publication '{}' does not exist in the source database. Create it with: \
229
                     CREATE PUBLICATION {} FOR TABLE <table_name>, ...",
230
                    self.publication_name, self.publication_name
231
                ),
232
            )])
233
        }
234
    }
9✔
235
}
236

237
/// Validates that there are enough free replication slots for the pipeline.
238
#[derive(Debug)]
239
pub struct ReplicationSlotsValidator {
240
    max_table_sync_workers: u16,
241
}
242

243
impl ReplicationSlotsValidator {
244
    pub fn new(max_table_sync_workers: u16) -> Self {
9✔
245
        Self { max_table_sync_workers }
9✔
246
    }
9✔
247
}
248

249
#[async_trait]
250
impl Validator for ReplicationSlotsValidator {
251
    async fn validate(
252
        &self,
253
        ctx: &ValidationContext,
254
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
255
        let source_pool = ctx
256
            .source_pool
257
            .as_ref()
258
            .expect("source pool required for replication slots validation");
259

260
        let max_slots: i32 = sqlx::query_scalar(
261
            "select setting::int from pg_settings where name = 'max_replication_slots'",
262
        )
263
        .fetch_one(source_pool)
264
        .await?;
265

266
        let used_slots: i64 = sqlx::query_scalar("select count(*) from pg_replication_slots")
267
            .fetch_one(source_pool)
268
            .await?;
269

270
        let free_slots = max_slots as i64 - used_slots;
271
        // We need 1 slot for the apply worker plus at most `max_table_sync_workers`
272
        // other slots for table sync workers.
273
        let required_slots = self.max_table_sync_workers as i64 + 1;
274

275
        if required_slots <= free_slots {
276
            Ok(vec![])
277
        } else {
278
            Ok(vec![ValidationFailure::critical(
279
                "Insufficient Replication Slots",
280
                format!(
281
                    "Not enough replication slots available.\nFound {free_slots} free slots, but \
282
                     {required_slots} are required at most during initial table copy \
283
                     ({used_slots}/{max_slots} currently in use).\nOnce all tables are copied, \
284
                     only 1 slot will be used.\n\nPlease verify:\n(1) max_replication_slots in \
285
                     postgresql.conf is sufficient\n(2) Unused replication slots can be \
286
                     removed\n(3) max_table_sync_workers can be reduced if needed",
287
                ),
288
            )])
289
        }
290
    }
9✔
291
}
292

293
/// Validates that the WAL level is set to 'logical' for replication.
294
#[derive(Debug)]
295
pub struct WalLevelValidator;
296

297
#[async_trait]
298
impl Validator for WalLevelValidator {
299
    async fn validate(
300
        &self,
301
        ctx: &ValidationContext,
302
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
303
        let source_pool =
304
            ctx.source_pool.as_ref().expect("source pool required for WAL level validation");
305

306
        let wal_level: String = sqlx::query_scalar("select current_setting('wal_level')")
307
            .fetch_one(source_pool)
308
            .await?;
309

310
        if wal_level == "logical" {
311
            Ok(vec![])
312
        } else {
313
            Ok(vec![ValidationFailure::critical(
314
                "Invalid WAL Level",
315
                format!(
316
                    "WAL level is set to '{wal_level}', but must be 'logical' for replication. \
317
                     Update postgresql.conf with: wal_level = 'logical' and restart PostgreSQL"
318
                ),
319
            )])
320
        }
321
    }
9✔
322
}
323

324
/// Validates that the database user has replication permissions.
325
#[derive(Debug)]
326
pub struct ReplicationPermissionsValidator;
327

328
#[async_trait]
329
impl Validator for ReplicationPermissionsValidator {
330
    async fn validate(
331
        &self,
332
        ctx: &ValidationContext,
333
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
334
        let source_pool = ctx
335
            .source_pool
336
            .as_ref()
337
            .expect("source pool required for replication permissions validation");
338

339
        // Check if user is superuser OR has replication privilege
340
        let has_permission: bool = sqlx::query_scalar(
341
            "select rolsuper or rolreplication from pg_roles where rolname = current_user",
342
        )
343
        .fetch_one(source_pool)
344
        .await?;
345

346
        if has_permission {
347
            Ok(vec![])
348
        } else {
349
            Ok(vec![ValidationFailure::critical(
350
                "Missing Replication Permission",
351
                "The database user does not have replication privileges",
352
            )])
353
        }
354
    }
9✔
355
}
356

357
/// Validates that a publication contains at least one table.
358
#[derive(Debug)]
359
pub struct PublicationHasTablesValidator {
360
    publication_name: String,
361
}
362

363
impl PublicationHasTablesValidator {
364
    pub fn new(publication_name: String) -> Self {
9✔
365
        Self { publication_name }
9✔
366
    }
9✔
367
}
368

369
#[async_trait]
370
impl Validator for PublicationHasTablesValidator {
371
    async fn validate(
372
        &self,
373
        ctx: &ValidationContext,
374
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
375
        let source_pool = ctx
376
            .source_pool
377
            .as_ref()
378
            .expect("source pool required for publication tables validation");
379

380
        // Check if publication publishes all tables or has specific tables
381
        let result: Option<(bool, i64)> = sqlx::query_as(
382
            r#"
383
            select
384
                p.puballtables,
385
                (select count(*) from pg_publication_tables pt where pt.pubname = p.pubname)
386
            from pg_publication p
387
            where p.pubname = $1
388
            "#,
389
        )
390
        .bind(&self.publication_name)
391
        .fetch_optional(source_pool)
392
        .await?;
393

394
        // If publication doesn't exist, skip this check (PublicationExistsValidator
395
        // handles it)
396
        let Some((puballtables, table_count)) = result else {
397
            return Ok(vec![]);
398
        };
399

400
        if puballtables || table_count > 0 {
401
            Ok(vec![])
402
        } else {
403
            Ok(vec![ValidationFailure::critical(
404
                "Publication Empty",
405
                format!(
406
                    "Publication '{}' exists but contains no tables.\n\nAdd tables with: ALTER \
407
                     PUBLICATION {} ADD TABLE <table_name>",
408
                    self.publication_name, self.publication_name
409
                ),
410
            )])
411
        }
412
    }
9✔
413
}
414

415
/// Validates that all tables in a publication have primary keys.
416
#[derive(Debug)]
417
pub struct PrimaryKeysValidator {
418
    publication_name: String,
419
}
420

421
impl PrimaryKeysValidator {
422
    pub fn new(publication_name: String) -> Self {
9✔
423
        Self { publication_name }
9✔
424
    }
9✔
425
}
426

427
#[async_trait]
428
impl Validator for PrimaryKeysValidator {
429
    async fn validate(
430
        &self,
431
        ctx: &ValidationContext,
432
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
433
        let source_pool =
434
            ctx.source_pool.as_ref().expect("source pool required for primary keys validation");
435

436
        // Find tables without primary keys using pg_publication_rel for direct OID
437
        // access
438
        let tables_without_pk: Vec<String> = sqlx::query_scalar(
439
            r#"
440
            select n.nspname || '.' || c.relname
441
            from pg_publication_rel pr
442
            join pg_publication p on p.oid = pr.prpubid
443
            join pg_class c on c.oid = pr.prrelid
444
            join pg_namespace n on n.oid = c.relnamespace
445
            where p.pubname = $1
446
              and not exists (
447
                select 1
448
                from pg_constraint con
449
                where con.conrelid = pr.prrelid
450
                  and con.contype = 'p'
451
              )
452
            order by n.nspname, c.relname
453
            limit 100
454
            "#,
455
        )
456
        .bind(&self.publication_name)
457
        .fetch_all(source_pool)
458
        .await?;
459

460
        if tables_without_pk.is_empty() {
461
            Ok(vec![])
462
        } else {
463
            Ok(vec![ValidationFailure::warning(
464
                "Tables Missing Primary Keys",
465
                format!(
466
                    "Tables without primary keys: {}\n\nPrimary keys are required for UPDATE and \
467
                     DELETE replication.",
468
                    tables_without_pk.join(", ")
469
                ),
470
            )])
471
        }
472
    }
9✔
473
}
474

475
/// Validates that tables in a publication don't have generated columns.
476
#[derive(Debug)]
477
pub struct GeneratedColumnsValidator {
478
    publication_name: String,
479
}
480

481
impl GeneratedColumnsValidator {
482
    pub fn new(publication_name: String) -> Self {
9✔
483
        Self { publication_name }
9✔
484
    }
9✔
485
}
486

487
#[async_trait]
488
impl Validator for GeneratedColumnsValidator {
489
    async fn validate(
490
        &self,
491
        ctx: &ValidationContext,
492
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
493
        let source_pool = ctx
494
            .source_pool
495
            .as_ref()
496
            .expect("source pool required for generated columns validation");
497

498
        // Find tables with generated columns using pg_publication_rel for direct OID
499
        // access
500
        let tables_with_generated: Vec<String> = sqlx::query_scalar(
501
            r#"
502
            select distinct n.nspname || '.' || c.relname
503
            from pg_publication_rel pr
504
            join pg_publication p on p.oid = pr.prpubid
505
            join pg_class c on c.oid = pr.prrelid
506
            join pg_namespace n on n.oid = c.relnamespace
507
            where p.pubname = $1
508
              and exists (
509
                select 1
510
                from pg_attribute a
511
                where a.attrelid = pr.prrelid
512
                  and a.attnum > 0
513
                  and not a.attisdropped
514
                  and a.attgenerated != ''
515
              )
516
            order by 1
517
            limit 100
518
            "#,
519
        )
520
        .bind(&self.publication_name)
521
        .fetch_all(source_pool)
522
        .await?;
523

524
        if tables_with_generated.is_empty() {
525
            Ok(vec![])
526
        } else {
527
            Ok(vec![ValidationFailure::warning(
528
                "Tables With Generated Columns",
529
                format!(
530
                    "Tables with generated columns: {}\n\nGenerated columns cannot be replicated \
531
                     and will be excluded from the destination.",
532
                    tables_with_generated.join(", ")
533
                ),
534
            )])
535
        }
536
    }
9✔
537
}
538

539
/// Composite validator for pipeline prerequisites.
540
#[derive(Debug)]
541
pub struct PipelineValidator {
542
    config: FullApiPipelineConfig,
543
}
544

545
impl PipelineValidator {
546
    pub fn new(config: FullApiPipelineConfig) -> Self {
9✔
547
        Self { config }
9✔
548
    }
9✔
549

550
    fn sub_validators(&self) -> Vec<Box<dyn Validator>> {
9✔
551
        let max_table_sync_workers = self.config.max_table_sync_workers.unwrap_or(4);
9✔
552
        let publication_name = self.config.publication_name.clone();
9✔
553

554
        vec![
9✔
555
            Box::new(WalLevelValidator),
9✔
556
            Box::new(ReplicationPermissionsValidator),
9✔
557
            Box::new(PublicationExistsValidator::new(publication_name.clone())),
9✔
558
            Box::new(PublicationHasTablesValidator::new(publication_name.clone())),
9✔
559
            Box::new(PrimaryKeysValidator::new(publication_name.clone())),
9✔
560
            Box::new(GeneratedColumnsValidator::new(publication_name)),
9✔
561
            Box::new(ReplicationSlotsValidator::new(max_table_sync_workers)),
9✔
562
        ]
563
    }
9✔
564
}
565

566
#[async_trait]
567
impl Validator for PipelineValidator {
568
    async fn validate(
569
        &self,
570
        ctx: &ValidationContext,
571
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
572
        let mut failures = Vec::new();
573

574
        for validator in self.sub_validators() {
575
            failures.extend(validator.validate(ctx).await?);
576
        }
577

578
        Ok(failures)
579
    }
9✔
580
}
581

582
/// Validates BigQuery destination connectivity and dataset accessibility.
583
#[derive(Debug)]
584
struct BigQueryValidator {
585
    project_id: String,
586
    dataset_id: String,
587
    service_account_key: String,
588
}
589

590
impl BigQueryValidator {
591
    fn new(project_id: String, dataset_id: String, service_account_key: String) -> Self {
4✔
592
        Self { project_id, dataset_id, service_account_key }
4✔
593
    }
4✔
594
}
595

596
#[async_trait]
597
impl Validator for BigQueryValidator {
598
    async fn validate(
599
        &self,
600
        _ctx: &ValidationContext,
601
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
4✔
602
        let Ok(client) =
603
            BigQueryClient::new_with_key(self.project_id.clone(), &self.service_account_key, 1)
604
                .await
605
        else {
606
            return Ok(vec![ValidationFailure::critical(
607
                "BigQuery Authentication Failed",
608
                "Unable to authenticate with BigQuery.\n\nPlease verify:\n(1) The service account \
609
                 key is valid JSON\n(2) The key has not expired or been revoked\n(3) The project \
610
                 ID is correct",
611
            )]);
612
        };
613

614
        match client.dataset_exists(&self.dataset_id).await {
615
            Ok(true) => Ok(vec![]),
616
            Ok(false) => Ok(vec![ValidationFailure::critical(
617
                "BigQuery Dataset Not Found",
618
                format!(
619
                    "Dataset '{}' does not exist in project '{}'.\n\nPlease verify:\n(1) The \
620
                     dataset name is correct\n(2) The dataset exists in the specified \
621
                     project\n(3) The service account has permission to access it",
622
                    self.dataset_id, self.project_id
623
                ),
624
            )]),
625
            Err(_) => Ok(vec![ValidationFailure::critical(
626
                "BigQuery Connection Failed",
627
                "Unable to connect to BigQuery.\n\nPlease verify:\n(1) Network connectivity to \
628
                 Google Cloud\n(2) The service account has the required permissions (BigQuery \
629
                 Data Editor, BigQuery Job User)\n(3) BigQuery API is enabled for your project",
630
            )]),
631
        }
632
    }
4✔
633
}
634

635
/// Validates Clickhouse destination connectivity and dataset accessibility.
636
#[derive(Debug)]
637
struct ClickHouseValidator {
638
    url: Url,
639
    user: String,
640
    password: Option<SerializableSecretString>,
641
    database: String,
642
}
643

644
impl ClickHouseValidator {
NEW
645
    fn new(
×
NEW
646
        url: Url,
×
NEW
647
        user: String,
×
NEW
648
        password: Option<SerializableSecretString>,
×
NEW
649
        database: String,
×
NEW
650
    ) -> Self {
×
NEW
651
        Self { url, user, password, database }
×
NEW
652
    }
×
653
}
654

655
#[async_trait]
656
impl Validator for ClickHouseValidator {
657
    async fn validate(
658
        &self,
659
        _ctx: &ValidationContext,
NEW
660
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
×
661
        let client = ClickHouseClient::new(
662
            self.url.clone(),
663
            self.user.clone(),
NEW
664
            self.password.as_ref().map(|password| password.expose_secret().to_owned()),
×
665
            self.database.clone(),
666
        );
667
        match client.validate_connectivity().await {
668
            Ok(_) => Ok(Vec::new()),
669
            Err(_) => Ok(vec![ValidationFailure::critical(
670
                "ClickHouse Connection Failed",
671
                "Unable to create clickhouse client.\n\nPlease verify:\n(1) The url is valid and \
672
                 accessible\n(2) The username is correct\n(3) You set the right password\n(4) You \
673
                 set the right database name
674
                    ",
675
            )]),
676
        }
NEW
677
    }
×
678
}
679

680
/// Validates Iceberg destination connectivity.
681
#[derive(Debug)]
682
struct IcebergValidator {
683
    config: FullApiIcebergConfig,
684
}
685

686
/// Validates DuckLake destination connectivity.
687
#[derive(Debug)]
688
struct DucklakeValidator {
689
    catalog_url: String,
690
    data_path: String,
691
    pool_size: u32,
692
    s3_access_key_id: Option<String>,
693
    s3_secret_access_key: Option<String>,
694
    s3_region: Option<String>,
695
    s3_endpoint: Option<String>,
696
    s3_url_style: Option<String>,
697
    s3_use_ssl: Option<bool>,
698
    metadata_schema: Option<String>,
699
    duckdb_memory_cache_limit: Option<String>,
700
    maintenance_target_file_size: Option<String>,
701
    expire_snapshots_older_than: Option<String>,
702
}
703

704
impl DucklakeValidator {
705
    #[allow(clippy::too_many_arguments)]
706
    fn new(
×
707
        catalog_url: String,
×
708
        data_path: String,
×
709
        pool_size: u32,
×
710
        s3_access_key_id: Option<String>,
×
711
        s3_secret_access_key: Option<String>,
×
712
        s3_region: Option<String>,
×
713
        s3_endpoint: Option<String>,
×
714
        s3_url_style: Option<String>,
×
715
        s3_use_ssl: Option<bool>,
×
716
        metadata_schema: Option<String>,
×
717
        duckdb_memory_cache_limit: Option<String>,
×
718
        maintenance_target_file_size: Option<String>,
×
719
        expire_snapshots_older_than: Option<String>,
×
720
    ) -> Self {
×
721
        Self {
×
722
            catalog_url,
×
723
            data_path,
×
724
            pool_size,
×
725
            s3_access_key_id,
×
726
            s3_secret_access_key,
×
727
            s3_region,
×
728
            s3_endpoint,
×
729
            s3_url_style,
×
730
            s3_use_ssl,
×
731
            metadata_schema,
×
732
            duckdb_memory_cache_limit,
×
733
            maintenance_target_file_size,
×
734
            expire_snapshots_older_than,
×
735
        }
×
736
    }
×
737
}
738

739
#[async_trait]
740
impl Validator for DucklakeValidator {
741
    async fn validate(
742
        &self,
743
        _ctx: &ValidationContext,
744
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
×
745
        match (&self.s3_access_key_id, &self.s3_secret_access_key) {
746
            (Some(_), None) | (None, Some(_)) => {
747
                return Ok(vec![ValidationFailure::critical(
748
                    "Ducklake S3 Configuration Invalid",
749
                    "DuckLake S3 credentials must include both access key ID and secret access \
750
                     key.",
751
                )]);
752
            }
753
            _ => {}
754
        }
755

756
        let catalog_url = match parse_ducklake_url(&self.catalog_url) {
757
            Ok(url) => url,
758
            Err(error) => {
759
                return Ok(vec![ValidationFailure::critical(
760
                    "Ducklake Catalog Url Invalid",
761
                    error.to_string(),
762
                )]);
763
            }
764
        };
765

766
        let data_path = match parse_ducklake_url(&self.data_path) {
767
            Ok(url) => url,
768
            Err(error) => {
769
                return Ok(vec![ValidationFailure::critical(
770
                    "Ducklake Data Path Invalid",
771
                    error.to_string(),
772
                )]);
773
            }
774
        };
775

776
        let s3_config = self.s3_access_key_id.clone().map(|access_key_id| DucklakeS3Config {
777
            access_key_id,
×
778
            secret_access_key: self
×
779
                .s3_secret_access_key
×
780
                .clone()
×
781
                .expect("ducklake s3 secret access key should be present"),
×
782
            region: self.s3_region.clone().unwrap_or_else(|| "us-east-1".to_owned()),
×
783
            endpoint: self.s3_endpoint.clone(),
×
784
            url_style: self.s3_url_style.clone().unwrap_or_else(|| "path".to_owned()),
×
785
            use_ssl: self.s3_use_ssl.unwrap_or(false),
×
786
        });
×
787

788
        match DuckLakeDestination::new(
789
            catalog_url,
790
            data_path,
791
            self.pool_size,
792
            s3_config,
793
            self.metadata_schema.clone(),
794
            self.duckdb_memory_cache_limit.clone(),
795
            self.maintenance_target_file_size.clone(),
796
            self.expire_snapshots_older_than.clone(),
797
            MemoryStore::new(),
798
        )
799
        .await
800
        {
801
            Ok(_) => Ok(vec![]),
802
            Err(_) => Ok(vec![ValidationFailure::critical(
803
                "Ducklake Connection Failed",
804
                "Unable to connect to DuckLake.\n\nPlease verify:\n(1) The catalog URL and data \
805
                 path are valid and reachable\n(2) DuckLake catalog credentials are embedded \
806
                 correctly in the catalog URL\n(3) The S3-compatible credentials and endpoint are \
807
                 correct when using object storage",
808
            )]),
809
        }
810
    }
×
811
}
812

813
impl IcebergValidator {
814
    fn new(config: FullApiIcebergConfig) -> Self {
2✔
815
        Self { config }
2✔
816
    }
2✔
817
}
818

819
#[async_trait]
820
impl Validator for IcebergValidator {
821
    async fn validate(
822
        &self,
823
        ctx: &ValidationContext,
824
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
2✔
825
        let client = match &self.config {
826
            FullApiIcebergConfig::Supabase {
827
                project_ref,
828
                warehouse_name,
829
                catalog_token,
830
                s3_access_key_id,
831
                s3_secret_access_key,
832
                s3_region,
833
                ..
834
            } => {
835
                IcebergClient::new_with_supabase_catalog(
836
                    project_ref,
837
                    ctx.environment.get_supabase_domain(),
838
                    catalog_token.expose_secret().to_owned(),
839
                    warehouse_name.clone(),
840
                    s3_access_key_id.expose_secret().to_owned(),
841
                    s3_secret_access_key.expose_secret().to_owned(),
842
                    s3_region.clone(),
843
                )
844
                .await
845
            }
846
            FullApiIcebergConfig::Rest {
847
                catalog_uri,
848
                warehouse_name,
849
                s3_access_key_id,
850
                s3_secret_access_key,
851
                s3_endpoint,
852
                ..
853
            } => {
854
                let mut props = HashMap::new();
855
                props.insert(
856
                    S3_ACCESS_KEY_ID.to_owned(),
857
                    s3_access_key_id.expose_secret().to_owned(),
858
                );
859
                props.insert(
860
                    S3_SECRET_ACCESS_KEY.to_owned(),
861
                    s3_secret_access_key.expose_secret().to_owned(),
862
                );
863
                props.insert(S3_ENDPOINT.to_owned(), s3_endpoint.clone());
864

865
                IcebergClient::new_with_rest_catalog(
866
                    catalog_uri.clone(),
867
                    warehouse_name.clone(),
868
                    props,
869
                )
870
                .await
871
            }
872
        };
873
        let Ok(client) = client else {
874
            return Ok(vec![ValidationFailure::critical(
875
                "Iceberg Authentication Failed",
876
                "Unable to authenticate with Iceberg.\n\nPlease verify:\n(1) The catalog token is \
877
                 valid and has not expired\n(2) The S3 access key and secret key are correct\n(3) \
878
                 The catalog URI is properly formatted",
879
            )]);
880
        };
881

882
        match client.validate_connectivity().await {
883
            Ok(()) => Ok(vec![]),
884
            Err(_) => Ok(vec![ValidationFailure::critical(
885
                "Iceberg Connection Failed",
886
                "Unable to connect to Iceberg catalog.\n\nPlease verify:\n(1) Network \
887
                 connectivity to the catalog and S3\n(2) The warehouse name exists in the \
888
                 catalog\n(3) You have the required permissions to access the warehouse\n(4) The \
889
                 S3 endpoint is reachable",
890
            )]),
891
        }
892
    }
2✔
893
}
894

895
/// Composite validator for destination prerequisites.
896
#[derive(Debug)]
897
pub struct DestinationValidator {
898
    config: FullApiDestinationConfig,
899
}
900

901
impl DestinationValidator {
902
    pub fn new(config: FullApiDestinationConfig) -> Self {
6✔
903
        Self { config }
6✔
904
    }
6✔
905
}
906

907
#[async_trait]
908
impl Validator for DestinationValidator {
909
    async fn validate(
910
        &self,
911
        ctx: &ValidationContext,
912
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
6✔
913
        match &self.config {
914
            FullApiDestinationConfig::BigQuery {
915
                project_id,
916
                dataset_id,
917
                service_account_key,
918
                ..
919
            } => {
920
                let validator = BigQueryValidator::new(
921
                    project_id.clone(),
922
                    dataset_id.clone(),
923
                    service_account_key.expose_secret().to_owned(),
924
                );
925
                validator.validate(ctx).await
926
            }
927
            FullApiDestinationConfig::ClickHouse { url, user, password, database } => {
928
                let validator = ClickHouseValidator::new(
929
                    url.clone(),
930
                    user.clone(),
931
                    password.clone(),
932
                    database.clone(),
933
                );
934
                validator.validate(ctx).await
935
            }
936
            FullApiDestinationConfig::Iceberg { config } => {
937
                let validator = IcebergValidator::new(config.clone());
938
                validator.validate(ctx).await
939
            }
940
            FullApiDestinationConfig::Ducklake {
941
                catalog_url,
942
                data_path,
943
                pool_size,
944
                s3_access_key_id,
945
                s3_secret_access_key,
946
                s3_region,
947
                s3_endpoint,
948
                s3_url_style,
949
                s3_use_ssl,
950
                metadata_schema,
951
                duckdb_memory_cache_limit,
952
                maintenance_target_file_size,
953
                expire_snapshots_older_than,
954
            } => {
955
                let validator = DucklakeValidator::new(
956
                    catalog_url.clone(),
957
                    data_path.clone(),
958
                    pool_size.unwrap_or(
959
                        etl_config::shared::DestinationConfig::DEFAULT_DUCKLAKE_POOL_SIZE,
960
                    ),
961
                    s3_access_key_id.as_ref().map(|value| value.expose_secret().to_owned()),
×
962
                    s3_secret_access_key.as_ref().map(|value| value.expose_secret().to_owned()),
×
963
                    s3_region.clone(),
964
                    s3_endpoint.clone(),
965
                    s3_url_style.clone(),
966
                    *s3_use_ssl,
967
                    metadata_schema.clone(),
968
                    duckdb_memory_cache_limit.clone(),
969
                    maintenance_target_file_size.clone(),
970
                    expire_snapshots_older_than.clone(),
971
                );
972
                validator.validate(ctx).await
973
            }
974
        }
975
    }
6✔
976
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc