• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12243970346

09 Dec 2024 08:44PM UTC coverage: 90.501% (-0.01%) from 90.512%
12243970346

Pull #1003

github

web-flow
Merge 832663083 into 382f310f6
Pull Request #1003: layer workflows

138 of 147 new or added lines in 7 files covered. (93.88%)

28 existing lines in 13 files now uncovered.

133112 of 147083 relevant lines covered (90.5%)

54737.39 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.78
/services/src/pro/users/postgres_userdb.rs
1
use crate::contexts::SessionId;
2
use crate::error::{Error, Result};
3
use crate::pro::contexts::{ProApplicationContext, ProPostgresDb};
4
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
5
use crate::pro::permissions::{Role, RoleDescription, RoleId};
6
use crate::pro::users::oidc::{FlatMaybeEncryptedOidcTokens, OidcTokens, UserClaims};
7
use crate::pro::users::userdb::{
8
    CannotRevokeRoleThatIsNotAssignedRoleDbError, RoleIdDoesNotExistRoleDbError,
9
};
10
use crate::pro::users::{
11
    SessionTokenStore, StoredOidcTokens, User, UserCredentials, UserDb, UserId, UserInfo,
12
    UserRegistration, UserSession,
13
};
14
use crate::projects::{ProjectId, STRectangle};
15
use crate::util::postgres::PostgresErrorExt;
16
use crate::util::Identifier;
17
use crate::{error, pro::contexts::ProPostgresContext};
18
use async_trait::async_trait;
19

20
use crate::util::encryption::MaybeEncryptedBytes;
21
use bb8_postgres::{
22
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
23
};
24
use oauth2::AccessToken;
25
use pwhash::bcrypt;
26
use snafu::{ensure, ResultExt};
27
use tokio_postgres::Transaction;
28
use uuid::Uuid;
29

30
use super::userdb::{
31
    Bb8RoleDbError, PermissionDbRoleDbError, PostgresRoleDbError, RoleDb, RoleDbError, UserAuth,
32
};
33

34
#[async_trait]
35
impl<Tls> UserAuth for ProPostgresContext<Tls>
36
where
37
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
38
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
39
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
40
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
41
{
42
    // TODO: clean up expired sessions?
43

44
    async fn register_user(&self, user: UserRegistration) -> Result<UserId> {
43✔
45
        let mut conn = self.pool.get().await?;
43✔
46

47
        let tx = conn.build_transaction().start().await?;
43✔
48

49
        let user = User::from(user);
43✔
50

51
        let stmt = tx
43✔
52
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
43✔
53
            .await?;
42✔
54
        tx.execute(&stmt, &[&user.id, &user.email])
43✔
55
            .await
40✔
56
            .map_unique_violation("roles", "name", || error::Error::Duplicate {
43✔
57
                reason: "E-mail already exists".to_string(),
15✔
58
            })?;
43✔
59

60
        let stmt = tx
28✔
61
            .prepare(
28✔
62
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
28✔
63
            )
28✔
64
            .await?;
27✔
65

66
        let quota_available =
28✔
67
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
28✔
68
                .initial_credits;
69

70
        tx.execute(
28✔
71
            &stmt,
28✔
72
            &[
28✔
73
                &user.id,
28✔
74
                &user.email,
28✔
75
                &user.password_hash,
28✔
76
                &user.real_name,
28✔
77
                &quota_available,
28✔
78
                &user.active,
28✔
79
            ],
28✔
80
        )
28✔
81
        .await?;
25✔
82

83
        let stmt = tx
28✔
84
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
28✔
85
            .await?;
26✔
86
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
28✔
87

88
        let stmt = tx
28✔
89
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
28✔
90
            .await?;
26✔
91
        tx.execute(&stmt, &[&user.id, &Role::registered_user_role_id()])
28✔
92
            .await?;
26✔
93

94
        tx.commit().await?;
28✔
95

96
        Ok(user.id)
28✔
97
    }
86✔
98

99
    async fn create_anonymous_session(&self) -> Result<UserSession> {
41✔
100
        let mut conn = self.pool.get().await?;
41✔
101

102
        let tx = conn.build_transaction().start().await?;
41✔
103

104
        let user_id = UserId::new();
41✔
105

106
        let stmt = tx
41✔
107
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
41✔
108
            .await?;
36✔
109
        tx.execute(&stmt, &[&user_id, &format!("anonymous_user_{user_id}")])
41✔
110
            .await?;
37✔
111

112
        let quota_available =
41✔
113
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
41✔
114
                .initial_credits;
115

116
        let stmt = tx
41✔
117
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
41✔
118
            .await?;
38✔
119

120
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
41✔
121

122
        let stmt = tx
41✔
123
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
41✔
124
            .await?;
38✔
125
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
41✔
126

127
        let stmt = tx
41✔
128
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
41✔
129
            .await?;
37✔
130
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
41✔
131
            .await?;
37✔
132

133
        let session_id = SessionId::new();
41✔
134
        let stmt = tx
41✔
135
            .prepare(
41✔
136
                "
41✔
137
                INSERT INTO sessions (id)
41✔
138
                VALUES ($1);",
41✔
139
            )
41✔
140
            .await?;
36✔
141

142
        tx.execute(&stmt, &[&session_id]).await?;
41✔
143

144
        let stmt = tx
41✔
145
            .prepare(
41✔
146
                "
41✔
147
            INSERT INTO 
41✔
148
                user_sessions (user_id, session_id, created, valid_until) 
41✔
149
            VALUES 
41✔
150
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
41✔
151
            RETURNING 
41✔
152
                created, valid_until;",
41✔
153
            )
41✔
154
            .await?;
37✔
155

156
        // TODO: load from config
157
        let session_duration = chrono::Duration::days(30);
41✔
158
        let row = tx
41✔
159
            .query_one(
41✔
160
                &stmt,
41✔
161
                &[
41✔
162
                    &user_id,
41✔
163
                    &session_id,
41✔
164
                    &(session_duration.num_seconds() as f64),
41✔
165
                ],
41✔
166
            )
41✔
167
            .await?;
36✔
168

169
        tx.commit().await?;
41✔
170

171
        Ok(UserSession {
41✔
172
            id: session_id,
41✔
173
            user: UserInfo {
41✔
174
                id: user_id,
41✔
175
                email: None,
41✔
176
                real_name: None,
41✔
177
            },
41✔
178
            created: row.get(0),
41✔
179
            valid_until: row.get(1),
41✔
180
            project: None,
41✔
181
            view: None,
41✔
182
            roles: vec![user_id.into(), Role::anonymous_role_id()],
41✔
183
        })
41✔
184
    }
82✔
185

186
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
59✔
187
        let mut conn = self.pool.get().await?;
59✔
188

189
        let tx = conn.build_transaction().start().await?;
59✔
190

191
        let stmt = tx
59✔
192
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
59✔
193
            .await?;
53✔
194

195
        let row = tx
59✔
196
            .query_one(&stmt, &[&user_credentials.email])
59✔
197
            .await
51✔
198
            .map_err(|_error| error::Error::LoginFailed)?;
59✔
199

200
        let user_id = UserId(row.get(0));
59✔
201
        let password_hash = row.get(1);
59✔
202
        let email = row.get(2);
59✔
203
        let real_name = row.get(3);
59✔
204

59✔
205
        if bcrypt::verify(user_credentials.password, password_hash) {
59✔
206
            let session_id = SessionId::new();
58✔
207
            let stmt = tx
58✔
208
                .prepare(
58✔
209
                    "
58✔
210
                INSERT INTO sessions (id)
58✔
211
                VALUES ($1);",
58✔
212
                )
58✔
213
                .await?;
53✔
214

215
            tx.execute(&stmt, &[&session_id]).await?;
58✔
216

217
            // TODO: load from config
218
            let session_duration = chrono::Duration::days(30);
58✔
219
            let stmt = tx
58✔
220
                .prepare(
58✔
221
                    "
58✔
222
                INSERT INTO 
58✔
223
                    user_sessions (user_id, session_id, created, valid_until) 
58✔
224
                VALUES 
58✔
225
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
58✔
226
                RETURNING 
58✔
227
                    created, valid_until;",
58✔
228
                )
58✔
229
                .await?;
50✔
230
            let row = tx
58✔
231
                .query_one(
58✔
232
                    &stmt,
58✔
233
                    &[
58✔
234
                        &user_id,
58✔
235
                        &session_id,
58✔
236
                        &(session_duration.num_seconds() as f64),
58✔
237
                    ],
58✔
238
                )
58✔
239
                .await?;
50✔
240

241
            let stmt = tx
58✔
242
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
58✔
243
                .await?;
50✔
244

245
            let rows = tx
58✔
246
                .query(&stmt, &[&user_id])
58✔
247
                .await
49✔
248
                .map_err(|_error| error::Error::LoginFailed)?;
58✔
249

250
            tx.commit().await?;
58✔
251

252
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
90✔
253

58✔
254
            Ok(UserSession {
58✔
255
                id: session_id,
58✔
256
                user: UserInfo {
58✔
257
                    id: user_id,
58✔
258
                    email,
58✔
259
                    real_name,
58✔
260
                },
58✔
261
                created: row.get(0),
58✔
262
                valid_until: row.get(1),
58✔
263
                project: None,
58✔
264
                view: None,
58✔
265
                roles,
58✔
266
            })
58✔
267
        } else {
268
            Err(error::Error::LoginFailed)
1✔
269
        }
270
    }
118✔
271

272
    #[allow(clippy::too_many_lines)]
273
    async fn login_external(
274
        &self,
275
        user: UserClaims,
276
        oidc_tokens: OidcTokens,
277
    ) -> Result<UserSession> {
8✔
278
        let mut conn = self.pool.get().await?;
8✔
279
        let tx = conn.build_transaction().start().await?;
8✔
280

281
        let stmt = tx
8✔
282
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
8✔
283
            .await?;
8✔
284

285
        let row = tx
8✔
286
            .query_opt(&stmt, &[&user.external_id.to_string()])
8✔
287
            .await
8✔
288
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
289

290
        let user_id = match row {
8✔
291
            Some(row) => UserId(row.get(0)),
2✔
292
            None => {
293
                let user_id = UserId::new();
6✔
294

295
                let stmt = tx
6✔
296
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
6✔
297
                    .await?;
6✔
298
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
6✔
299

300
                let quota_available =
6✔
301
                    crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
6✔
302
                        .initial_credits;
303

304
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
305
                let stmt = tx
6✔
306
                    .prepare(
6✔
307
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
6✔
308
                    )
6✔
309
                    .await?;
6✔
310
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
6✔
311

312
                let stmt = tx
6✔
313
                    .prepare(
6✔
314
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
6✔
315
                    )
6✔
316
                    .await?;
6✔
317

318
                tx.execute(
6✔
319
                    &stmt,
6✔
320
                    &[
6✔
321
                        &user_id,
6✔
322
                        &user.external_id.to_string(),
6✔
323
                        &user.email,
6✔
324
                        &user.real_name,
6✔
325
                        &true,
6✔
326
                    ],
6✔
327
                )
6✔
328
                .await?;
6✔
329

330
                let stmt = tx
6✔
331
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
332
                    .await?;
6✔
333
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
6✔
334

335
                let stmt = tx
6✔
336
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
337
                    .await?;
5✔
338
                tx.execute(&stmt, &[&user_id, &Role::registered_user_role_id()])
6✔
339
                    .await?;
6✔
340

341
                user_id
6✔
342
            }
343
        };
344

345
        let session_id = SessionId::new();
8✔
346
        let stmt = tx
8✔
347
            .prepare(
8✔
348
                "
8✔
349
            INSERT INTO sessions (id)
8✔
350
            VALUES ($1);",
8✔
351
            )
8✔
352
            .await?;
8✔
353

354
        tx.execute(&stmt, &[&session_id]).await?;
8✔
355

356
        let stmt = tx
8✔
357
            .prepare(
8✔
358
                "
8✔
359
            INSERT INTO 
8✔
360
                user_sessions (user_id, session_id, created, valid_until) 
8✔
361
            VALUES 
8✔
362
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
8✔
363
            RETURNING 
8✔
364
                created, valid_until;",
8✔
365
            )
8✔
366
            .await?;
8✔
367
        let row = tx
8✔
368
            .query_one(
8✔
369
                &stmt,
8✔
370
                &[
8✔
371
                    &user_id,
8✔
372
                    &session_id,
8✔
373
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
374
                ],
8✔
375
            )
8✔
376
            .await?;
8✔
377

378
        self.store_oidc_session_tokens(session_id, oidc_tokens, &tx)
8✔
379
            .await?;
16✔
380

381
        let stmt = tx
8✔
382
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
8✔
383
            .await?;
8✔
384

385
        let rows = tx
8✔
386
            .query(&stmt, &[&user_id])
8✔
387
            .await
9✔
388
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
389

390
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
16✔
391

8✔
392
        tx.commit().await?;
8✔
393

394
        Ok(UserSession {
8✔
395
            id: session_id,
8✔
396
            user: UserInfo {
8✔
397
                id: user_id,
8✔
398
                email: Some(user.email.clone()),
8✔
399
                real_name: Some(user.real_name.clone()),
8✔
400
            },
8✔
401
            created: row.get(0),
8✔
402
            valid_until: row.get(1),
8✔
403
            project: None,
8✔
404
            view: None,
8✔
405
            roles,
8✔
406
        })
8✔
407
    }
16✔
408

409
    async fn user_session_by_id(&self, session: SessionId) -> Result<UserSession> {
58✔
410
        let mut conn = self.pool.get().await?;
58✔
411

412
        let tx = conn.build_transaction().start().await?;
58✔
413

414
        let stmt = tx
58✔
415
            .prepare(
58✔
416
                "
58✔
417
            SELECT 
58✔
418
                u.id,   
58✔
419
                COALESCE(u.email, eu.email) AS email,
58✔
420
                COALESCE(u.real_name, eu.real_name) AS real_name,
58✔
421
                us.created, 
58✔
422
                us.valid_until, 
58✔
423
                s.project_id,
58✔
424
                s.view,
58✔
425
                CASE WHEN CURRENT_TIMESTAMP < us.valid_until THEN TRUE ELSE FALSE END AS valid_session
58✔
426
            FROM
58✔
427
                sessions s JOIN user_sessions us ON (s.id = us.session_id)
58✔
428
                    JOIN users u ON (us.user_id = u.id)
58✔
429
                    LEFT JOIN external_users eu ON (u.id = eu.id)
58✔
430
                    LEFT JOIN oidc_session_tokens t ON (s.id = t.session_id)
58✔
431
            WHERE s.id = $1 AND (CURRENT_TIMESTAMP < us.valid_until OR t.refresh_token IS NOT NULL);",
58✔
432
            )
58✔
433
            .await?;
312✔
434

435
        let row = tx
58✔
436
            .query_one(&stmt, &[&session])
58✔
437
            .await
49✔
438
            .map_err(|_error| error::Error::InvalidSession)?;
58✔
439

440
        let valid_session: bool = row.get(7);
51✔
441

442
        let valid_until = if valid_session {
51✔
443
            row.get(4)
50✔
444
        } else {
445
            log::debug!("Session expired, trying to extend");
1✔
446
            let refresh_result = self.refresh_oidc_session_tokens(session, &tx).await;
11✔
447

448
            if let Err(refresh_error) = refresh_result {
1✔
449
                log::debug!("Session extension failed {}", refresh_error);
×
450
                return Err(Error::InvalidSession);
×
451
            }
1✔
452
            log::debug!("Session extended");
1✔
453
            refresh_result
1✔
454
                .expect("Refresh result should exist")
1✔
455
                .db_valid_until
1✔
456
        };
457

458
        let mut session = UserSession {
51✔
459
            id: session,
51✔
460
            user: UserInfo {
51✔
461
                id: row.get(0),
51✔
462
                email: row.get(1),
51✔
463
                real_name: row.get(2),
51✔
464
            },
51✔
465
            created: row.get(3),
51✔
466
            valid_until,
51✔
467
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
51✔
468
            view: row.get(6),
51✔
469
            roles: vec![],
51✔
470
        };
51✔
471

472
        let stmt = tx
51✔
473
            .prepare(
51✔
474
                "
51✔
475
            SELECT role_id FROM user_roles WHERE user_id = $1;
51✔
476
            ",
51✔
477
            )
51✔
478
            .await?;
42✔
479

480
        let rows = tx.query(&stmt, &[&session.user.id]).await?;
51✔
481

482
        tx.commit().await?;
51✔
483

484
        session.roles = rows.into_iter().map(|row| row.get(0)).collect();
90✔
485

51✔
486
        Ok(session)
51✔
487
    }
116✔
488
}
489

490
#[async_trait]
491
impl<Tls> SessionTokenStore for ProPostgresContext<Tls>
492
where
493
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
494
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
495
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
496
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
497
{
498
    async fn store_oidc_session_tokens(
499
        &self,
500
        session: SessionId,
501
        oidc_tokens: OidcTokens,
502
        tx: &Transaction<'_>,
503
    ) -> Result<StoredOidcTokens> {
8✔
504
        let flat_tokens: FlatMaybeEncryptedOidcTokens = self
8✔
505
            .oidc_manager()
8✔
506
            .maybe_encrypt_tokens(&oidc_tokens)?
8✔
507
            .into();
8✔
508

509
        let stmt = tx
8✔
510
            .prepare(
8✔
511
                "
8✔
512
                INSERT INTO oidc_session_tokens
8✔
513
                    (session_id, access_token, access_token_encryption_nonce, access_token_valid_until, refresh_token, refresh_token_encryption_nonce)
8✔
514
                VALUES
8✔
515
                    ($1, $2, $3, CURRENT_TIMESTAMP + make_interval(secs:=$4), $5, $6)
8✔
516
                RETURNING
8✔
517
                    access_token_valid_until;
8✔
518
                ;"
8✔
519
            )
8✔
520
            .await?;
8✔
521

522
        let db_valid_until = tx
8✔
523
            .query_one(
8✔
524
                &stmt,
8✔
525
                &[
8✔
526
                    &session,
8✔
527
                    &flat_tokens.access_token_value,
8✔
528
                    &flat_tokens.access_token_nonce,
8✔
529
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
530
                    &flat_tokens.refresh_token_value,
8✔
531
                    &flat_tokens.refresh_token_nonce,
8✔
532
                ],
8✔
533
            )
8✔
534
            .await?
8✔
535
            .get(0);
8✔
536

8✔
537
        Ok(StoredOidcTokens {
8✔
538
            oidc_tokens,
8✔
539
            db_valid_until,
8✔
540
        })
8✔
541
    }
16✔
542

543
    async fn refresh_oidc_session_tokens(
544
        &self,
545
        session: SessionId,
546
        tx: &Transaction<'_>,
547
    ) -> Result<StoredOidcTokens> {
3✔
548
        let stmt = tx
3✔
549
            .prepare(
3✔
550
                "
3✔
551
            SELECT
3✔
552
                refresh_token,
3✔
553
                refresh_token_encryption_nonce
3✔
554
            FROM
3✔
555
                oidc_session_tokens
3✔
556
            WHERE session_id = $1 AND refresh_token IS NOT NULL;",
3✔
557
            )
3✔
558
            .await?;
3✔
559

560
        let rows = tx.query_opt(&stmt, &[&session]).await?;
3✔
561

562
        if let Some(refresh_string) = rows {
3✔
563
            let string_field_and_nonce = MaybeEncryptedBytes {
3✔
564
                value: refresh_string.get(0),
3✔
565
                nonce: refresh_string.get(1),
3✔
566
            };
3✔
567

568
            let refresh_token = self
3✔
569
                .oidc_manager()
3✔
570
                .maybe_decrypt_refresh_token(string_field_and_nonce)?;
3✔
571

572
            let oidc_manager = self.oidc_manager();
3✔
573

574
            let oidc_tokens = oidc_manager
3✔
575
                .get_client()
3✔
576
                .await?
5✔
577
                .refresh_access_token(refresh_token)
3✔
578
                .await?;
1✔
579

580
            let flat_tokens: FlatMaybeEncryptedOidcTokens = self
3✔
581
                .oidc_manager()
3✔
582
                .maybe_encrypt_tokens(&oidc_tokens)?
3✔
583
                .into();
3✔
584

585
            let update_session_tokens = tx.prepare("
3✔
586
                UPDATE
3✔
587
                    oidc_session_tokens
3✔
588
                SET
3✔
589
                    access_token = $2, access_token_encryption_nonce = $3, access_token_valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$4), refresh_token = $5, refresh_token_encryption_nonce = $6
3✔
590
                WHERE
3✔
591
                    session_id = $1;",
3✔
592

3✔
593
            ).await?;
3✔
594

595
            tx.execute(
3✔
596
                &update_session_tokens,
3✔
597
                &[
3✔
598
                    &session,
3✔
599
                    &flat_tokens.access_token_value,
3✔
600
                    &flat_tokens.access_token_nonce,
3✔
601
                    &(oidc_tokens.expires_in.num_seconds() as f64),
3✔
602
                    &flat_tokens.refresh_token_value,
3✔
603
                    &flat_tokens.refresh_token_nonce,
3✔
604
                ],
3✔
605
            )
3✔
606
            .await?;
3✔
607

608
            let stmt = tx
3✔
609
                .prepare(
3✔
610
                    "
3✔
611
                UPDATE
3✔
612
                    user_sessions
3✔
613
                SET
3✔
614
                    valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$2)
3✔
615
                WHERE
3✔
616
                    session_id = $1
3✔
617
                RETURNING
3✔
618
                    valid_until;",
3✔
619
                )
3✔
620
                .await?;
3✔
621

622
            let expiration = tx
3✔
623
                .query_one(
3✔
624
                    &stmt,
3✔
625
                    &[&session, &(oidc_tokens.expires_in.num_seconds() as f64)],
3✔
626
                )
3✔
627
                .await?
3✔
628
                .get(0);
3✔
629

3✔
630
            return Ok(StoredOidcTokens {
3✔
631
                oidc_tokens,
3✔
632
                db_valid_until: expiration,
3✔
633
            });
3✔
634
        };
×
635

×
636
        Err(Error::InvalidSession)
×
637
    }
6✔
638

639
    async fn get_access_token(&self, session: SessionId) -> Result<AccessToken> {
4✔
640
        let mut conn = self.pool.get().await?;
4✔
641

642
        let tx = conn.build_transaction().start().await?;
4✔
643

644
        let stmt = tx
4✔
645
            .prepare(
4✔
646
                "
4✔
647
            SELECT
4✔
648
                access_token,
4✔
649
                access_token_encryption_nonce
4✔
650
            FROM
4✔
651
                oidc_session_tokens
4✔
652
            WHERE session_id = $1 AND (CURRENT_TIMESTAMP < access_token_valid_until);",
4✔
653
            )
4✔
654
            .await?;
4✔
655

656
        let rows = tx.query_opt(&stmt, &[&session]).await?;
4✔
657

658
        let access_token = if let Some(token_row) = rows {
4✔
659
            let string_field_and_nonce = MaybeEncryptedBytes {
2✔
660
                value: token_row.get(0),
2✔
661
                nonce: token_row.get(1),
2✔
662
            };
2✔
663
            self.oidc_manager()
2✔
664
                .maybe_decrypt_access_token(string_field_and_nonce)?
2✔
665
        } else {
666
            self.refresh_oidc_session_tokens(session, &tx)
2✔
667
                .await?
12✔
668
                .oidc_tokens
669
                .access
670
        };
671

672
        tx.commit().await?;
4✔
673

674
        Ok(access_token)
4✔
675
    }
8✔
676
}
677

678
#[async_trait]
679
impl<Tls> UserDb for ProPostgresDb<Tls>
680
where
681
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
682
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
683
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
684
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
685
{
686
    // TODO: clean up expired sessions?
687

688
    async fn logout(&self) -> Result<()> {
6✔
689
        let conn = self.conn_pool.get().await?;
6✔
690
        let stmt = conn
6✔
691
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
6✔
692
            .await?;
6✔
693

694
        conn.execute(&stmt, &[&self.session.id])
6✔
695
            .await
6✔
696
            .map_err(|_error| error::Error::LogoutFailed)?;
6✔
697
        Ok(())
6✔
698
    }
12✔
699

700
    async fn set_session_project(&self, project: ProjectId) -> Result<()> {
3✔
701
        // TODO: check permission
702

703
        let conn = self.conn_pool.get().await?;
3✔
704
        let stmt = conn
3✔
705
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
3✔
706
            .await?;
3✔
707

708
        conn.execute(&stmt, &[&project, &self.session.id]).await?;
3✔
709

710
        Ok(())
3✔
711
    }
6✔
712

713
    async fn set_session_view(&self, view: STRectangle) -> Result<()> {
3✔
714
        let conn = self.conn_pool.get().await?;
3✔
715
        let stmt = conn
3✔
716
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
3✔
717
            .await?;
2✔
718

719
        conn.execute(&stmt, &[&view, &self.session.id]).await?;
3✔
720

721
        Ok(())
3✔
722
    }
6✔
723

724
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
1✔
725
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
726

727
        let conn = self.conn_pool.get().await?;
1✔
728
        let stmt = conn
1✔
729
            .prepare(
1✔
730
                "
1✔
731
            UPDATE users SET 
1✔
732
                quota_available = quota_available - $1, 
1✔
733
                quota_used = quota_used + $1
1✔
734
            WHERE id = $2;",
1✔
735
            )
1✔
736
            .await?;
1✔
737

738
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
1✔
739

740
        Ok(())
1✔
741
    }
2✔
742

743
    async fn bulk_increment_quota_used<I: IntoIterator<Item = (UserId, u64)> + Send>(
744
        &self,
745
        quota_used_updates: I,
746
    ) -> Result<()> {
14✔
747
        ensure!(self.session.is_admin(), error::PermissionDenied);
14✔
748

749
        let conn = self.conn_pool.get().await?;
20✔
750

751
        // collect the user ids and quotas into separate vectors to pass them as parameters to the query
752
        let (users, quotas): (Vec<UserId>, Vec<i64>) = quota_used_updates
14✔
753
            .into_iter()
14✔
754
            .map(|(user, quota)| (user, quota as i64))
14✔
755
            .unzip();
14✔
756

14✔
757
        let query = "
14✔
758
            UPDATE users
14✔
759
            SET quota_available = quota_available - quota_changes.quota, 
14✔
760
                quota_used = quota_used + quota_changes.quota
14✔
761
            FROM 
14✔
762
                (SELECT * FROM UNNEST($1::uuid[], $2::bigint[]) AS t(id, quota)) AS quota_changes
14✔
763
            WHERE users.id = quota_changes.id;
14✔
764
        ";
14✔
765

14✔
766
        conn.execute(query, &[&users, &quotas]).await?;
28✔
767

768
        Ok(())
14✔
769
    }
28✔
770

771
    async fn quota_used(&self) -> Result<u64> {
7✔
772
        let conn = self.conn_pool.get().await?;
10✔
773
        let stmt = conn
7✔
774
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
775
            .await?;
7✔
776

777
        let row = conn
7✔
778
            .query_one(&stmt, &[&self.session.user.id])
7✔
779
            .await
7✔
780
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
781

782
        Ok(row.get::<usize, i64>(0) as u64)
7✔
783
    }
14✔
784

785
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
7✔
786
        ensure!(
7✔
787
            self.session.user.id == *user || self.session.is_admin(),
7✔
788
            error::PermissionDenied
×
789
        );
790

791
        let conn = self.conn_pool.get().await?;
7✔
792
        let stmt = conn
7✔
793
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
794
            .await?;
7✔
795

796
        let row = conn
7✔
797
            .query_one(&stmt, &[&user])
7✔
798
            .await
7✔
799
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
800

801
        Ok(row.get::<usize, i64>(0) as u64)
7✔
802
    }
14✔
803

804
    async fn quota_available(&self) -> Result<i64> {
14✔
805
        let conn = self.conn_pool.get().await?;
14✔
806
        let stmt = conn
14✔
807
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
14✔
808
            .await?;
10✔
809

810
        let row = conn
14✔
811
            .query_one(&stmt, &[&self.session.user.id])
14✔
812
            .await
10✔
813
            .map_err(|_error| error::Error::InvalidSession)?;
14✔
814

815
        Ok(row.get::<usize, i64>(0))
14✔
816
    }
28✔
817

818
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
6✔
819
        ensure!(
6✔
820
            self.session.user.id == *user || self.session.is_admin(),
6✔
821
            error::PermissionDenied
×
822
        );
823

824
        let conn = self.conn_pool.get().await?;
6✔
825
        let stmt = conn
6✔
826
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
6✔
827
            .await?;
4✔
828

829
        let row = conn
6✔
830
            .query_one(&stmt, &[&user])
6✔
831
            .await
4✔
832
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
833

834
        Ok(row.get::<usize, i64>(0))
6✔
835
    }
12✔
836

837
    async fn update_quota_available_by_user(
838
        &self,
839
        user: &UserId,
840
        new_available_quota: i64,
841
    ) -> Result<()> {
5✔
842
        ensure!(self.session.is_admin(), error::PermissionDenied);
5✔
843

844
        let conn = self.conn_pool.get().await?;
5✔
845
        let stmt = conn
5✔
846
            .prepare(
5✔
847
                "
5✔
848
            UPDATE users SET 
5✔
849
                quota_available = $1
5✔
850
            WHERE id = $2;",
5✔
851
            )
5✔
852
            .await?;
3✔
853

854
        conn.execute(&stmt, &[&(new_available_quota), &user])
5✔
855
            .await?;
5✔
856

857
        Ok(())
5✔
858
    }
10✔
859
}
860

861
#[async_trait]
862
impl<Tls> RoleDb for ProPostgresDb<Tls>
863
where
864
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
865
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
866
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
867
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
868
{
869
    async fn add_role(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
4✔
870
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
871

872
        let tx = conn
4✔
873
            .build_transaction()
4✔
874
            .start()
4✔
875
            .await
4✔
876
            .context(PostgresRoleDbError)?;
4✔
877

878
        self.ensure_admin_in_tx(&tx)
4✔
879
            .await
×
880
            .context(PermissionDbRoleDbError)?;
4✔
881

882
        let id = RoleId::new();
4✔
883

884
        let res = tx
4✔
885
            .execute(
4✔
886
                "INSERT INTO roles (id, name) VALUES ($1, $2);",
4✔
887
                &[&id, &role_name],
4✔
888
            )
4✔
889
            .await;
8✔
890

891
        if let Err(err) = res {
4✔
892
            if err.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
×
893
                return Err(RoleDbError::RoleAlreadyExists {
×
894
                    role_name: role_name.to_string(),
×
895
                });
×
896
            }
×
897
        }
4✔
898

899
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
900

901
        Ok(id)
4✔
902
    }
8✔
903

904
    async fn load_role_by_name(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
×
905
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
×
906

907
        let row = conn
×
908
            .query_opt("SELECT id FROM roles WHERE name = $1;", &[&role_name])
×
909
            .await
×
910
            .context(PostgresRoleDbError)?
×
911
            .ok_or(RoleDbError::RoleNameDoesNotExist {
×
912
                role_name: role_name.to_string(),
×
913
            })?;
×
914

915
        Ok(RoleId(row.get(0)))
×
916
    }
×
917

918
    async fn remove_role(&self, role_id: &RoleId) -> Result<(), RoleDbError> {
2✔
919
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
920

921
        let tx = conn
2✔
922
            .build_transaction()
2✔
923
            .start()
2✔
UNCOV
924
            .await
×
925
            .context(PostgresRoleDbError)?;
2✔
926

927
        self.ensure_admin_in_tx(&tx)
2✔
928
            .await
×
929
            .context(PermissionDbRoleDbError)?;
2✔
930

931
        let deleted = tx
2✔
932
            .execute("DELETE FROM roles WHERE id = $1;", &[&role_id])
2✔
UNCOV
933
            .await
×
934
            .context(PostgresRoleDbError)?;
2✔
935

936
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
937

938
        ensure!(
2✔
939
            deleted > 0,
2✔
940
            RoleIdDoesNotExistRoleDbError { role_id: *role_id }
×
941
        );
942

943
        Ok(())
2✔
944
    }
4✔
945

946
    async fn assign_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
4✔
947
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
948

949
        let tx = conn
4✔
950
            .build_transaction()
4✔
951
            .start()
4✔
952
            .await
2✔
953
            .context(PostgresRoleDbError)?;
4✔
954

955
        self.ensure_admin_in_tx(&tx)
4✔
956
            .await
×
957
            .context(PermissionDbRoleDbError)?;
4✔
958

959
        tx.execute(
4✔
960
            "INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2) ON CONFLICT DO NOTHING;",
4✔
961
            &[&user_id, &role_id],
4✔
962
        )
4✔
963
        .await
4✔
964
        .context(PostgresRoleDbError)?;
4✔
965

966
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
967

968
        Ok(())
4✔
969
    }
8✔
970

971
    async fn revoke_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
2✔
972
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
973

974
        let tx = conn
2✔
975
            .build_transaction()
2✔
976
            .start()
2✔
977
            .await
1✔
978
            .context(PostgresRoleDbError)?;
2✔
979

980
        self.ensure_admin_in_tx(&tx)
2✔
981
            .await
×
982
            .context(PermissionDbRoleDbError)?;
2✔
983

984
        let deleted = tx
2✔
985
            .execute(
2✔
986
                "DELETE FROM user_roles WHERE user_id= $1 AND role_id = $2;",
2✔
987
                &[&user_id, &role_id],
2✔
988
            )
2✔
UNCOV
989
            .await
×
990
            .context(PostgresRoleDbError)?;
2✔
991

992
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
993

994
        ensure!(
2✔
995
            deleted > 0,
2✔
996
            CannotRevokeRoleThatIsNotAssignedRoleDbError { role_id: *role_id }
×
997
        );
998

999
        Ok(())
2✔
1000
    }
4✔
1001

1002
    async fn get_role_descriptions(
1003
        &self,
1004
        user_id: &UserId,
1005
    ) -> Result<Vec<RoleDescription>, RoleDbError> {
5✔
1006
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
5✔
1007

1008
        let stmt = conn
5✔
1009
            .prepare(
5✔
1010
                "SELECT roles.id, roles.name \
5✔
1011
                FROM roles JOIN user_roles ON (roles.id=user_roles.role_id) \
5✔
1012
                WHERE user_roles.user_id=$1 \
5✔
1013
                ORDER BY roles.name;",
5✔
1014
            )
5✔
1015
            .await
1✔
1016
            .context(PostgresRoleDbError)?;
5✔
1017

1018
        let results = conn
5✔
1019
            .query(&stmt, &[&user_id])
5✔
1020
            .await
1✔
1021
            .context(PostgresRoleDbError)?;
5✔
1022

1023
        let mut result_vec = Vec::new();
5✔
1024

1025
        for result in results {
17✔
1026
            let id = result.get(0);
12✔
1027
            let name = result.get(1);
12✔
1028
            let individual = UserId(id) == *user_id;
12✔
1029
            result_vec.push(RoleDescription {
12✔
1030
                role: Role {
12✔
1031
                    id: RoleId(id),
12✔
1032
                    name,
12✔
1033
                },
12✔
1034
                individual,
12✔
1035
            });
12✔
1036
        }
12✔
1037

1038
        Ok(result_vec)
5✔
1039
    }
10✔
1040
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc