• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 23802622604

31 Mar 2026 02:27PM UTC coverage: 78.225% (+0.5%) from 77.743%
23802622604

Pull #627

github

web-flow
Merge 47f7c3c5b into 33aba3583
Pull Request #627: Add ducklake destination

4931 of 6213 new or added lines in 24 files covered. (79.37%)

8 existing lines in 5 files now uncovered.

24159 of 30884 relevant lines covered (78.22%)

1076.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.53
/etl-api/src/validation/validators.rs
1
//! Built-in validators for ETL pipeline and destination prerequisites.
2

3
use std::collections::HashMap;
4

5
use async_trait::async_trait;
6
use etl::store::both::memory::MemoryStore;
7
use etl_config::parse_ducklake_url;
8
use etl_destinations::bigquery::BigQueryClient;
9
use etl_destinations::ducklake::{DuckLakeDestination, S3Config as DucklakeS3Config};
10
use etl_destinations::iceberg::{
11
    IcebergClient, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_SECRET_ACCESS_KEY,
12
};
13
use secrecy::ExposeSecret;
14
use sqlx::FromRow;
15

16
use crate::configs::destination::{FullApiDestinationConfig, FullApiIcebergConfig};
17
use crate::configs::pipeline::FullApiPipelineConfig;
18

19
use super::{ValidationContext, ValidationError, ValidationFailure, Validator};
20

21
const ETL_SCHEMA_NAME: &str = "etl";
22

23
/// Validates the connected source role profile for ETL.
24
#[derive(Debug)]
25
pub struct SourceValidator;
26

27
#[derive(Debug, FromRow)]
28
struct SourceRoleAudit {
29
    rolcanlogin: bool,
30
    rolreplication: bool,
31
    rolbypassrls: bool,
32
    rolcreaterole: bool,
33
    rolcreatedb: bool,
34
    rolinherit: bool,
35
    rolvaliduntil_is_null: bool,
36
    etl_schema_exists: bool,
37
    etl_schema_usage: Option<bool>,
38
    etl_schema_create: Option<bool>,
39
    controls_all_existing_etl_tables: Option<bool>,
40
    can_create_schema_if_missing: Option<bool>,
41
}
42

43
#[async_trait]
44
impl Validator for SourceValidator {
45
    async fn validate(
46
        &self,
47
        ctx: &ValidationContext,
48
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
26✔
49
        let Some(expected_username) = ctx.trusted_username.as_ref() else {
50
            return Ok(vec![]);
51
        };
52

53
        let source_pool = ctx
54
            .source_pool
55
            .as_ref()
56
            .expect("source pool required for source validation");
57

58
        let current_user: String = sqlx::query_scalar("select current_user")
59
            .fetch_one(source_pool)
60
            .await?;
61

62
        if current_user != *expected_username {
63
            return Ok(vec![ValidationFailure::critical(
64
                "Invalid source username",
65
                format!("connected as '{current_user}' but expected '{expected_username}'"),
66
            )]);
67
        }
68

69
        // This validation is best effort: it relies on catalog metadata and
70
        // privilege checks to confirm the trusted role profile without running
71
        // invasive probes against the customer database.
72
        let audit = sqlx::query_as::<_, SourceRoleAudit>(
73
            r#"
74
            with target as (
75
              -- Load the direct role attributes for the trusted ETL user.
76
              select
77
                oid,
78
                rolcanlogin,
79
                rolreplication,
80
                rolbypassrls,
81
                rolcreaterole,
82
                rolcreatedb,
83
                rolinherit,
84
                rolvaliduntil is null as rolvaliduntil_is_null
85
              from pg_roles
86
              where rolname = $1
87
            ),
88
            etl_schema as (
89
              -- Check whether the etl schema already exists.
90
              select oid
91
              from pg_namespace
92
              where nspname = $2
93
            ),
94
            etl_tables as (
95
              -- List the existing tables in the etl schema, if present.
96
              select
97
                c.relowner
98
              from pg_class c
99
              join etl_schema s on s.oid = c.relnamespace
100
              where c.relkind in ('r', 'p')
101
            ),
102
            etl_table_ownership as (
103
              -- Determine whether the trusted role controls every existing ETL table.
104
              -- pg_has_role(..., 'USAGE') means the owning role's privileges are
105
              -- immediately available without requiring SET ROLE, which matches
106
              -- how ETL connects and operates.
107
              select
108
                coalesce(bool_and(pg_has_role($1, relowner, 'USAGE')), true)
109
                  as controls_all_existing_etl_tables
110
              from etl_tables
111
            )
112
            select
113
              t.rolcanlogin,
114
              t.rolreplication,
115
              t.rolbypassrls,
116
              t.rolcreaterole,
117
              t.rolcreatedb,
118
              t.rolinherit,
119
              t.rolvaliduntil_is_null,
120
              exists(select 1 from etl_schema) as etl_schema_exists,
121
              case
122
                when exists(select 1 from etl_schema)
123
                then has_schema_privilege($1, $2, 'USAGE')
124
                else null
125
              end as etl_schema_usage,
126
              case
127
                when exists(select 1 from etl_schema)
128
                then has_schema_privilege($1, $2, 'CREATE')
129
                else null
130
              end as etl_schema_create,
131
              case
132
                when exists(select 1 from etl_schema)
133
                then (select controls_all_existing_etl_tables from etl_table_ownership)
134
                else null
135
              end as controls_all_existing_etl_tables,
136
              case
137
                when not exists(select 1 from etl_schema)
138
                then has_database_privilege($1, current_database(), 'CREATE')
139
                else null
140
              end as can_create_schema_if_missing
141
            from target t
142
            "#,
143
        )
144
        .bind(expected_username)
145
        .bind(ETL_SCHEMA_NAME)
146
        .fetch_optional(source_pool)
147
        .await?;
148

149
        let Some(audit) = audit else {
150
            return Ok(vec![ValidationFailure::critical(
151
                "Invalid source role attributes",
152
                "role not found",
153
            )]);
154
        };
155

156
        let has_required_role_attributes = audit.rolcanlogin
157
            && audit.rolreplication
158
            && audit.rolbypassrls
159
            && !audit.rolcreaterole
160
            && !audit.rolcreatedb
161
            && audit.rolinherit
162
            && audit.rolvaliduntil_is_null;
163

164
        let mut failures = Vec::new();
165
        if !has_required_role_attributes {
166
            failures.push(ValidationFailure::critical(
167
                "Invalid source role attributes",
168
                "The source database does not grant the trusted username role all permissions ETL needs to work properly.",
169
            ));
170
        }
171

172
        let has_required_etl_schema_permissions = if audit.etl_schema_exists {
173
            audit.etl_schema_usage == Some(true)
174
                && audit.etl_schema_create == Some(true)
175
                && audit.controls_all_existing_etl_tables == Some(true)
176
        } else {
177
            audit.can_create_schema_if_missing == Some(true)
178
        };
179

180
        if !has_required_etl_schema_permissions {
181
            failures.push(ValidationFailure::critical(
182
                "Invalid source etl schema permissions",
183
                format!(
184
                    "The source database does not grant the trusted username role all permissions ETL needs to manage schema {ETL_SCHEMA_NAME} properly."
185
                ),
186
            ));
187
        }
188

189
        Ok(failures)
190
    }
26✔
191
}
192

193
/// Validates that the required publication exists in the source database.
194
#[derive(Debug)]
195
pub struct PublicationExistsValidator {
196
    publication_name: String,
197
}
198

199
impl PublicationExistsValidator {
200
    pub fn new(publication_name: String) -> Self {
9✔
201
        Self { publication_name }
9✔
202
    }
9✔
203
}
204

205
#[async_trait]
206
impl Validator for PublicationExistsValidator {
207
    async fn validate(
208
        &self,
209
        ctx: &ValidationContext,
210
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
211
        let source_pool = ctx
212
            .source_pool
213
            .as_ref()
214
            .expect("source pool required for publication validation");
215

216
        let exists: bool =
217
            sqlx::query_scalar("select exists(select 1 from pg_publication where pubname = $1)")
218
                .bind(&self.publication_name)
219
                .fetch_one(source_pool)
220
                .await?;
221

222
        if exists {
223
            Ok(vec![])
224
        } else {
225
            Ok(vec![ValidationFailure::critical(
226
                "Publication Not Found",
227
                format!(
228
                    "Publication '{}' does not exist in the source database. \
229
                    Create it with: CREATE PUBLICATION {} FOR TABLE <table_name>, ...",
230
                    self.publication_name, self.publication_name
231
                ),
232
            )])
233
        }
234
    }
9✔
235
}
236

237
/// Validates that there are enough free replication slots for the pipeline.
238
#[derive(Debug)]
239
pub struct ReplicationSlotsValidator {
240
    max_table_sync_workers: u16,
241
}
242

243
impl ReplicationSlotsValidator {
244
    pub fn new(max_table_sync_workers: u16) -> Self {
9✔
245
        Self {
9✔
246
            max_table_sync_workers,
9✔
247
        }
9✔
248
    }
9✔
249
}
250

251
#[async_trait]
252
impl Validator for ReplicationSlotsValidator {
253
    async fn validate(
254
        &self,
255
        ctx: &ValidationContext,
256
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
257
        let source_pool = ctx
258
            .source_pool
259
            .as_ref()
260
            .expect("source pool required for replication slots validation");
261

262
        let max_slots: i32 = sqlx::query_scalar(
263
            "select setting::int from pg_settings where name = 'max_replication_slots'",
264
        )
265
        .fetch_one(source_pool)
266
        .await?;
267

268
        let used_slots: i64 = sqlx::query_scalar("select count(*) from pg_replication_slots")
269
            .fetch_one(source_pool)
270
            .await?;
271

272
        let free_slots = max_slots as i64 - used_slots;
273
        // We need 1 slot for the apply worker plus at most `max_table_sync_workers` other slots
274
        // for table sync workers.
275
        let required_slots = self.max_table_sync_workers as i64 + 1;
276

277
        if required_slots <= free_slots {
278
            Ok(vec![])
279
        } else {
280
            Ok(vec![ValidationFailure::critical(
281
                "Insufficient Replication Slots",
282
                format!(
283
                    "Not enough replication slots available.\n\
284
                    Found {free_slots} free slots, but {required_slots} are required at most during initial table copy ({used_slots}/{max_slots} currently in use).\n\
285
                    Once all tables are copied, only 1 slot will be used.\n\n\
286
                    Please verify:\n\
287
                    (1) max_replication_slots in postgresql.conf is sufficient\n\
288
                    (2) Unused replication slots can be removed\n\
289
                    (3) max_table_sync_workers can be reduced if needed",
290
                ),
291
            )])
292
        }
293
    }
9✔
294
}
295

296
/// Validates that the WAL level is set to 'logical' for replication.
297
#[derive(Debug)]
298
pub struct WalLevelValidator;
299

300
#[async_trait]
301
impl Validator for WalLevelValidator {
302
    async fn validate(
303
        &self,
304
        ctx: &ValidationContext,
305
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
306
        let source_pool = ctx
307
            .source_pool
308
            .as_ref()
309
            .expect("source pool required for WAL level validation");
310

311
        let wal_level: String = sqlx::query_scalar("select current_setting('wal_level')")
312
            .fetch_one(source_pool)
313
            .await?;
314

315
        if wal_level == "logical" {
316
            Ok(vec![])
317
        } else {
318
            Ok(vec![ValidationFailure::critical(
319
                "Invalid WAL Level",
320
                format!(
321
                    "WAL level is set to '{wal_level}', but must be 'logical' for replication. \
322
                    Update postgresql.conf with: wal_level = 'logical' and restart PostgreSQL"
323
                ),
324
            )])
325
        }
326
    }
9✔
327
}
328

329
/// Validates that the database user has replication permissions.
330
#[derive(Debug)]
331
pub struct ReplicationPermissionsValidator;
332

333
#[async_trait]
334
impl Validator for ReplicationPermissionsValidator {
335
    async fn validate(
336
        &self,
337
        ctx: &ValidationContext,
338
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
339
        let source_pool = ctx
340
            .source_pool
341
            .as_ref()
342
            .expect("source pool required for replication permissions validation");
343

344
        // Check if user is superuser OR has replication privilege
345
        let has_permission: bool = sqlx::query_scalar(
346
            "select rolsuper or rolreplication from pg_roles where rolname = current_user",
347
        )
348
        .fetch_one(source_pool)
349
        .await?;
350

351
        if has_permission {
352
            Ok(vec![])
353
        } else {
354
            Ok(vec![ValidationFailure::critical(
355
                "Missing Replication Permission",
356
                "The database user does not have replication privileges",
357
            )])
358
        }
359
    }
9✔
360
}
361

362
/// Validates that a publication contains at least one table.
363
#[derive(Debug)]
364
pub struct PublicationHasTablesValidator {
365
    publication_name: String,
366
}
367

368
impl PublicationHasTablesValidator {
369
    pub fn new(publication_name: String) -> Self {
9✔
370
        Self { publication_name }
9✔
371
    }
9✔
372
}
373

374
#[async_trait]
375
impl Validator for PublicationHasTablesValidator {
376
    async fn validate(
377
        &self,
378
        ctx: &ValidationContext,
379
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
380
        let source_pool = ctx
381
            .source_pool
382
            .as_ref()
383
            .expect("source pool required for publication tables validation");
384

385
        // Check if publication publishes all tables or has specific tables
386
        let result: Option<(bool, i64)> = sqlx::query_as(
387
            r#"
388
            select
389
                p.puballtables,
390
                (select count(*) from pg_publication_tables pt where pt.pubname = p.pubname)
391
            from pg_publication p
392
            where p.pubname = $1
393
            "#,
394
        )
395
        .bind(&self.publication_name)
396
        .fetch_optional(source_pool)
397
        .await?;
398

399
        // If publication doesn't exist, skip this check (PublicationExistsValidator handles it)
400
        let Some((puballtables, table_count)) = result else {
401
            return Ok(vec![]);
402
        };
403

404
        if puballtables || table_count > 0 {
405
            Ok(vec![])
406
        } else {
407
            Ok(vec![ValidationFailure::critical(
408
                "Publication Empty",
409
                format!(
410
                    "Publication '{}' exists but contains no tables.\n\n\
411
                    Add tables with: ALTER PUBLICATION {} ADD TABLE <table_name>",
412
                    self.publication_name, self.publication_name
413
                ),
414
            )])
415
        }
416
    }
9✔
417
}
418

419
/// Validates that all tables in a publication have primary keys.
420
#[derive(Debug)]
421
pub struct PrimaryKeysValidator {
422
    publication_name: String,
423
}
424

425
impl PrimaryKeysValidator {
426
    pub fn new(publication_name: String) -> Self {
9✔
427
        Self { publication_name }
9✔
428
    }
9✔
429
}
430

431
#[async_trait]
432
impl Validator for PrimaryKeysValidator {
433
    async fn validate(
434
        &self,
435
        ctx: &ValidationContext,
436
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
437
        let source_pool = ctx
438
            .source_pool
439
            .as_ref()
440
            .expect("source pool required for primary keys validation");
441

442
        // Find tables without primary keys using pg_publication_rel for direct OID access
443
        let tables_without_pk: Vec<String> = sqlx::query_scalar(
444
            r#"
445
            select n.nspname || '.' || c.relname
446
            from pg_publication_rel pr
447
            join pg_publication p on p.oid = pr.prpubid
448
            join pg_class c on c.oid = pr.prrelid
449
            join pg_namespace n on n.oid = c.relnamespace
450
            where p.pubname = $1
451
              and not exists (
452
                select 1
453
                from pg_constraint con
454
                where con.conrelid = pr.prrelid
455
                  and con.contype = 'p'
456
              )
457
            order by n.nspname, c.relname
458
            limit 100
459
            "#,
460
        )
461
        .bind(&self.publication_name)
462
        .fetch_all(source_pool)
463
        .await?;
464

465
        if tables_without_pk.is_empty() {
466
            Ok(vec![])
467
        } else {
468
            Ok(vec![ValidationFailure::warning(
469
                "Tables Missing Primary Keys",
470
                format!(
471
                    "Tables without primary keys: {}\n\n\
472
                    Primary keys are required for UPDATE and DELETE replication.",
473
                    tables_without_pk.join(", ")
474
                ),
475
            )])
476
        }
477
    }
9✔
478
}
479

480
/// Validates that tables in a publication don't have generated columns.
481
#[derive(Debug)]
482
pub struct GeneratedColumnsValidator {
483
    publication_name: String,
484
}
485

486
impl GeneratedColumnsValidator {
487
    pub fn new(publication_name: String) -> Self {
9✔
488
        Self { publication_name }
9✔
489
    }
9✔
490
}
491

492
#[async_trait]
493
impl Validator for GeneratedColumnsValidator {
494
    async fn validate(
495
        &self,
496
        ctx: &ValidationContext,
497
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
498
        let source_pool = ctx
499
            .source_pool
500
            .as_ref()
501
            .expect("source pool required for generated columns validation");
502

503
        // Find tables with generated columns using pg_publication_rel for direct OID access
504
        let tables_with_generated: Vec<String> = sqlx::query_scalar(
505
            r#"
506
            select distinct n.nspname || '.' || c.relname
507
            from pg_publication_rel pr
508
            join pg_publication p on p.oid = pr.prpubid
509
            join pg_class c on c.oid = pr.prrelid
510
            join pg_namespace n on n.oid = c.relnamespace
511
            where p.pubname = $1
512
              and exists (
513
                select 1
514
                from pg_attribute a
515
                where a.attrelid = pr.prrelid
516
                  and a.attnum > 0
517
                  and not a.attisdropped
518
                  and a.attgenerated != ''
519
              )
520
            order by 1
521
            limit 100
522
            "#,
523
        )
524
        .bind(&self.publication_name)
525
        .fetch_all(source_pool)
526
        .await?;
527

528
        if tables_with_generated.is_empty() {
529
            Ok(vec![])
530
        } else {
531
            Ok(vec![ValidationFailure::warning(
532
                "Tables With Generated Columns",
533
                format!(
534
                    "Tables with generated columns: {}\n\n\
535
                    Generated columns cannot be replicated and will be excluded from the destination.",
536
                    tables_with_generated.join(", ")
537
                ),
538
            )])
539
        }
540
    }
9✔
541
}
542

543
/// Composite validator for pipeline prerequisites.
544
#[derive(Debug)]
545
pub struct PipelineValidator {
546
    config: FullApiPipelineConfig,
547
}
548

549
impl PipelineValidator {
550
    pub fn new(config: FullApiPipelineConfig) -> Self {
9✔
551
        Self { config }
9✔
552
    }
9✔
553

554
    fn sub_validators(&self) -> Vec<Box<dyn Validator>> {
9✔
555
        let max_table_sync_workers = self.config.max_table_sync_workers.unwrap_or(4);
9✔
556
        let publication_name = self.config.publication_name.clone();
9✔
557

558
        vec![
9✔
559
            Box::new(WalLevelValidator),
9✔
560
            Box::new(ReplicationPermissionsValidator),
9✔
561
            Box::new(PublicationExistsValidator::new(publication_name.clone())),
9✔
562
            Box::new(PublicationHasTablesValidator::new(publication_name.clone())),
9✔
563
            Box::new(PrimaryKeysValidator::new(publication_name.clone())),
9✔
564
            Box::new(GeneratedColumnsValidator::new(publication_name)),
9✔
565
            Box::new(ReplicationSlotsValidator::new(max_table_sync_workers)),
9✔
566
        ]
567
    }
9✔
568
}
569

570
#[async_trait]
571
impl Validator for PipelineValidator {
572
    async fn validate(
573
        &self,
574
        ctx: &ValidationContext,
575
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
576
        let mut failures = Vec::new();
577

578
        for validator in self.sub_validators() {
579
            failures.extend(validator.validate(ctx).await?);
580
        }
581

582
        Ok(failures)
583
    }
9✔
584
}
585

586
/// Validates BigQuery destination connectivity and dataset accessibility.
587
#[derive(Debug)]
588
struct BigQueryValidator {
589
    project_id: String,
590
    dataset_id: String,
591
    service_account_key: String,
592
}
593

594
impl BigQueryValidator {
595
    fn new(project_id: String, dataset_id: String, service_account_key: String) -> Self {
4✔
596
        Self {
4✔
597
            project_id,
4✔
598
            dataset_id,
4✔
599
            service_account_key,
4✔
600
        }
4✔
601
    }
4✔
602
}
603

604
#[async_trait]
605
impl Validator for BigQueryValidator {
606
    async fn validate(
607
        &self,
608
        _ctx: &ValidationContext,
609
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
4✔
610
        let client = match BigQueryClient::new_with_key(
611
            self.project_id.clone(),
612
            &self.service_account_key,
613
            1,
614
        )
615
        .await
616
        {
617
            Ok(client) => client,
618
            Err(_) => {
619
                return Ok(vec![ValidationFailure::critical(
620
                    "BigQuery Authentication Failed",
621
                    "Unable to authenticate with BigQuery.\n\n\
622
                    Please verify:\n\
623
                    (1) The service account key is valid JSON\n\
624
                    (2) The key has not expired or been revoked\n\
625
                    (3) The project ID is correct",
626
                )]);
627
            }
628
        };
629

630
        match client.dataset_exists(&self.dataset_id).await {
631
            Ok(true) => Ok(vec![]),
632
            Ok(false) => Ok(vec![ValidationFailure::critical(
633
                "BigQuery Dataset Not Found",
634
                format!(
635
                    "Dataset '{}' does not exist in project '{}'.\n\n\
636
                    Please verify:\n\
637
                    (1) The dataset name is correct\n\
638
                    (2) The dataset exists in the specified project\n\
639
                    (3) The service account has permission to access it",
640
                    self.dataset_id, self.project_id
641
                ),
642
            )]),
643
            Err(_) => Ok(vec![ValidationFailure::critical(
644
                "BigQuery Connection Failed",
645
                "Unable to connect to BigQuery.\n\n\
646
                Please verify:\n\
647
                (1) Network connectivity to Google Cloud\n\
648
                (2) The service account has the required permissions (BigQuery Data Editor, BigQuery Job User)\n\
649
                (3) BigQuery API is enabled for your project",
650
            )]),
651
        }
652
    }
4✔
653
}
654

655
/// Validates Iceberg destination connectivity.
656
#[derive(Debug)]
657
struct IcebergValidator {
658
    config: FullApiIcebergConfig,
659
}
660

661
/// Validates DuckLake destination connectivity.
662
#[derive(Debug)]
663
struct DucklakeValidator {
664
    catalog_url: String,
665
    data_path: String,
666
    pool_size: u32,
667
    s3_access_key_id: Option<String>,
668
    s3_secret_access_key: Option<String>,
669
    s3_region: Option<String>,
670
    s3_endpoint: Option<String>,
671
    s3_url_style: Option<String>,
672
    s3_use_ssl: Option<bool>,
673
    metadata_schema: Option<String>,
674
}
675

676
impl DucklakeValidator {
677
    #[allow(clippy::too_many_arguments)]
NEW
678
    fn new(
×
NEW
679
        catalog_url: String,
×
NEW
680
        data_path: String,
×
NEW
681
        pool_size: u32,
×
NEW
682
        s3_access_key_id: Option<String>,
×
NEW
683
        s3_secret_access_key: Option<String>,
×
NEW
684
        s3_region: Option<String>,
×
NEW
685
        s3_endpoint: Option<String>,
×
NEW
686
        s3_url_style: Option<String>,
×
NEW
687
        s3_use_ssl: Option<bool>,
×
NEW
688
        metadata_schema: Option<String>,
×
NEW
689
    ) -> Self {
×
NEW
690
        Self {
×
NEW
691
            catalog_url,
×
NEW
692
            data_path,
×
NEW
693
            pool_size,
×
NEW
694
            s3_access_key_id,
×
NEW
695
            s3_secret_access_key,
×
NEW
696
            s3_region,
×
NEW
697
            s3_endpoint,
×
NEW
698
            s3_url_style,
×
NEW
699
            s3_use_ssl,
×
NEW
700
            metadata_schema,
×
NEW
701
        }
×
NEW
702
    }
×
703
}
704

705
#[async_trait]
706
impl Validator for DucklakeValidator {
707
    async fn validate(
708
        &self,
709
        _ctx: &ValidationContext,
NEW
710
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
×
711
        match (&self.s3_access_key_id, &self.s3_secret_access_key) {
712
            (Some(_), None) | (None, Some(_)) => {
713
                return Ok(vec![ValidationFailure::critical(
714
                    "Ducklake S3 Configuration Invalid",
715
                    "DuckLake S3 credentials must include both access key ID and secret access key.",
716
                )]);
717
            }
718
            _ => {}
719
        }
720

721
        let catalog_url = match parse_ducklake_url(&self.catalog_url) {
722
            Ok(url) => url,
723
            Err(error) => {
724
                return Ok(vec![ValidationFailure::critical(
725
                    "Ducklake Catalog Url Invalid",
726
                    error.to_string(),
727
                )]);
728
            }
729
        };
730

731
        let data_path = match parse_ducklake_url(&self.data_path) {
732
            Ok(url) => url,
733
            Err(error) => {
734
                return Ok(vec![ValidationFailure::critical(
735
                    "Ducklake Data Path Invalid",
736
                    error.to_string(),
737
                )]);
738
            }
739
        };
740

741
        let s3_config = self
742
            .s3_access_key_id
743
            .clone()
744
            .map(|access_key_id| DucklakeS3Config {
NEW
745
                access_key_id,
×
NEW
746
                secret_access_key: self
×
NEW
747
                    .s3_secret_access_key
×
NEW
748
                    .clone()
×
NEW
749
                    .expect("ducklake s3 secret access key should be present"),
×
NEW
750
                region: self
×
NEW
751
                    .s3_region
×
NEW
752
                    .clone()
×
NEW
753
                    .unwrap_or_else(|| "us-east-1".to_string()),
×
NEW
754
                endpoint: self.s3_endpoint.clone(),
×
NEW
755
                url_style: self
×
NEW
756
                    .s3_url_style
×
NEW
757
                    .clone()
×
NEW
758
                    .unwrap_or_else(|| "path".to_string()),
×
NEW
759
                use_ssl: self.s3_use_ssl.unwrap_or(false),
×
NEW
760
            });
×
761

762
        match DuckLakeDestination::new(
763
            catalog_url,
764
            data_path,
765
            self.pool_size,
766
            s3_config,
767
            self.metadata_schema.clone(),
768
            MemoryStore::new(),
769
        )
770
        .await
771
        {
772
            Ok(_) => Ok(vec![]),
773
            Err(_) => Ok(vec![ValidationFailure::critical(
774
                "Ducklake Connection Failed",
775
                "Unable to connect to DuckLake.\n\n\
776
                Please verify:\n\
777
                (1) The catalog URL and data path are valid and reachable\n\
778
                (2) DuckLake catalog credentials are embedded correctly in the catalog URL\n\
779
                (3) The S3-compatible credentials and endpoint are correct when using object storage",
780
            )]),
781
        }
NEW
782
    }
×
783
}
784

785
impl IcebergValidator {
786
    fn new(config: FullApiIcebergConfig) -> Self {
2✔
787
        Self { config }
2✔
788
    }
2✔
789
}
790

791
#[async_trait]
792
impl Validator for IcebergValidator {
793
    async fn validate(
794
        &self,
795
        ctx: &ValidationContext,
796
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
2✔
797
        let client = match &self.config {
798
            FullApiIcebergConfig::Supabase {
799
                project_ref,
800
                warehouse_name,
801
                catalog_token,
802
                s3_access_key_id,
803
                s3_secret_access_key,
804
                s3_region,
805
                ..
806
            } => {
807
                IcebergClient::new_with_supabase_catalog(
808
                    project_ref,
809
                    ctx.environment.get_supabase_domain(),
810
                    catalog_token.expose_secret().to_string(),
811
                    warehouse_name.clone(),
812
                    s3_access_key_id.expose_secret().to_string(),
813
                    s3_secret_access_key.expose_secret().to_string(),
814
                    s3_region.clone(),
815
                )
816
                .await
817
            }
818
            FullApiIcebergConfig::Rest {
819
                catalog_uri,
820
                warehouse_name,
821
                s3_access_key_id,
822
                s3_secret_access_key,
823
                s3_endpoint,
824
                ..
825
            } => {
826
                let mut props = HashMap::new();
827
                props.insert(
828
                    S3_ACCESS_KEY_ID.to_string(),
829
                    s3_access_key_id.expose_secret().to_string(),
830
                );
831
                props.insert(
832
                    S3_SECRET_ACCESS_KEY.to_string(),
833
                    s3_secret_access_key.expose_secret().to_string(),
834
                );
835
                props.insert(S3_ENDPOINT.to_string(), s3_endpoint.clone());
836

837
                IcebergClient::new_with_rest_catalog(
838
                    catalog_uri.clone(),
839
                    warehouse_name.clone(),
840
                    props,
841
                )
842
                .await
843
            }
844
        };
845

846
        let client = match client {
847
            Ok(client) => client,
848
            Err(_) => {
849
                return Ok(vec![ValidationFailure::critical(
850
                    "Iceberg Authentication Failed",
851
                    "Unable to authenticate with Iceberg.\n\n\
852
                    Please verify:\n\
853
                    (1) The catalog token is valid and has not expired\n\
854
                    (2) The S3 access key and secret key are correct\n\
855
                    (3) The catalog URI is properly formatted",
856
                )]);
857
            }
858
        };
859

860
        match client.validate_connectivity().await {
861
            Ok(()) => Ok(vec![]),
862
            Err(_) => Ok(vec![ValidationFailure::critical(
863
                "Iceberg Connection Failed",
864
                "Unable to connect to Iceberg catalog.\n\n\
865
                Please verify:\n\
866
                (1) Network connectivity to the catalog and S3\n\
867
                (2) The warehouse name exists in the catalog\n\
868
                (3) You have the required permissions to access the warehouse\n\
869
                (4) The S3 endpoint is reachable",
870
            )]),
871
        }
872
    }
2✔
873
}
874

875
/// Composite validator for destination prerequisites.
876
#[derive(Debug)]
877
pub struct DestinationValidator {
878
    config: FullApiDestinationConfig,
879
}
880

881
impl DestinationValidator {
882
    pub fn new(config: FullApiDestinationConfig) -> Self {
6✔
883
        Self { config }
6✔
884
    }
6✔
885
}
886

887
#[async_trait]
888
impl Validator for DestinationValidator {
889
    async fn validate(
890
        &self,
891
        ctx: &ValidationContext,
892
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
6✔
893
        match &self.config {
894
            FullApiDestinationConfig::BigQuery {
895
                project_id,
896
                dataset_id,
897
                service_account_key,
898
                ..
899
            } => {
900
                let validator = BigQueryValidator::new(
901
                    project_id.clone(),
902
                    dataset_id.clone(),
903
                    service_account_key.expose_secret().to_string(),
904
                );
905
                validator.validate(ctx).await
906
            }
907
            FullApiDestinationConfig::Iceberg { config } => {
908
                let validator = IcebergValidator::new(config.clone());
909
                validator.validate(ctx).await
910
            }
911
            FullApiDestinationConfig::Ducklake {
912
                catalog_url,
913
                data_path,
914
                pool_size,
915
                s3_access_key_id,
916
                s3_secret_access_key,
917
                s3_region,
918
                s3_endpoint,
919
                s3_url_style,
920
                s3_use_ssl,
921
                metadata_schema,
922
            } => {
923
                let validator = DucklakeValidator::new(
924
                    catalog_url.clone(),
925
                    data_path.clone(),
926
                    pool_size.unwrap_or(
927
                        etl_config::shared::DestinationConfig::DEFAULT_DUCKLAKE_POOL_SIZE,
928
                    ),
929
                    s3_access_key_id
930
                        .as_ref()
NEW
931
                        .map(|value| value.expose_secret().to_string()),
×
932
                    s3_secret_access_key
933
                        .as_ref()
NEW
934
                        .map(|value| value.expose_secret().to_string()),
×
935
                    s3_region.clone(),
936
                    s3_endpoint.clone(),
937
                    s3_url_style.clone(),
938
                    *s3_use_ssl,
939
                    metadata_schema.clone(),
940
                );
941
                validator.validate(ctx).await
942
            }
943
        }
944
    }
6✔
945
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc