• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / etl / 15678617192

16 Jun 2025 10:41AM UTC coverage: 59.818% (+0.009%) from 59.809%
15678617192

push

github

web-flow
ref(postgres): Rename `PgDatabaseOptions` structs (#145)

24 of 31 new or added lines in 9 files covered. (77.42%)

2 existing lines in 2 files now uncovered.

5005 of 8367 relevant lines covered (59.82%)

235.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.63
/api/src/startup.rs
1
use std::{net::TcpListener, sync::Arc};
2

3
use actix_web::{dev::Server, web, App, HttpServer};
4
use actix_web_httpauth::middleware::HttpAuthentication;
5
use aws_lc_rs::aead::{RandomizedNonceKey, AES_256_GCM};
6
use base64::{prelude::BASE64_STANDARD, Engine};
7
use postgres::sqlx::config::PgConnectionConfig;
8
use sqlx::{postgres::PgPoolOptions, PgPool};
9
use tracing_actix_web::TracingLogger;
10
use utoipa::OpenApi;
11
use utoipa_swagger_ui::SwaggerUi;
12

13
use crate::{
14
    authentication::auth_validator,
15
    configuration::Settings,
16
    db::publications::Publication,
17
    encryption,
18
    k8s_client::HttpK8sClient,
19
    routes::{
20
        destinations::{
21
            create_destination, delete_destination, read_all_destinations, read_destination,
22
            update_destination, GetDestinationResponse, PostDestinationRequest,
23
            PostDestinationResponse,
24
        },
25
        destinations_pipelines::{
26
            create_destinations_and_pipelines, update_destinations_and_pipelines,
27
            PostDestinationPipelineRequest, PostDestinationPipelineResponse,
28
        },
29
        health_check::health_check,
30
        images::{
31
            create_image, delete_image, read_all_images, read_image, update_image,
32
            GetImageResponse, PostImageRequest, PostImageResponse,
33
        },
34
        pipelines::{
35
            create_pipeline, delete_pipeline, get_pipeline_status, read_all_pipelines,
36
            read_pipeline, start_pipeline, stop_all_pipelines, stop_pipeline, update_pipeline,
37
            GetPipelineResponse, PostPipelineRequest, PostPipelineResponse,
38
        },
39
        sources::{
40
            create_source, delete_source,
41
            publications::{
42
                create_publication, delete_publication, read_all_publications, read_publication,
43
                update_publication, CreatePublicationRequest, UpdatePublicationRequest,
44
            },
45
            read_all_sources, read_source,
46
            tables::read_table_names,
47
            update_source, GetSourceResponse, PostSourceRequest, PostSourceResponse,
48
        },
49
        tenants::{
50
            create_or_update_tenant, create_tenant, delete_tenant, read_all_tenants, read_tenant,
51
            update_tenant, CreateTenantRequest, GetTenantResponse, PostTenantResponse,
52
        },
53
        tenants_sources::{
54
            create_tenant_and_source, CreateTenantSourceRequest, PostTenantSourceResponse,
55
        },
56
    },
57
    span_builder::ApiRootSpanBuilder,
58
};
59

60
pub struct Application {
61
    port: u16,
62
    server: Server,
63
}
64

65
impl Application {
66
    pub async fn build(configuration: Settings) -> Result<Self, anyhow::Error> {
×
67
        let connection_pool = get_connection_pool(&configuration.database);
×
68

×
69
        let address = format!(
×
70
            "{}:{}",
×
71
            configuration.application.host, configuration.application.port
×
72
        );
×
73
        let listener = TcpListener::bind(address)?;
×
74
        let port = listener.local_addr()?.port();
×
75
        let key_bytes = BASE64_STANDARD.decode(&configuration.encryption_key.key)?;
×
76
        let key = RandomizedNonceKey::new(&AES_256_GCM, &key_bytes)?;
×
77
        let encryption_key = encryption::EncryptionKey {
×
78
            id: configuration.encryption_key.id,
×
79
            key,
×
80
        };
×
81
        let api_key = configuration.api_key;
×
82
        let k8s_client = HttpK8sClient::new().await?;
×
83
        let server = run(
×
84
            listener,
×
85
            connection_pool,
×
86
            encryption_key,
×
87
            api_key,
×
88
            Some(k8s_client),
×
89
        )
×
90
        .await?;
×
91

92
        Ok(Self { port, server })
×
93
    }
×
94

NEW
95
    pub async fn migrate_database(options: PgConnectionConfig) -> Result<(), anyhow::Error> {
×
96
        let connection_pool = get_connection_pool(&options);
×
97

×
98
        sqlx::migrate!("./migrations").run(&connection_pool).await?;
×
99

100
        Ok(())
×
101
    }
×
102

103
    pub fn port(&self) -> u16 {
×
104
        self.port
×
105
    }
×
106

107
    pub async fn run_until_stopped(self) -> Result<(), std::io::Error> {
×
108
        self.server.await
×
109
    }
×
110
}
111

NEW
112
pub fn get_connection_pool(config: &PgConnectionConfig) -> PgPool {
×
NEW
113
    PgPoolOptions::new().connect_lazy_with(config.with_db())
×
UNCOV
114
}
×
115

116
// HttpK8sClient is wrapped in an option because creating it
117
// in tests involves setting a default CryptoProvider and it
118
// interferes with parallel tasks because only one can be set.
119
pub async fn run(
59✔
120
    listener: TcpListener,
59✔
121
    connection_pool: PgPool,
59✔
122
    encryption_key: encryption::EncryptionKey,
59✔
123
    api_key: String,
59✔
124
    http_k8s_client: Option<HttpK8sClient>,
59✔
125
) -> Result<Server, anyhow::Error> {
59✔
126
    let connection_pool = web::Data::new(connection_pool);
59✔
127
    let encryption_key = web::Data::new(encryption_key);
59✔
128
    let api_key = web::Data::new(api_key);
59✔
129
    let k8s_client = http_k8s_client.map(|client| web::Data::new(Arc::new(client)));
59✔
130

131
    #[derive(OpenApi)]
×
132
    #[openapi(
133
        paths(
134
            crate::routes::health_check::health_check,
135
            crate::routes::images::create_image,
136
            crate::routes::images::read_image,
137
            crate::routes::images::update_image,
138
            crate::routes::images::delete_image,
139
            crate::routes::images::read_all_images,
140
            crate::routes::pipelines::create_pipeline,
141
            crate::routes::pipelines::read_pipeline,
142
            crate::routes::pipelines::update_pipeline,
143
            crate::routes::pipelines::delete_pipeline,
144
            crate::routes::pipelines::read_all_pipelines,
145
            crate::routes::pipelines::get_pipeline_status,
146
            crate::routes::tenants::create_tenant,
147
            crate::routes::tenants::create_or_update_tenant,
148
            crate::routes::tenants::read_tenant,
149
            crate::routes::tenants::update_tenant,
150
            crate::routes::tenants::delete_tenant,
151
            crate::routes::tenants::read_all_tenants,
152
            crate::routes::sources::create_source,
153
            crate::routes::sources::read_source,
154
            crate::routes::sources::update_source,
155
            crate::routes::sources::delete_source,
156
            crate::routes::sources::read_all_sources,
157
            crate::routes::sources::publications::create_publication,
158
            crate::routes::sources::publications::read_publication,
159
            crate::routes::sources::publications::update_publication,
160
            crate::routes::sources::publications::delete_publication,
161
            crate::routes::sources::publications::read_all_publications,
162
            crate::routes::sources::tables::read_table_names,
163
            crate::routes::destinations::create_destination,
164
            crate::routes::destinations::read_destination,
165
            crate::routes::destinations::update_destination,
166
            crate::routes::destinations::delete_destination,
167
            crate::routes::destinations::read_all_destinations,
168
            crate::routes::tenants_sources::create_tenant_and_source,
169
            crate::routes::destinations_pipelines::create_destinations_and_pipelines,
170
            crate::routes::destinations_pipelines::update_destinations_and_pipelines,
171
        ),
172
        components(schemas(
173
            PostImageRequest,
174
            PostImageResponse,
175
            GetImageResponse,
176
            PostPipelineRequest,
177
            PostPipelineResponse,
178
            GetPipelineResponse,
179
            CreateTenantRequest,
180
            PostTenantResponse,
181
            GetTenantResponse,
182
            PostSourceRequest,
183
            PostSourceResponse,
184
            GetSourceResponse,
185
            CreatePublicationRequest,
186
            UpdatePublicationRequest,
187
            Publication,
188
            PostDestinationRequest,
189
            PostDestinationResponse,
190
            GetDestinationResponse,
191
            CreateTenantSourceRequest,
192
            PostTenantSourceResponse,
193
            PostDestinationPipelineRequest,
194
            PostDestinationPipelineResponse,
195
        ))
196
    )]
197
    struct ApiDoc;
198

199
    //TODO: replace all the context_path = v1 in route modules with the nest attribute
200
    //when it is available in utoipa 5.0.0: https://github.com/juhaku/utoipa/pull/930
201
    let openapi = ApiDoc::openapi();
59✔
202

203
    let server = HttpServer::new(move || {
236✔
204
        let tracing_middleware = TracingLogger::<ApiRootSpanBuilder>::new();
236✔
205
        let authentication = HttpAuthentication::bearer(auth_validator);
236✔
206
        let app = App::new()
236✔
207
            .wrap(tracing_middleware)
236✔
208
            .service(health_check)
236✔
209
            .service(
236✔
210
                SwaggerUi::new("/swagger-ui/{_:.*}").url("/api-docs/openapi.json", openapi.clone()),
236✔
211
            )
236✔
212
            .service(
236✔
213
                web::scope("v1")
236✔
214
                    .wrap(authentication)
236✔
215
                    //tenants
236✔
216
                    .service(create_tenant)
236✔
217
                    .service(create_or_update_tenant)
236✔
218
                    .service(read_tenant)
236✔
219
                    .service(update_tenant)
236✔
220
                    .service(delete_tenant)
236✔
221
                    .service(read_all_tenants)
236✔
222
                    //sources
236✔
223
                    .service(create_source)
236✔
224
                    .service(read_source)
236✔
225
                    .service(update_source)
236✔
226
                    .service(delete_source)
236✔
227
                    .service(read_all_sources)
236✔
228
                    //destinations
236✔
229
                    .service(create_destination)
236✔
230
                    .service(read_destination)
236✔
231
                    .service(update_destination)
236✔
232
                    .service(delete_destination)
236✔
233
                    .service(read_all_destinations)
236✔
234
                    //pipelines
236✔
235
                    .service(create_pipeline)
236✔
236
                    .service(read_pipeline)
236✔
237
                    .service(update_pipeline)
236✔
238
                    .service(delete_pipeline)
236✔
239
                    .service(read_all_pipelines)
236✔
240
                    .service(start_pipeline)
236✔
241
                    .service(stop_pipeline)
236✔
242
                    .service(stop_all_pipelines)
236✔
243
                    .service(get_pipeline_status)
236✔
244
                    //tables
236✔
245
                    .service(read_table_names)
236✔
246
                    //publications
236✔
247
                    .service(create_publication)
236✔
248
                    .service(read_publication)
236✔
249
                    .service(update_publication)
236✔
250
                    .service(delete_publication)
236✔
251
                    .service(read_all_publications)
236✔
252
                    //images
236✔
253
                    .service(create_image)
236✔
254
                    .service(read_image)
236✔
255
                    .service(update_image)
236✔
256
                    .service(delete_image)
236✔
257
                    .service(read_all_images)
236✔
258
                    //tenants_sources
236✔
259
                    .service(create_tenant_and_source)
236✔
260
                    // destinations-pipelines
236✔
261
                    .service(create_destinations_and_pipelines)
236✔
262
                    .service(update_destinations_and_pipelines),
236✔
263
            )
236✔
264
            .app_data(connection_pool.clone())
236✔
265
            .app_data(encryption_key.clone())
236✔
266
            .app_data(api_key.clone());
236✔
267
        if let Some(k8s_client) = k8s_client.clone() {
236✔
268
            app.app_data(k8s_client.clone())
×
269
        } else {
270
            app
236✔
271
        }
272
    })
236✔
273
    .listen(listener)?
59✔
274
    .run();
59✔
275

59✔
276
    Ok(server)
59✔
277
}
59✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc