• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10178074589

31 Jul 2024 09:34AM UTC coverage: 91.068% (+0.4%) from 90.682%
10178074589

push

github

web-flow
Merge pull request #973 from geo-engine/remove-XGB-update-toolchain

Remove-XGB-update-toolchain

81 of 88 new or added lines in 29 files covered. (92.05%)

456 existing lines in 119 files now uncovered.

131088 of 143945 relevant lines covered (91.07%)

53581.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.15
/services/src/pro/users/postgres_userdb.rs
1
use crate::contexts::SessionId;
2
use crate::error::{Error, Result};
3
use crate::pro::contexts::{ProApplicationContext, ProPostgresDb};
4
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
5
use crate::pro::permissions::{Role, RoleDescription, RoleId};
6
use crate::pro::users::oidc::{FlatMaybeEncryptedOidcTokens, OidcTokens, UserClaims};
7
use crate::pro::users::userdb::{
8
    CannotRevokeRoleThatIsNotAssignedRoleDbError, RoleIdDoesNotExistRoleDbError,
9
};
10
use crate::pro::users::{
11
    SessionTokenStore, StoredOidcTokens, User, UserCredentials, UserDb, UserId, UserInfo,
12
    UserRegistration, UserSession,
13
};
14
use crate::projects::{ProjectId, STRectangle};
15
use crate::util::postgres::PostgresErrorExt;
16
use crate::util::Identifier;
17
use crate::{error, pro::contexts::ProPostgresContext};
18
use async_trait::async_trait;
19

20
use crate::util::encryption::MaybeEncryptedBytes;
21
use bb8_postgres::{
22
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
23
};
24
use oauth2::AccessToken;
25
use pwhash::bcrypt;
26
use snafu::{ensure, ResultExt};
27
use tokio_postgres::Transaction;
28
use uuid::Uuid;
29

30
use super::userdb::{
31
    Bb8RoleDbError, PermissionDbRoleDbError, PostgresRoleDbError, RoleDb, RoleDbError, UserAuth,
32
};
33

34
#[async_trait]
35
impl<Tls> UserAuth for ProPostgresContext<Tls>
36
where
37
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
38
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
39
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
40
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
41
{
42
    // TODO: clean up expired sessions?
43

44
    async fn register_user(&self, user: UserRegistration) -> Result<UserId> {
43✔
45
        let mut conn = self.pool.get().await?;
43✔
46

43✔
47
        let tx = conn.build_transaction().start().await?;
43✔
48

43✔
49
        let user = User::from(user);
43✔
50

43✔
51
        let stmt = tx
43✔
52
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
43✔
53
            .await?;
43✔
54
        tx.execute(&stmt, &[&user.id, &user.email])
43✔
55
            .await
43✔
56
            .map_unique_violation("roles", "name", || error::Error::Duplicate {
43✔
57
                reason: "E-mail already exists".to_string(),
15✔
58
            })?;
43✔
59

43✔
60
        let stmt = tx
43✔
61
            .prepare(
28✔
62
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
28✔
63
            )
28✔
64
            .await?;
43✔
65

43✔
66
        let quota_available =
43✔
67
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
43✔
68
                .initial_credits;
43✔
69

43✔
70
        tx.execute(
43✔
71
            &stmt,
28✔
72
            &[
28✔
73
                &user.id,
28✔
74
                &user.email,
28✔
75
                &user.password_hash,
28✔
76
                &user.real_name,
28✔
77
                &quota_available,
28✔
78
                &user.active,
28✔
79
            ],
28✔
80
        )
28✔
81
        .await?;
43✔
82

43✔
83
        let stmt = tx
43✔
84
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
28✔
85
            .await?;
43✔
86
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
43✔
87

43✔
88
        let stmt = tx
43✔
89
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
28✔
90
            .await?;
43✔
91
        tx.execute(&stmt, &[&user.id, &Role::registered_user_role_id()])
43✔
92
            .await?;
43✔
93

43✔
94
        tx.commit().await?;
43✔
95

43✔
96
        Ok(user.id)
43✔
97
    }
43✔
98

99
    async fn create_anonymous_session(&self) -> Result<UserSession> {
40✔
100
        let mut conn = self.pool.get().await?;
40✔
101

40✔
102
        let tx = conn.build_transaction().start().await?;
40✔
103

40✔
104
        let user_id = UserId::new();
40✔
105

40✔
106
        let stmt = tx
40✔
107
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
40✔
108
            .await?;
40✔
109
        tx.execute(&stmt, &[&user_id, &format!("anonymous_user_{user_id}")])
40✔
110
            .await?;
40✔
111

40✔
112
        let quota_available =
40✔
113
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
40✔
114
                .initial_credits;
40✔
115

40✔
116
        let stmt = tx
40✔
117
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
40✔
118
            .await?;
40✔
119

40✔
120
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
40✔
121

40✔
122
        let stmt = tx
40✔
123
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
40✔
124
            .await?;
40✔
125
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
40✔
126

40✔
127
        let stmt = tx
40✔
128
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
40✔
129
            .await?;
40✔
130
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
40✔
131
            .await?;
40✔
132

40✔
133
        let session_id = SessionId::new();
40✔
134
        let stmt = tx
40✔
135
            .prepare(
40✔
136
                "
40✔
137
                INSERT INTO sessions (id)
40✔
138
                VALUES ($1);",
40✔
139
            )
40✔
140
            .await?;
40✔
141

40✔
142
        tx.execute(&stmt, &[&session_id]).await?;
40✔
143

40✔
144
        let stmt = tx
40✔
145
            .prepare(
40✔
146
                "
40✔
147
            INSERT INTO 
40✔
148
                user_sessions (user_id, session_id, created, valid_until) 
40✔
149
            VALUES 
40✔
150
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
40✔
151
            RETURNING 
40✔
152
                created, valid_until;",
40✔
153
            )
40✔
154
            .await?;
40✔
155

40✔
156
        // TODO: load from config
40✔
157
        let session_duration = chrono::Duration::days(30);
40✔
158
        let row = tx
40✔
159
            .query_one(
40✔
160
                &stmt,
40✔
161
                &[
40✔
162
                    &user_id,
40✔
163
                    &session_id,
40✔
164
                    &(session_duration.num_seconds() as f64),
40✔
165
                ],
40✔
166
            )
40✔
167
            .await?;
40✔
168

40✔
169
        tx.commit().await?;
40✔
170

40✔
171
        Ok(UserSession {
40✔
172
            id: session_id,
40✔
173
            user: UserInfo {
40✔
174
                id: user_id,
40✔
175
                email: None,
40✔
176
                real_name: None,
40✔
177
            },
40✔
178
            created: row.get(0),
40✔
179
            valid_until: row.get(1),
40✔
180
            project: None,
40✔
181
            view: None,
40✔
182
            roles: vec![user_id.into(), Role::anonymous_role_id()],
40✔
183
        })
40✔
184
    }
40✔
185

186
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
59✔
187
        let mut conn = self.pool.get().await?;
59✔
188

59✔
189
        let tx = conn.build_transaction().start().await?;
59✔
190

59✔
191
        let stmt = tx
59✔
192
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
59✔
193
            .await?;
59✔
194

59✔
195
        let row = tx
59✔
196
            .query_one(&stmt, &[&user_credentials.email])
59✔
197
            .await
59✔
198
            .map_err(|_error| error::Error::LoginFailed)?;
59✔
199

59✔
200
        let user_id = UserId(row.get(0));
59✔
201
        let password_hash = row.get(1);
59✔
202
        let email = row.get(2);
59✔
203
        let real_name = row.get(3);
59✔
204

59✔
205
        if bcrypt::verify(user_credentials.password, password_hash) {
59✔
206
            let session_id = SessionId::new();
59✔
207
            let stmt = tx
59✔
208
                .prepare(
58✔
209
                    "
58✔
210
                INSERT INTO sessions (id)
58✔
211
                VALUES ($1);",
58✔
212
                )
58✔
213
                .await?;
59✔
214

59✔
215
            tx.execute(&stmt, &[&session_id]).await?;
59✔
216

59✔
217
            // TODO: load from config
59✔
218
            let session_duration = chrono::Duration::days(30);
59✔
219
            let stmt = tx
59✔
220
                .prepare(
58✔
221
                    "
58✔
222
                INSERT INTO 
58✔
223
                    user_sessions (user_id, session_id, created, valid_until) 
58✔
224
                VALUES 
58✔
225
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
58✔
226
                RETURNING 
58✔
227
                    created, valid_until;",
58✔
228
                )
58✔
229
                .await?;
59✔
230
            let row = tx
59✔
231
                .query_one(
58✔
232
                    &stmt,
58✔
233
                    &[
58✔
234
                        &user_id,
58✔
235
                        &session_id,
58✔
236
                        &(session_duration.num_seconds() as f64),
58✔
237
                    ],
58✔
238
                )
58✔
239
                .await?;
59✔
240

59✔
241
            let stmt = tx
59✔
242
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
58✔
243
                .await?;
59✔
244

59✔
245
            let rows = tx
59✔
246
                .query(&stmt, &[&user_id])
58✔
247
                .await
59✔
248
                .map_err(|_error| error::Error::LoginFailed)?;
59✔
249

59✔
250
            tx.commit().await?;
59✔
251

59✔
252
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
90✔
253

58✔
254
            Ok(UserSession {
58✔
255
                id: session_id,
58✔
256
                user: UserInfo {
58✔
257
                    id: user_id,
58✔
258
                    email,
58✔
259
                    real_name,
58✔
260
                },
58✔
261
                created: row.get(0),
58✔
262
                valid_until: row.get(1),
58✔
263
                project: None,
58✔
264
                view: None,
58✔
265
                roles,
58✔
266
            })
58✔
267
        } else {
59✔
268
            Err(error::Error::LoginFailed)
59✔
269
        }
59✔
270
    }
59✔
271

272
    #[allow(clippy::too_many_lines)]
273
    async fn login_external(
274
        &self,
275
        user: UserClaims,
276
        oidc_tokens: OidcTokens,
277
    ) -> Result<UserSession> {
8✔
278
        let mut conn = self.pool.get().await?;
8✔
279
        let tx = conn.build_transaction().start().await?;
8✔
280

8✔
281
        let stmt = tx
8✔
282
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
8✔
283
            .await?;
8✔
284

8✔
285
        let row = tx
8✔
286
            .query_opt(&stmt, &[&user.external_id.to_string()])
8✔
287
            .await
8✔
288
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
289

8✔
290
        let user_id = match row {
8✔
291
            Some(row) => UserId(row.get(0)),
8✔
292
            None => {
8✔
293
                let user_id = UserId::new();
8✔
294

8✔
295
                let stmt = tx
8✔
296
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
6✔
297
                    .await?;
8✔
298
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
8✔
299

8✔
300
                let quota_available =
8✔
301
                    crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
8✔
302
                        .initial_credits;
8✔
303

8✔
304
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
8✔
305
                let stmt = tx
8✔
306
                    .prepare(
6✔
307
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
6✔
308
                    )
6✔
309
                    .await?;
8✔
310
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
8✔
311

8✔
312
                let stmt = tx
8✔
313
                    .prepare(
6✔
314
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
6✔
315
                    )
6✔
316
                    .await?;
8✔
317

8✔
318
                tx.execute(
8✔
319
                    &stmt,
6✔
320
                    &[
6✔
321
                        &user_id,
6✔
322
                        &user.external_id.to_string(),
6✔
323
                        &user.email,
6✔
324
                        &user.real_name,
6✔
325
                        &true,
6✔
326
                    ],
6✔
327
                )
6✔
328
                .await?;
8✔
329

8✔
330
                let stmt = tx
8✔
331
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
332
                    .await?;
8✔
333
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
8✔
334

8✔
335
                let stmt = tx
8✔
336
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
337
                    .await?;
8✔
338
                tx.execute(&stmt, &[&user_id, &Role::registered_user_role_id()])
8✔
339
                    .await?;
8✔
340

8✔
341
                user_id
8✔
342
            }
8✔
343
        };
8✔
344

8✔
345
        let session_id = SessionId::new();
8✔
346
        let stmt = tx
8✔
347
            .prepare(
8✔
348
                "
8✔
349
            INSERT INTO sessions (id)
8✔
350
            VALUES ($1);",
8✔
351
            )
8✔
352
            .await?;
8✔
353

8✔
354
        tx.execute(&stmt, &[&session_id]).await?;
8✔
355

8✔
356
        let stmt = tx
8✔
357
            .prepare(
8✔
358
                "
8✔
359
            INSERT INTO 
8✔
360
                user_sessions (user_id, session_id, created, valid_until) 
8✔
361
            VALUES 
8✔
362
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
8✔
363
            RETURNING 
8✔
364
                created, valid_until;",
8✔
365
            )
8✔
366
            .await?;
8✔
367
        let row = tx
8✔
368
            .query_one(
8✔
369
                &stmt,
8✔
370
                &[
8✔
371
                    &user_id,
8✔
372
                    &session_id,
8✔
373
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
374
                ],
8✔
375
            )
8✔
376
            .await?;
8✔
377

8✔
378
        self.store_oidc_session_tokens(session_id, oidc_tokens, &tx)
8✔
379
            .await?;
16✔
380

8✔
381
        let stmt = tx
8✔
382
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
8✔
383
            .await?;
8✔
384

8✔
385
        let rows = tx
8✔
386
            .query(&stmt, &[&user_id])
8✔
387
            .await
8✔
388
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
389

8✔
390
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
16✔
391

8✔
392
        tx.commit().await?;
8✔
393

8✔
394
        Ok(UserSession {
8✔
395
            id: session_id,
8✔
396
            user: UserInfo {
8✔
397
                id: user_id,
8✔
398
                email: Some(user.email.clone()),
8✔
399
                real_name: Some(user.real_name.clone()),
8✔
400
            },
8✔
401
            created: row.get(0),
8✔
402
            valid_until: row.get(1),
8✔
403
            project: None,
8✔
404
            view: None,
8✔
405
            roles,
8✔
406
        })
8✔
407
    }
8✔
408

409
    async fn user_session_by_id(&self, session: SessionId) -> Result<UserSession> {
54✔
410
        let mut conn = self.pool.get().await?;
54✔
411

54✔
412
        let tx = conn.build_transaction().start().await?;
54✔
413

54✔
414
        let stmt = tx
54✔
415
            .prepare(
54✔
416
                "
54✔
417
            SELECT 
54✔
418
                u.id,   
54✔
419
                COALESCE(u.email, eu.email) AS email,
54✔
420
                COALESCE(u.real_name, eu.real_name) AS real_name,
54✔
421
                us.created, 
54✔
422
                us.valid_until, 
54✔
423
                s.project_id,
54✔
424
                s.view,
54✔
425
                CASE WHEN CURRENT_TIMESTAMP < us.valid_until THEN TRUE ELSE FALSE END AS valid_session
54✔
426
            FROM
54✔
427
                sessions s JOIN user_sessions us ON (s.id = us.session_id)
54✔
428
                    JOIN users u ON (us.user_id = u.id)
54✔
429
                    LEFT JOIN external_users eu ON (u.id = eu.id)
54✔
430
                    LEFT JOIN oidc_session_tokens t ON (s.id = t.session_id)
54✔
431
            WHERE s.id = $1 AND (CURRENT_TIMESTAMP < us.valid_until OR t.refresh_token IS NOT NULL);",
54✔
432
            )
54✔
433
            .await?;
283✔
434

54✔
435
        let row = tx
54✔
436
            .query_one(&stmt, &[&session])
54✔
437
            .await
54✔
438
            .map_err(|_error| error::Error::InvalidSession)?;
54✔
439

54✔
440
        let valid_session: bool = row.get(7);
54✔
441

54✔
442
        let valid_until = if valid_session {
54✔
443
            row.get(4)
54✔
444
        } else {
54✔
445
            log::debug!("Session expired, trying to extend");
54✔
446
            let refresh_result = self.refresh_oidc_session_tokens(session, &tx).await;
54✔
447

54✔
448
            if let Err(refresh_error) = refresh_result {
54✔
449
                log::debug!("Session extension failed {}", refresh_error);
54✔
450
                return Err(Error::InvalidSession);
54✔
451
            }
54✔
452
            log::debug!("Session extended");
1✔
453
            refresh_result
54✔
454
                .expect("Refresh result should exist")
1✔
455
                .db_valid_until
1✔
456
        };
54✔
457

54✔
458
        let mut session = UserSession {
54✔
459
            id: session,
47✔
460
            user: UserInfo {
47✔
461
                id: row.get(0),
47✔
462
                email: row.get(1),
47✔
463
                real_name: row.get(2),
47✔
464
            },
47✔
465
            created: row.get(3),
47✔
466
            valid_until,
47✔
467
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
47✔
468
            view: row.get(6),
47✔
469
            roles: vec![],
47✔
470
        };
47✔
471

54✔
472
        let stmt = tx
54✔
473
            .prepare(
47✔
474
                "
47✔
475
            SELECT role_id FROM user_roles WHERE user_id = $1;
47✔
476
            ",
47✔
477
            )
47✔
478
            .await?;
54✔
479

54✔
480
        let rows = tx.query(&stmt, &[&session.user.id]).await?;
54✔
481

54✔
482
        tx.commit().await?;
54✔
483

54✔
484
        session.roles = rows.into_iter().map(|row| row.get(0)).collect();
82✔
485

47✔
486
        Ok(session)
47✔
487
    }
54✔
488
}
489

490
#[async_trait]
491
impl<Tls> SessionTokenStore for ProPostgresContext<Tls>
492
where
493
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
494
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
495
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
496
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
497
{
498
    async fn store_oidc_session_tokens(
499
        &self,
500
        session: SessionId,
501
        oidc_tokens: OidcTokens,
502
        tx: &Transaction<'_>,
503
    ) -> Result<StoredOidcTokens> {
8✔
504
        let flat_tokens: FlatMaybeEncryptedOidcTokens = self
8✔
505
            .oidc_manager()
8✔
506
            .maybe_encrypt_tokens(&oidc_tokens)?
8✔
507
            .into();
8✔
508

8✔
509
        let stmt = tx
8✔
510
            .prepare(
8✔
511
                "
8✔
512
                INSERT INTO oidc_session_tokens
8✔
513
                    (session_id, access_token, access_token_encryption_nonce, access_token_valid_until, refresh_token, refresh_token_encryption_nonce)
8✔
514
                VALUES
8✔
515
                    ($1, $2, $3, CURRENT_TIMESTAMP + make_interval(secs:=$4), $5, $6)
8✔
516
                RETURNING
8✔
517
                    access_token_valid_until;
8✔
518
                ;"
8✔
519
            )
8✔
520
            .await?;
8✔
521

8✔
522
        let db_valid_until = tx
8✔
523
            .query_one(
8✔
524
                &stmt,
8✔
525
                &[
8✔
526
                    &session,
8✔
527
                    &flat_tokens.access_token_value,
8✔
528
                    &flat_tokens.access_token_nonce,
8✔
529
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
530
                    &flat_tokens.refresh_token_value,
8✔
531
                    &flat_tokens.refresh_token_nonce,
8✔
532
                ],
8✔
533
            )
8✔
534
            .await?
8✔
535
            .get(0);
8✔
536

8✔
537
        Ok(StoredOidcTokens {
8✔
538
            oidc_tokens,
8✔
539
            db_valid_until,
8✔
540
        })
8✔
541
    }
8✔
542

543
    async fn refresh_oidc_session_tokens(
544
        &self,
545
        session: SessionId,
546
        tx: &Transaction<'_>,
547
    ) -> Result<StoredOidcTokens> {
3✔
548
        let stmt = tx
3✔
549
            .prepare(
3✔
550
                "
3✔
551
            SELECT
3✔
552
                refresh_token,
3✔
553
                refresh_token_encryption_nonce
3✔
554
            FROM
3✔
555
                oidc_session_tokens
3✔
556
            WHERE session_id = $1 AND refresh_token IS NOT NULL;",
3✔
557
            )
3✔
558
            .await?;
3✔
559

3✔
560
        let rows = tx.query_opt(&stmt, &[&session]).await?;
3✔
561

3✔
562
        if let Some(refresh_string) = rows {
3✔
563
            let string_field_and_nonce = MaybeEncryptedBytes {
3✔
564
                value: refresh_string.get(0),
3✔
565
                nonce: refresh_string.get(1),
3✔
566
            };
3✔
567

3✔
568
            let refresh_token = self
3✔
569
                .oidc_manager()
3✔
570
                .maybe_decrypt_refresh_token(string_field_and_nonce)?;
3✔
571

3✔
572
            let oidc_manager = self.oidc_manager();
3✔
573

3✔
574
            let oidc_tokens = oidc_manager
3✔
575
                .get_client()
3✔
576
                .await?
10✔
577
                .refresh_access_token(refresh_token)
3✔
578
                .await?;
3✔
579

3✔
580
            let flat_tokens: FlatMaybeEncryptedOidcTokens = self
3✔
581
                .oidc_manager()
3✔
582
                .maybe_encrypt_tokens(&oidc_tokens)?
3✔
583
                .into();
3✔
584

3✔
585
            let update_session_tokens = tx.prepare("
3✔
586
                UPDATE
3✔
587
                    oidc_session_tokens
3✔
588
                SET
3✔
589
                    access_token = $2, access_token_encryption_nonce = $3, access_token_valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$4), refresh_token = $5, refresh_token_encryption_nonce = $6
3✔
590
                WHERE
3✔
591
                    session_id = $1;",
3✔
592

3✔
593
            ).await?;
3✔
594

3✔
595
            tx.execute(
3✔
596
                &update_session_tokens,
3✔
597
                &[
3✔
598
                    &session,
3✔
599
                    &flat_tokens.access_token_value,
3✔
600
                    &flat_tokens.access_token_nonce,
3✔
601
                    &(oidc_tokens.expires_in.num_seconds() as f64),
3✔
602
                    &flat_tokens.refresh_token_value,
3✔
603
                    &flat_tokens.refresh_token_nonce,
3✔
604
                ],
3✔
605
            )
3✔
606
            .await?;
3✔
607

3✔
608
            let stmt = tx
3✔
609
                .prepare(
3✔
610
                    "
3✔
611
                UPDATE
3✔
612
                    user_sessions
3✔
613
                SET
3✔
614
                    valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$2)
3✔
615
                WHERE
3✔
616
                    session_id = $1
3✔
617
                RETURNING
3✔
618
                    valid_until;",
3✔
619
                )
3✔
620
                .await?;
3✔
621

3✔
622
            let expiration = tx
3✔
623
                .query_one(
3✔
624
                    &stmt,
3✔
625
                    &[&session, &(oidc_tokens.expires_in.num_seconds() as f64)],
3✔
626
                )
3✔
627
                .await?
3✔
628
                .get(0);
3✔
629

3✔
630
            return Ok(StoredOidcTokens {
3✔
631
                oidc_tokens,
3✔
632
                db_valid_until: expiration,
3✔
633
            });
3✔
634
        };
3✔
635

×
636
        Err(Error::InvalidSession)
×
637
    }
3✔
638

639
    async fn get_access_token(&self, session: SessionId) -> Result<AccessToken> {
4✔
640
        let mut conn = self.pool.get().await?;
4✔
641

4✔
642
        let tx = conn.build_transaction().start().await?;
4✔
643

4✔
644
        let stmt = tx
4✔
645
            .prepare(
4✔
646
                "
4✔
647
            SELECT
4✔
648
                access_token,
4✔
649
                access_token_encryption_nonce
4✔
650
            FROM
4✔
651
                oidc_session_tokens
4✔
652
            WHERE session_id = $1 AND (CURRENT_TIMESTAMP < access_token_valid_until);",
4✔
653
            )
4✔
654
            .await?;
4✔
655

4✔
656
        let rows = tx.query_opt(&stmt, &[&session]).await?;
4✔
657

4✔
658
        let access_token = if let Some(token_row) = rows {
4✔
659
            let string_field_and_nonce = MaybeEncryptedBytes {
4✔
660
                value: token_row.get(0),
2✔
661
                nonce: token_row.get(1),
2✔
662
            };
2✔
663
            self.oidc_manager()
2✔
664
                .maybe_decrypt_access_token(string_field_and_nonce)?
2✔
665
        } else {
4✔
666
            self.refresh_oidc_session_tokens(session, &tx)
4✔
667
                .await?
17✔
668
                .oidc_tokens
4✔
669
                .access
4✔
670
        };
4✔
671

4✔
672
        tx.commit().await?;
4✔
673

4✔
674
        Ok(access_token)
4✔
675
    }
4✔
676
}
677

678
#[async_trait]
679
impl<Tls> UserDb for ProPostgresDb<Tls>
680
where
681
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
682
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
683
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
684
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
685
{
686
    // TODO: clean up expired sessions?
687

688
    async fn logout(&self) -> Result<()> {
6✔
689
        let conn = self.conn_pool.get().await?;
6✔
690
        let stmt = conn
6✔
691
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
6✔
692
            .await?;
6✔
693

6✔
694
        conn.execute(&stmt, &[&self.session.id])
6✔
695
            .await
6✔
696
            .map_err(|_error| error::Error::LogoutFailed)?;
6✔
697
        Ok(())
6✔
698
    }
6✔
699

700
    async fn set_session_project(&self, project: ProjectId) -> Result<()> {
3✔
701
        // TODO: check permission
3✔
702

3✔
703
        let conn = self.conn_pool.get().await?;
3✔
704
        let stmt = conn
3✔
705
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
3✔
706
            .await?;
3✔
707

3✔
708
        conn.execute(&stmt, &[&project, &self.session.id]).await?;
3✔
709

3✔
710
        Ok(())
3✔
711
    }
3✔
712

713
    async fn set_session_view(&self, view: STRectangle) -> Result<()> {
3✔
714
        let conn = self.conn_pool.get().await?;
3✔
715
        let stmt = conn
3✔
716
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
3✔
717
            .await?;
3✔
718

3✔
719
        conn.execute(&stmt, &[&view, &self.session.id]).await?;
3✔
720

3✔
721
        Ok(())
3✔
722
    }
3✔
723

724
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
1✔
725
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
726

1✔
727
        let conn = self.conn_pool.get().await?;
1✔
728
        let stmt = conn
1✔
729
            .prepare(
1✔
730
                "
1✔
731
            UPDATE users SET 
1✔
732
                quota_available = quota_available - $1, 
1✔
733
                quota_used = quota_used + $1
1✔
734
            WHERE id = $2;",
1✔
735
            )
1✔
736
            .await?;
1✔
737

1✔
738
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
1✔
739

1✔
740
        Ok(())
1✔
741
    }
1✔
742

743
    async fn bulk_increment_quota_used<I: IntoIterator<Item = (UserId, u64)> + Send>(
744
        &self,
745
        quota_used_updates: I,
746
    ) -> Result<()> {
14✔
747
        ensure!(self.session.is_admin(), error::PermissionDenied);
14✔
748

14✔
749
        let conn = self.conn_pool.get().await?;
16✔
750

14✔
751
        // collect the user ids and quotas into separate vectors to pass them as parameters to the query
14✔
752
        let (users, quotas): (Vec<UserId>, Vec<i64>) = quota_used_updates
14✔
753
            .into_iter()
14✔
754
            .map(|(user, quota)| (user, quota as i64))
14✔
755
            .unzip();
14✔
756

14✔
757
        let query = "
14✔
758
            UPDATE users
14✔
759
            SET quota_available = quota_available - quota_changes.quota, 
14✔
760
                quota_used = quota_used + quota_changes.quota
14✔
761
            FROM 
14✔
762
                (SELECT * FROM UNNEST($1::uuid[], $2::bigint[]) AS t(id, quota)) AS quota_changes
14✔
763
            WHERE users.id = quota_changes.id;
14✔
764
        ";
14✔
765

14✔
766
        conn.execute(query, &[&users, &quotas]).await?;
28✔
767

14✔
768
        Ok(())
14✔
769
    }
14✔
770

771
    async fn quota_used(&self) -> Result<u64> {
6✔
772
        let conn = self.conn_pool.get().await?;
9✔
773
        let stmt = conn
6✔
774
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
6✔
775
            .await?;
6✔
776

6✔
777
        let row = conn
6✔
778
            .query_one(&stmt, &[&self.session.user.id])
6✔
779
            .await
7✔
780
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
781

6✔
782
        Ok(row.get::<usize, i64>(0) as u64)
6✔
783
    }
6✔
784

785
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
7✔
786
        ensure!(
7✔
787
            self.session.user.id == *user || self.session.is_admin(),
7✔
788
            error::PermissionDenied
7✔
789
        );
7✔
790

7✔
791
        let conn = self.conn_pool.get().await?;
7✔
792
        let stmt = conn
7✔
793
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
794
            .await?;
7✔
795

7✔
796
        let row = conn
7✔
797
            .query_one(&stmt, &[&user])
7✔
798
            .await
7✔
799
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
800

7✔
801
        Ok(row.get::<usize, i64>(0) as u64)
7✔
802
    }
7✔
803

804
    async fn quota_available(&self) -> Result<i64> {
11✔
805
        let conn = self.conn_pool.get().await?;
12✔
806
        let stmt = conn
11✔
807
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
11✔
808
            .await?;
11✔
809

11✔
810
        let row = conn
11✔
811
            .query_one(&stmt, &[&self.session.user.id])
11✔
812
            .await
11✔
813
            .map_err(|_error| error::Error::InvalidSession)?;
11✔
814

11✔
815
        Ok(row.get::<usize, i64>(0))
11✔
816
    }
11✔
817

818
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
6✔
819
        ensure!(
6✔
820
            self.session.user.id == *user || self.session.is_admin(),
6✔
821
            error::PermissionDenied
6✔
822
        );
6✔
823

6✔
824
        let conn = self.conn_pool.get().await?;
6✔
825
        let stmt = conn
6✔
826
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
6✔
827
            .await?;
6✔
828

6✔
829
        let row = conn
6✔
830
            .query_one(&stmt, &[&user])
6✔
831
            .await
6✔
832
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
833

6✔
834
        Ok(row.get::<usize, i64>(0))
6✔
835
    }
6✔
836

837
    async fn update_quota_available_by_user(
838
        &self,
839
        user: &UserId,
840
        new_available_quota: i64,
841
    ) -> Result<()> {
5✔
842
        ensure!(self.session.is_admin(), error::PermissionDenied);
5✔
843

5✔
844
        let conn = self.conn_pool.get().await?;
5✔
845
        let stmt = conn
5✔
846
            .prepare(
5✔
847
                "
5✔
848
            UPDATE users SET 
5✔
849
                quota_available = $1
5✔
850
            WHERE id = $2;",
5✔
851
            )
5✔
852
            .await?;
5✔
853

5✔
854
        conn.execute(&stmt, &[&(new_available_quota), &user])
5✔
855
            .await?;
5✔
856

5✔
857
        Ok(())
5✔
858
    }
5✔
859
}
860

861
#[async_trait]
862
impl<Tls> RoleDb for ProPostgresDb<Tls>
863
where
864
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
865
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
866
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
867
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
868
{
869
    async fn add_role(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
4✔
870
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
871

4✔
872
        let tx = conn
4✔
873
            .build_transaction()
4✔
874
            .start()
4✔
875
            .await
4✔
876
            .context(PostgresRoleDbError)?;
4✔
877

4✔
878
        self.ensure_admin_in_tx(&tx)
4✔
879
            .await
4✔
880
            .context(PermissionDbRoleDbError)?;
4✔
881

4✔
882
        let id = RoleId::new();
4✔
883

4✔
884
        let res = tx
4✔
885
            .execute(
4✔
886
                "INSERT INTO roles (id, name) VALUES ($1, $2);",
4✔
887
                &[&id, &role_name],
4✔
888
            )
4✔
889
            .await;
10✔
890

4✔
891
        if let Err(err) = res {
4✔
892
            if err.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
4✔
893
                return Err(RoleDbError::RoleAlreadyExists {
4✔
894
                    role_name: role_name.to_string(),
×
895
                });
×
896
            }
4✔
897
        }
4✔
898

4✔
899
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
900

4✔
901
        Ok(id)
4✔
902
    }
4✔
903

904
    async fn load_role_by_name(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
×
905
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
×
UNCOV
906

×
907
        let row = conn
×
908
            .query_opt("SELECT id FROM roles WHERE name = $1;", &[&role_name])
×
909
            .await
×
910
            .context(PostgresRoleDbError)?
×
911
            .ok_or(RoleDbError::RoleNameDoesNotExist {
×
912
                role_name: role_name.to_string(),
×
913
            })?;
×
UNCOV
914

×
915
        Ok(RoleId(row.get(0)))
×
916
    }
×
917

918
    async fn remove_role(&self, role_id: &RoleId) -> Result<(), RoleDbError> {
2✔
919
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
920

2✔
921
        let tx = conn
2✔
922
            .build_transaction()
2✔
923
            .start()
2✔
924
            .await
2✔
925
            .context(PostgresRoleDbError)?;
2✔
926

2✔
927
        self.ensure_admin_in_tx(&tx)
2✔
928
            .await
2✔
929
            .context(PermissionDbRoleDbError)?;
2✔
930

2✔
931
        let deleted = tx
2✔
932
            .execute("DELETE FROM roles WHERE id = $1;", &[&role_id])
2✔
933
            .await
3✔
934
            .context(PostgresRoleDbError)?;
2✔
935

2✔
936
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
937

2✔
938
        ensure!(
2✔
939
            deleted > 0,
2✔
940
            RoleIdDoesNotExistRoleDbError { role_id: *role_id }
2✔
941
        );
2✔
942

2✔
943
        Ok(())
2✔
944
    }
2✔
945

946
    async fn assign_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
4✔
947
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
948

4✔
949
        let tx = conn
4✔
950
            .build_transaction()
4✔
951
            .start()
4✔
952
            .await
4✔
953
            .context(PostgresRoleDbError)?;
4✔
954

4✔
955
        self.ensure_admin_in_tx(&tx)
4✔
956
            .await
4✔
957
            .context(PermissionDbRoleDbError)?;
4✔
958

4✔
959
        tx.execute(
4✔
960
            "INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2) ON CONFLICT DO NOTHING;",
4✔
961
            &[&user_id, &role_id],
4✔
962
        )
4✔
963
        .await
8✔
964
        .context(PostgresRoleDbError)?;
4✔
965

4✔
966
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
967

4✔
968
        Ok(())
4✔
969
    }
4✔
970

971
    async fn revoke_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
2✔
972
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
973

2✔
974
        let tx = conn
2✔
975
            .build_transaction()
2✔
976
            .start()
2✔
977
            .await
2✔
978
            .context(PostgresRoleDbError)?;
2✔
979

2✔
980
        self.ensure_admin_in_tx(&tx)
2✔
981
            .await
2✔
982
            .context(PermissionDbRoleDbError)?;
2✔
983

2✔
984
        let deleted = tx
2✔
985
            .execute(
2✔
986
                "DELETE FROM user_roles WHERE user_id= $1 AND role_id = $2;",
2✔
987
                &[&user_id, &role_id],
2✔
988
            )
2✔
989
            .await
4✔
990
            .context(PostgresRoleDbError)?;
2✔
991

2✔
992
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
993

2✔
994
        ensure!(
2✔
995
            deleted > 0,
2✔
996
            CannotRevokeRoleThatIsNotAssignedRoleDbError { role_id: *role_id }
2✔
997
        );
2✔
998

2✔
999
        Ok(())
2✔
1000
    }
2✔
1001

1002
    async fn get_role_descriptions(
1003
        &self,
1004
        user_id: &UserId,
1005
    ) -> Result<Vec<RoleDescription>, RoleDbError> {
5✔
1006
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
5✔
1007

5✔
1008
        let stmt = conn
5✔
1009
            .prepare(
5✔
1010
                "SELECT roles.id, roles.name \
5✔
1011
                FROM roles JOIN user_roles ON (roles.id=user_roles.role_id) \
5✔
1012
                WHERE user_roles.user_id=$1 \
5✔
1013
                ORDER BY roles.name;",
5✔
1014
            )
5✔
1015
            .await
5✔
1016
            .context(PostgresRoleDbError)?;
5✔
1017

5✔
1018
        let results = conn
5✔
1019
            .query(&stmt, &[&user_id])
5✔
1020
            .await
5✔
1021
            .context(PostgresRoleDbError)?;
5✔
1022

5✔
1023
        let mut result_vec = Vec::new();
5✔
1024

5✔
1025
        for result in results {
17✔
1026
            let id = result.get(0);
12✔
1027
            let name = result.get(1);
12✔
1028
            let individual = UserId(id) == *user_id;
12✔
1029
            result_vec.push(RoleDescription {
12✔
1030
                role: Role {
12✔
1031
                    id: RoleId(id),
12✔
1032
                    name,
12✔
1033
                },
12✔
1034
                individual,
12✔
1035
            });
12✔
1036
        }
12✔
1037

5✔
1038
        Ok(result_vec)
5✔
1039
    }
5✔
1040
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc