• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12912023152

22 Jan 2025 03:54PM UTC coverage: 90.098% (+0.04%) from 90.061%
12912023152

Pull #1009

github

web-flow
Merge b697ee8a8 into df8c694c8
Pull Request #1009: merge data providers and migrations

1340 of 1514 new or added lines in 22 files covered. (88.51%)

74 existing lines in 5 files now uncovered.

125729 of 139547 relevant lines covered (90.1%)

57683.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.62
/services/src/users/postgres_userdb.rs
1
use crate::api::handlers::users::UsageSummaryGranularity;
2
use crate::contexts::PostgresDb;
3
use crate::contexts::{ApplicationContext, SessionId};
4
use crate::error::{Error, Result};
5
use crate::permissions::TxPermissionDb;
6
use crate::permissions::{Role, RoleDescription, RoleId};
7
use crate::projects::{ProjectId, STRectangle};
8
use crate::quota::{ComputationQuota, DataUsage, DataUsageSummary, OperatorQuota};
9
use crate::users::oidc::{FlatMaybeEncryptedOidcTokens, OidcTokens, UserClaims};
10
use crate::users::userdb::{
11
    CannotRevokeRoleThatIsNotAssignedRoleDbError, RoleIdDoesNotExistRoleDbError,
12
};
13
use crate::users::{
14
    SessionTokenStore, StoredOidcTokens, User, UserCredentials, UserDb, UserId, UserInfo,
15
    UserRegistration, UserSession,
16
};
17
use crate::util::postgres::PostgresErrorExt;
18
use crate::util::Identifier;
19
use crate::{contexts::PostgresContext, error};
20
use async_trait::async_trait;
21
use geoengine_operators::meta::quota::ComputationUnit;
22

23
use crate::util::encryption::MaybeEncryptedBytes;
24
use bb8_postgres::{
25
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
26
};
27
use oauth2::AccessToken;
28
use pwhash::bcrypt;
29
use snafu::{ensure, ResultExt};
30
use tokio_postgres::Transaction;
31
use uuid::Uuid;
32

33
use super::userdb::{
34
    Bb8RoleDbError, PermissionDbRoleDbError, PostgresRoleDbError, RoleDb, RoleDbError, UserAuth,
35
};
36

37
#[async_trait]
38
impl<Tls> UserAuth for PostgresContext<Tls>
39
where
40
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
41
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
42
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
43
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
44
{
45
    // TODO: clean up expired sessions?
46

47
    async fn register_user(&self, user: UserRegistration) -> Result<UserId> {
45✔
48
        let mut conn = self.pool.get().await?;
45✔
49

50
        let tx = conn.build_transaction().start().await?;
45✔
51

52
        let user = User::from(user);
45✔
53

54
        let stmt = tx
45✔
55
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
45✔
56
            .await?;
45✔
57
        tx.execute(&stmt, &[&user.id, &user.email])
45✔
58
            .await
45✔
59
            .map_unique_violation("roles", "name", || error::Error::Duplicate {
45✔
60
                reason: "E-mail already exists".to_string(),
15✔
61
            })?;
45✔
62

63
        let stmt = tx
30✔
64
            .prepare(
30✔
65
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
30✔
66
            )
30✔
67
            .await?;
30✔
68

69
        let quota_available =
30✔
70
            crate::config::get_config_element::<crate::config::Quota>()?.initial_credits;
30✔
71

72
        tx.execute(
30✔
73
            &stmt,
30✔
74
            &[
30✔
75
                &user.id,
30✔
76
                &user.email,
30✔
77
                &user.password_hash,
30✔
78
                &user.real_name,
30✔
79
                &quota_available,
30✔
80
                &user.active,
30✔
81
            ],
30✔
82
        )
30✔
83
        .await?;
30✔
84

85
        let stmt = tx
30✔
86
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
30✔
87
            .await?;
30✔
88
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
30✔
89

90
        let stmt = tx
30✔
91
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
30✔
92
            .await?;
30✔
93
        tx.execute(&stmt, &[&user.id, &Role::registered_user_role_id()])
30✔
94
            .await?;
30✔
95

96
        tx.commit().await?;
30✔
97

98
        Ok(user.id)
30✔
99
    }
90✔
100

101
    async fn create_anonymous_session(&self) -> Result<UserSession> {
268✔
102
        let mut conn = self.pool.get().await?;
268✔
103

104
        let tx = conn.build_transaction().start().await?;
268✔
105

106
        let user_id = UserId::new();
268✔
107

108
        let stmt = tx
268✔
109
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
268✔
110
            .await?;
268✔
111
        tx.execute(&stmt, &[&user_id, &format!("anonymous_user_{user_id}")])
268✔
112
            .await?;
268✔
113

114
        let quota_available =
268✔
115
            crate::config::get_config_element::<crate::config::Quota>()?.initial_credits;
268✔
116

117
        let stmt = tx
268✔
118
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
268✔
119
            .await?;
268✔
120

121
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
268✔
122

123
        let stmt = tx
268✔
124
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
268✔
125
            .await?;
268✔
126
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
268✔
127

128
        let stmt = tx
268✔
129
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
268✔
130
            .await?;
268✔
131
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
268✔
132
            .await?;
268✔
133

134
        let session_id = SessionId::new();
268✔
135
        let stmt = tx
268✔
136
            .prepare(
268✔
137
                "
268✔
138
                INSERT INTO sessions (id)
268✔
139
                VALUES ($1);",
268✔
140
            )
268✔
141
            .await?;
268✔
142

143
        tx.execute(&stmt, &[&session_id]).await?;
268✔
144

145
        let stmt = tx
268✔
146
            .prepare(
268✔
147
                "
268✔
148
            INSERT INTO 
268✔
149
                user_sessions (user_id, session_id, created, valid_until) 
268✔
150
            VALUES 
268✔
151
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
268✔
152
            RETURNING 
268✔
153
                created, valid_until;",
268✔
154
            )
268✔
155
            .await?;
268✔
156

157
        // TODO: load from config
158
        let session_duration = chrono::Duration::days(30);
268✔
159
        let row = tx
268✔
160
            .query_one(
268✔
161
                &stmt,
268✔
162
                &[
268✔
163
                    &user_id,
268✔
164
                    &session_id,
268✔
165
                    &(session_duration.num_seconds() as f64),
268✔
166
                ],
268✔
167
            )
268✔
168
            .await?;
268✔
169

170
        tx.commit().await?;
268✔
171

172
        Ok(UserSession {
268✔
173
            id: session_id,
268✔
174
            user: UserInfo {
268✔
175
                id: user_id,
268✔
176
                email: None,
268✔
177
                real_name: None,
268✔
178
            },
268✔
179
            created: row.get(0),
268✔
180
            valid_until: row.get(1),
268✔
181
            project: None,
268✔
182
            view: None,
268✔
183
            roles: vec![user_id.into(), Role::anonymous_role_id()],
268✔
184
        })
268✔
185
    }
536✔
186

187
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
221✔
188
        let mut conn = self.pool.get().await?;
221✔
189

190
        let tx = conn.build_transaction().start().await?;
221✔
191

192
        let stmt = tx
221✔
193
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
221✔
194
            .await?;
221✔
195

196
        let row = tx
221✔
197
            .query_one(&stmt, &[&user_credentials.email])
221✔
198
            .await
221✔
199
            .map_err(|_error| error::Error::LoginFailed)?;
221✔
200

201
        let user_id = UserId(row.get(0));
221✔
202
        let password_hash = row.get(1);
221✔
203
        let email = row.get(2);
221✔
204
        let real_name = row.get(3);
221✔
205

221✔
206
        if bcrypt::verify(user_credentials.password, password_hash) {
221✔
207
            let session_id = SessionId::new();
220✔
208
            let stmt = tx
220✔
209
                .prepare(
220✔
210
                    "
220✔
211
                INSERT INTO sessions (id)
220✔
212
                VALUES ($1);",
220✔
213
                )
220✔
214
                .await?;
220✔
215

216
            tx.execute(&stmt, &[&session_id]).await?;
220✔
217

218
            // TODO: load from config
219
            let session_duration = chrono::Duration::days(30);
220✔
220
            let stmt = tx
220✔
221
                .prepare(
220✔
222
                    "
220✔
223
                INSERT INTO 
220✔
224
                    user_sessions (user_id, session_id, created, valid_until) 
220✔
225
                VALUES 
220✔
226
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
220✔
227
                RETURNING 
220✔
228
                    created, valid_until;",
220✔
229
                )
220✔
230
                .await?;
220✔
231
            let row = tx
220✔
232
                .query_one(
220✔
233
                    &stmt,
220✔
234
                    &[
220✔
235
                        &user_id,
220✔
236
                        &session_id,
220✔
237
                        &(session_duration.num_seconds() as f64),
220✔
238
                    ],
220✔
239
                )
220✔
240
                .await?;
220✔
241

242
            let stmt = tx
220✔
243
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
220✔
244
                .await?;
220✔
245

246
            let rows = tx
220✔
247
                .query(&stmt, &[&user_id])
220✔
248
                .await
220✔
249
                .map_err(|_error| error::Error::LoginFailed)?;
220✔
250

251
            tx.commit().await?;
220✔
252

253
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
254✔
254

220✔
255
            Ok(UserSession {
220✔
256
                id: session_id,
220✔
257
                user: UserInfo {
220✔
258
                    id: user_id,
220✔
259
                    email,
220✔
260
                    real_name,
220✔
261
                },
220✔
262
                created: row.get(0),
220✔
263
                valid_until: row.get(1),
220✔
264
                project: None,
220✔
265
                view: None,
220✔
266
                roles,
220✔
267
            })
220✔
268
        } else {
269
            Err(error::Error::LoginFailed)
1✔
270
        }
271
    }
442✔
272

273
    #[allow(clippy::too_many_lines)]
274
    async fn login_external(
275
        &self,
276
        user: UserClaims,
277
        oidc_tokens: OidcTokens,
278
    ) -> Result<UserSession> {
8✔
279
        let mut conn = self.pool.get().await?;
8✔
280
        let tx = conn.build_transaction().start().await?;
8✔
281

282
        let stmt = tx
8✔
283
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
8✔
284
            .await?;
8✔
285

286
        let row = tx
8✔
287
            .query_opt(&stmt, &[&user.external_id.to_string()])
8✔
288
            .await
8✔
289
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
290

291
        let user_id = match row {
8✔
292
            Some(row) => UserId(row.get(0)),
2✔
293
            None => {
294
                let user_id = UserId::new();
6✔
295

296
                let stmt = tx
6✔
297
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
6✔
298
                    .await?;
6✔
299
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
6✔
300

301
                let quota_available =
6✔
302
                    crate::config::get_config_element::<crate::config::Quota>()?.initial_credits;
6✔
303

304
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
305
                let stmt = tx
6✔
306
                    .prepare(
6✔
307
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
6✔
308
                    )
6✔
309
                    .await?;
6✔
310
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
6✔
311

312
                let stmt = tx
6✔
313
                    .prepare(
6✔
314
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
6✔
315
                    )
6✔
316
                    .await?;
6✔
317

318
                tx.execute(
6✔
319
                    &stmt,
6✔
320
                    &[
6✔
321
                        &user_id,
6✔
322
                        &user.external_id.to_string(),
6✔
323
                        &user.email,
6✔
324
                        &user.real_name,
6✔
325
                        &true,
6✔
326
                    ],
6✔
327
                )
6✔
328
                .await?;
6✔
329

330
                let stmt = tx
6✔
331
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
332
                    .await?;
6✔
333
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
6✔
334

335
                let stmt = tx
6✔
336
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
337
                    .await?;
6✔
338
                tx.execute(&stmt, &[&user_id, &Role::registered_user_role_id()])
6✔
339
                    .await?;
6✔
340

341
                user_id
6✔
342
            }
343
        };
344

345
        let session_id = SessionId::new();
8✔
346
        let stmt = tx
8✔
347
            .prepare(
8✔
348
                "
8✔
349
            INSERT INTO sessions (id)
8✔
350
            VALUES ($1);",
8✔
351
            )
8✔
352
            .await?;
8✔
353

354
        tx.execute(&stmt, &[&session_id]).await?;
8✔
355

356
        let stmt = tx
8✔
357
            .prepare(
8✔
358
                "
8✔
359
            INSERT INTO 
8✔
360
                user_sessions (user_id, session_id, created, valid_until) 
8✔
361
            VALUES 
8✔
362
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
8✔
363
            RETURNING 
8✔
364
                created, valid_until;",
8✔
365
            )
8✔
366
            .await?;
8✔
367
        let row = tx
8✔
368
            .query_one(
8✔
369
                &stmt,
8✔
370
                &[
8✔
371
                    &user_id,
8✔
372
                    &session_id,
8✔
373
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
374
                ],
8✔
375
            )
8✔
376
            .await?;
8✔
377

378
        self.store_oidc_session_tokens(session_id, oidc_tokens, &tx)
8✔
379
            .await?;
8✔
380

381
        let stmt = tx
8✔
382
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
8✔
383
            .await?;
8✔
384

385
        let rows = tx
8✔
386
            .query(&stmt, &[&user_id])
8✔
387
            .await
8✔
388
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
389

390
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
16✔
391

8✔
392
        tx.commit().await?;
8✔
393

394
        Ok(UserSession {
8✔
395
            id: session_id,
8✔
396
            user: UserInfo {
8✔
397
                id: user_id,
8✔
398
                email: Some(user.email.clone()),
8✔
399
                real_name: Some(user.real_name.clone()),
8✔
400
            },
8✔
401
            created: row.get(0),
8✔
402
            valid_until: row.get(1),
8✔
403
            project: None,
8✔
404
            view: None,
8✔
405
            roles,
8✔
406
        })
8✔
407
    }
16✔
408

409
    async fn user_session_by_id(&self, session: SessionId) -> Result<UserSession> {
173✔
410
        let mut conn = self.pool.get().await?;
173✔
411

412
        let tx = conn.build_transaction().start().await?;
167✔
413

414
        let stmt = tx
167✔
415
            .prepare(
167✔
416
                "
167✔
417
            SELECT 
167✔
418
                u.id,   
167✔
419
                COALESCE(u.email, eu.email) AS email,
167✔
420
                COALESCE(u.real_name, eu.real_name) AS real_name,
167✔
421
                us.created, 
167✔
422
                us.valid_until, 
167✔
423
                s.project_id,
167✔
424
                s.view,
167✔
425
                CASE WHEN CURRENT_TIMESTAMP < us.valid_until THEN TRUE ELSE FALSE END AS valid_session
167✔
426
            FROM
167✔
427
                sessions s JOIN user_sessions us ON (s.id = us.session_id)
167✔
428
                    JOIN users u ON (us.user_id = u.id)
167✔
429
                    LEFT JOIN external_users eu ON (u.id = eu.id)
167✔
430
                    LEFT JOIN oidc_session_tokens t ON (s.id = t.session_id)
167✔
431
            WHERE s.id = $1 AND (CURRENT_TIMESTAMP < us.valid_until OR t.refresh_token IS NOT NULL);",
167✔
432
            )
167✔
433
            .await?;
167✔
434

435
        let row = tx
167✔
436
            .query_one(&stmt, &[&session])
167✔
437
            .await
167✔
438
            .map_err(|_error| error::Error::InvalidSession)?;
167✔
439

440
        let valid_session: bool = row.get(7);
160✔
441

442
        let valid_until = if valid_session {
160✔
443
            row.get(4)
159✔
444
        } else {
445
            log::debug!("Session expired, trying to extend");
1✔
446
            let refresh_result = self.refresh_oidc_session_tokens(session, &tx).await;
1✔
447

448
            if let Err(refresh_error) = refresh_result {
1✔
UNCOV
449
                log::debug!("Session extension failed {}", refresh_error);
×
UNCOV
450
                return Err(Error::InvalidSession);
×
451
            }
1✔
452
            log::debug!("Session extended");
1✔
453
            refresh_result
1✔
454
                .expect("Refresh result should exist")
1✔
455
                .db_valid_until
1✔
456
        };
457

458
        let mut session = UserSession {
160✔
459
            id: session,
160✔
460
            user: UserInfo {
160✔
461
                id: row.get(0),
160✔
462
                email: row.get(1),
160✔
463
                real_name: row.get(2),
160✔
464
            },
160✔
465
            created: row.get(3),
160✔
466
            valid_until,
160✔
467
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
160✔
468
            view: row.get(6),
160✔
469
            roles: vec![],
160✔
470
        };
160✔
471

472
        let stmt = tx
160✔
473
            .prepare(
160✔
474
                "
160✔
475
            SELECT role_id FROM user_roles WHERE user_id = $1;
160✔
476
            ",
160✔
477
            )
160✔
478
            .await?;
160✔
479

480
        let rows = tx.query(&stmt, &[&session.user.id]).await?;
160✔
481

482
        tx.commit().await?;
160✔
483

484
        session.roles = rows.into_iter().map(|row| row.get(0)).collect();
272✔
485

160✔
486
        Ok(session)
160✔
487
    }
340✔
488
}
489

490
#[async_trait]
491
impl<Tls> SessionTokenStore for PostgresContext<Tls>
492
where
493
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
494
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
495
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
496
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
497
{
498
    async fn store_oidc_session_tokens(
499
        &self,
500
        session: SessionId,
501
        oidc_tokens: OidcTokens,
502
        tx: &Transaction<'_>,
503
    ) -> Result<StoredOidcTokens> {
8✔
504
        let flat_tokens: FlatMaybeEncryptedOidcTokens = self
8✔
505
            .oidc_manager()
8✔
506
            .maybe_encrypt_tokens(&oidc_tokens)?
8✔
507
            .into();
8✔
508

509
        let stmt = tx
8✔
510
            .prepare(
8✔
511
                "
8✔
512
                INSERT INTO oidc_session_tokens
8✔
513
                    (session_id, access_token, access_token_encryption_nonce, access_token_valid_until, refresh_token, refresh_token_encryption_nonce)
8✔
514
                VALUES
8✔
515
                    ($1, $2, $3, CURRENT_TIMESTAMP + make_interval(secs:=$4), $5, $6)
8✔
516
                RETURNING
8✔
517
                    access_token_valid_until;
8✔
518
                ;"
8✔
519
            )
8✔
520
            .await?;
8✔
521

522
        let db_valid_until = tx
8✔
523
            .query_one(
8✔
524
                &stmt,
8✔
525
                &[
8✔
526
                    &session,
8✔
527
                    &flat_tokens.access_token_value,
8✔
528
                    &flat_tokens.access_token_nonce,
8✔
529
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
530
                    &flat_tokens.refresh_token_value,
8✔
531
                    &flat_tokens.refresh_token_nonce,
8✔
532
                ],
8✔
533
            )
8✔
534
            .await?
8✔
535
            .get(0);
8✔
536

8✔
537
        Ok(StoredOidcTokens {
8✔
538
            oidc_tokens,
8✔
539
            db_valid_until,
8✔
540
        })
8✔
541
    }
16✔
542

543
    async fn refresh_oidc_session_tokens(
544
        &self,
545
        session: SessionId,
546
        tx: &Transaction<'_>,
547
    ) -> Result<StoredOidcTokens> {
3✔
548
        let stmt = tx
3✔
549
            .prepare(
3✔
550
                "
3✔
551
            SELECT
3✔
552
                refresh_token,
3✔
553
                refresh_token_encryption_nonce
3✔
554
            FROM
3✔
555
                oidc_session_tokens
3✔
556
            WHERE session_id = $1 AND refresh_token IS NOT NULL;",
3✔
557
            )
3✔
558
            .await?;
3✔
559

560
        let rows = tx.query_opt(&stmt, &[&session]).await?;
3✔
561

562
        if let Some(refresh_string) = rows {
3✔
563
            let string_field_and_nonce = MaybeEncryptedBytes {
3✔
564
                value: refresh_string.get(0),
3✔
565
                nonce: refresh_string.get(1),
3✔
566
            };
3✔
567

568
            let refresh_token = self
3✔
569
                .oidc_manager()
3✔
570
                .maybe_decrypt_refresh_token(string_field_and_nonce)?;
3✔
571

572
            let oidc_manager = self.oidc_manager();
3✔
573

574
            let oidc_tokens = oidc_manager
3✔
575
                .get_client()
3✔
576
                .await?
3✔
577
                .refresh_access_token(refresh_token)
3✔
578
                .await?;
3✔
579

580
            let flat_tokens: FlatMaybeEncryptedOidcTokens = self
3✔
581
                .oidc_manager()
3✔
582
                .maybe_encrypt_tokens(&oidc_tokens)?
3✔
583
                .into();
3✔
584

585
            let update_session_tokens = tx.prepare("
3✔
586
                UPDATE
3✔
587
                    oidc_session_tokens
3✔
588
                SET
3✔
589
                    access_token = $2, access_token_encryption_nonce = $3, access_token_valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$4), refresh_token = $5, refresh_token_encryption_nonce = $6
3✔
590
                WHERE
3✔
591
                    session_id = $1;",
3✔
592

3✔
593
            ).await?;
3✔
594

595
            tx.execute(
3✔
596
                &update_session_tokens,
3✔
597
                &[
3✔
598
                    &session,
3✔
599
                    &flat_tokens.access_token_value,
3✔
600
                    &flat_tokens.access_token_nonce,
3✔
601
                    &(oidc_tokens.expires_in.num_seconds() as f64),
3✔
602
                    &flat_tokens.refresh_token_value,
3✔
603
                    &flat_tokens.refresh_token_nonce,
3✔
604
                ],
3✔
605
            )
3✔
606
            .await?;
3✔
607

608
            let stmt = tx
3✔
609
                .prepare(
3✔
610
                    "
3✔
611
                UPDATE
3✔
612
                    user_sessions
3✔
613
                SET
3✔
614
                    valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$2)
3✔
615
                WHERE
3✔
616
                    session_id = $1
3✔
617
                RETURNING
3✔
618
                    valid_until;",
3✔
619
                )
3✔
620
                .await?;
3✔
621

622
            let expiration = tx
3✔
623
                .query_one(
3✔
624
                    &stmt,
3✔
625
                    &[&session, &(oidc_tokens.expires_in.num_seconds() as f64)],
3✔
626
                )
3✔
627
                .await?
3✔
628
                .get(0);
3✔
629

3✔
630
            return Ok(StoredOidcTokens {
3✔
631
                oidc_tokens,
3✔
632
                db_valid_until: expiration,
3✔
633
            });
3✔
UNCOV
634
        };
×
UNCOV
635

×
UNCOV
636
        Err(Error::InvalidSession)
×
637
    }
6✔
638

639
    async fn get_access_token(&self, session: SessionId) -> Result<AccessToken> {
4✔
640
        let mut conn = self.pool.get().await?;
4✔
641

642
        let tx = conn.build_transaction().start().await?;
4✔
643

644
        let stmt = tx
4✔
645
            .prepare(
4✔
646
                "
4✔
647
            SELECT
4✔
648
                access_token,
4✔
649
                access_token_encryption_nonce
4✔
650
            FROM
4✔
651
                oidc_session_tokens
4✔
652
            WHERE session_id = $1 AND (CURRENT_TIMESTAMP < access_token_valid_until);",
4✔
653
            )
4✔
654
            .await?;
4✔
655

656
        let rows = tx.query_opt(&stmt, &[&session]).await?;
4✔
657

658
        let access_token = if let Some(token_row) = rows {
4✔
659
            let string_field_and_nonce = MaybeEncryptedBytes {
2✔
660
                value: token_row.get(0),
2✔
661
                nonce: token_row.get(1),
2✔
662
            };
2✔
663
            self.oidc_manager()
2✔
664
                .maybe_decrypt_access_token(string_field_and_nonce)?
2✔
665
        } else {
666
            self.refresh_oidc_session_tokens(session, &tx)
2✔
667
                .await?
2✔
668
                .oidc_tokens
669
                .access
670
        };
671

672
        tx.commit().await?;
4✔
673

674
        Ok(access_token)
4✔
675
    }
8✔
676
}
677

678
#[async_trait]
679
impl<Tls> UserDb for PostgresDb<Tls>
680
where
681
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
682
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
683
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
684
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
685
{
686
    // TODO: clean up expired sessions?
687

688
    async fn logout(&self) -> Result<()> {
6✔
689
        let conn = self.conn_pool.get().await?;
6✔
690
        let stmt = conn
6✔
691
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
6✔
692
            .await?;
6✔
693

694
        conn.execute(&stmt, &[&self.session.id])
6✔
695
            .await
6✔
696
            .map_err(|_error| error::Error::LogoutFailed)?;
6✔
697
        Ok(())
6✔
698
    }
12✔
699

700
    async fn set_session_project(&self, project: ProjectId) -> Result<()> {
3✔
701
        // TODO: check permission
702

703
        let conn = self.conn_pool.get().await?;
3✔
704
        let stmt = conn
3✔
705
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
3✔
706
            .await?;
3✔
707

708
        conn.execute(&stmt, &[&project, &self.session.id]).await?;
3✔
709

710
        Ok(())
3✔
711
    }
6✔
712

713
    async fn set_session_view(&self, view: STRectangle) -> Result<()> {
3✔
714
        let conn = self.conn_pool.get().await?;
3✔
715
        let stmt = conn
3✔
716
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
3✔
717
            .await?;
3✔
718

719
        conn.execute(&stmt, &[&view, &self.session.id]).await?;
3✔
720

721
        Ok(())
3✔
722
    }
6✔
723

724
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
1✔
725
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
726

727
        let conn = self.conn_pool.get().await?;
1✔
728
        let stmt = conn
1✔
729
            .prepare(
1✔
730
                "
1✔
731
            UPDATE users SET 
1✔
732
                quota_available = quota_available - $1, 
1✔
733
                quota_used = quota_used + $1
1✔
734
            WHERE id = $2;",
1✔
735
            )
1✔
736
            .await?;
1✔
737

738
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
1✔
739

740
        Ok(())
1✔
741
    }
2✔
742

743
    async fn bulk_increment_quota_used<I: IntoIterator<Item = (UserId, u64)> + Send>(
744
        &self,
745
        quota_used_updates: I,
746
    ) -> Result<()> {
279✔
747
        ensure!(self.session.is_admin(), error::PermissionDenied);
279✔
748

749
        let conn = self.conn_pool.get().await?;
279✔
750

751
        // collect the user ids and quotas into separate vectors to pass them as parameters to the query
752
        let (users, quotas): (Vec<UserId>, Vec<i64>) = quota_used_updates
279✔
753
            .into_iter()
279✔
754
            .map(|(user, quota)| (user, quota as i64))
280✔
755
            .unzip();
279✔
756

279✔
757
        let query = "
279✔
758
            UPDATE users
279✔
759
            SET quota_available = quota_available - quota_changes.quota, 
279✔
760
                quota_used = quota_used + quota_changes.quota
279✔
761
            FROM 
279✔
762
                (SELECT * FROM UNNEST($1::uuid[], $2::bigint[]) AS t(id, quota)) AS quota_changes
279✔
763
            WHERE users.id = quota_changes.id;
279✔
764
        ";
279✔
765

279✔
766
        conn.execute(query, &[&users, &quotas]).await?;
279✔
767

768
        Ok(())
279✔
769
    }
558✔
770

771
    async fn quota_used(&self) -> Result<u64> {
7✔
772
        let conn = self.conn_pool.get().await?;
7✔
773
        let stmt = conn
7✔
774
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
775
            .await?;
7✔
776

777
        let row = conn
7✔
778
            .query_one(&stmt, &[&self.session.user.id])
7✔
779
            .await
7✔
780
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
781

782
        Ok(row.get::<usize, i64>(0) as u64)
7✔
783
    }
14✔
784

785
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
7✔
786
        ensure!(
7✔
787
            self.session.user.id == *user || self.session.is_admin(),
7✔
UNCOV
788
            error::PermissionDenied
×
789
        );
790

791
        let conn = self.conn_pool.get().await?;
7✔
792
        let stmt = conn
7✔
793
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
794
            .await?;
7✔
795

796
        let row = conn
7✔
797
            .query_one(&stmt, &[&user])
7✔
798
            .await
7✔
799
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
800

801
        Ok(row.get::<usize, i64>(0) as u64)
7✔
802
    }
14✔
803

804
    async fn quota_available(&self) -> Result<i64> {
63✔
805
        let conn = self.conn_pool.get().await?;
63✔
806
        let stmt = conn
63✔
807
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
63✔
808
            .await?;
63✔
809

810
        let row = conn
63✔
811
            .query_one(&stmt, &[&self.session.user.id])
63✔
812
            .await
63✔
813
            .map_err(|_error| error::Error::InvalidSession)?;
63✔
814

815
        Ok(row.get::<usize, i64>(0))
63✔
816
    }
126✔
817

818
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
6✔
819
        ensure!(
6✔
820
            self.session.user.id == *user || self.session.is_admin(),
6✔
UNCOV
821
            error::PermissionDenied
×
822
        );
823

824
        let conn = self.conn_pool.get().await?;
6✔
825
        let stmt = conn
6✔
826
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
6✔
827
            .await?;
6✔
828

829
        let row = conn
6✔
830
            .query_one(&stmt, &[&user])
6✔
831
            .await
6✔
832
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
833

834
        Ok(row.get::<usize, i64>(0))
6✔
835
    }
12✔
836

837
    async fn log_quota_used<I: IntoIterator<Item = ComputationUnit> + Send>(
838
        &self,
839
        log: I,
840
    ) -> Result<()> {
282✔
841
        ensure!(self.session.is_admin(), error::PermissionDenied);
282✔
842

843
        let conn = self.conn_pool.get().await?;
282✔
844

845
        // collect the log into separate vectors to pass them as parameters to the query
846
        let mut users = Vec::new();
282✔
847
        let mut workflows = Vec::new();
282✔
848
        let mut computations = Vec::new();
282✔
849
        let mut operators_names = Vec::new();
282✔
850
        let mut operator_paths = Vec::new();
282✔
851
        let mut datas = Vec::new();
282✔
852

853
        for unit in log {
569✔
854
            users.push(unit.user);
287✔
855
            workflows.push(unit.workflow);
287✔
856
            computations.push(unit.computation);
287✔
857
            operators_names.push(unit.operator_name);
287✔
858
            operator_paths.push(unit.operator_path.to_string());
287✔
859
            datas.push(unit.data);
287✔
860
        }
287✔
861

862
        let query = "
282✔
863
            INSERT INTO quota_log (user_id, workflow_id, computation_id, operator_name, operator_path, data)
282✔
864
                (SELECT * FROM UNNEST($1::uuid[], $2::uuid[], $3::uuid[], $4::text[], $5::text[], $6::text[]))
282✔
865
        ";
282✔
866

282✔
867
        conn.execute(
282✔
868
            query,
282✔
869
            &[
282✔
870
                &users,
282✔
871
                &workflows,
282✔
872
                &computations,
282✔
873
                &operators_names,
282✔
874
                &operator_paths,
282✔
875
                &datas,
282✔
876
            ],
282✔
877
        )
282✔
878
        .await?;
282✔
879

880
        Ok(())
282✔
881
    }
564✔
882

883
    #[allow(clippy::too_many_lines)]
884
    async fn quota_used_by_computations(
885
        &self,
886
        offset: usize,
887
        limit: usize,
888
    ) -> Result<Vec<ComputationQuota>> {
1✔
889
        let limit = limit.min(10); // TODO: use list limit from config
1✔
890

891
        let conn = self.conn_pool.get().await?;
1✔
892

893
        let rows = conn
1✔
894
            .query(
1✔
895
                "
1✔
896
            SELECT
1✔
897
                computation_id,
1✔
898
                workflow_id,
1✔
899
                MIN(timestamp) AS timestamp,
1✔
900
                COUNT(*) AS count
1✔
901
            FROM
1✔
902
                quota_log
1✔
903
            WHERE
1✔
904
                user_id = $1
1✔
905
            GROUP BY
1✔
906
                computation_id,
1✔
907
                workflow_id
1✔
908
            ORDER BY 
1✔
909
                MIN(TIMESTAMP) DESC
1✔
910
            OFFSET $2
1✔
911
            LIMIT $3;",
1✔
912
                &[&self.session.user.id, &(offset as i64), &(limit as i64)],
1✔
913
            )
1✔
914
            .await?;
1✔
915

916
        Ok(rows
1✔
917
            .iter()
1✔
918
            .map(|row| ComputationQuota {
1✔
919
                computation_id: row.get(0),
1✔
920
                workflow_id: row.get(1),
1✔
921
                timestamp: row.get(2),
1✔
922
                count: row.get::<_, i64>(3) as u64,
1✔
923
            })
1✔
924
            .collect())
1✔
925
    }
2✔
926

927
    async fn quota_used_by_computation(&self, computation_id: Uuid) -> Result<Vec<OperatorQuota>> {
1✔
928
        let conn = self.conn_pool.get().await?;
1✔
929

930
        let rows = conn
1✔
931
            .query(
1✔
932
                "
1✔
933
            SELECT
1✔
934
                operator_name,
1✔
935
                operator_path,
1✔
936
                COUNT(*) AS count
1✔
937
            FROM
1✔
938
                quota_log
1✔
939
            WHERE
1✔
940
                user_id = $1 AND
1✔
941
                computation_id = $2
1✔
942
            GROUP BY
1✔
943
                operator_name, operator_path
1✔
944
            ORDER BY 
1✔
945
            operator_name DESC;",
1✔
946
                &[&self.session.user.id, &computation_id],
1✔
947
            )
1✔
948
            .await?;
1✔
949

950
        Ok(rows
1✔
951
            .iter()
1✔
952
            .map(|row| OperatorQuota {
1✔
953
                operator_name: row.get(0),
1✔
954
                operator_path: row.get(1),
1✔
955
                count: row.get::<_, i64>(2) as u64,
1✔
956
            })
1✔
957
            .collect())
1✔
958
    }
2✔
959

960
    async fn quota_used_on_data(&self, offset: u64, limit: u64) -> Result<Vec<DataUsage>> {
1✔
961
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
962

963
        let conn = self.conn_pool.get().await?;
1✔
964

965
        let rows = conn
1✔
966
            .query(
1✔
967
                "
1✔
968
            SELECT
1✔
969
                user_id,
1✔
970
                computation_id,
1✔
971
                data,
1✔
972
                MIN(timestamp) AS timestamp,
1✔
973
                COUNT(*) AS count
1✔
974
            FROM
1✔
975
                quota_log
1✔
976
            WHERE 
1✔
977
                data IS NOT NULL
1✔
978
            GROUP BY
1✔
979
               user_id,
1✔
980
               computation_id,
1✔
981
               workflow_id,
1✔
982
               data
1✔
983
            ORDER BY
1✔
984
                MIN(timestamp) DESC, user_id ASC, computation_id ASC, workflow_id ASC, data ASC
1✔
985
            OFFSET
1✔
986
                $1
1✔
987
            LIMIT
1✔
988
                $2;",
1✔
989
                &[&(offset as i64), &(limit as i64)],
1✔
990
            )
1✔
991
            .await?;
1✔
992

993
        Ok(rows
1✔
994
            .iter()
1✔
995
            .map(|row| DataUsage {
3✔
996
                user_id: row.get(0),
3✔
997
                computation_id: row.get(1),
3✔
998
                data: row.get(2),
3✔
999
                timestamp: row.get(3),
3✔
1000
                count: row.get::<_, i64>(4) as u64,
3✔
1001
            })
3✔
1002
            .collect())
1✔
1003
    }
2✔
1004

1005
    async fn quota_used_on_data_summary(
1006
        &self,
1007
        dataset: Option<String>,
1008
        granularity: UsageSummaryGranularity,
1009
        offset: u64,
1010
        limit: u64,
1011
    ) -> Result<Vec<DataUsageSummary>> {
1✔
1012
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
1013

1014
        // TODO: check if user is Owner of dataset
1015

1016
        let conn = self.conn_pool.get().await?;
1✔
1017

1018
        let trunc = match granularity {
1✔
UNCOV
1019
            UsageSummaryGranularity::Minutes => "minute",
×
UNCOV
1020
            UsageSummaryGranularity::Hours => "hour",
×
UNCOV
1021
            UsageSummaryGranularity::Days => "day",
×
UNCOV
1022
            UsageSummaryGranularity::Months => "month",
×
1023
            UsageSummaryGranularity::Years => "year",
1✔
1024
        };
1025

1026
        let rows = if let Some(dataset) = dataset {
1✔
UNCOV
1027
            conn.query(
×
UNCOV
1028
                &format!(
×
UNCOV
1029
                    "
×
UNCOV
1030
            SELECT
×
UNCOV
1031
                date_trunc('{trunc}', timestamp) AS trunc,
×
UNCOV
1032
                data
×
UNCOV
1033
                COUNT(*) AS count
×
UNCOV
1034
            FROM
×
UNCOV
1035
                quota_log
×
UNCOV
1036
            WHERE 
×
UNCOV
1037
                data = $3
×
UNCOV
1038
            GROUP BY
×
UNCOV
1039
                trunc, data
×
UNCOV
1040
            ORDER BY
×
UNCOV
1041
                trunc DESC, data ASC
×
UNCOV
1042
            OFFSET
×
UNCOV
1043
                $1
×
UNCOV
1044
            LIMIT 
×
UNCOV
1045
                $2;",
×
UNCOV
1046
                ),
×
UNCOV
1047
                &[&(offset as i64), &(limit as i64), &dataset],
×
UNCOV
1048
            )
×
UNCOV
1049
            .await?
×
1050
        } else {
1051
            conn.query(
1✔
1052
                &format!(
1✔
1053
                    "
1✔
1054
            SELECT
1✔
1055
                date_trunc('{trunc}', timestamp) AS trunc,
1✔
1056
                data,
1✔
1057
                COUNT(*) AS count
1✔
1058
            FROM
1✔
1059
                quota_log
1✔
1060
            WHERE 
1✔
1061
                data IS NOT NULL
1✔
1062
            GROUP BY
1✔
1063
                trunc, data
1✔
1064
            ORDER BY
1✔
1065
                trunc DESC, data ASC
1✔
1066
            OFFSET
1✔
1067
                $1
1✔
1068
            LIMIT 
1✔
1069
                $2;",
1✔
1070
                ),
1✔
1071
                &[&(offset as i64), &(limit as i64)],
1✔
1072
            )
1✔
1073
            .await?
1✔
1074
        };
1075

1076
        Ok(rows
1✔
1077
            .iter()
1✔
1078
            .map(|row| DataUsageSummary {
2✔
1079
                timestamp: row.get(0),
2✔
1080
                data: row.get(1),
2✔
1081
                count: row.get::<_, i64>(2) as u64,
2✔
1082
            })
2✔
1083
            .collect())
1✔
1084
    }
2✔
1085

1086
    async fn update_quota_available_by_user(
1087
        &self,
1088
        user: &UserId,
1089
        new_available_quota: i64,
1090
    ) -> Result<()> {
5✔
1091
        ensure!(self.session.is_admin(), error::PermissionDenied);
5✔
1092

1093
        let conn = self.conn_pool.get().await?;
5✔
1094
        let stmt = conn
5✔
1095
            .prepare(
5✔
1096
                "
5✔
1097
            UPDATE users SET 
5✔
1098
                quota_available = $1
5✔
1099
            WHERE id = $2;",
5✔
1100
            )
5✔
1101
            .await?;
5✔
1102

1103
        conn.execute(&stmt, &[&(new_available_quota), &user])
5✔
1104
            .await?;
5✔
1105

1106
        Ok(())
5✔
1107
    }
10✔
1108
}
1109

1110
#[async_trait]
1111
impl<Tls> RoleDb for PostgresDb<Tls>
1112
where
1113
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
1114
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
1115
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
1116
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
1117
{
1118
    async fn add_role(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
4✔
1119
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
1120

1121
        let tx = conn
4✔
1122
            .build_transaction()
4✔
1123
            .start()
4✔
1124
            .await
4✔
1125
            .context(PostgresRoleDbError)?;
4✔
1126

1127
        self.ensure_admin_in_tx(&tx)
4✔
1128
            .await
4✔
1129
            .context(PermissionDbRoleDbError)?;
4✔
1130

1131
        let id = RoleId::new();
4✔
1132

1133
        let res = tx
4✔
1134
            .execute(
4✔
1135
                "INSERT INTO roles (id, name) VALUES ($1, $2);",
4✔
1136
                &[&id, &role_name],
4✔
1137
            )
4✔
1138
            .await;
4✔
1139

1140
        if let Err(err) = res {
4✔
UNCOV
1141
            if err.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
×
UNCOV
1142
                return Err(RoleDbError::RoleAlreadyExists {
×
UNCOV
1143
                    role_name: role_name.to_string(),
×
UNCOV
1144
                });
×
1145
            }
×
1146
        }
4✔
1147

1148
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
1149

1150
        Ok(id)
4✔
1151
    }
8✔
1152

UNCOV
1153
    async fn load_role_by_name(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
×
UNCOV
1154
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
×
1155

UNCOV
1156
        let row = conn
×
UNCOV
1157
            .query_opt("SELECT id FROM roles WHERE name = $1;", &[&role_name])
×
UNCOV
1158
            .await
×
UNCOV
1159
            .context(PostgresRoleDbError)?
×
UNCOV
1160
            .ok_or(RoleDbError::RoleNameDoesNotExist {
×
UNCOV
1161
                role_name: role_name.to_string(),
×
UNCOV
1162
            })?;
×
1163

UNCOV
1164
        Ok(RoleId(row.get(0)))
×
UNCOV
1165
    }
×
1166

1167
    async fn remove_role(&self, role_id: &RoleId) -> Result<(), RoleDbError> {
2✔
1168
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
1169

1170
        let tx = conn
2✔
1171
            .build_transaction()
2✔
1172
            .start()
2✔
1173
            .await
2✔
1174
            .context(PostgresRoleDbError)?;
2✔
1175

1176
        self.ensure_admin_in_tx(&tx)
2✔
1177
            .await
2✔
1178
            .context(PermissionDbRoleDbError)?;
2✔
1179

1180
        let deleted = tx
2✔
1181
            .execute("DELETE FROM roles WHERE id = $1;", &[&role_id])
2✔
1182
            .await
2✔
1183
            .context(PostgresRoleDbError)?;
2✔
1184

1185
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
1186

1187
        ensure!(
2✔
1188
            deleted > 0,
2✔
UNCOV
1189
            RoleIdDoesNotExistRoleDbError { role_id: *role_id }
×
1190
        );
1191

1192
        Ok(())
2✔
1193
    }
4✔
1194

1195
    async fn assign_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
4✔
1196
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
1197

1198
        let tx = conn
4✔
1199
            .build_transaction()
4✔
1200
            .start()
4✔
1201
            .await
4✔
1202
            .context(PostgresRoleDbError)?;
4✔
1203

1204
        self.ensure_admin_in_tx(&tx)
4✔
1205
            .await
4✔
1206
            .context(PermissionDbRoleDbError)?;
4✔
1207

1208
        tx.execute(
4✔
1209
            "INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2) ON CONFLICT DO NOTHING;",
4✔
1210
            &[&user_id, &role_id],
4✔
1211
        )
4✔
1212
        .await
4✔
1213
        .context(PostgresRoleDbError)?;
4✔
1214

1215
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
1216

1217
        Ok(())
4✔
1218
    }
8✔
1219

1220
    async fn revoke_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
2✔
1221
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
1222

1223
        let tx = conn
2✔
1224
            .build_transaction()
2✔
1225
            .start()
2✔
1226
            .await
2✔
1227
            .context(PostgresRoleDbError)?;
2✔
1228

1229
        self.ensure_admin_in_tx(&tx)
2✔
1230
            .await
2✔
1231
            .context(PermissionDbRoleDbError)?;
2✔
1232

1233
        let deleted = tx
2✔
1234
            .execute(
2✔
1235
                "DELETE FROM user_roles WHERE user_id= $1 AND role_id = $2;",
2✔
1236
                &[&user_id, &role_id],
2✔
1237
            )
2✔
1238
            .await
2✔
1239
            .context(PostgresRoleDbError)?;
2✔
1240

1241
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
1242

1243
        ensure!(
2✔
1244
            deleted > 0,
2✔
UNCOV
1245
            CannotRevokeRoleThatIsNotAssignedRoleDbError { role_id: *role_id }
×
1246
        );
1247

1248
        Ok(())
2✔
1249
    }
4✔
1250

1251
    async fn get_role_descriptions(
1252
        &self,
1253
        user_id: &UserId,
1254
    ) -> Result<Vec<RoleDescription>, RoleDbError> {
5✔
1255
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
5✔
1256

1257
        let stmt = conn
5✔
1258
            .prepare(
5✔
1259
                "SELECT roles.id, roles.name \
5✔
1260
                FROM roles JOIN user_roles ON (roles.id=user_roles.role_id) \
5✔
1261
                WHERE user_roles.user_id=$1 \
5✔
1262
                ORDER BY roles.name;",
5✔
1263
            )
5✔
1264
            .await
5✔
1265
            .context(PostgresRoleDbError)?;
5✔
1266

1267
        let results = conn
5✔
1268
            .query(&stmt, &[&user_id])
5✔
1269
            .await
5✔
1270
            .context(PostgresRoleDbError)?;
5✔
1271

1272
        let mut result_vec = Vec::new();
5✔
1273

1274
        for result in results {
17✔
1275
            let id = result.get(0);
12✔
1276
            let name = result.get(1);
12✔
1277
            let individual = UserId(id) == *user_id;
12✔
1278
            result_vec.push(RoleDescription {
12✔
1279
                role: Role {
12✔
1280
                    id: RoleId(id),
12✔
1281
                    name,
12✔
1282
                },
12✔
1283
                individual,
12✔
1284
            });
12✔
1285
        }
12✔
1286

1287
        Ok(result_vec)
5✔
1288
    }
10✔
1289
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc