• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 20740294215

06 Jan 2026 06:33AM UTC coverage: 81.415% (-0.3%) from 81.691%
20740294215

push

github

web-flow
feat(api): Implement validation endpoints (#520)

304 of 453 new or added lines in 17 files covered. (67.11%)

140 existing lines in 13 files now uncovered.

17129 of 21039 relevant lines covered (81.42%)

197.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.62
/etl-api/src/validation/mod.rs
1
//! Validation framework for ETL destinations and pipelines.
2
//!
3
//! Provides a trait-based validation framework for checking configuration
4
//! and runtime requirements before creating destinations or pipelines.
5

6
mod validators;
7

8
use std::fmt;
9

10
use async_trait::async_trait;
11
use etl_config::Environment;
12
use serde::{Deserialize, Serialize};
13
use sqlx::PgPool;
14
use thiserror::Error;
15
use utoipa::ToSchema;
16

17
use crate::configs::destination::FullApiDestinationConfig;
18
use crate::configs::pipeline::FullApiPipelineConfig;
19
use crate::validation::validators::{DestinationValidator, PipelineValidator};
20

21
/// Shared context provided to validators during validation.
22
pub struct ValidationContext {
23
    /// Runtime environment for environment-specific configuration.
24
    pub environment: Environment,
25
    /// Connection pool to the source PostgreSQL database.
26
    /// Required for pipeline validation, optional for destination validation.
27
    pub source_pool: Option<PgPool>,
28
}
29

30
impl ValidationContext {
31
    /// Creates a new validation context builder.
32
    pub fn builder(environment: Environment) -> ValidationContextBuilder {
56✔
33
        ValidationContextBuilder {
56✔
34
            environment,
56✔
35
            source_pool: None,
56✔
36
        }
56✔
37
    }
56✔
38
}
39

40
/// Builder for constructing a [`ValidationContext`].
41
pub struct ValidationContextBuilder {
42
    environment: Environment,
43
    source_pool: Option<PgPool>,
44
}
45

46
impl ValidationContextBuilder {
47
    /// Sets the source database connection pool.
48
    pub fn source_pool(mut self, pool: PgPool) -> Self {
32✔
49
        self.source_pool = Some(pool);
32✔
50
        self
32✔
51
    }
32✔
52

53
    /// Builds the [`ValidationContext`].
54
    pub fn build(self) -> ValidationContext {
56✔
55
        ValidationContext {
56✔
56
            environment: self.environment,
56✔
57
            source_pool: self.source_pool,
56✔
58
        }
56✔
59
    }
56✔
60
}
61

62
/// Severity level of a validation failure.
63
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, ToSchema)]
64
#[serde(rename_all = "snake_case")]
65
pub enum FailureType {
66
    /// Critical failures that prevent the pipeline from running correctly.
67
    Critical,
68
    /// Warnings that don't prevent operation but indicate potential issues.
69
    Warning,
70
}
71

72
/// A validation failure with details about what failed.
73
#[derive(Debug, Clone)]
74
pub struct ValidationFailure {
75
    /// Name identifying what failed.
76
    pub name: String,
77
    /// Human-readable reason for the failure.
78
    pub reason: String,
79
    /// Severity of the failure.
80
    pub failure_type: FailureType,
81
}
82

83
impl ValidationFailure {
84
    /// Creates a new critical validation failure.
85
    pub fn critical(name: impl Into<String>, reason: impl Into<String>) -> Self {
24✔
86
        Self {
24✔
87
            name: name.into(),
24✔
88
            reason: reason.into(),
24✔
89
            failure_type: FailureType::Critical,
24✔
90
        }
24✔
91
    }
24✔
92

93
    /// Creates a new warning validation failure.
94
    pub fn warning(name: impl Into<String>, reason: impl Into<String>) -> Self {
12✔
95
        Self {
12✔
96
            name: name.into(),
12✔
97
            reason: reason.into(),
12✔
98
            failure_type: FailureType::Warning,
12✔
99
        }
12✔
100
    }
12✔
101
}
102

103
impl fmt::Display for ValidationFailure {
104
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
8✔
105
        write!(f, "{}: {}", self.name, self.reason)
8✔
106
    }
8✔
107
}
108

109
/// Errors that can occur during validation execution.
110
#[derive(Debug, Error)]
111
pub enum ValidationError {
112
    /// Failed to execute a database query.
113
    #[error("database query failed: {0}")]
114
    Database(#[from] sqlx::Error),
115

116
    /// Failed to connect to BigQuery.
117
    #[error("bigquery connection failed: {0}")]
118
    BigQuery(String),
119

120
    /// Failed to connect to Iceberg catalog.
121
    #[error("iceberg connection failed: {0}")]
122
    Iceberg(String),
123
}
124

125
/// Trait for implementing validation checks.
126
#[async_trait]
127
pub trait Validator: Send + Sync {
128
    /// Executes the validation check and returns a list of failures.
129
    /// An empty list means validation passed. Returns an error if validation
130
    /// could not be completed due to connection or configuration issues.
131
    async fn validate(
132
        &self,
133
        ctx: &ValidationContext,
134
    ) -> Result<Vec<ValidationFailure>, ValidationError>;
135
}
136

137
/// Validates destination configuration.
138
///
139
/// Returns a list of validation failures. Empty list means validation passed.
140
/// Returns an error if validation could not be completed.
141
///
142
/// Checks that the destination is accessible and properly configured:
143
/// - **BigQuery**: Validates dataset exists and is accessible.
144
/// - **Iceberg**: Validates catalog connectivity.
145
/// - **Memory**: Always passes.
146
pub async fn validate_destination(
24✔
147
    ctx: &ValidationContext,
24✔
148
    destination_config: &FullApiDestinationConfig,
24✔
149
) -> Result<Vec<ValidationFailure>, ValidationError> {
24✔
150
    let validator = DestinationValidator::new(destination_config.clone());
24✔
151
    validator.validate(ctx).await
24✔
152
}
24✔
153

154
/// Validates pipeline configuration against the source database.
155
///
156
/// Returns a list of validation failures. Empty list means validation passed.
157
/// Returns an error if validation could not be completed.
158
///
159
/// Checks pipeline prerequisites:
160
/// - Publication exists in the source database.
161
/// - Sufficient replication slots are available.
162
pub async fn validate_pipeline(
32✔
163
    ctx: &ValidationContext,
32✔
164
    pipeline_config: &FullApiPipelineConfig,
32✔
165
) -> Result<Vec<ValidationFailure>, ValidationError> {
32✔
166
    let validator = PipelineValidator::new(pipeline_config.clone());
32✔
167
    validator.validate(ctx).await
32✔
168
}
32✔
169

170
/// Validates both destination and pipeline configuration.
171
///
172
/// Returns a list of validation failures. Empty list means validation passed.
173
/// Returns an error if validation could not be completed.
174
///
175
/// Runs all validators and collects all failures, returning them together
176
/// for comprehensive error reporting.
NEW
177
pub async fn validate_destination_pipeline(
×
NEW
178
    ctx: &ValidationContext,
×
NEW
179
    destination_config: &FullApiDestinationConfig,
×
NEW
180
    pipeline_config: &FullApiPipelineConfig,
×
NEW
181
) -> Result<Vec<ValidationFailure>, ValidationError> {
×
NEW
182
    let mut failures = Vec::new();
×
183

NEW
184
    failures.extend(validate_destination(ctx, destination_config).await?);
×
NEW
185
    failures.extend(validate_pipeline(ctx, pipeline_config).await?);
×
186

NEW
187
    Ok(failures)
×
NEW
188
}
×
189

190
#[cfg(test)]
191
mod tests {
192
    use super::*;
193

194
    #[tokio::test]
195
    async fn test_validation_failure_display() {
4✔
196
        let critical = ValidationFailure::critical("test_error", "Something wrong");
4✔
197
        assert_eq!(critical.to_string(), "test_error: Something wrong");
4✔
198
        assert_eq!(critical.failure_type, FailureType::Critical);
4✔
199

200
        let warning = ValidationFailure::warning("test_warning", "Something to note");
4✔
201
        assert_eq!(warning.to_string(), "test_warning: Something to note");
4✔
202
        assert_eq!(warning.failure_type, FailureType::Warning);
4✔
203
    }
4✔
204
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc