• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 24713061833

21 Apr 2026 08:47AM UTC coverage: 78.316% (+0.07%) from 78.248%
24713061833

push

github

web-flow
refactor: enable extended clippy lint set (#675)

* refactor: explicit_iter_loop

* refactor: implicit_clone

* refactor: manual_let_else

* refactor: map_unwrap_or

* refactor: match_same_arms

* refactor: clippy::redundant_closure_for_method_calls

* refactor: redundant_clone

* refactor: redundant_test_prefix

* refactor: semicolon_if_nothing_returned

* refactor: uninlined_format_args

* refactor: unnested_or_patterns

* feat: enable extra lints

* fix: rebase

* fix: removed test

* fix: after rebase

* fix: fmt

* fix: http tests

* fix: rebase

* fix: fmt

* fix: fmt

414 of 503 new or added lines in 59 files covered. (82.31%)

3292 existing lines in 120 files now uncovered.

24379 of 31129 relevant lines covered (78.32%)

1069.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.76
/etl-api/src/validation/validators.rs
1
//! Built-in validators for ETL pipeline and destination prerequisites.
2

3
use std::collections::HashMap;
4

5
use async_trait::async_trait;
6
use etl::store::both::memory::MemoryStore;
7
use etl_config::parse_ducklake_url;
8
use etl_destinations::{
9
    bigquery::BigQueryClient,
10
    ducklake::{DuckLakeDestination, S3Config as DucklakeS3Config},
11
    iceberg::{IcebergClient, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_SECRET_ACCESS_KEY},
12
};
13
use secrecy::ExposeSecret;
14
use sqlx::FromRow;
15

16
use super::{ValidationContext, ValidationError, ValidationFailure, Validator};
17
use crate::configs::{
18
    destination::{FullApiDestinationConfig, FullApiIcebergConfig},
19
    pipeline::FullApiPipelineConfig,
20
};
21

22
const ETL_SCHEMA_NAME: &str = "etl";
23

24
/// Validates the connected source role profile for ETL.
25
#[derive(Debug)]
26
pub struct SourceValidator;
27

28
#[derive(Debug, FromRow)]
29
struct SourceRoleAudit {
30
    rolcanlogin: bool,
31
    rolreplication: bool,
32
    rolbypassrls: bool,
33
    rolcreaterole: bool,
34
    rolcreatedb: bool,
35
    rolinherit: bool,
36
    rolvaliduntil_is_null: bool,
37
    etl_schema_exists: bool,
38
    etl_schema_usage: Option<bool>,
39
    etl_schema_create: Option<bool>,
40
    controls_all_existing_etl_tables: Option<bool>,
41
    can_create_schema_if_missing: Option<bool>,
42
}
43

44
#[async_trait]
45
impl Validator for SourceValidator {
46
    async fn validate(
47
        &self,
48
        ctx: &ValidationContext,
49
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
28✔
50
        let Some(expected_username) = ctx.trusted_username.as_ref() else {
51
            return Ok(vec![]);
52
        };
53

54
        let source_pool =
55
            ctx.source_pool.as_ref().expect("source pool required for source validation");
56

57
        let current_user: String =
58
            sqlx::query_scalar("select current_user").fetch_one(source_pool).await?;
59

60
        if current_user != *expected_username {
61
            return Ok(vec![ValidationFailure::critical(
62
                "Invalid source username",
63
                format!("connected as '{current_user}' but expected '{expected_username}'"),
64
            )]);
65
        }
66

67
        // This validation is best effort: it relies on catalog metadata and
68
        // privilege checks to confirm the trusted role profile without running
69
        // invasive probes against the customer database.
70
        let audit = sqlx::query_as::<_, SourceRoleAudit>(
71
            r#"
72
            with target as (
73
              -- Load the direct role attributes for the trusted ETL user.
74
              select
75
                oid,
76
                rolcanlogin,
77
                rolreplication,
78
                rolbypassrls,
79
                rolcreaterole,
80
                rolcreatedb,
81
                rolinherit,
82
                rolvaliduntil is null as rolvaliduntil_is_null
83
              from pg_roles
84
              where rolname = $1
85
            ),
86
            etl_schema as (
87
              -- Check whether the etl schema already exists.
88
              select oid
89
              from pg_namespace
90
              where nspname = $2
91
            ),
92
            etl_tables as (
93
              -- List the existing tables in the etl schema, if present.
94
              select
95
                c.relowner
96
              from pg_class c
97
              join etl_schema s on s.oid = c.relnamespace
98
              where c.relkind in ('r', 'p')
99
            ),
100
            etl_table_ownership as (
101
              -- Determine whether the trusted role controls every existing ETL table.
102
              -- pg_has_role(..., 'USAGE') means the owning role's privileges are
103
              -- immediately available without requiring SET ROLE, which matches
104
              -- how ETL connects and operates.
105
              select
106
                coalesce(bool_and(pg_has_role($1, relowner, 'USAGE')), true)
107
                  as controls_all_existing_etl_tables
108
              from etl_tables
109
            )
110
            select
111
              t.rolcanlogin,
112
              t.rolreplication,
113
              t.rolbypassrls,
114
              t.rolcreaterole,
115
              t.rolcreatedb,
116
              t.rolinherit,
117
              t.rolvaliduntil_is_null,
118
              exists(select 1 from etl_schema) as etl_schema_exists,
119
              case
120
                when exists(select 1 from etl_schema)
121
                then has_schema_privilege($1, $2, 'USAGE')
122
                else null
123
              end as etl_schema_usage,
124
              case
125
                when exists(select 1 from etl_schema)
126
                then has_schema_privilege($1, $2, 'CREATE')
127
                else null
128
              end as etl_schema_create,
129
              case
130
                when exists(select 1 from etl_schema)
131
                then (select controls_all_existing_etl_tables from etl_table_ownership)
132
                else null
133
              end as controls_all_existing_etl_tables,
134
              case
135
                when not exists(select 1 from etl_schema)
136
                then has_database_privilege($1, current_database(), 'CREATE')
137
                else null
138
              end as can_create_schema_if_missing
139
            from target t
140
            "#,
141
        )
142
        .bind(expected_username)
143
        .bind(ETL_SCHEMA_NAME)
144
        .fetch_optional(source_pool)
145
        .await?;
146

147
        let Some(audit) = audit else {
148
            return Ok(vec![ValidationFailure::critical(
149
                "Invalid source role attributes",
150
                "role not found",
151
            )]);
152
        };
153

154
        let has_required_role_attributes = audit.rolcanlogin
155
            && audit.rolreplication
156
            && audit.rolbypassrls
157
            && !audit.rolcreaterole
158
            && !audit.rolcreatedb
159
            && audit.rolinherit
160
            && audit.rolvaliduntil_is_null;
161

162
        let mut failures = Vec::new();
163
        if !has_required_role_attributes {
164
            failures.push(ValidationFailure::critical(
165
                "Invalid source role attributes",
166
                "The source database does not grant the trusted username role all permissions ETL \
167
                 needs to work properly.",
168
            ));
169
        }
170

171
        let has_required_etl_schema_permissions = if audit.etl_schema_exists {
172
            audit.etl_schema_usage == Some(true)
173
                && audit.etl_schema_create == Some(true)
174
                && audit.controls_all_existing_etl_tables == Some(true)
175
        } else {
176
            audit.can_create_schema_if_missing == Some(true)
177
        };
178

179
        if !has_required_etl_schema_permissions {
180
            failures.push(ValidationFailure::critical(
181
                "Invalid source etl schema permissions",
182
                format!(
183
                    "The source database does not grant the trusted username role all permissions \
184
                     ETL needs to manage schema {ETL_SCHEMA_NAME} properly."
185
                ),
186
            ));
187
        }
188

189
        Ok(failures)
190
    }
28✔
191
}
192

193
/// Validates that the required publication exists in the source database.
194
#[derive(Debug)]
195
pub struct PublicationExistsValidator {
196
    publication_name: String,
197
}
198

199
impl PublicationExistsValidator {
200
    pub fn new(publication_name: String) -> Self {
9✔
201
        Self { publication_name }
9✔
202
    }
9✔
203
}
204

205
#[async_trait]
206
impl Validator for PublicationExistsValidator {
207
    async fn validate(
208
        &self,
209
        ctx: &ValidationContext,
210
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
211
        let source_pool =
212
            ctx.source_pool.as_ref().expect("source pool required for publication validation");
213

214
        let exists: bool =
215
            sqlx::query_scalar("select exists(select 1 from pg_publication where pubname = $1)")
216
                .bind(&self.publication_name)
217
                .fetch_one(source_pool)
218
                .await?;
219

220
        if exists {
221
            Ok(vec![])
222
        } else {
223
            Ok(vec![ValidationFailure::critical(
224
                "Publication Not Found",
225
                format!(
226
                    "Publication '{}' does not exist in the source database. Create it with: \
227
                     CREATE PUBLICATION {} FOR TABLE <table_name>, ...",
228
                    self.publication_name, self.publication_name
229
                ),
230
            )])
231
        }
232
    }
9✔
233
}
234

235
/// Validates that there are enough free replication slots for the pipeline.
236
#[derive(Debug)]
237
pub struct ReplicationSlotsValidator {
238
    max_table_sync_workers: u16,
239
}
240

241
impl ReplicationSlotsValidator {
242
    pub fn new(max_table_sync_workers: u16) -> Self {
9✔
243
        Self { max_table_sync_workers }
9✔
244
    }
9✔
245
}
246

247
#[async_trait]
248
impl Validator for ReplicationSlotsValidator {
249
    async fn validate(
250
        &self,
251
        ctx: &ValidationContext,
252
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
253
        let source_pool = ctx
254
            .source_pool
255
            .as_ref()
256
            .expect("source pool required for replication slots validation");
257

258
        let max_slots: i32 = sqlx::query_scalar(
259
            "select setting::int from pg_settings where name = 'max_replication_slots'",
260
        )
261
        .fetch_one(source_pool)
262
        .await?;
263

264
        let used_slots: i64 = sqlx::query_scalar("select count(*) from pg_replication_slots")
265
            .fetch_one(source_pool)
266
            .await?;
267

268
        let free_slots = max_slots as i64 - used_slots;
269
        // We need 1 slot for the apply worker plus at most `max_table_sync_workers`
270
        // other slots for table sync workers.
271
        let required_slots = self.max_table_sync_workers as i64 + 1;
272

273
        if required_slots <= free_slots {
274
            Ok(vec![])
275
        } else {
276
            Ok(vec![ValidationFailure::critical(
277
                "Insufficient Replication Slots",
278
                format!(
279
                    "Not enough replication slots available.\nFound {free_slots} free slots, but \
280
                     {required_slots} are required at most during initial table copy \
281
                     ({used_slots}/{max_slots} currently in use).\nOnce all tables are copied, \
282
                     only 1 slot will be used.\n\nPlease verify:\n(1) max_replication_slots in \
283
                     postgresql.conf is sufficient\n(2) Unused replication slots can be \
284
                     removed\n(3) max_table_sync_workers can be reduced if needed",
285
                ),
286
            )])
287
        }
288
    }
9✔
289
}
290

291
/// Validates that the WAL level is set to 'logical' for replication.
292
#[derive(Debug)]
293
pub struct WalLevelValidator;
294

295
#[async_trait]
296
impl Validator for WalLevelValidator {
297
    async fn validate(
298
        &self,
299
        ctx: &ValidationContext,
300
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
301
        let source_pool =
302
            ctx.source_pool.as_ref().expect("source pool required for WAL level validation");
303

304
        let wal_level: String = sqlx::query_scalar("select current_setting('wal_level')")
305
            .fetch_one(source_pool)
306
            .await?;
307

308
        if wal_level == "logical" {
309
            Ok(vec![])
310
        } else {
311
            Ok(vec![ValidationFailure::critical(
312
                "Invalid WAL Level",
313
                format!(
314
                    "WAL level is set to '{wal_level}', but must be 'logical' for replication. \
315
                     Update postgresql.conf with: wal_level = 'logical' and restart PostgreSQL"
316
                ),
317
            )])
318
        }
319
    }
9✔
320
}
321

322
/// Validates that the database user has replication permissions.
323
#[derive(Debug)]
324
pub struct ReplicationPermissionsValidator;
325

326
#[async_trait]
327
impl Validator for ReplicationPermissionsValidator {
328
    async fn validate(
329
        &self,
330
        ctx: &ValidationContext,
331
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
332
        let source_pool = ctx
333
            .source_pool
334
            .as_ref()
335
            .expect("source pool required for replication permissions validation");
336

337
        // Check if user is superuser OR has replication privilege
338
        let has_permission: bool = sqlx::query_scalar(
339
            "select rolsuper or rolreplication from pg_roles where rolname = current_user",
340
        )
341
        .fetch_one(source_pool)
342
        .await?;
343

344
        if has_permission {
345
            Ok(vec![])
346
        } else {
347
            Ok(vec![ValidationFailure::critical(
348
                "Missing Replication Permission",
349
                "The database user does not have replication privileges",
350
            )])
351
        }
352
    }
9✔
353
}
354

355
/// Validates that a publication contains at least one table.
356
#[derive(Debug)]
357
pub struct PublicationHasTablesValidator {
358
    publication_name: String,
359
}
360

361
impl PublicationHasTablesValidator {
362
    pub fn new(publication_name: String) -> Self {
9✔
363
        Self { publication_name }
9✔
364
    }
9✔
365
}
366

367
#[async_trait]
368
impl Validator for PublicationHasTablesValidator {
369
    async fn validate(
370
        &self,
371
        ctx: &ValidationContext,
372
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
373
        let source_pool = ctx
374
            .source_pool
375
            .as_ref()
376
            .expect("source pool required for publication tables validation");
377

378
        // Check if publication publishes all tables or has specific tables
379
        let result: Option<(bool, i64)> = sqlx::query_as(
380
            r#"
381
            select
382
                p.puballtables,
383
                (select count(*) from pg_publication_tables pt where pt.pubname = p.pubname)
384
            from pg_publication p
385
            where p.pubname = $1
386
            "#,
387
        )
388
        .bind(&self.publication_name)
389
        .fetch_optional(source_pool)
390
        .await?;
391

392
        // If publication doesn't exist, skip this check (PublicationExistsValidator
393
        // handles it)
394
        let Some((puballtables, table_count)) = result else {
395
            return Ok(vec![]);
396
        };
397

398
        if puballtables || table_count > 0 {
399
            Ok(vec![])
400
        } else {
401
            Ok(vec![ValidationFailure::critical(
402
                "Publication Empty",
403
                format!(
404
                    "Publication '{}' exists but contains no tables.\n\nAdd tables with: ALTER \
405
                     PUBLICATION {} ADD TABLE <table_name>",
406
                    self.publication_name, self.publication_name
407
                ),
408
            )])
409
        }
410
    }
9✔
411
}
412

413
/// Validates that all tables in a publication have primary keys.
414
#[derive(Debug)]
415
pub struct PrimaryKeysValidator {
416
    publication_name: String,
417
}
418

419
impl PrimaryKeysValidator {
420
    pub fn new(publication_name: String) -> Self {
9✔
421
        Self { publication_name }
9✔
422
    }
9✔
423
}
424

425
#[async_trait]
426
impl Validator for PrimaryKeysValidator {
427
    async fn validate(
428
        &self,
429
        ctx: &ValidationContext,
430
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
431
        let source_pool =
432
            ctx.source_pool.as_ref().expect("source pool required for primary keys validation");
433

434
        // Find tables without primary keys using pg_publication_rel for direct OID
435
        // access
436
        let tables_without_pk: Vec<String> = sqlx::query_scalar(
437
            r#"
438
            select n.nspname || '.' || c.relname
439
            from pg_publication_rel pr
440
            join pg_publication p on p.oid = pr.prpubid
441
            join pg_class c on c.oid = pr.prrelid
442
            join pg_namespace n on n.oid = c.relnamespace
443
            where p.pubname = $1
444
              and not exists (
445
                select 1
446
                from pg_constraint con
447
                where con.conrelid = pr.prrelid
448
                  and con.contype = 'p'
449
              )
450
            order by n.nspname, c.relname
451
            limit 100
452
            "#,
453
        )
454
        .bind(&self.publication_name)
455
        .fetch_all(source_pool)
456
        .await?;
457

458
        if tables_without_pk.is_empty() {
459
            Ok(vec![])
460
        } else {
461
            Ok(vec![ValidationFailure::warning(
462
                "Tables Missing Primary Keys",
463
                format!(
464
                    "Tables without primary keys: {}\n\nPrimary keys are required for UPDATE and \
465
                     DELETE replication.",
466
                    tables_without_pk.join(", ")
467
                ),
468
            )])
469
        }
470
    }
9✔
471
}
472

473
/// Validates that tables in a publication don't have generated columns.
474
#[derive(Debug)]
475
pub struct GeneratedColumnsValidator {
476
    publication_name: String,
477
}
478

479
impl GeneratedColumnsValidator {
480
    pub fn new(publication_name: String) -> Self {
9✔
481
        Self { publication_name }
9✔
482
    }
9✔
483
}
484

485
#[async_trait]
486
impl Validator for GeneratedColumnsValidator {
487
    async fn validate(
488
        &self,
489
        ctx: &ValidationContext,
490
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
491
        let source_pool = ctx
492
            .source_pool
493
            .as_ref()
494
            .expect("source pool required for generated columns validation");
495

496
        // Find tables with generated columns using pg_publication_rel for direct OID
497
        // access
498
        let tables_with_generated: Vec<String> = sqlx::query_scalar(
499
            r#"
500
            select distinct n.nspname || '.' || c.relname
501
            from pg_publication_rel pr
502
            join pg_publication p on p.oid = pr.prpubid
503
            join pg_class c on c.oid = pr.prrelid
504
            join pg_namespace n on n.oid = c.relnamespace
505
            where p.pubname = $1
506
              and exists (
507
                select 1
508
                from pg_attribute a
509
                where a.attrelid = pr.prrelid
510
                  and a.attnum > 0
511
                  and not a.attisdropped
512
                  and a.attgenerated != ''
513
              )
514
            order by 1
515
            limit 100
516
            "#,
517
        )
518
        .bind(&self.publication_name)
519
        .fetch_all(source_pool)
520
        .await?;
521

522
        if tables_with_generated.is_empty() {
523
            Ok(vec![])
524
        } else {
525
            Ok(vec![ValidationFailure::warning(
526
                "Tables With Generated Columns",
527
                format!(
528
                    "Tables with generated columns: {}\n\nGenerated columns cannot be replicated \
529
                     and will be excluded from the destination.",
530
                    tables_with_generated.join(", ")
531
                ),
532
            )])
533
        }
534
    }
9✔
535
}
536

537
/// Composite validator for pipeline prerequisites.
538
#[derive(Debug)]
539
pub struct PipelineValidator {
540
    config: FullApiPipelineConfig,
541
}
542

543
impl PipelineValidator {
544
    pub fn new(config: FullApiPipelineConfig) -> Self {
9✔
545
        Self { config }
9✔
546
    }
9✔
547

548
    fn sub_validators(&self) -> Vec<Box<dyn Validator>> {
9✔
549
        let max_table_sync_workers = self.config.max_table_sync_workers.unwrap_or(4);
9✔
550
        let publication_name = self.config.publication_name.clone();
9✔
551

552
        vec![
9✔
553
            Box::new(WalLevelValidator),
9✔
554
            Box::new(ReplicationPermissionsValidator),
9✔
555
            Box::new(PublicationExistsValidator::new(publication_name.clone())),
9✔
556
            Box::new(PublicationHasTablesValidator::new(publication_name.clone())),
9✔
557
            Box::new(PrimaryKeysValidator::new(publication_name.clone())),
9✔
558
            Box::new(GeneratedColumnsValidator::new(publication_name)),
9✔
559
            Box::new(ReplicationSlotsValidator::new(max_table_sync_workers)),
9✔
560
        ]
561
    }
9✔
562
}
563

564
#[async_trait]
565
impl Validator for PipelineValidator {
566
    async fn validate(
567
        &self,
568
        ctx: &ValidationContext,
569
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
570
        let mut failures = Vec::new();
571

572
        for validator in self.sub_validators() {
573
            failures.extend(validator.validate(ctx).await?);
574
        }
575

576
        Ok(failures)
577
    }
9✔
578
}
579

580
/// Validates BigQuery destination connectivity and dataset accessibility.
581
#[derive(Debug)]
582
struct BigQueryValidator {
583
    project_id: String,
584
    dataset_id: String,
585
    service_account_key: String,
586
}
587

588
impl BigQueryValidator {
589
    fn new(project_id: String, dataset_id: String, service_account_key: String) -> Self {
4✔
590
        Self { project_id, dataset_id, service_account_key }
4✔
591
    }
4✔
592
}
593

594
#[async_trait]
595
impl Validator for BigQueryValidator {
596
    async fn validate(
597
        &self,
598
        _ctx: &ValidationContext,
599
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
4✔
600
        let Ok(client) =
601
            BigQueryClient::new_with_key(self.project_id.clone(), &self.service_account_key, 1)
602
                .await
603
        else {
604
            return Ok(vec![ValidationFailure::critical(
605
                "BigQuery Authentication Failed",
606
                "Unable to authenticate with BigQuery.\n\nPlease verify:\n(1) The service account \
607
                 key is valid JSON\n(2) The key has not expired or been revoked\n(3) The project \
608
                 ID is correct",
609
            )]);
610
        };
611

612
        match client.dataset_exists(&self.dataset_id).await {
613
            Ok(true) => Ok(vec![]),
614
            Ok(false) => Ok(vec![ValidationFailure::critical(
615
                "BigQuery Dataset Not Found",
616
                format!(
617
                    "Dataset '{}' does not exist in project '{}'.\n\nPlease verify:\n(1) The \
618
                     dataset name is correct\n(2) The dataset exists in the specified \
619
                     project\n(3) The service account has permission to access it",
620
                    self.dataset_id, self.project_id
621
                ),
622
            )]),
623
            Err(_) => Ok(vec![ValidationFailure::critical(
624
                "BigQuery Connection Failed",
625
                "Unable to connect to BigQuery.\n\nPlease verify:\n(1) Network connectivity to \
626
                 Google Cloud\n(2) The service account has the required permissions (BigQuery \
627
                 Data Editor, BigQuery Job User)\n(3) BigQuery API is enabled for your project",
628
            )]),
629
        }
630
    }
4✔
631
}
632

633
/// Validates Iceberg destination connectivity.
634
#[derive(Debug)]
635
struct IcebergValidator {
636
    config: FullApiIcebergConfig,
637
}
638

639
/// Validates DuckLake destination connectivity.
640
#[derive(Debug)]
641
struct DucklakeValidator {
642
    catalog_url: String,
643
    data_path: String,
644
    pool_size: u32,
645
    s3_access_key_id: Option<String>,
646
    s3_secret_access_key: Option<String>,
647
    s3_region: Option<String>,
648
    s3_endpoint: Option<String>,
649
    s3_url_style: Option<String>,
650
    s3_use_ssl: Option<bool>,
651
    metadata_schema: Option<String>,
652
}
653

654
impl DucklakeValidator {
655
    #[allow(clippy::too_many_arguments)]
UNCOV
656
    fn new(
×
UNCOV
657
        catalog_url: String,
×
UNCOV
658
        data_path: String,
×
UNCOV
659
        pool_size: u32,
×
UNCOV
660
        s3_access_key_id: Option<String>,
×
UNCOV
661
        s3_secret_access_key: Option<String>,
×
UNCOV
662
        s3_region: Option<String>,
×
UNCOV
663
        s3_endpoint: Option<String>,
×
UNCOV
664
        s3_url_style: Option<String>,
×
UNCOV
665
        s3_use_ssl: Option<bool>,
×
UNCOV
666
        metadata_schema: Option<String>,
×
UNCOV
667
    ) -> Self {
×
UNCOV
668
        Self {
×
UNCOV
669
            catalog_url,
×
UNCOV
670
            data_path,
×
UNCOV
671
            pool_size,
×
672
            s3_access_key_id,
×
673
            s3_secret_access_key,
×
674
            s3_region,
×
675
            s3_endpoint,
×
676
            s3_url_style,
×
677
            s3_use_ssl,
×
678
            metadata_schema,
×
679
        }
×
680
    }
×
681
}
682

683
#[async_trait]
684
impl Validator for DucklakeValidator {
685
    async fn validate(
686
        &self,
687
        _ctx: &ValidationContext,
688
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
×
689
        match (&self.s3_access_key_id, &self.s3_secret_access_key) {
690
            (Some(_), None) | (None, Some(_)) => {
691
                return Ok(vec![ValidationFailure::critical(
692
                    "Ducklake S3 Configuration Invalid",
693
                    "DuckLake S3 credentials must include both access key ID and secret access \
694
                     key.",
695
                )]);
696
            }
697
            _ => {}
698
        }
699

700
        let catalog_url = match parse_ducklake_url(&self.catalog_url) {
701
            Ok(url) => url,
702
            Err(error) => {
703
                return Ok(vec![ValidationFailure::critical(
704
                    "Ducklake Catalog Url Invalid",
705
                    error.to_string(),
706
                )]);
707
            }
708
        };
709

710
        let data_path = match parse_ducklake_url(&self.data_path) {
711
            Ok(url) => url,
712
            Err(error) => {
713
                return Ok(vec![ValidationFailure::critical(
714
                    "Ducklake Data Path Invalid",
715
                    error.to_string(),
716
                )]);
717
            }
718
        };
719

720
        let s3_config = self.s3_access_key_id.clone().map(|access_key_id| DucklakeS3Config {
UNCOV
721
            access_key_id,
×
UNCOV
722
            secret_access_key: self
×
UNCOV
723
                .s3_secret_access_key
×
UNCOV
724
                .clone()
×
UNCOV
725
                .expect("ducklake s3 secret access key should be present"),
×
UNCOV
726
            region: self.s3_region.clone().unwrap_or_else(|| "us-east-1".to_string()),
×
UNCOV
727
            endpoint: self.s3_endpoint.clone(),
×
UNCOV
728
            url_style: self.s3_url_style.clone().unwrap_or_else(|| "path".to_string()),
×
UNCOV
729
            use_ssl: self.s3_use_ssl.unwrap_or(false),
×
UNCOV
730
        });
×
731

732
        match DuckLakeDestination::new(
733
            catalog_url,
734
            data_path,
735
            self.pool_size,
736
            s3_config,
737
            self.metadata_schema.clone(),
738
            MemoryStore::new(),
739
        )
740
        .await
741
        {
742
            Ok(_) => Ok(vec![]),
743
            Err(_) => Ok(vec![ValidationFailure::critical(
744
                "Ducklake Connection Failed",
745
                "Unable to connect to DuckLake.\n\nPlease verify:\n(1) The catalog URL and data \
746
                 path are valid and reachable\n(2) DuckLake catalog credentials are embedded \
747
                 correctly in the catalog URL\n(3) The S3-compatible credentials and endpoint are \
748
                 correct when using object storage",
749
            )]),
750
        }
751
    }
×
752
}
753

754
impl IcebergValidator {
755
    fn new(config: FullApiIcebergConfig) -> Self {
2✔
756
        Self { config }
2✔
757
    }
2✔
758
}
759

760
#[async_trait]
761
impl Validator for IcebergValidator {
762
    async fn validate(
763
        &self,
764
        ctx: &ValidationContext,
765
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
2✔
766
        let client = match &self.config {
767
            FullApiIcebergConfig::Supabase {
768
                project_ref,
769
                warehouse_name,
770
                catalog_token,
771
                s3_access_key_id,
772
                s3_secret_access_key,
773
                s3_region,
774
                ..
775
            } => {
776
                IcebergClient::new_with_supabase_catalog(
777
                    project_ref,
778
                    ctx.environment.get_supabase_domain(),
779
                    catalog_token.expose_secret().to_string(),
780
                    warehouse_name.clone(),
781
                    s3_access_key_id.expose_secret().to_string(),
782
                    s3_secret_access_key.expose_secret().to_string(),
783
                    s3_region.clone(),
784
                )
785
                .await
786
            }
787
            FullApiIcebergConfig::Rest {
788
                catalog_uri,
789
                warehouse_name,
790
                s3_access_key_id,
791
                s3_secret_access_key,
792
                s3_endpoint,
793
                ..
794
            } => {
795
                let mut props = HashMap::new();
796
                props.insert(
797
                    S3_ACCESS_KEY_ID.to_string(),
798
                    s3_access_key_id.expose_secret().to_string(),
799
                );
800
                props.insert(
801
                    S3_SECRET_ACCESS_KEY.to_string(),
802
                    s3_secret_access_key.expose_secret().to_string(),
803
                );
804
                props.insert(S3_ENDPOINT.to_string(), s3_endpoint.clone());
805

806
                IcebergClient::new_with_rest_catalog(
807
                    catalog_uri.clone(),
808
                    warehouse_name.clone(),
809
                    props,
810
                )
811
                .await
812
            }
813
        };
814
        let Ok(client) = client else {
815
            return Ok(vec![ValidationFailure::critical(
816
                "Iceberg Authentication Failed",
817
                "Unable to authenticate with Iceberg.\n\nPlease verify:\n(1) The catalog token is \
818
                 valid and has not expired\n(2) The S3 access key and secret key are correct\n(3) \
819
                 The catalog URI is properly formatted",
820
            )]);
821
        };
822

823
        match client.validate_connectivity().await {
824
            Ok(()) => Ok(vec![]),
825
            Err(_) => Ok(vec![ValidationFailure::critical(
826
                "Iceberg Connection Failed",
827
                "Unable to connect to Iceberg catalog.\n\nPlease verify:\n(1) Network \
828
                 connectivity to the catalog and S3\n(2) The warehouse name exists in the \
829
                 catalog\n(3) You have the required permissions to access the warehouse\n(4) The \
830
                 S3 endpoint is reachable",
831
            )]),
832
        }
833
    }
2✔
834
}
835

836
/// Composite validator for destination prerequisites.
837
#[derive(Debug)]
838
pub struct DestinationValidator {
839
    config: FullApiDestinationConfig,
840
}
841

842
impl DestinationValidator {
843
    pub fn new(config: FullApiDestinationConfig) -> Self {
6✔
844
        Self { config }
6✔
845
    }
6✔
846
}
847

848
#[async_trait]
849
impl Validator for DestinationValidator {
850
    async fn validate(
851
        &self,
852
        ctx: &ValidationContext,
853
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
6✔
854
        match &self.config {
855
            FullApiDestinationConfig::BigQuery {
856
                project_id,
857
                dataset_id,
858
                service_account_key,
859
                ..
860
            } => {
861
                let validator = BigQueryValidator::new(
862
                    project_id.clone(),
863
                    dataset_id.clone(),
864
                    service_account_key.expose_secret().to_string(),
865
                );
866
                validator.validate(ctx).await
867
            }
868
            FullApiDestinationConfig::Iceberg { config } => {
869
                let validator = IcebergValidator::new(config.clone());
870
                validator.validate(ctx).await
871
            }
872
            FullApiDestinationConfig::Ducklake {
873
                catalog_url,
874
                data_path,
875
                pool_size,
876
                s3_access_key_id,
877
                s3_secret_access_key,
878
                s3_region,
879
                s3_endpoint,
880
                s3_url_style,
881
                s3_use_ssl,
882
                metadata_schema,
883
            } => {
884
                let validator = DucklakeValidator::new(
885
                    catalog_url.clone(),
886
                    data_path.clone(),
887
                    pool_size.unwrap_or(
888
                        etl_config::shared::DestinationConfig::DEFAULT_DUCKLAKE_POOL_SIZE,
889
                    ),
UNCOV
890
                    s3_access_key_id.as_ref().map(|value| value.expose_secret().to_string()),
×
UNCOV
891
                    s3_secret_access_key.as_ref().map(|value| value.expose_secret().to_string()),
×
892
                    s3_region.clone(),
893
                    s3_endpoint.clone(),
894
                    s3_url_style.clone(),
895
                    *s3_use_ssl,
896
                    metadata_schema.clone(),
897
                );
898
                validator.validate(ctx).await
899
            }
900
        }
901
    }
6✔
902
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc