• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12415218959

19 Dec 2024 03:00PM UTC coverage: 90.359% (-0.2%) from 90.512%
12415218959

Pull #998

github

web-flow
Merge 70dc35aa7 into 34e12969f
Pull Request #998: quota logging wip

836 of 1213 new or added lines in 66 files covered. (68.92%)

212 existing lines in 18 files now uncovered.

133845 of 148125 relevant lines covered (90.36%)

54352.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.1
/services/src/pro/users/postgres_userdb.rs
1
use crate::contexts::SessionId;
2
use crate::error::{Error, Result};
3
use crate::pro::api::handlers::users::UsageSummaryGranularity;
4
use crate::pro::contexts::{ProApplicationContext, ProPostgresDb};
5
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
6
use crate::pro::permissions::{Role, RoleDescription, RoleId};
7
use crate::pro::quota::{ComputationQuota, DataUsage, DataUsageSummary, OperatorQuota};
8
use crate::pro::users::oidc::{FlatMaybeEncryptedOidcTokens, OidcTokens, UserClaims};
9
use crate::pro::users::userdb::{
10
    CannotRevokeRoleThatIsNotAssignedRoleDbError, RoleIdDoesNotExistRoleDbError,
11
};
12
use crate::pro::users::{
13
    SessionTokenStore, StoredOidcTokens, User, UserCredentials, UserDb, UserId, UserInfo,
14
    UserRegistration, UserSession,
15
};
16
use crate::projects::{ProjectId, STRectangle};
17
use crate::util::postgres::PostgresErrorExt;
18
use crate::util::Identifier;
19
use crate::{error, pro::contexts::ProPostgresContext};
20
use async_trait::async_trait;
21
use geoengine_operators::meta::quota::ComputationUnit;
22

23
use crate::util::encryption::MaybeEncryptedBytes;
24
use bb8_postgres::{
25
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
26
};
27
use oauth2::AccessToken;
28
use pwhash::bcrypt;
29
use snafu::{ensure, ResultExt};
30
use tokio_postgres::Transaction;
31
use uuid::Uuid;
32

33
use super::userdb::{
34
    Bb8RoleDbError, PermissionDbRoleDbError, PostgresRoleDbError, RoleDb, RoleDbError, UserAuth,
35
};
36

37
#[async_trait]
38
impl<Tls> UserAuth for ProPostgresContext<Tls>
39
where
40
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
41
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
42
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
43
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
44
{
45
    // TODO: clean up expired sessions?
46

47
    async fn register_user(&self, user: UserRegistration) -> Result<UserId> {
45✔
48
        let mut conn = self.pool.get().await?;
45✔
49

50
        let tx = conn.build_transaction().start().await?;
45✔
51

52
        let user = User::from(user);
45✔
53

54
        let stmt = tx
45✔
55
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
45✔
56
            .await?;
46✔
57
        tx.execute(&stmt, &[&user.id, &user.email])
45✔
58
            .await
40✔
59
            .map_unique_violation("roles", "name", || error::Error::Duplicate {
45✔
60
                reason: "E-mail already exists".to_string(),
15✔
61
            })?;
45✔
62

63
        let stmt = tx
30✔
64
            .prepare(
30✔
65
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
30✔
66
            )
30✔
67
            .await?;
27✔
68

69
        let quota_available =
30✔
70
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
30✔
71
                .initial_credits;
72

73
        tx.execute(
30✔
74
            &stmt,
30✔
75
            &[
30✔
76
                &user.id,
30✔
77
                &user.email,
30✔
78
                &user.password_hash,
30✔
79
                &user.real_name,
30✔
80
                &quota_available,
30✔
81
                &user.active,
30✔
82
            ],
30✔
83
        )
30✔
84
        .await?;
29✔
85

86
        let stmt = tx
30✔
87
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
30✔
88
            .await?;
29✔
89
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
30✔
90

91
        let stmt = tx
30✔
92
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
30✔
93
            .await?;
28✔
94
        tx.execute(&stmt, &[&user.id, &Role::registered_user_role_id()])
30✔
95
            .await?;
29✔
96

97
        tx.commit().await?;
30✔
98

99
        Ok(user.id)
30✔
100
    }
90✔
101

102
    async fn create_anonymous_session(&self) -> Result<UserSession> {
41✔
103
        let mut conn = self.pool.get().await?;
41✔
104

105
        let tx = conn.build_transaction().start().await?;
41✔
106

107
        let user_id = UserId::new();
41✔
108

109
        let stmt = tx
41✔
110
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
41✔
111
            .await?;
37✔
112
        tx.execute(&stmt, &[&user_id, &format!("anonymous_user_{user_id}")])
41✔
113
            .await?;
36✔
114

115
        let quota_available =
41✔
116
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
41✔
117
                .initial_credits;
118

119
        let stmt = tx
41✔
120
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
41✔
121
            .await?;
35✔
122

123
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
41✔
124

125
        let stmt = tx
41✔
126
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
41✔
127
            .await?;
35✔
128
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
41✔
129

130
        let stmt = tx
41✔
131
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
41✔
132
            .await?;
35✔
133
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
41✔
134
            .await?;
35✔
135

136
        let session_id = SessionId::new();
41✔
137
        let stmt = tx
41✔
138
            .prepare(
41✔
139
                "
41✔
140
                INSERT INTO sessions (id)
41✔
141
                VALUES ($1);",
41✔
142
            )
41✔
143
            .await?;
34✔
144

145
        tx.execute(&stmt, &[&session_id]).await?;
41✔
146

147
        let stmt = tx
41✔
148
            .prepare(
41✔
149
                "
41✔
150
            INSERT INTO 
41✔
151
                user_sessions (user_id, session_id, created, valid_until) 
41✔
152
            VALUES 
41✔
153
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
41✔
154
            RETURNING 
41✔
155
                created, valid_until;",
41✔
156
            )
41✔
157
            .await?;
36✔
158

159
        // TODO: load from config
160
        let session_duration = chrono::Duration::days(30);
41✔
161
        let row = tx
41✔
162
            .query_one(
41✔
163
                &stmt,
41✔
164
                &[
41✔
165
                    &user_id,
41✔
166
                    &session_id,
41✔
167
                    &(session_duration.num_seconds() as f64),
41✔
168
                ],
41✔
169
            )
41✔
170
            .await?;
36✔
171

172
        tx.commit().await?;
41✔
173

174
        Ok(UserSession {
41✔
175
            id: session_id,
41✔
176
            user: UserInfo {
41✔
177
                id: user_id,
41✔
178
                email: None,
41✔
179
                real_name: None,
41✔
180
            },
41✔
181
            created: row.get(0),
41✔
182
            valid_until: row.get(1),
41✔
183
            project: None,
41✔
184
            view: None,
41✔
185
            roles: vec![user_id.into(), Role::anonymous_role_id()],
41✔
186
        })
41✔
187
    }
82✔
188

189
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
63✔
190
        let mut conn = self.pool.get().await?;
63✔
191

192
        let tx = conn.build_transaction().start().await?;
63✔
193

194
        let stmt = tx
63✔
195
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
63✔
196
            .await?;
59✔
197

198
        let row = tx
63✔
199
            .query_one(&stmt, &[&user_credentials.email])
63✔
200
            .await
59✔
201
            .map_err(|_error| error::Error::LoginFailed)?;
63✔
202

203
        let user_id = UserId(row.get(0));
63✔
204
        let password_hash = row.get(1);
63✔
205
        let email = row.get(2);
63✔
206
        let real_name = row.get(3);
63✔
207

63✔
208
        if bcrypt::verify(user_credentials.password, password_hash) {
63✔
209
            let session_id = SessionId::new();
62✔
210
            let stmt = tx
62✔
211
                .prepare(
62✔
212
                    "
62✔
213
                INSERT INTO sessions (id)
62✔
214
                VALUES ($1);",
62✔
215
                )
62✔
216
                .await?;
58✔
217

218
            tx.execute(&stmt, &[&session_id]).await?;
62✔
219

220
            // TODO: load from config
221
            let session_duration = chrono::Duration::days(30);
62✔
222
            let stmt = tx
62✔
223
                .prepare(
62✔
224
                    "
62✔
225
                INSERT INTO 
62✔
226
                    user_sessions (user_id, session_id, created, valid_until) 
62✔
227
                VALUES 
62✔
228
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
62✔
229
                RETURNING 
62✔
230
                    created, valid_until;",
62✔
231
                )
62✔
232
                .await?;
57✔
233
            let row = tx
62✔
234
                .query_one(
62✔
235
                    &stmt,
62✔
236
                    &[
62✔
237
                        &user_id,
62✔
238
                        &session_id,
62✔
239
                        &(session_duration.num_seconds() as f64),
62✔
240
                    ],
62✔
241
                )
62✔
242
                .await?;
58✔
243

244
            let stmt = tx
62✔
245
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
62✔
246
                .await?;
58✔
247

248
            let rows = tx
62✔
249
                .query(&stmt, &[&user_id])
62✔
250
                .await
57✔
251
                .map_err(|_error| error::Error::LoginFailed)?;
62✔
252

253
            tx.commit().await?;
62✔
254

255
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
96✔
256

62✔
257
            Ok(UserSession {
62✔
258
                id: session_id,
62✔
259
                user: UserInfo {
62✔
260
                    id: user_id,
62✔
261
                    email,
62✔
262
                    real_name,
62✔
263
                },
62✔
264
                created: row.get(0),
62✔
265
                valid_until: row.get(1),
62✔
266
                project: None,
62✔
267
                view: None,
62✔
268
                roles,
62✔
269
            })
62✔
270
        } else {
271
            Err(error::Error::LoginFailed)
1✔
272
        }
273
    }
126✔
274

275
    #[allow(clippy::too_many_lines)]
276
    async fn login_external(
277
        &self,
278
        user: UserClaims,
279
        oidc_tokens: OidcTokens,
280
    ) -> Result<UserSession> {
8✔
281
        let mut conn = self.pool.get().await?;
8✔
282
        let tx = conn.build_transaction().start().await?;
8✔
283

284
        let stmt = tx
8✔
285
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
8✔
286
            .await?;
8✔
287

288
        let row = tx
8✔
289
            .query_opt(&stmt, &[&user.external_id.to_string()])
8✔
290
            .await
8✔
291
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
292

293
        let user_id = match row {
8✔
294
            Some(row) => UserId(row.get(0)),
2✔
295
            None => {
296
                let user_id = UserId::new();
6✔
297

298
                let stmt = tx
6✔
299
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
6✔
300
                    .await?;
6✔
301
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
6✔
302

303
                let quota_available =
6✔
304
                    crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
6✔
305
                        .initial_credits;
306

307
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
308
                let stmt = tx
6✔
309
                    .prepare(
6✔
310
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
6✔
311
                    )
6✔
312
                    .await?;
6✔
313
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
6✔
314

315
                let stmt = tx
6✔
316
                    .prepare(
6✔
317
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
6✔
318
                    )
6✔
319
                    .await?;
6✔
320

321
                tx.execute(
6✔
322
                    &stmt,
6✔
323
                    &[
6✔
324
                        &user_id,
6✔
325
                        &user.external_id.to_string(),
6✔
326
                        &user.email,
6✔
327
                        &user.real_name,
6✔
328
                        &true,
6✔
329
                    ],
6✔
330
                )
6✔
331
                .await?;
6✔
332

333
                let stmt = tx
6✔
334
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
335
                    .await?;
6✔
336
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
6✔
337

338
                let stmt = tx
6✔
339
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
340
                    .await?;
6✔
341
                tx.execute(&stmt, &[&user_id, &Role::registered_user_role_id()])
6✔
342
                    .await?;
6✔
343

344
                user_id
6✔
345
            }
346
        };
347

348
        let session_id = SessionId::new();
8✔
349
        let stmt = tx
8✔
350
            .prepare(
8✔
351
                "
8✔
352
            INSERT INTO sessions (id)
8✔
353
            VALUES ($1);",
8✔
354
            )
8✔
355
            .await?;
8✔
356

357
        tx.execute(&stmt, &[&session_id]).await?;
8✔
358

359
        let stmt = tx
8✔
360
            .prepare(
8✔
361
                "
8✔
362
            INSERT INTO 
8✔
363
                user_sessions (user_id, session_id, created, valid_until) 
8✔
364
            VALUES 
8✔
365
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
8✔
366
            RETURNING 
8✔
367
                created, valid_until;",
8✔
368
            )
8✔
369
            .await?;
8✔
370
        let row = tx
8✔
371
            .query_one(
8✔
372
                &stmt,
8✔
373
                &[
8✔
374
                    &user_id,
8✔
375
                    &session_id,
8✔
376
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
377
                ],
8✔
378
            )
8✔
379
            .await?;
8✔
380

381
        self.store_oidc_session_tokens(session_id, oidc_tokens, &tx)
8✔
382
            .await?;
16✔
383

384
        let stmt = tx
8✔
385
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
8✔
386
            .await?;
8✔
387

388
        let rows = tx
8✔
389
            .query(&stmt, &[&user_id])
8✔
390
            .await
8✔
391
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
392

393
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
16✔
394

8✔
395
        tx.commit().await?;
8✔
396

397
        Ok(UserSession {
8✔
398
            id: session_id,
8✔
399
            user: UserInfo {
8✔
400
                id: user_id,
8✔
401
                email: Some(user.email.clone()),
8✔
402
                real_name: Some(user.real_name.clone()),
8✔
403
            },
8✔
404
            created: row.get(0),
8✔
405
            valid_until: row.get(1),
8✔
406
            project: None,
8✔
407
            view: None,
8✔
408
            roles,
8✔
409
        })
8✔
410
    }
16✔
411

412
    async fn user_session_by_id(&self, session: SessionId) -> Result<UserSession> {
62✔
413
        let mut conn = self.pool.get().await?;
62✔
414

415
        let tx = conn.build_transaction().start().await?;
62✔
416

417
        let stmt = tx
62✔
418
            .prepare(
62✔
419
                "
62✔
420
            SELECT 
62✔
421
                u.id,   
62✔
422
                COALESCE(u.email, eu.email) AS email,
62✔
423
                COALESCE(u.real_name, eu.real_name) AS real_name,
62✔
424
                us.created, 
62✔
425
                us.valid_until, 
62✔
426
                s.project_id,
62✔
427
                s.view,
62✔
428
                CASE WHEN CURRENT_TIMESTAMP < us.valid_until THEN TRUE ELSE FALSE END AS valid_session
62✔
429
            FROM
62✔
430
                sessions s JOIN user_sessions us ON (s.id = us.session_id)
62✔
431
                    JOIN users u ON (us.user_id = u.id)
62✔
432
                    LEFT JOIN external_users eu ON (u.id = eu.id)
62✔
433
                    LEFT JOIN oidc_session_tokens t ON (s.id = t.session_id)
62✔
434
            WHERE s.id = $1 AND (CURRENT_TIMESTAMP < us.valid_until OR t.refresh_token IS NOT NULL);",
62✔
435
            )
62✔
436
            .await?;
346✔
437

438
        let row = tx
62✔
439
            .query_one(&stmt, &[&session])
62✔
440
            .await
56✔
441
            .map_err(|_error| error::Error::InvalidSession)?;
62✔
442

443
        let valid_session: bool = row.get(7);
55✔
444

445
        let valid_until = if valid_session {
55✔
446
            row.get(4)
54✔
447
        } else {
448
            log::debug!("Session expired, trying to extend");
1✔
449
            let refresh_result = self.refresh_oidc_session_tokens(session, &tx).await;
13✔
450

451
            if let Err(refresh_error) = refresh_result {
1✔
452
                log::debug!("Session extension failed {}", refresh_error);
×
453
                return Err(Error::InvalidSession);
×
454
            }
1✔
455
            log::debug!("Session extended");
1✔
456
            refresh_result
1✔
457
                .expect("Refresh result should exist")
1✔
458
                .db_valid_until
1✔
459
        };
460

461
        let mut session = UserSession {
55✔
462
            id: session,
55✔
463
            user: UserInfo {
55✔
464
                id: row.get(0),
55✔
465
                email: row.get(1),
55✔
466
                real_name: row.get(2),
55✔
467
            },
55✔
468
            created: row.get(3),
55✔
469
            valid_until,
55✔
470
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
55✔
471
            view: row.get(6),
55✔
472
            roles: vec![],
55✔
473
        };
55✔
474

475
        let stmt = tx
55✔
476
            .prepare(
55✔
477
                "
55✔
478
            SELECT role_id FROM user_roles WHERE user_id = $1;
55✔
479
            ",
55✔
480
            )
55✔
481
            .await?;
47✔
482

483
        let rows = tx.query(&stmt, &[&session.user.id]).await?;
55✔
484

485
        tx.commit().await?;
55✔
486

487
        session.roles = rows.into_iter().map(|row| row.get(0)).collect();
96✔
488

55✔
489
        Ok(session)
55✔
490
    }
124✔
491
}
492

493
#[async_trait]
494
impl<Tls> SessionTokenStore for ProPostgresContext<Tls>
495
where
496
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
497
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
498
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
499
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
500
{
501
    async fn store_oidc_session_tokens(
502
        &self,
503
        session: SessionId,
504
        oidc_tokens: OidcTokens,
505
        tx: &Transaction<'_>,
506
    ) -> Result<StoredOidcTokens> {
8✔
507
        let flat_tokens: FlatMaybeEncryptedOidcTokens = self
8✔
508
            .oidc_manager()
8✔
509
            .maybe_encrypt_tokens(&oidc_tokens)?
8✔
510
            .into();
8✔
511

512
        let stmt = tx
8✔
513
            .prepare(
8✔
514
                "
8✔
515
                INSERT INTO oidc_session_tokens
8✔
516
                    (session_id, access_token, access_token_encryption_nonce, access_token_valid_until, refresh_token, refresh_token_encryption_nonce)
8✔
517
                VALUES
8✔
518
                    ($1, $2, $3, CURRENT_TIMESTAMP + make_interval(secs:=$4), $5, $6)
8✔
519
                RETURNING
8✔
520
                    access_token_valid_until;
8✔
521
                ;"
8✔
522
            )
8✔
523
            .await?;
8✔
524

525
        let db_valid_until = tx
8✔
526
            .query_one(
8✔
527
                &stmt,
8✔
528
                &[
8✔
529
                    &session,
8✔
530
                    &flat_tokens.access_token_value,
8✔
531
                    &flat_tokens.access_token_nonce,
8✔
532
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
533
                    &flat_tokens.refresh_token_value,
8✔
534
                    &flat_tokens.refresh_token_nonce,
8✔
535
                ],
8✔
536
            )
8✔
537
            .await?
8✔
538
            .get(0);
8✔
539

8✔
540
        Ok(StoredOidcTokens {
8✔
541
            oidc_tokens,
8✔
542
            db_valid_until,
8✔
543
        })
8✔
544
    }
16✔
545

546
    async fn refresh_oidc_session_tokens(
547
        &self,
548
        session: SessionId,
549
        tx: &Transaction<'_>,
550
    ) -> Result<StoredOidcTokens> {
3✔
551
        let stmt = tx
3✔
552
            .prepare(
3✔
553
                "
3✔
554
            SELECT
3✔
555
                refresh_token,
3✔
556
                refresh_token_encryption_nonce
3✔
557
            FROM
3✔
558
                oidc_session_tokens
3✔
559
            WHERE session_id = $1 AND refresh_token IS NOT NULL;",
3✔
560
            )
3✔
561
            .await?;
3✔
562

563
        let rows = tx.query_opt(&stmt, &[&session]).await?;
3✔
564

565
        if let Some(refresh_string) = rows {
3✔
566
            let string_field_and_nonce = MaybeEncryptedBytes {
3✔
567
                value: refresh_string.get(0),
3✔
568
                nonce: refresh_string.get(1),
3✔
569
            };
3✔
570

571
            let refresh_token = self
3✔
572
                .oidc_manager()
3✔
573
                .maybe_decrypt_refresh_token(string_field_and_nonce)?;
3✔
574

575
            let oidc_manager = self.oidc_manager();
3✔
576

577
            let oidc_tokens = oidc_manager
3✔
578
                .get_client()
3✔
579
                .await?
14✔
580
                .refresh_access_token(refresh_token)
3✔
581
                .await?;
2✔
582

583
            let flat_tokens: FlatMaybeEncryptedOidcTokens = self
3✔
584
                .oidc_manager()
3✔
585
                .maybe_encrypt_tokens(&oidc_tokens)?
3✔
586
                .into();
3✔
587

588
            let update_session_tokens = tx.prepare("
3✔
589
                UPDATE
3✔
590
                    oidc_session_tokens
3✔
591
                SET
3✔
592
                    access_token = $2, access_token_encryption_nonce = $3, access_token_valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$4), refresh_token = $5, refresh_token_encryption_nonce = $6
3✔
593
                WHERE
3✔
594
                    session_id = $1;",
3✔
595

3✔
596
            ).await?;
3✔
597

598
            tx.execute(
3✔
599
                &update_session_tokens,
3✔
600
                &[
3✔
601
                    &session,
3✔
602
                    &flat_tokens.access_token_value,
3✔
603
                    &flat_tokens.access_token_nonce,
3✔
604
                    &(oidc_tokens.expires_in.num_seconds() as f64),
3✔
605
                    &flat_tokens.refresh_token_value,
3✔
606
                    &flat_tokens.refresh_token_nonce,
3✔
607
                ],
3✔
608
            )
3✔
609
            .await?;
1✔
610

611
            let stmt = tx
3✔
612
                .prepare(
3✔
613
                    "
3✔
614
                UPDATE
3✔
615
                    user_sessions
3✔
616
                SET
3✔
617
                    valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$2)
3✔
618
                WHERE
3✔
619
                    session_id = $1
3✔
620
                RETURNING
3✔
621
                    valid_until;",
3✔
622
                )
3✔
623
                .await?;
2✔
624

625
            let expiration = tx
3✔
626
                .query_one(
3✔
627
                    &stmt,
3✔
628
                    &[&session, &(oidc_tokens.expires_in.num_seconds() as f64)],
3✔
629
                )
3✔
630
                .await?
2✔
631
                .get(0);
3✔
632

3✔
633
            return Ok(StoredOidcTokens {
3✔
634
                oidc_tokens,
3✔
635
                db_valid_until: expiration,
3✔
636
            });
3✔
637
        };
×
638

×
639
        Err(Error::InvalidSession)
×
640
    }
6✔
641

642
    async fn get_access_token(&self, session: SessionId) -> Result<AccessToken> {
4✔
643
        let mut conn = self.pool.get().await?;
4✔
644

645
        let tx = conn.build_transaction().start().await?;
4✔
646

647
        let stmt = tx
4✔
648
            .prepare(
4✔
649
                "
4✔
650
            SELECT
4✔
651
                access_token,
4✔
652
                access_token_encryption_nonce
4✔
653
            FROM
4✔
654
                oidc_session_tokens
4✔
655
            WHERE session_id = $1 AND (CURRENT_TIMESTAMP < access_token_valid_until);",
4✔
656
            )
4✔
657
            .await?;
4✔
658

659
        let rows = tx.query_opt(&stmt, &[&session]).await?;
4✔
660

661
        let access_token = if let Some(token_row) = rows {
4✔
662
            let string_field_and_nonce = MaybeEncryptedBytes {
2✔
663
                value: token_row.get(0),
2✔
664
                nonce: token_row.get(1),
2✔
665
            };
2✔
666
            self.oidc_manager()
2✔
667
                .maybe_decrypt_access_token(string_field_and_nonce)?
2✔
668
        } else {
669
            self.refresh_oidc_session_tokens(session, &tx)
2✔
670
                .await?
16✔
671
                .oidc_tokens
672
                .access
673
        };
674

675
        tx.commit().await?;
4✔
676

677
        Ok(access_token)
4✔
678
    }
8✔
679
}
680

681
#[async_trait]
682
impl<Tls> UserDb for ProPostgresDb<Tls>
683
where
684
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
685
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
686
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
687
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
688
{
689
    // TODO: clean up expired sessions?
690

691
    async fn logout(&self) -> Result<()> {
6✔
692
        let conn = self.conn_pool.get().await?;
6✔
693
        let stmt = conn
6✔
694
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
6✔
695
            .await?;
6✔
696

697
        conn.execute(&stmt, &[&self.session.id])
6✔
698
            .await
6✔
699
            .map_err(|_error| error::Error::LogoutFailed)?;
6✔
700
        Ok(())
6✔
701
    }
12✔
702

703
    async fn set_session_project(&self, project: ProjectId) -> Result<()> {
3✔
704
        // TODO: check permission
705

706
        let conn = self.conn_pool.get().await?;
3✔
707
        let stmt = conn
3✔
708
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
3✔
709
            .await?;
2✔
710

711
        conn.execute(&stmt, &[&project, &self.session.id]).await?;
3✔
712

713
        Ok(())
3✔
714
    }
6✔
715

716
    async fn set_session_view(&self, view: STRectangle) -> Result<()> {
3✔
717
        let conn = self.conn_pool.get().await?;
3✔
718
        let stmt = conn
3✔
719
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
3✔
720
            .await?;
3✔
721

722
        conn.execute(&stmt, &[&view, &self.session.id]).await?;
3✔
723

724
        Ok(())
3✔
725
    }
6✔
726

727
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
1✔
728
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
729

730
        let conn = self.conn_pool.get().await?;
1✔
731
        let stmt = conn
1✔
732
            .prepare(
1✔
733
                "
1✔
734
            UPDATE users SET 
1✔
735
                quota_available = quota_available - $1, 
1✔
736
                quota_used = quota_used + $1
1✔
737
            WHERE id = $2;",
1✔
738
            )
1✔
UNCOV
739
            .await?;
×
740

741
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
1✔
742

743
        Ok(())
1✔
744
    }
2✔
745

746
    async fn bulk_increment_quota_used<I: IntoIterator<Item = (UserId, u64)> + Send>(
747
        &self,
748
        quota_used_updates: I,
749
    ) -> Result<()> {
12✔
750
        ensure!(self.session.is_admin(), error::PermissionDenied);
12✔
751

752
        let conn = self.conn_pool.get().await?;
16✔
753

754
        // collect the user ids and quotas into separate vectors to pass them as parameters to the query
755
        let (users, quotas): (Vec<UserId>, Vec<i64>) = quota_used_updates
12✔
756
            .into_iter()
12✔
757
            .map(|(user, quota)| (user, quota as i64))
13✔
758
            .unzip();
12✔
759

12✔
760
        let query = "
12✔
761
            UPDATE users
12✔
762
            SET quota_available = quota_available - quota_changes.quota, 
12✔
763
                quota_used = quota_used + quota_changes.quota
12✔
764
            FROM 
12✔
765
                (SELECT * FROM UNNEST($1::uuid[], $2::bigint[]) AS t(id, quota)) AS quota_changes
12✔
766
            WHERE users.id = quota_changes.id;
12✔
767
        ";
12✔
768

12✔
769
        conn.execute(query, &[&users, &quotas]).await?;
24✔
770

771
        Ok(())
12✔
772
    }
24✔
773

774
    async fn quota_used(&self) -> Result<u64> {
9✔
775
        let conn = self.conn_pool.get().await?;
9✔
776
        let stmt = conn
9✔
777
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
9✔
778
            .await?;
9✔
779

780
        let row = conn
9✔
781
            .query_one(&stmt, &[&self.session.user.id])
9✔
782
            .await
9✔
783
            .map_err(|_error| error::Error::InvalidSession)?;
9✔
784

785
        Ok(row.get::<usize, i64>(0) as u64)
9✔
786
    }
18✔
787

788
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
7✔
789
        ensure!(
7✔
790
            self.session.user.id == *user || self.session.is_admin(),
7✔
791
            error::PermissionDenied
×
792
        );
793

794
        let conn = self.conn_pool.get().await?;
7✔
795
        let stmt = conn
7✔
796
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
797
            .await?;
5✔
798

799
        let row = conn
7✔
800
            .query_one(&stmt, &[&user])
7✔
801
            .await
5✔
802
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
803

804
        Ok(row.get::<usize, i64>(0) as u64)
7✔
805
    }
14✔
806

807
    async fn quota_available(&self) -> Result<i64> {
15✔
808
        let conn = self.conn_pool.get().await?;
15✔
809
        let stmt = conn
15✔
810
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
15✔
811
            .await?;
14✔
812

813
        let row = conn
15✔
814
            .query_one(&stmt, &[&self.session.user.id])
15✔
815
            .await
13✔
816
            .map_err(|_error| error::Error::InvalidSession)?;
15✔
817

818
        Ok(row.get::<usize, i64>(0))
15✔
819
    }
30✔
820

821
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
6✔
822
        ensure!(
6✔
823
            self.session.user.id == *user || self.session.is_admin(),
6✔
824
            error::PermissionDenied
×
825
        );
826

827
        let conn = self.conn_pool.get().await?;
6✔
828
        let stmt = conn
6✔
829
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
6✔
830
            .await?;
4✔
831

832
        let row = conn
6✔
833
            .query_one(&stmt, &[&user])
6✔
834
            .await
4✔
835
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
836

837
        Ok(row.get::<usize, i64>(0))
6✔
838
    }
12✔
839

840
    async fn log_quota_used<I: IntoIterator<Item = ComputationUnit> + Send>(
841
        &self,
842
        log: I,
843
    ) -> Result<()> {
15✔
844
        ensure!(self.session.is_admin(), error::PermissionDenied);
15✔
845

846
        let conn = self.conn_pool.get().await?;
15✔
847

848
        // collect the log into separate vectors to pass them as parameters to the query
849
        let mut users = Vec::new();
15✔
850
        let mut workflows = Vec::new();
15✔
851
        let mut computations = Vec::new();
15✔
852
        let mut operators_names = Vec::new();
15✔
853
        let mut operator_paths = Vec::new();
15✔
854
        let mut datas = Vec::new();
15✔
855

856
        for unit in log {
35✔
857
            users.push(unit.user);
20✔
858
            workflows.push(unit.workflow);
20✔
859
            computations.push(unit.computation);
20✔
860
            operators_names.push(unit.operator_name);
20✔
861
            operator_paths.push(unit.operator_path.to_string());
20✔
862
            datas.push(unit.data);
20✔
863
        }
20✔
864

865
        let query = "
15✔
866
            INSERT INTO quota_log (user_id, workflow_id, computation_id, operator_name, operator_path, data)
15✔
867
                (SELECT * FROM UNNEST($1::uuid[], $2::uuid[], $3::uuid[], $4::text[], $5::text[], $6::text[]))
15✔
868
        ";
15✔
869

15✔
870
        conn.execute(
15✔
871
            query,
15✔
872
            &[
15✔
873
                &users,
15✔
874
                &workflows,
15✔
875
                &computations,
15✔
876
                &operators_names,
15✔
877
                &operator_paths,
15✔
878
                &datas,
15✔
879
            ],
15✔
880
        )
15✔
881
        .await?;
30✔
882

883
        Ok(())
15✔
884
    }
30✔
885

886
    #[allow(clippy::too_many_lines)]
887
    async fn quota_used_by_computations(
888
        &self,
889
        offset: usize,
890
        limit: usize,
891
    ) -> Result<Vec<ComputationQuota>> {
1✔
892
        let limit = limit.min(10); // TODO: use list limit from config
1✔
893

894
        let conn = self.conn_pool.get().await?;
1✔
895

896
        let rows = conn
1✔
897
            .query(
1✔
898
                "
1✔
899
            SELECT
1✔
900
                computation_id,
1✔
901
                workflow_id,
1✔
902
                MIN(timestamp) AS timestamp,
1✔
903
                COUNT(*) AS count
1✔
904
            FROM
1✔
905
                quota_log
1✔
906
            WHERE
1✔
907
                user_id = $1
1✔
908
            GROUP BY
1✔
909
                computation_id,
1✔
910
                workflow_id
1✔
911
            ORDER BY 
1✔
912
                MIN(TIMESTAMP) DESC
1✔
913
            OFFSET $2
1✔
914
            LIMIT $3;",
1✔
915
                &[&self.session.user.id, &(offset as i64), &(limit as i64)],
1✔
916
            )
1✔
917
            .await?;
2✔
918

919
        Ok(rows
1✔
920
            .iter()
1✔
921
            .map(|row| ComputationQuota {
1✔
922
                computation_id: row.get(0),
1✔
923
                workflow_id: row.get(1),
1✔
924
                timestamp: row.get(2),
1✔
925
                count: row.get::<_, i64>(3) as u64,
1✔
926
            })
1✔
927
            .collect())
1✔
928
    }
2✔
929

930
    async fn quota_used_by_computation(&self, computation_id: Uuid) -> Result<Vec<OperatorQuota>> {
1✔
931
        let conn = self.conn_pool.get().await?;
1✔
932

933
        let rows = conn
1✔
934
            .query(
1✔
935
                "
1✔
936
            SELECT
1✔
937
                operator_name,
1✔
938
                operator_path,
1✔
939
                COUNT(*) AS count
1✔
940
            FROM
1✔
941
                quota_log
1✔
942
            WHERE
1✔
943
                user_id = $1 AND
1✔
944
                computation_id = $2
1✔
945
            GROUP BY
1✔
946
                operator_name, operator_path
1✔
947
            ORDER BY 
1✔
948
            operator_name DESC;",
1✔
949
                &[&self.session.user.id, &computation_id],
1✔
950
            )
1✔
951
            .await?;
2✔
952

953
        Ok(rows
1✔
954
            .iter()
1✔
955
            .map(|row| OperatorQuota {
1✔
956
                operator_name: row.get(0),
1✔
957
                operator_path: row.get(1),
1✔
958
                count: row.get::<_, i64>(2) as u64,
1✔
959
            })
1✔
960
            .collect())
1✔
961
    }
2✔
962

963
    async fn quota_used_on_data(&self, offset: u64, limit: u64) -> Result<Vec<DataUsage>> {
1✔
964
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
965

966
        let conn = self.conn_pool.get().await?;
1✔
967

968
        let rows = conn
1✔
969
            .query(
1✔
970
                "
1✔
971
            SELECT
1✔
972
                user_id,
1✔
973
                computation_id,
1✔
974
                data,
1✔
975
                MIN(timestamp) AS timestamp,
1✔
976
                COUNT(*) AS count
1✔
977
            FROM
1✔
978
                quota_log
1✔
979
            WHERE 
1✔
980
                data IS NOT NULL
1✔
981
            GROUP BY
1✔
982
               user_id,
1✔
983
               computation_id,
1✔
984
               workflow_id,
1✔
985
               data
1✔
986
            ORDER BY
1✔
987
                MIN(timestamp) DESC, user_id ASC, computation_id ASC, workflow_id ASC, data ASC
1✔
988
            OFFSET
1✔
989
                $1
1✔
990
            LIMIT
1✔
991
                $2;",
1✔
992
                &[&(offset as i64), &(limit as i64)],
1✔
993
            )
1✔
994
            .await?;
2✔
995

996
        Ok(rows
1✔
997
            .iter()
1✔
998
            .map(|row| DataUsage {
3✔
999
                user_id: row.get(0),
3✔
1000
                computation_id: row.get(1),
3✔
1001
                data: row.get(2),
3✔
1002
                timestamp: row.get(3),
3✔
1003
                count: row.get::<_, i64>(4) as u64,
3✔
1004
            })
3✔
1005
            .collect())
1✔
1006
    }
2✔
1007

1008
    async fn quota_used_on_data_summary(
1009
        &self,
1010
        dataset: Option<String>,
1011
        granularity: UsageSummaryGranularity,
1012
        offset: u64,
1013
        limit: u64,
1014
    ) -> Result<Vec<DataUsageSummary>> {
1✔
1015
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
1016

1017
        // TODO: check if user is Owner of dataset
1018

1019
        let conn = self.conn_pool.get().await?;
1✔
1020

1021
        let trunc = match granularity {
1✔
NEW
1022
            UsageSummaryGranularity::Minutes => "minute",
×
NEW
1023
            UsageSummaryGranularity::Hours => "hour",
×
NEW
1024
            UsageSummaryGranularity::Days => "day",
×
NEW
1025
            UsageSummaryGranularity::Months => "month",
×
1026
            UsageSummaryGranularity::Years => "year",
1✔
1027
        };
1028

1029
        let rows = if let Some(dataset) = dataset {
1✔
NEW
1030
            conn.query(
×
NEW
1031
                &format!(
×
NEW
1032
                    "
×
NEW
1033
            SELECT
×
NEW
1034
                date_trunc('{trunc}', timestamp) AS trunc,
×
NEW
1035
                data
×
NEW
1036
                COUNT(*) AS count
×
NEW
1037
            FROM
×
NEW
1038
                quota_log
×
NEW
1039
            WHERE 
×
NEW
1040
                data = $3
×
NEW
1041
            GROUP BY
×
NEW
1042
                trunc, data
×
NEW
1043
            ORDER BY
×
NEW
1044
                trunc DESC, data ASC
×
NEW
1045
            OFFSET
×
NEW
1046
                $1
×
NEW
1047
            LIMIT 
×
NEW
1048
                $2;",
×
NEW
1049
                ),
×
NEW
1050
                &[&(offset as i64), &(limit as i64), &dataset],
×
NEW
1051
            )
×
NEW
1052
            .await?
×
1053
        } else {
1054
            conn.query(
1✔
1055
                &format!(
1✔
1056
                    "
1✔
1057
            SELECT
1✔
1058
                date_trunc('{trunc}', timestamp) AS trunc,
1✔
1059
                data,
1✔
1060
                COUNT(*) AS count
1✔
1061
            FROM
1✔
1062
                quota_log
1✔
1063
            WHERE 
1✔
1064
                data IS NOT NULL
1✔
1065
            GROUP BY
1✔
1066
                trunc, data
1✔
1067
            ORDER BY
1✔
1068
                trunc DESC, data ASC
1✔
1069
            OFFSET
1✔
1070
                $1
1✔
1071
            LIMIT 
1✔
1072
                $2;",
1✔
1073
                ),
1✔
1074
                &[&(offset as i64), &(limit as i64)],
1✔
1075
            )
1✔
1076
            .await?
2✔
1077
        };
1078

1079
        Ok(rows
1✔
1080
            .iter()
1✔
1081
            .map(|row| DataUsageSummary {
2✔
1082
                timestamp: row.get(0),
2✔
1083
                dataset: row.get(1),
2✔
1084
                count: row.get::<_, i64>(2) as u64,
2✔
1085
            })
2✔
1086
            .collect())
1✔
1087
    }
2✔
1088

1089
    async fn update_quota_available_by_user(
1090
        &self,
1091
        user: &UserId,
1092
        new_available_quota: i64,
1093
    ) -> Result<()> {
5✔
1094
        ensure!(self.session.is_admin(), error::PermissionDenied);
5✔
1095

1096
        let conn = self.conn_pool.get().await?;
5✔
1097
        let stmt = conn
5✔
1098
            .prepare(
5✔
1099
                "
5✔
1100
            UPDATE users SET 
5✔
1101
                quota_available = $1
5✔
1102
            WHERE id = $2;",
5✔
1103
            )
5✔
1104
            .await?;
5✔
1105

1106
        conn.execute(&stmt, &[&(new_available_quota), &user])
5✔
1107
            .await?;
5✔
1108

1109
        Ok(())
5✔
1110
    }
10✔
1111
}
1112

1113
#[async_trait]
1114
impl<Tls> RoleDb for ProPostgresDb<Tls>
1115
where
1116
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
1117
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
1118
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
1119
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
1120
{
1121
    async fn add_role(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
4✔
1122
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
1123

1124
        let tx = conn
4✔
1125
            .build_transaction()
4✔
1126
            .start()
4✔
1127
            .await
2✔
1128
            .context(PostgresRoleDbError)?;
4✔
1129

1130
        self.ensure_admin_in_tx(&tx)
4✔
1131
            .await
×
1132
            .context(PermissionDbRoleDbError)?;
4✔
1133

1134
        let id = RoleId::new();
4✔
1135

1136
        let res = tx
4✔
1137
            .execute(
4✔
1138
                "INSERT INTO roles (id, name) VALUES ($1, $2);",
4✔
1139
                &[&id, &role_name],
4✔
1140
            )
4✔
1141
            .await;
8✔
1142

1143
        if let Err(err) = res {
4✔
1144
            if err.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
×
1145
                return Err(RoleDbError::RoleAlreadyExists {
×
1146
                    role_name: role_name.to_string(),
×
1147
                });
×
1148
            }
×
1149
        }
4✔
1150

1151
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
1152

1153
        Ok(id)
4✔
1154
    }
8✔
1155

1156
    async fn load_role_by_name(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
×
1157
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
×
1158

1159
        let row = conn
×
1160
            .query_opt("SELECT id FROM roles WHERE name = $1;", &[&role_name])
×
1161
            .await
×
1162
            .context(PostgresRoleDbError)?
×
1163
            .ok_or(RoleDbError::RoleNameDoesNotExist {
×
1164
                role_name: role_name.to_string(),
×
1165
            })?;
×
1166

1167
        Ok(RoleId(row.get(0)))
×
1168
    }
×
1169

1170
    async fn remove_role(&self, role_id: &RoleId) -> Result<(), RoleDbError> {
2✔
1171
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
1172

1173
        let tx = conn
2✔
1174
            .build_transaction()
2✔
1175
            .start()
2✔
1176
            .await
2✔
1177
            .context(PostgresRoleDbError)?;
2✔
1178

1179
        self.ensure_admin_in_tx(&tx)
2✔
1180
            .await
×
1181
            .context(PermissionDbRoleDbError)?;
2✔
1182

1183
        let deleted = tx
2✔
1184
            .execute("DELETE FROM roles WHERE id = $1;", &[&role_id])
2✔
1185
            .await
4✔
1186
            .context(PostgresRoleDbError)?;
2✔
1187

1188
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
1189

1190
        ensure!(
2✔
1191
            deleted > 0,
2✔
1192
            RoleIdDoesNotExistRoleDbError { role_id: *role_id }
×
1193
        );
1194

1195
        Ok(())
2✔
1196
    }
4✔
1197

1198
    async fn assign_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
4✔
1199
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
1200

1201
        let tx = conn
4✔
1202
            .build_transaction()
4✔
1203
            .start()
4✔
1204
            .await
3✔
1205
            .context(PostgresRoleDbError)?;
4✔
1206

1207
        self.ensure_admin_in_tx(&tx)
4✔
1208
            .await
×
1209
            .context(PermissionDbRoleDbError)?;
4✔
1210

1211
        tx.execute(
4✔
1212
            "INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2) ON CONFLICT DO NOTHING;",
4✔
1213
            &[&user_id, &role_id],
4✔
1214
        )
4✔
1215
        .await
6✔
1216
        .context(PostgresRoleDbError)?;
4✔
1217

1218
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
1219

1220
        Ok(())
4✔
1221
    }
8✔
1222

1223
    async fn revoke_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
2✔
1224
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
1225

1226
        let tx = conn
2✔
1227
            .build_transaction()
2✔
1228
            .start()
2✔
1229
            .await
2✔
1230
            .context(PostgresRoleDbError)?;
2✔
1231

1232
        self.ensure_admin_in_tx(&tx)
2✔
1233
            .await
×
1234
            .context(PermissionDbRoleDbError)?;
2✔
1235

1236
        let deleted = tx
2✔
1237
            .execute(
2✔
1238
                "DELETE FROM user_roles WHERE user_id= $1 AND role_id = $2;",
2✔
1239
                &[&user_id, &role_id],
2✔
1240
            )
2✔
1241
            .await
4✔
1242
            .context(PostgresRoleDbError)?;
2✔
1243

1244
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
1245

1246
        ensure!(
2✔
1247
            deleted > 0,
2✔
1248
            CannotRevokeRoleThatIsNotAssignedRoleDbError { role_id: *role_id }
×
1249
        );
1250

1251
        Ok(())
2✔
1252
    }
4✔
1253

1254
    async fn get_role_descriptions(
1255
        &self,
1256
        user_id: &UserId,
1257
    ) -> Result<Vec<RoleDescription>, RoleDbError> {
5✔
1258
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
5✔
1259

1260
        let stmt = conn
5✔
1261
            .prepare(
5✔
1262
                "SELECT roles.id, roles.name \
5✔
1263
                FROM roles JOIN user_roles ON (roles.id=user_roles.role_id) \
5✔
1264
                WHERE user_roles.user_id=$1 \
5✔
1265
                ORDER BY roles.name;",
5✔
1266
            )
5✔
1267
            .await
5✔
1268
            .context(PostgresRoleDbError)?;
5✔
1269

1270
        let results = conn
5✔
1271
            .query(&stmt, &[&user_id])
5✔
1272
            .await
5✔
1273
            .context(PostgresRoleDbError)?;
5✔
1274

1275
        let mut result_vec = Vec::new();
5✔
1276

1277
        for result in results {
17✔
1278
            let id = result.get(0);
12✔
1279
            let name = result.get(1);
12✔
1280
            let individual = UserId(id) == *user_id;
12✔
1281
            result_vec.push(RoleDescription {
12✔
1282
                role: Role {
12✔
1283
                    id: RoleId(id),
12✔
1284
                    name,
12✔
1285
                },
12✔
1286
                individual,
12✔
1287
            });
12✔
1288
        }
12✔
1289

1290
        Ok(result_vec)
5✔
1291
    }
10✔
1292
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc