• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 24802572044

22 Apr 2026 09:03PM UTC coverage: 77.377% (-0.5%) from 77.89%
24802572044

push

github

web-flow
Fixes for ducklake destination (#656)

* add logs to debug ducklake in staging

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* add more metrics

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* make ducklake destination truly async, improve duckdb extensions loading, add metrics

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* update batch marker strategy and add target_file_size to 10MB for maintenances

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* fix(ducklake): fix after retry

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* add more metrics

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* use update in ducklake

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* create maintenance connection pool when starting but asynchronously

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* add mutual exclusive access for maintenance task

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* add optional maintenance just to test behavior on staging

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* Revert "add optional maintenance just to test behavior on staging"

This reverts commit d48530d76.

* Revert "add more metrics"

This reverts commit b628046f4.

* revert network probes

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* Revert "add more metrics"

This reverts commit e98a633b8.

* Revert "add logs to debug ducklake in staging"

This reverts commit f4b510f55.

* remove lint errors

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* upgrade duckdb to 1.5.2

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* fix test

Signed-off-by: Benjamin <5719034+bnjjj@users.noreply.github.com>

* try to n... (continued)

1350 of 1802 new or added lines in 17 files covered. (74.92%)

143 existing lines in 10 files now uncovered.

25533 of 32998 relevant lines covered (77.38%)

1019.8 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.43
/etl-api/src/validation/validators.rs
1
//! Built-in validators for ETL pipeline and destination prerequisites.
2

3
use std::collections::HashMap;
4

5
use async_trait::async_trait;
6
use etl::store::both::memory::MemoryStore;
7
use etl_config::parse_ducklake_url;
8
use etl_destinations::{
9
    bigquery::BigQueryClient,
10
    ducklake::{DuckLakeDestination, S3Config as DucklakeS3Config},
11
    iceberg::{IcebergClient, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_SECRET_ACCESS_KEY},
12
};
13
use secrecy::ExposeSecret;
14
use sqlx::FromRow;
15

16
use super::{ValidationContext, ValidationError, ValidationFailure, Validator};
17
use crate::configs::{
18
    destination::{FullApiDestinationConfig, FullApiIcebergConfig},
19
    pipeline::FullApiPipelineConfig,
20
};
21

22
const ETL_SCHEMA_NAME: &str = "etl";
23

24
/// Validates the connected source role profile for ETL.
25
#[derive(Debug)]
26
pub struct SourceValidator;
27

28
#[derive(Debug, FromRow)]
29
struct SourceRoleAudit {
30
    rolcanlogin: bool,
31
    rolreplication: bool,
32
    rolbypassrls: bool,
33
    rolcreaterole: bool,
34
    rolcreatedb: bool,
35
    rolinherit: bool,
36
    rolvaliduntil_is_null: bool,
37
    etl_schema_exists: bool,
38
    etl_schema_usage: Option<bool>,
39
    etl_schema_create: Option<bool>,
40
    controls_all_existing_etl_tables: Option<bool>,
41
    can_create_schema_if_missing: Option<bool>,
42
}
43

44
#[async_trait]
45
impl Validator for SourceValidator {
46
    async fn validate(
47
        &self,
48
        ctx: &ValidationContext,
49
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
28✔
50
        let Some(expected_username) = ctx.trusted_username.as_ref() else {
51
            return Ok(vec![]);
52
        };
53

54
        let source_pool =
55
            ctx.source_pool.as_ref().expect("source pool required for source validation");
56

57
        let current_user: String =
58
            sqlx::query_scalar("select current_user").fetch_one(source_pool).await?;
59

60
        if current_user != *expected_username {
61
            return Ok(vec![ValidationFailure::critical(
62
                "Invalid source username",
63
                format!("connected as '{current_user}' but expected '{expected_username}'"),
64
            )]);
65
        }
66

67
        // This validation is best effort: it relies on catalog metadata and
68
        // privilege checks to confirm the trusted role profile without running
69
        // invasive probes against the customer database.
70
        let audit = sqlx::query_as::<_, SourceRoleAudit>(
71
            r#"
72
            with target as (
73
              -- Load the direct role attributes for the trusted ETL user.
74
              select
75
                oid,
76
                rolcanlogin,
77
                rolreplication,
78
                rolbypassrls,
79
                rolcreaterole,
80
                rolcreatedb,
81
                rolinherit,
82
                rolvaliduntil is null as rolvaliduntil_is_null
83
              from pg_roles
84
              where rolname = $1
85
            ),
86
            etl_schema as (
87
              -- Check whether the etl schema already exists.
88
              select oid
89
              from pg_namespace
90
              where nspname = $2
91
            ),
92
            etl_tables as (
93
              -- List the existing tables in the etl schema, if present.
94
              select
95
                c.relowner
96
              from pg_class c
97
              join etl_schema s on s.oid = c.relnamespace
98
              where c.relkind in ('r', 'p')
99
            ),
100
            etl_table_ownership as (
101
              -- Determine whether the trusted role controls every existing ETL table.
102
              -- pg_has_role(..., 'USAGE') means the owning role's privileges are
103
              -- immediately available without requiring SET ROLE, which matches
104
              -- how ETL connects and operates.
105
              select
106
                coalesce(bool_and(pg_has_role($1, relowner, 'USAGE')), true)
107
                  as controls_all_existing_etl_tables
108
              from etl_tables
109
            )
110
            select
111
              t.rolcanlogin,
112
              t.rolreplication,
113
              t.rolbypassrls,
114
              t.rolcreaterole,
115
              t.rolcreatedb,
116
              t.rolinherit,
117
              t.rolvaliduntil_is_null,
118
              exists(select 1 from etl_schema) as etl_schema_exists,
119
              case
120
                when exists(select 1 from etl_schema)
121
                then has_schema_privilege($1, $2, 'USAGE')
122
                else null
123
              end as etl_schema_usage,
124
              case
125
                when exists(select 1 from etl_schema)
126
                then has_schema_privilege($1, $2, 'CREATE')
127
                else null
128
              end as etl_schema_create,
129
              case
130
                when exists(select 1 from etl_schema)
131
                then (select controls_all_existing_etl_tables from etl_table_ownership)
132
                else null
133
              end as controls_all_existing_etl_tables,
134
              case
135
                when not exists(select 1 from etl_schema)
136
                then has_database_privilege($1, current_database(), 'CREATE')
137
                else null
138
              end as can_create_schema_if_missing
139
            from target t
140
            "#,
141
        )
142
        .bind(expected_username)
143
        .bind(ETL_SCHEMA_NAME)
144
        .fetch_optional(source_pool)
145
        .await?;
146

147
        let Some(audit) = audit else {
148
            return Ok(vec![ValidationFailure::critical(
149
                "Invalid source role attributes",
150
                "role not found",
151
            )]);
152
        };
153

154
        let has_required_role_attributes = audit.rolcanlogin
155
            && audit.rolreplication
156
            && audit.rolbypassrls
157
            && !audit.rolcreaterole
158
            && !audit.rolcreatedb
159
            && audit.rolinherit
160
            && audit.rolvaliduntil_is_null;
161

162
        let mut failures = Vec::new();
163
        if !has_required_role_attributes {
164
            failures.push(ValidationFailure::critical(
165
                "Invalid source role attributes",
166
                "The source database does not grant the trusted username role all permissions ETL \
167
                 needs to work properly.",
168
            ));
169
        }
170

171
        let has_required_etl_schema_permissions = if audit.etl_schema_exists {
172
            audit.etl_schema_usage == Some(true)
173
                && audit.etl_schema_create == Some(true)
174
                && audit.controls_all_existing_etl_tables == Some(true)
175
        } else {
176
            audit.can_create_schema_if_missing == Some(true)
177
        };
178

179
        if !has_required_etl_schema_permissions {
180
            failures.push(ValidationFailure::critical(
181
                "Invalid source etl schema permissions",
182
                format!(
183
                    "The source database does not grant the trusted username role all permissions \
184
                     ETL needs to manage schema {ETL_SCHEMA_NAME} properly."
185
                ),
186
            ));
187
        }
188

189
        Ok(failures)
190
    }
28✔
191
}
192

193
/// Validates that the required publication exists in the source database.
194
#[derive(Debug)]
195
pub struct PublicationExistsValidator {
196
    publication_name: String,
197
}
198

199
impl PublicationExistsValidator {
200
    pub fn new(publication_name: String) -> Self {
9✔
201
        Self { publication_name }
9✔
202
    }
9✔
203
}
204

205
#[async_trait]
206
impl Validator for PublicationExistsValidator {
207
    async fn validate(
208
        &self,
209
        ctx: &ValidationContext,
210
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
211
        let source_pool =
212
            ctx.source_pool.as_ref().expect("source pool required for publication validation");
213

214
        let exists: bool =
215
            sqlx::query_scalar("select exists(select 1 from pg_publication where pubname = $1)")
216
                .bind(&self.publication_name)
217
                .fetch_one(source_pool)
218
                .await?;
219

220
        if exists {
221
            Ok(vec![])
222
        } else {
223
            Ok(vec![ValidationFailure::critical(
224
                "Publication Not Found",
225
                format!(
226
                    "Publication '{}' does not exist in the source database. Create it with: \
227
                     CREATE PUBLICATION {} FOR TABLE <table_name>, ...",
228
                    self.publication_name, self.publication_name
229
                ),
230
            )])
231
        }
232
    }
9✔
233
}
234

235
/// Validates that there are enough free replication slots for the pipeline.
236
#[derive(Debug)]
237
pub struct ReplicationSlotsValidator {
238
    max_table_sync_workers: u16,
239
}
240

241
impl ReplicationSlotsValidator {
242
    pub fn new(max_table_sync_workers: u16) -> Self {
9✔
243
        Self { max_table_sync_workers }
9✔
244
    }
9✔
245
}
246

247
#[async_trait]
248
impl Validator for ReplicationSlotsValidator {
249
    async fn validate(
250
        &self,
251
        ctx: &ValidationContext,
252
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
253
        let source_pool = ctx
254
            .source_pool
255
            .as_ref()
256
            .expect("source pool required for replication slots validation");
257

258
        let max_slots: i32 = sqlx::query_scalar(
259
            "select setting::int from pg_settings where name = 'max_replication_slots'",
260
        )
261
        .fetch_one(source_pool)
262
        .await?;
263

264
        let used_slots: i64 = sqlx::query_scalar("select count(*) from pg_replication_slots")
265
            .fetch_one(source_pool)
266
            .await?;
267

268
        let free_slots = max_slots as i64 - used_slots;
269
        // We need 1 slot for the apply worker plus at most `max_table_sync_workers`
270
        // other slots for table sync workers.
271
        let required_slots = self.max_table_sync_workers as i64 + 1;
272

273
        if required_slots <= free_slots {
274
            Ok(vec![])
275
        } else {
276
            Ok(vec![ValidationFailure::critical(
277
                "Insufficient Replication Slots",
278
                format!(
279
                    "Not enough replication slots available.\nFound {free_slots} free slots, but \
280
                     {required_slots} are required at most during initial table copy \
281
                     ({used_slots}/{max_slots} currently in use).\nOnce all tables are copied, \
282
                     only 1 slot will be used.\n\nPlease verify:\n(1) max_replication_slots in \
283
                     postgresql.conf is sufficient\n(2) Unused replication slots can be \
284
                     removed\n(3) max_table_sync_workers can be reduced if needed",
285
                ),
286
            )])
287
        }
288
    }
9✔
289
}
290

291
/// Validates that the WAL level is set to 'logical' for replication.
292
#[derive(Debug)]
293
pub struct WalLevelValidator;
294

295
#[async_trait]
296
impl Validator for WalLevelValidator {
297
    async fn validate(
298
        &self,
299
        ctx: &ValidationContext,
300
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
301
        let source_pool =
302
            ctx.source_pool.as_ref().expect("source pool required for WAL level validation");
303

304
        let wal_level: String = sqlx::query_scalar("select current_setting('wal_level')")
305
            .fetch_one(source_pool)
306
            .await?;
307

308
        if wal_level == "logical" {
309
            Ok(vec![])
310
        } else {
311
            Ok(vec![ValidationFailure::critical(
312
                "Invalid WAL Level",
313
                format!(
314
                    "WAL level is set to '{wal_level}', but must be 'logical' for replication. \
315
                     Update postgresql.conf with: wal_level = 'logical' and restart PostgreSQL"
316
                ),
317
            )])
318
        }
319
    }
9✔
320
}
321

322
/// Validates that the database user has replication permissions.
323
#[derive(Debug)]
324
pub struct ReplicationPermissionsValidator;
325

326
#[async_trait]
327
impl Validator for ReplicationPermissionsValidator {
328
    async fn validate(
329
        &self,
330
        ctx: &ValidationContext,
331
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
332
        let source_pool = ctx
333
            .source_pool
334
            .as_ref()
335
            .expect("source pool required for replication permissions validation");
336

337
        // Check if user is superuser OR has replication privilege
338
        let has_permission: bool = sqlx::query_scalar(
339
            "select rolsuper or rolreplication from pg_roles where rolname = current_user",
340
        )
341
        .fetch_one(source_pool)
342
        .await?;
343

344
        if has_permission {
345
            Ok(vec![])
346
        } else {
347
            Ok(vec![ValidationFailure::critical(
348
                "Missing Replication Permission",
349
                "The database user does not have replication privileges",
350
            )])
351
        }
352
    }
9✔
353
}
354

355
/// Validates that a publication contains at least one table.
356
#[derive(Debug)]
357
pub struct PublicationHasTablesValidator {
358
    publication_name: String,
359
}
360

361
impl PublicationHasTablesValidator {
362
    pub fn new(publication_name: String) -> Self {
9✔
363
        Self { publication_name }
9✔
364
    }
9✔
365
}
366

367
#[async_trait]
368
impl Validator for PublicationHasTablesValidator {
369
    async fn validate(
370
        &self,
371
        ctx: &ValidationContext,
372
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
373
        let source_pool = ctx
374
            .source_pool
375
            .as_ref()
376
            .expect("source pool required for publication tables validation");
377

378
        // Check if publication publishes all tables or has specific tables
379
        let result: Option<(bool, i64)> = sqlx::query_as(
380
            r#"
381
            select
382
                p.puballtables,
383
                (select count(*) from pg_publication_tables pt where pt.pubname = p.pubname)
384
            from pg_publication p
385
            where p.pubname = $1
386
            "#,
387
        )
388
        .bind(&self.publication_name)
389
        .fetch_optional(source_pool)
390
        .await?;
391

392
        // If publication doesn't exist, skip this check (PublicationExistsValidator
393
        // handles it)
394
        let Some((puballtables, table_count)) = result else {
395
            return Ok(vec![]);
396
        };
397

398
        if puballtables || table_count > 0 {
399
            Ok(vec![])
400
        } else {
401
            Ok(vec![ValidationFailure::critical(
402
                "Publication Empty",
403
                format!(
404
                    "Publication '{}' exists but contains no tables.\n\nAdd tables with: ALTER \
405
                     PUBLICATION {} ADD TABLE <table_name>",
406
                    self.publication_name, self.publication_name
407
                ),
408
            )])
409
        }
410
    }
9✔
411
}
412

413
/// Validates that all tables in a publication have primary keys.
414
#[derive(Debug)]
415
pub struct PrimaryKeysValidator {
416
    publication_name: String,
417
}
418

419
impl PrimaryKeysValidator {
420
    pub fn new(publication_name: String) -> Self {
9✔
421
        Self { publication_name }
9✔
422
    }
9✔
423
}
424

425
#[async_trait]
426
impl Validator for PrimaryKeysValidator {
427
    async fn validate(
428
        &self,
429
        ctx: &ValidationContext,
430
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
431
        let source_pool =
432
            ctx.source_pool.as_ref().expect("source pool required for primary keys validation");
433

434
        // Find tables without primary keys using pg_publication_rel for direct OID
435
        // access
436
        let tables_without_pk: Vec<String> = sqlx::query_scalar(
437
            r#"
438
            select n.nspname || '.' || c.relname
439
            from pg_publication_rel pr
440
            join pg_publication p on p.oid = pr.prpubid
441
            join pg_class c on c.oid = pr.prrelid
442
            join pg_namespace n on n.oid = c.relnamespace
443
            where p.pubname = $1
444
              and not exists (
445
                select 1
446
                from pg_constraint con
447
                where con.conrelid = pr.prrelid
448
                  and con.contype = 'p'
449
              )
450
            order by n.nspname, c.relname
451
            limit 100
452
            "#,
453
        )
454
        .bind(&self.publication_name)
455
        .fetch_all(source_pool)
456
        .await?;
457

458
        if tables_without_pk.is_empty() {
459
            Ok(vec![])
460
        } else {
461
            Ok(vec![ValidationFailure::warning(
462
                "Tables Missing Primary Keys",
463
                format!(
464
                    "Tables without primary keys: {}\n\nPrimary keys are required for UPDATE and \
465
                     DELETE replication.",
466
                    tables_without_pk.join(", ")
467
                ),
468
            )])
469
        }
470
    }
9✔
471
}
472

473
/// Validates that tables in a publication don't have generated columns.
474
#[derive(Debug)]
475
pub struct GeneratedColumnsValidator {
476
    publication_name: String,
477
}
478

479
impl GeneratedColumnsValidator {
480
    pub fn new(publication_name: String) -> Self {
9✔
481
        Self { publication_name }
9✔
482
    }
9✔
483
}
484

485
#[async_trait]
486
impl Validator for GeneratedColumnsValidator {
487
    async fn validate(
488
        &self,
489
        ctx: &ValidationContext,
490
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
491
        let source_pool = ctx
492
            .source_pool
493
            .as_ref()
494
            .expect("source pool required for generated columns validation");
495

496
        // Find tables with generated columns using pg_publication_rel for direct OID
497
        // access
498
        let tables_with_generated: Vec<String> = sqlx::query_scalar(
499
            r#"
500
            select distinct n.nspname || '.' || c.relname
501
            from pg_publication_rel pr
502
            join pg_publication p on p.oid = pr.prpubid
503
            join pg_class c on c.oid = pr.prrelid
504
            join pg_namespace n on n.oid = c.relnamespace
505
            where p.pubname = $1
506
              and exists (
507
                select 1
508
                from pg_attribute a
509
                where a.attrelid = pr.prrelid
510
                  and a.attnum > 0
511
                  and not a.attisdropped
512
                  and a.attgenerated != ''
513
              )
514
            order by 1
515
            limit 100
516
            "#,
517
        )
518
        .bind(&self.publication_name)
519
        .fetch_all(source_pool)
520
        .await?;
521

522
        if tables_with_generated.is_empty() {
523
            Ok(vec![])
524
        } else {
525
            Ok(vec![ValidationFailure::warning(
526
                "Tables With Generated Columns",
527
                format!(
528
                    "Tables with generated columns: {}\n\nGenerated columns cannot be replicated \
529
                     and will be excluded from the destination.",
530
                    tables_with_generated.join(", ")
531
                ),
532
            )])
533
        }
534
    }
9✔
535
}
536

537
/// Composite validator for pipeline prerequisites.
538
#[derive(Debug)]
539
pub struct PipelineValidator {
540
    config: FullApiPipelineConfig,
541
}
542

543
impl PipelineValidator {
544
    pub fn new(config: FullApiPipelineConfig) -> Self {
9✔
545
        Self { config }
9✔
546
    }
9✔
547

548
    fn sub_validators(&self) -> Vec<Box<dyn Validator>> {
9✔
549
        let max_table_sync_workers = self.config.max_table_sync_workers.unwrap_or(4);
9✔
550
        let publication_name = self.config.publication_name.clone();
9✔
551

552
        vec![
9✔
553
            Box::new(WalLevelValidator),
9✔
554
            Box::new(ReplicationPermissionsValidator),
9✔
555
            Box::new(PublicationExistsValidator::new(publication_name.clone())),
9✔
556
            Box::new(PublicationHasTablesValidator::new(publication_name.clone())),
9✔
557
            Box::new(PrimaryKeysValidator::new(publication_name.clone())),
9✔
558
            Box::new(GeneratedColumnsValidator::new(publication_name)),
9✔
559
            Box::new(ReplicationSlotsValidator::new(max_table_sync_workers)),
9✔
560
        ]
561
    }
9✔
562
}
563

564
#[async_trait]
565
impl Validator for PipelineValidator {
566
    async fn validate(
567
        &self,
568
        ctx: &ValidationContext,
569
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
9✔
570
        let mut failures = Vec::new();
571

572
        for validator in self.sub_validators() {
573
            failures.extend(validator.validate(ctx).await?);
574
        }
575

576
        Ok(failures)
577
    }
9✔
578
}
579

580
/// Validates BigQuery destination connectivity and dataset accessibility.
581
#[derive(Debug)]
582
struct BigQueryValidator {
583
    project_id: String,
584
    dataset_id: String,
585
    service_account_key: String,
586
}
587

588
impl BigQueryValidator {
589
    fn new(project_id: String, dataset_id: String, service_account_key: String) -> Self {
4✔
590
        Self { project_id, dataset_id, service_account_key }
4✔
591
    }
4✔
592
}
593

594
#[async_trait]
595
impl Validator for BigQueryValidator {
596
    async fn validate(
597
        &self,
598
        _ctx: &ValidationContext,
599
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
4✔
600
        let Ok(client) =
601
            BigQueryClient::new_with_key(self.project_id.clone(), &self.service_account_key, 1)
602
                .await
603
        else {
604
            return Ok(vec![ValidationFailure::critical(
605
                "BigQuery Authentication Failed",
606
                "Unable to authenticate with BigQuery.\n\nPlease verify:\n(1) The service account \
607
                 key is valid JSON\n(2) The key has not expired or been revoked\n(3) The project \
608
                 ID is correct",
609
            )]);
610
        };
611

612
        match client.dataset_exists(&self.dataset_id).await {
613
            Ok(true) => Ok(vec![]),
614
            Ok(false) => Ok(vec![ValidationFailure::critical(
615
                "BigQuery Dataset Not Found",
616
                format!(
617
                    "Dataset '{}' does not exist in project '{}'.\n\nPlease verify:\n(1) The \
618
                     dataset name is correct\n(2) The dataset exists in the specified \
619
                     project\n(3) The service account has permission to access it",
620
                    self.dataset_id, self.project_id
621
                ),
622
            )]),
623
            Err(_) => Ok(vec![ValidationFailure::critical(
624
                "BigQuery Connection Failed",
625
                "Unable to connect to BigQuery.\n\nPlease verify:\n(1) Network connectivity to \
626
                 Google Cloud\n(2) The service account has the required permissions (BigQuery \
627
                 Data Editor, BigQuery Job User)\n(3) BigQuery API is enabled for your project",
628
            )]),
629
        }
630
    }
4✔
631
}
632

633
/// Validates Iceberg destination connectivity.
634
#[derive(Debug)]
635
struct IcebergValidator {
636
    config: FullApiIcebergConfig,
637
}
638

639
/// Validates DuckLake destination connectivity.
640
#[derive(Debug)]
641
struct DucklakeValidator {
642
    catalog_url: String,
643
    data_path: String,
644
    pool_size: u32,
645
    s3_access_key_id: Option<String>,
646
    s3_secret_access_key: Option<String>,
647
    s3_region: Option<String>,
648
    s3_endpoint: Option<String>,
649
    s3_url_style: Option<String>,
650
    s3_use_ssl: Option<bool>,
651
    metadata_schema: Option<String>,
652
    duckdb_memory_cache_limit: Option<String>,
653
    maintenance_target_file_size: Option<String>,
654
}
655

656
impl DucklakeValidator {
657
    #[allow(clippy::too_many_arguments)]
658
    fn new(
×
659
        catalog_url: String,
×
660
        data_path: String,
×
661
        pool_size: u32,
×
662
        s3_access_key_id: Option<String>,
×
663
        s3_secret_access_key: Option<String>,
×
664
        s3_region: Option<String>,
×
665
        s3_endpoint: Option<String>,
×
666
        s3_url_style: Option<String>,
×
667
        s3_use_ssl: Option<bool>,
×
668
        metadata_schema: Option<String>,
×
NEW
669
        duckdb_memory_cache_limit: Option<String>,
×
NEW
670
        maintenance_target_file_size: Option<String>,
×
671
    ) -> Self {
×
672
        Self {
×
673
            catalog_url,
×
674
            data_path,
×
675
            pool_size,
×
676
            s3_access_key_id,
×
677
            s3_secret_access_key,
×
678
            s3_region,
×
679
            s3_endpoint,
×
680
            s3_url_style,
×
681
            s3_use_ssl,
×
682
            metadata_schema,
×
NEW
683
            duckdb_memory_cache_limit,
×
NEW
684
            maintenance_target_file_size,
×
685
        }
×
686
    }
×
687
}
688

689
#[async_trait]
690
impl Validator for DucklakeValidator {
691
    async fn validate(
692
        &self,
693
        _ctx: &ValidationContext,
694
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
×
695
        match (&self.s3_access_key_id, &self.s3_secret_access_key) {
696
            (Some(_), None) | (None, Some(_)) => {
697
                return Ok(vec![ValidationFailure::critical(
698
                    "Ducklake S3 Configuration Invalid",
699
                    "DuckLake S3 credentials must include both access key ID and secret access \
700
                     key.",
701
                )]);
702
            }
703
            _ => {}
704
        }
705

706
        let catalog_url = match parse_ducklake_url(&self.catalog_url) {
707
            Ok(url) => url,
708
            Err(error) => {
709
                return Ok(vec![ValidationFailure::critical(
710
                    "Ducklake Catalog Url Invalid",
711
                    error.to_string(),
712
                )]);
713
            }
714
        };
715

716
        let data_path = match parse_ducklake_url(&self.data_path) {
717
            Ok(url) => url,
718
            Err(error) => {
719
                return Ok(vec![ValidationFailure::critical(
720
                    "Ducklake Data Path Invalid",
721
                    error.to_string(),
722
                )]);
723
            }
724
        };
725

726
        let s3_config = self.s3_access_key_id.clone().map(|access_key_id| DucklakeS3Config {
727
            access_key_id,
×
728
            secret_access_key: self
×
729
                .s3_secret_access_key
×
730
                .clone()
×
731
                .expect("ducklake s3 secret access key should be present"),
×
732
            region: self.s3_region.clone().unwrap_or_else(|| "us-east-1".to_string()),
×
733
            endpoint: self.s3_endpoint.clone(),
×
734
            url_style: self.s3_url_style.clone().unwrap_or_else(|| "path".to_string()),
×
735
            use_ssl: self.s3_use_ssl.unwrap_or(false),
×
736
        });
×
737

738
        match DuckLakeDestination::new(
739
            catalog_url,
740
            data_path,
741
            self.pool_size,
742
            s3_config,
743
            self.metadata_schema.clone(),
744
            self.duckdb_memory_cache_limit.clone(),
745
            self.maintenance_target_file_size.clone(),
746
            MemoryStore::new(),
747
        )
748
        .await
749
        {
750
            Ok(_) => Ok(vec![]),
751
            Err(_) => Ok(vec![ValidationFailure::critical(
752
                "Ducklake Connection Failed",
753
                "Unable to connect to DuckLake.\n\nPlease verify:\n(1) The catalog URL and data \
754
                 path are valid and reachable\n(2) DuckLake catalog credentials are embedded \
755
                 correctly in the catalog URL\n(3) The S3-compatible credentials and endpoint are \
756
                 correct when using object storage",
757
            )]),
758
        }
759
    }
×
760
}
761

762
impl IcebergValidator {
763
    fn new(config: FullApiIcebergConfig) -> Self {
2✔
764
        Self { config }
2✔
765
    }
2✔
766
}
767

768
#[async_trait]
769
impl Validator for IcebergValidator {
770
    async fn validate(
771
        &self,
772
        ctx: &ValidationContext,
773
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
2✔
774
        let client = match &self.config {
775
            FullApiIcebergConfig::Supabase {
776
                project_ref,
777
                warehouse_name,
778
                catalog_token,
779
                s3_access_key_id,
780
                s3_secret_access_key,
781
                s3_region,
782
                ..
783
            } => {
784
                IcebergClient::new_with_supabase_catalog(
785
                    project_ref,
786
                    ctx.environment.get_supabase_domain(),
787
                    catalog_token.expose_secret().to_string(),
788
                    warehouse_name.clone(),
789
                    s3_access_key_id.expose_secret().to_string(),
790
                    s3_secret_access_key.expose_secret().to_string(),
791
                    s3_region.clone(),
792
                )
793
                .await
794
            }
795
            FullApiIcebergConfig::Rest {
796
                catalog_uri,
797
                warehouse_name,
798
                s3_access_key_id,
799
                s3_secret_access_key,
800
                s3_endpoint,
801
                ..
802
            } => {
803
                let mut props = HashMap::new();
804
                props.insert(
805
                    S3_ACCESS_KEY_ID.to_string(),
806
                    s3_access_key_id.expose_secret().to_string(),
807
                );
808
                props.insert(
809
                    S3_SECRET_ACCESS_KEY.to_string(),
810
                    s3_secret_access_key.expose_secret().to_string(),
811
                );
812
                props.insert(S3_ENDPOINT.to_string(), s3_endpoint.clone());
813

814
                IcebergClient::new_with_rest_catalog(
815
                    catalog_uri.clone(),
816
                    warehouse_name.clone(),
817
                    props,
818
                )
819
                .await
820
            }
821
        };
822
        let Ok(client) = client else {
823
            return Ok(vec![ValidationFailure::critical(
824
                "Iceberg Authentication Failed",
825
                "Unable to authenticate with Iceberg.\n\nPlease verify:\n(1) The catalog token is \
826
                 valid and has not expired\n(2) The S3 access key and secret key are correct\n(3) \
827
                 The catalog URI is properly formatted",
828
            )]);
829
        };
830

831
        match client.validate_connectivity().await {
832
            Ok(()) => Ok(vec![]),
833
            Err(_) => Ok(vec![ValidationFailure::critical(
834
                "Iceberg Connection Failed",
835
                "Unable to connect to Iceberg catalog.\n\nPlease verify:\n(1) Network \
836
                 connectivity to the catalog and S3\n(2) The warehouse name exists in the \
837
                 catalog\n(3) You have the required permissions to access the warehouse\n(4) The \
838
                 S3 endpoint is reachable",
839
            )]),
840
        }
841
    }
2✔
842
}
843

844
/// Composite validator for destination prerequisites.
845
#[derive(Debug)]
846
pub struct DestinationValidator {
847
    config: FullApiDestinationConfig,
848
}
849

850
impl DestinationValidator {
851
    pub fn new(config: FullApiDestinationConfig) -> Self {
6✔
852
        Self { config }
6✔
853
    }
6✔
854
}
855

856
#[async_trait]
857
impl Validator for DestinationValidator {
858
    async fn validate(
859
        &self,
860
        ctx: &ValidationContext,
861
    ) -> Result<Vec<ValidationFailure>, ValidationError> {
6✔
862
        match &self.config {
863
            FullApiDestinationConfig::BigQuery {
864
                project_id,
865
                dataset_id,
866
                service_account_key,
867
                ..
868
            } => {
869
                let validator = BigQueryValidator::new(
870
                    project_id.clone(),
871
                    dataset_id.clone(),
872
                    service_account_key.expose_secret().to_string(),
873
                );
874
                validator.validate(ctx).await
875
            }
876
            FullApiDestinationConfig::Iceberg { config } => {
877
                let validator = IcebergValidator::new(config.clone());
878
                validator.validate(ctx).await
879
            }
880
            FullApiDestinationConfig::Ducklake {
881
                catalog_url,
882
                data_path,
883
                pool_size,
884
                s3_access_key_id,
885
                s3_secret_access_key,
886
                s3_region,
887
                s3_endpoint,
888
                s3_url_style,
889
                s3_use_ssl,
890
                metadata_schema,
891
                duckdb_memory_cache_limit,
892
                maintenance_target_file_size,
893
            } => {
894
                let validator = DucklakeValidator::new(
895
                    catalog_url.clone(),
896
                    data_path.clone(),
897
                    pool_size.unwrap_or(
898
                        etl_config::shared::DestinationConfig::DEFAULT_DUCKLAKE_POOL_SIZE,
899
                    ),
900
                    s3_access_key_id.as_ref().map(|value| value.expose_secret().to_string()),
×
901
                    s3_secret_access_key.as_ref().map(|value| value.expose_secret().to_string()),
×
902
                    s3_region.clone(),
903
                    s3_endpoint.clone(),
904
                    s3_url_style.clone(),
905
                    *s3_use_ssl,
906
                    metadata_schema.clone(),
907
                    duckdb_memory_cache_limit.clone(),
908
                    maintenance_target_file_size.clone(),
909
                );
910
                validator.validate(ctx).await
911
            }
912
        }
913
    }
6✔
914
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc