• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

supabase / pg_replicate / 12929014939

23 Jan 2025 12:19PM UTC coverage: 36.501% (-0.5%) from 36.994%
12929014939

push

github

imor
update sqlx metadata

2005 of 5493 relevant lines covered (36.5%)

15.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.04
/api/src/startup.rs
1
use std::{net::TcpListener, sync::Arc};
2

3
use actix_web::{dev::Server, web, App, HttpServer};
4
use actix_web_httpauth::middleware::HttpAuthentication;
5
use aws_lc_rs::aead::{RandomizedNonceKey, AES_256_GCM};
6
use base64::{prelude::BASE64_STANDARD, Engine};
7
use sqlx::{postgres::PgPoolOptions, PgPool};
8
use tracing_actix_web::TracingLogger;
9
use utoipa::OpenApi;
10
use utoipa_swagger_ui::SwaggerUi;
11

12
use crate::{
13
    authentication::auth_validator,
14
    configuration::{DatabaseSettings, Settings},
15
    db::publications::Publication,
16
    encryption,
17
    k8s_client::HttpK8sClient,
18
    routes::{
19
        health_check::health_check,
20
        images::{
21
            create_image, delete_image, read_all_images, read_image, update_image,
22
            GetImageResponse, PostImageRequest, PostImageResponse,
23
        },
24
        pipelines::{
25
            create_pipeline, delete_pipeline, get_pipeline_status, read_all_pipelines,
26
            read_pipeline, start_pipeline, stop_pipeline, update_pipeline, GetPipelineResponse,
27
            PostPipelineRequest, PostPipelineResponse,
28
        },
29
        sinks::{
30
            create_sink, delete_sink, read_all_sinks, read_sink, update_sink, GetSinkResponse,
31
            PostSinkRequest, PostSinkResponse,
32
        },
33
        sources::{
34
            create_source, delete_source,
35
            publications::{
36
                create_publication, delete_publication, read_all_publications, read_publication,
37
                update_publication, CreatePublicationRequest, UpdatePublicationRequest,
38
            },
39
            read_all_sources, read_source,
40
            tables::read_table_names,
41
            update_source, GetSourceResponse, PostSourceRequest, PostSourceResponse,
42
        },
43
        tenants::{
44
            create_or_update_tenant, create_tenant, delete_tenant, read_all_tenants, read_tenant,
45
            update_tenant, CreateTenantRequest, GetTenantResponse, PostTenantResponse,
46
        },
47
    },
48
};
49

50
pub struct Application {
51
    port: u16,
52
    server: Server,
53
}
54

55
impl Application {
56
    pub async fn build(configuration: Settings) -> Result<Self, anyhow::Error> {
×
57
        let connection_pool = get_connection_pool(&configuration.database);
×
58

×
59
        let address = format!(
×
60
            "{}:{}",
×
61
            configuration.application.host, configuration.application.port
×
62
        );
×
63
        let listener = TcpListener::bind(address)?;
×
64
        let port = listener.local_addr().unwrap().port();
×
65
        let key_bytes = BASE64_STANDARD.decode(&configuration.encryption_key.key)?;
×
66
        let key = RandomizedNonceKey::new(&AES_256_GCM, &key_bytes)?;
×
67
        let encryption_key = encryption::EncryptionKey {
×
68
            id: configuration.encryption_key.id,
×
69
            key,
×
70
        };
×
71
        let api_key = configuration.api_key;
×
72
        let k8s_client = HttpK8sClient::new().await?;
×
73
        let server = run(
×
74
            listener,
×
75
            connection_pool,
×
76
            encryption_key,
×
77
            api_key,
×
78
            Some(k8s_client),
×
79
        )
×
80
        .await?;
×
81

82
        Ok(Self { port, server })
×
83
    }
×
84

85
    pub async fn migrate_database(
×
86
        database_settings: DatabaseSettings,
×
87
    ) -> Result<(), anyhow::Error> {
×
88
        let connection_pool = get_connection_pool(&database_settings);
×
89

×
90
        sqlx::migrate!("./migrations").run(&connection_pool).await?;
×
91

92
        Ok(())
×
93
    }
×
94

95
    pub fn port(&self) -> u16 {
×
96
        self.port
×
97
    }
×
98

99
    pub async fn run_until_stopped(self) -> Result<(), std::io::Error> {
×
100
        self.server.await
×
101
    }
×
102
}
103

104
pub fn get_connection_pool(configuration: &DatabaseSettings) -> PgPool {
47✔
105
    PgPoolOptions::new().connect_lazy_with(configuration.with_db())
47✔
106
}
47✔
107

108
// HttpK8sClient is wrapped in an option because creating it
109
// in tests involves setting a default CryptoProvider and it
110
// interferes with parallel tasks because only one can be set.
111
pub async fn run(
47✔
112
    listener: TcpListener,
47✔
113
    connection_pool: PgPool,
47✔
114
    encryption_key: encryption::EncryptionKey,
47✔
115
    api_key: String,
47✔
116
    http_k8s_client: Option<HttpK8sClient>,
47✔
117
) -> Result<Server, anyhow::Error> {
47✔
118
    let connection_pool = web::Data::new(connection_pool);
47✔
119
    let encryption_key = web::Data::new(encryption_key);
47✔
120
    let api_key = web::Data::new(api_key);
47✔
121
    let k8s_client = http_k8s_client.map(|client| web::Data::new(Arc::new(client)));
47✔
122

123
    #[derive(OpenApi)]
×
124
    #[openapi(
125
        paths(
126
            crate::routes::health_check::health_check,
127
            crate::routes::images::create_image,
128
            crate::routes::images::read_image,
129
            crate::routes::images::update_image,
130
            crate::routes::images::delete_image,
131
            crate::routes::images::read_all_images,
132
            crate::routes::pipelines::create_pipeline,
133
            crate::routes::pipelines::read_pipeline,
134
            crate::routes::pipelines::update_pipeline,
135
            crate::routes::pipelines::delete_pipeline,
136
            crate::routes::pipelines::read_all_pipelines,
137
            crate::routes::pipelines::get_pipeline_status,
138
            crate::routes::tenants::create_tenant,
139
            crate::routes::tenants::create_or_update_tenant,
140
            crate::routes::tenants::read_tenant,
141
            crate::routes::tenants::update_tenant,
142
            crate::routes::tenants::delete_tenant,
143
            crate::routes::tenants::read_all_tenants,
144
            crate::routes::sources::create_source,
145
            crate::routes::sources::read_source,
146
            crate::routes::sources::update_source,
147
            crate::routes::sources::delete_source,
148
            crate::routes::sources::read_all_sources,
149
            crate::routes::sources::publications::create_publication,
150
            crate::routes::sources::publications::read_publication,
151
            crate::routes::sources::publications::update_publication,
152
            crate::routes::sources::publications::delete_publication,
153
            crate::routes::sources::publications::read_all_publications,
154
            crate::routes::sources::tables::read_table_names,
155
            crate::routes::sinks::create_sink,
156
            crate::routes::sinks::read_sink,
157
            crate::routes::sinks::update_sink,
158
            crate::routes::sinks::delete_sink,
159
            crate::routes::sinks::read_all_sinks,
160
        ),
161
        components(schemas(
162
            PostImageRequest,
163
            PostImageResponse,
164
            GetImageResponse,
165
            PostPipelineRequest,
166
            PostPipelineResponse,
167
            GetPipelineResponse,
168
            CreateTenantRequest,
169
            PostTenantResponse,
170
            GetTenantResponse,
171
            PostSourceRequest,
172
            PostSourceResponse,
173
            GetSourceResponse,
174
            CreatePublicationRequest,
175
            UpdatePublicationRequest,
176
            Publication,
177
            PostSinkRequest,
178
            PostSinkResponse,
179
            GetSinkResponse,
180
        ))
181
    )]
182
    struct ApiDoc;
183

184
    //TODO: replace all the context_path = v1 in route modules with the nest attribute
185
    //when it is available in utoipa 5.0.0: https://github.com/juhaku/utoipa/pull/930
186
    let openapi = ApiDoc::openapi();
47✔
187

188
    let server = HttpServer::new(move || {
188✔
189
        let authentication = HttpAuthentication::bearer(auth_validator);
188✔
190
        let app = App::new()
188✔
191
            .wrap(TracingLogger::default())
188✔
192
            .service(health_check)
188✔
193
            .service(
188✔
194
                SwaggerUi::new("/swagger-ui/{_:.*}").url("/api-docs/openapi.json", openapi.clone()),
188✔
195
            )
188✔
196
            .service(
188✔
197
                web::scope("v1")
188✔
198
                    .wrap(authentication)
188✔
199
                    //tenants
188✔
200
                    .service(create_tenant)
188✔
201
                    .service(create_or_update_tenant)
188✔
202
                    .service(read_tenant)
188✔
203
                    .service(update_tenant)
188✔
204
                    .service(delete_tenant)
188✔
205
                    .service(read_all_tenants)
188✔
206
                    //sources
188✔
207
                    .service(create_source)
188✔
208
                    .service(read_source)
188✔
209
                    .service(update_source)
188✔
210
                    .service(delete_source)
188✔
211
                    .service(read_all_sources)
188✔
212
                    //sinks
188✔
213
                    .service(create_sink)
188✔
214
                    .service(read_sink)
188✔
215
                    .service(update_sink)
188✔
216
                    .service(delete_sink)
188✔
217
                    .service(read_all_sinks)
188✔
218
                    //pipelines
188✔
219
                    .service(create_pipeline)
188✔
220
                    .service(read_pipeline)
188✔
221
                    .service(update_pipeline)
188✔
222
                    .service(delete_pipeline)
188✔
223
                    .service(read_all_pipelines)
188✔
224
                    .service(start_pipeline)
188✔
225
                    .service(stop_pipeline)
188✔
226
                    .service(get_pipeline_status)
188✔
227
                    //tables
188✔
228
                    .service(read_table_names)
188✔
229
                    //publications
188✔
230
                    .service(create_publication)
188✔
231
                    .service(read_publication)
188✔
232
                    .service(update_publication)
188✔
233
                    .service(delete_publication)
188✔
234
                    .service(read_all_publications)
188✔
235
                    //images
188✔
236
                    .service(create_image)
188✔
237
                    .service(read_image)
188✔
238
                    .service(update_image)
188✔
239
                    .service(delete_image)
188✔
240
                    .service(read_all_images),
188✔
241
            )
188✔
242
            .app_data(connection_pool.clone())
188✔
243
            .app_data(encryption_key.clone())
188✔
244
            .app_data(api_key.clone());
188✔
245
        if let Some(k8s_client) = k8s_client.clone() {
188✔
246
            app.app_data(k8s_client.clone())
×
247
        } else {
248
            app
188✔
249
        }
250
    })
188✔
251
    .listen(listener)?
47✔
252
    .run();
47✔
253

47✔
254
    Ok(server)
47✔
255
}
47✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc