• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 14513006992

17 Apr 2025 09:53AM UTC coverage: 90.021%. First build
14513006992

Pull #1048

github

web-flow
Merge ff954054c into 1076a6163
Pull Request #1048: 1.86

32 of 77 new or added lines in 36 files covered. (41.56%)

126708 of 140754 relevant lines covered (90.02%)

57184.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.41
/services/src/users/postgres_userdb.rs
1
use crate::api::handlers::users::UsageSummaryGranularity;
2
use crate::contexts::PostgresDb;
3
use crate::contexts::{ApplicationContext, SessionId};
4
use crate::error::{Error, Result};
5
use crate::permissions::TxPermissionDb;
6
use crate::permissions::{Role, RoleDescription, RoleId};
7
use crate::projects::{ProjectId, STRectangle};
8
use crate::quota::{ComputationQuota, DataUsage, DataUsageSummary, OperatorQuota};
9
use crate::users::oidc::{FlatMaybeEncryptedOidcTokens, OidcTokens, UserClaims};
10
use crate::users::userdb::{
11
    CannotRevokeRoleThatIsNotAssignedRoleDbError, RoleIdDoesNotExistRoleDbError,
12
};
13
use crate::users::{
14
    SessionTokenStore, StoredOidcTokens, User, UserCredentials, UserDb, UserId, UserInfo,
15
    UserRegistration, UserSession,
16
};
17
use crate::util::Identifier;
18
use crate::util::postgres::PostgresErrorExt;
19
use crate::{contexts::PostgresContext, error};
20
use async_trait::async_trait;
21
use geoengine_operators::meta::quota::ComputationUnit;
22

23
use crate::util::encryption::MaybeEncryptedBytes;
24
use bb8_postgres::{
25
    tokio_postgres::Socket, tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect,
26
};
27
use oauth2::AccessToken;
28
use pwhash::bcrypt;
29
use snafu::{ResultExt, ensure};
30
use tokio_postgres::Transaction;
31
use uuid::Uuid;
32

33
use super::userdb::{
34
    Bb8RoleDbError, PermissionDbRoleDbError, PostgresRoleDbError, RoleDb, RoleDbError, UserAuth,
35
};
36

37
#[async_trait]
38
impl<Tls> UserAuth for PostgresContext<Tls>
39
where
40
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
41
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
42
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
43
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
44
{
45
    // TODO: clean up expired sessions?
46

47
    async fn register_user(&self, user: UserRegistration) -> Result<UserId> {
45✔
48
        let mut conn = self.pool.get().await?;
45✔
49

50
        let tx = conn.build_transaction().start().await?;
45✔
51

52
        let user = User::from(user);
45✔
53

54
        let stmt = tx
45✔
55
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
45✔
56
            .await?;
45✔
57
        tx.execute(&stmt, &[&user.id, &user.email])
45✔
58
            .await
45✔
59
            .map_unique_violation("roles", "name", || error::Error::Duplicate {
45✔
60
                reason: "E-mail already exists".to_string(),
15✔
61
            })?;
45✔
62

63
        let stmt = tx
30✔
64
            .prepare(
30✔
65
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
30✔
66
            )
30✔
67
            .await?;
30✔
68

69
        let quota_available =
30✔
70
            crate::config::get_config_element::<crate::config::Quota>()?.initial_credits;
30✔
71

72
        tx.execute(
30✔
73
            &stmt,
30✔
74
            &[
30✔
75
                &user.id,
30✔
76
                &user.email,
30✔
77
                &user.password_hash,
30✔
78
                &user.real_name,
30✔
79
                &quota_available,
30✔
80
                &user.active,
30✔
81
            ],
30✔
82
        )
30✔
83
        .await?;
30✔
84

85
        let stmt = tx
30✔
86
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
30✔
87
            .await?;
30✔
88
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
30✔
89

90
        let stmt = tx
30✔
91
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
30✔
92
            .await?;
30✔
93
        tx.execute(&stmt, &[&user.id, &Role::registered_user_role_id()])
30✔
94
            .await?;
30✔
95

96
        tx.commit().await?;
30✔
97

98
        Ok(user.id)
30✔
99
    }
90✔
100

101
    async fn create_anonymous_session(&self) -> Result<UserSession> {
268✔
102
        let mut conn = self.pool.get().await?;
268✔
103

104
        let tx = conn.build_transaction().start().await?;
268✔
105

106
        let user_id = UserId::new();
268✔
107

108
        let stmt = tx
268✔
109
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
268✔
110
            .await?;
268✔
111
        tx.execute(&stmt, &[&user_id, &format!("anonymous_user_{user_id}")])
268✔
112
            .await?;
268✔
113

114
        let quota_available =
268✔
115
            crate::config::get_config_element::<crate::config::Quota>()?.initial_credits;
268✔
116

117
        let stmt = tx
268✔
118
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
268✔
119
            .await?;
268✔
120

121
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
268✔
122

123
        let stmt = tx
268✔
124
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
268✔
125
            .await?;
268✔
126
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
268✔
127

128
        let stmt = tx
268✔
129
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
268✔
130
            .await?;
268✔
131
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
268✔
132
            .await?;
268✔
133

134
        let session_id = SessionId::new();
268✔
135

268✔
136
        // TODO: load from config
268✔
137
        let session_duration = chrono::Duration::days(30);
268✔
138
        let row = tx
268✔
139
            .query_one(
268✔
140
                "
268✔
141
                INSERT INTO 
268✔
142
                    sessions (id, user_id, created, valid_until) 
268✔
143
                VALUES 
268✔
144
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
268✔
145
                RETURNING 
268✔
146
                    created, valid_until;
268✔
147
                ",
268✔
148
                &[
268✔
149
                    &session_id,
268✔
150
                    &user_id,
268✔
151
                    &(session_duration.num_seconds() as f64),
268✔
152
                ],
268✔
153
            )
268✔
154
            .await?;
268✔
155

156
        tx.commit().await?;
268✔
157

158
        Ok(UserSession {
268✔
159
            id: session_id,
268✔
160
            user: UserInfo {
268✔
161
                id: user_id,
268✔
162
                email: None,
268✔
163
                real_name: None,
268✔
164
            },
268✔
165
            created: row.get(0),
268✔
166
            valid_until: row.get(1),
268✔
167
            project: None,
268✔
168
            view: None,
268✔
169
            roles: vec![user_id.into(), Role::anonymous_role_id()],
268✔
170
        })
268✔
171
    }
536✔
172

173
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
224✔
174
        let mut conn = self.pool.get().await?;
224✔
175

176
        let tx = conn.build_transaction().start().await?;
224✔
177

178
        let stmt = tx
224✔
179
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
224✔
180
            .await?;
224✔
181

182
        let row = tx
224✔
183
            .query_one(&stmt, &[&user_credentials.email])
224✔
184
            .await
224✔
185
            .map_err(|_error| error::Error::LoginFailed)?;
224✔
186

187
        let user_id = UserId(row.get(0));
224✔
188
        let password_hash = row.get(1);
224✔
189
        let email = row.get(2);
224✔
190
        let real_name = row.get(3);
224✔
191

224✔
192
        if bcrypt::verify(user_credentials.password, password_hash) {
224✔
193
            let session_id = SessionId::new();
223✔
194

223✔
195
            // TODO: load from config
223✔
196
            let session_duration = chrono::Duration::days(30);
223✔
197

198
            let row = tx
223✔
199
                .query_one(
223✔
200
                    "
223✔
201
                    INSERT INTO 
223✔
202
                        sessions (id, user_id, created, valid_until) 
223✔
203
                    VALUES 
223✔
204
                        ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
223✔
205
                    RETURNING 
223✔
206
                        created, valid_until;
223✔
207
                    ",
223✔
208
                    &[
223✔
209
                        &session_id,
223✔
210
                        &user_id,
223✔
211
                        &(session_duration.num_seconds() as f64),
223✔
212
                    ],
223✔
213
                )
223✔
214
                .await?;
223✔
215

216
            let stmt = tx
223✔
217
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
223✔
218
                .await?;
223✔
219

220
            let rows = tx
223✔
221
                .query(&stmt, &[&user_id])
223✔
222
                .await
223✔
223
                .map_err(|_error| error::Error::LoginFailed)?;
223✔
224

225
            tx.commit().await?;
223✔
226

227
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
257✔
228

223✔
229
            Ok(UserSession {
223✔
230
                id: session_id,
223✔
231
                user: UserInfo {
223✔
232
                    id: user_id,
223✔
233
                    email,
223✔
234
                    real_name,
223✔
235
                },
223✔
236
                created: row.get(0),
223✔
237
                valid_until: row.get(1),
223✔
238
                project: None,
223✔
239
                view: None,
223✔
240
                roles,
223✔
241
            })
223✔
242
        } else {
243
            Err(error::Error::LoginFailed)
1✔
244
        }
245
    }
448✔
246

247
    #[allow(clippy::too_many_lines)]
248
    async fn login_external(
249
        &self,
250
        user: UserClaims,
251
        oidc_tokens: OidcTokens,
252
    ) -> Result<UserSession> {
8✔
253
        let mut conn = self.pool.get().await?;
8✔
254
        let tx = conn.build_transaction().start().await?;
8✔
255

256
        let stmt = tx
8✔
257
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
8✔
258
            .await?;
8✔
259

260
        let row = tx
8✔
261
            .query_opt(&stmt, &[&user.external_id.to_string()])
8✔
262
            .await
8✔
263
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
264

265
        let user_id = match row {
8✔
266
            Some(row) => UserId(row.get(0)),
2✔
267
            None => {
268
                let user_id = UserId::new();
6✔
269

270
                let stmt = tx
6✔
271
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
6✔
272
                    .await?;
6✔
273
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
6✔
274

275
                let quota_available =
6✔
276
                    crate::config::get_config_element::<crate::config::Quota>()?.initial_credits;
6✔
277

278
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
279
                let stmt = tx
6✔
280
                    .prepare(
6✔
281
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
6✔
282
                    )
6✔
283
                    .await?;
6✔
284
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
6✔
285

286
                let stmt = tx
6✔
287
                    .prepare(
6✔
288
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
6✔
289
                    )
6✔
290
                    .await?;
6✔
291

292
                tx.execute(
6✔
293
                    &stmt,
6✔
294
                    &[
6✔
295
                        &user_id,
6✔
296
                        &user.external_id.to_string(),
6✔
297
                        &user.email,
6✔
298
                        &user.real_name,
6✔
299
                        &true,
6✔
300
                    ],
6✔
301
                )
6✔
302
                .await?;
6✔
303

304
                let stmt = tx
6✔
305
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
306
                    .await?;
6✔
307
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
6✔
308

309
                let stmt = tx
6✔
310
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
311
                    .await?;
6✔
312
                tx.execute(&stmt, &[&user_id, &Role::registered_user_role_id()])
6✔
313
                    .await?;
6✔
314

315
                user_id
6✔
316
            }
317
        };
318

319
        let session_id = SessionId::new();
8✔
320

321
        let row = tx
8✔
322
            .query_one(
8✔
323
                "
8✔
324
                INSERT INTO 
8✔
325
                    sessions (id, user_id, created, valid_until) 
8✔
326
                VALUES 
8✔
327
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
8✔
328
                RETURNING 
8✔
329
                    created, valid_until;
8✔
330
                ",
8✔
331
                &[
8✔
332
                    &session_id,
8✔
333
                    &user_id,
8✔
334
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
335
                ],
8✔
336
            )
8✔
337
            .await?;
8✔
338

339
        self.store_oidc_session_tokens(session_id, oidc_tokens, &tx)
8✔
340
            .await?;
8✔
341

342
        let stmt = tx
8✔
343
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
8✔
344
            .await?;
8✔
345

346
        let rows = tx
8✔
347
            .query(&stmt, &[&user_id])
8✔
348
            .await
8✔
349
            .map_err(|_error| error::Error::LoginFailed)?;
8✔
350

351
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
16✔
352

8✔
353
        tx.commit().await?;
8✔
354

355
        Ok(UserSession {
8✔
356
            id: session_id,
8✔
357
            user: UserInfo {
8✔
358
                id: user_id,
8✔
359
                email: Some(user.email.clone()),
8✔
360
                real_name: Some(user.real_name.clone()),
8✔
361
            },
8✔
362
            created: row.get(0),
8✔
363
            valid_until: row.get(1),
8✔
364
            project: None,
8✔
365
            view: None,
8✔
366
            roles,
8✔
367
        })
8✔
368
    }
16✔
369

370
    async fn user_session_by_id(&self, session: SessionId) -> Result<UserSession> {
176✔
371
        let mut conn = self.pool.get().await?;
176✔
372

373
        let tx = conn.build_transaction().start().await?;
170✔
374

375
        let stmt = tx
170✔
376
            .prepare(
170✔
377
                "
170✔
378
            SELECT 
170✔
379
                u.id,   
170✔
380
                COALESCE(u.email, eu.email) AS email,
170✔
381
                COALESCE(u.real_name, eu.real_name) AS real_name,
170✔
382
                s.created, 
170✔
383
                s.valid_until, 
170✔
384
                s.project_id,
170✔
385
                s.view,
170✔
386
                CASE WHEN CURRENT_TIMESTAMP < s.valid_until THEN TRUE ELSE FALSE END AS valid_session
170✔
387
            FROM
170✔
388
                sessions s
170✔
389
                    JOIN users u ON (s.user_id = u.id)
170✔
390
                    LEFT JOIN external_users eu ON (u.id = eu.id)
170✔
391
                    LEFT JOIN oidc_session_tokens t ON (s.id = t.session_id)
170✔
392
            WHERE s.id = $1 AND (CURRENT_TIMESTAMP < s.valid_until OR t.refresh_token IS NOT NULL);",
170✔
393
            )
170✔
394
            .await?;
170✔
395

396
        let row = tx
170✔
397
            .query_one(&stmt, &[&session])
170✔
398
            .await
170✔
399
            .map_err(|_error| error::Error::InvalidSession)?;
170✔
400

401
        let valid_session: bool = row.get(7);
163✔
402

403
        let valid_until = if valid_session {
163✔
404
            row.get(4)
162✔
405
        } else {
406
            log::debug!("Session expired, trying to extend");
1✔
407
            let refresh_result = self.refresh_oidc_session_tokens(session, &tx).await;
1✔
408

409
            if let Err(refresh_error) = refresh_result {
1✔
NEW
410
                log::debug!("Session extension failed {refresh_error}");
×
411
                return Err(Error::InvalidSession);
×
412
            }
1✔
413
            log::debug!("Session extended");
1✔
414
            refresh_result
1✔
415
                .expect("Refresh result should exist")
1✔
416
                .db_valid_until
1✔
417
        };
418

419
        let mut session = UserSession {
163✔
420
            id: session,
163✔
421
            user: UserInfo {
163✔
422
                id: row.get(0),
163✔
423
                email: row.get(1),
163✔
424
                real_name: row.get(2),
163✔
425
            },
163✔
426
            created: row.get(3),
163✔
427
            valid_until,
163✔
428
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
163✔
429
            view: row.get(6),
163✔
430
            roles: vec![],
163✔
431
        };
163✔
432

433
        let stmt = tx
163✔
434
            .prepare(
163✔
435
                "
163✔
436
            SELECT role_id FROM user_roles WHERE user_id = $1;
163✔
437
            ",
163✔
438
            )
163✔
439
            .await?;
163✔
440

441
        let rows = tx.query(&stmt, &[&session.user.id]).await?;
163✔
442

443
        tx.commit().await?;
163✔
444

445
        session.roles = rows.into_iter().map(|row| row.get(0)).collect();
275✔
446

163✔
447
        Ok(session)
163✔
448
    }
346✔
449
}
450

451
#[async_trait]
452
impl<Tls> SessionTokenStore for PostgresContext<Tls>
453
where
454
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
455
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
456
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
457
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
458
{
459
    async fn store_oidc_session_tokens(
460
        &self,
461
        session: SessionId,
462
        oidc_tokens: OidcTokens,
463
        tx: &Transaction<'_>,
464
    ) -> Result<StoredOidcTokens> {
8✔
465
        let flat_tokens: FlatMaybeEncryptedOidcTokens = self
8✔
466
            .oidc_manager()
8✔
467
            .maybe_encrypt_tokens(&oidc_tokens)?
8✔
468
            .into();
8✔
469

470
        let stmt = tx
8✔
471
            .prepare(
8✔
472
                "
8✔
473
                INSERT INTO oidc_session_tokens
8✔
474
                    (session_id, access_token, access_token_encryption_nonce, access_token_valid_until, refresh_token, refresh_token_encryption_nonce)
8✔
475
                VALUES
8✔
476
                    ($1, $2, $3, CURRENT_TIMESTAMP + make_interval(secs:=$4), $5, $6)
8✔
477
                RETURNING
8✔
478
                    access_token_valid_until;
8✔
479
                ;"
8✔
480
            )
8✔
481
            .await?;
8✔
482

483
        let db_valid_until = tx
8✔
484
            .query_one(
8✔
485
                &stmt,
8✔
486
                &[
8✔
487
                    &session,
8✔
488
                    &flat_tokens.access_token_value,
8✔
489
                    &flat_tokens.access_token_nonce,
8✔
490
                    &(oidc_tokens.expires_in.num_seconds() as f64),
8✔
491
                    &flat_tokens.refresh_token_value,
8✔
492
                    &flat_tokens.refresh_token_nonce,
8✔
493
                ],
8✔
494
            )
8✔
495
            .await?
8✔
496
            .get(0);
8✔
497

8✔
498
        Ok(StoredOidcTokens {
8✔
499
            oidc_tokens,
8✔
500
            db_valid_until,
8✔
501
        })
8✔
502
    }
16✔
503

504
    async fn refresh_oidc_session_tokens(
505
        &self,
506
        session: SessionId,
507
        tx: &Transaction<'_>,
508
    ) -> Result<StoredOidcTokens> {
3✔
509
        let stmt = tx
3✔
510
            .prepare(
3✔
511
                "
3✔
512
            SELECT
3✔
513
                refresh_token,
3✔
514
                refresh_token_encryption_nonce
3✔
515
            FROM
3✔
516
                oidc_session_tokens
3✔
517
            WHERE session_id = $1 AND refresh_token IS NOT NULL;",
3✔
518
            )
3✔
519
            .await?;
3✔
520

521
        let rows = tx.query_opt(&stmt, &[&session]).await?;
3✔
522

523
        if let Some(refresh_string) = rows {
3✔
524
            let string_field_and_nonce = MaybeEncryptedBytes {
3✔
525
                value: refresh_string.get(0),
3✔
526
                nonce: refresh_string.get(1),
3✔
527
            };
3✔
528

529
            let refresh_token = self
3✔
530
                .oidc_manager()
3✔
531
                .maybe_decrypt_refresh_token(string_field_and_nonce)?;
3✔
532

533
            let oidc_manager = self.oidc_manager();
3✔
534

535
            let oidc_tokens = oidc_manager
3✔
536
                .get_client()
3✔
537
                .await?
3✔
538
                .refresh_access_token(refresh_token)
3✔
539
                .await?;
3✔
540

541
            let flat_tokens: FlatMaybeEncryptedOidcTokens = self
3✔
542
                .oidc_manager()
3✔
543
                .maybe_encrypt_tokens(&oidc_tokens)?
3✔
544
                .into();
3✔
545

546
            let update_session_tokens = tx.prepare("
3✔
547
                UPDATE
3✔
548
                    oidc_session_tokens
3✔
549
                SET
3✔
550
                    access_token = $2, access_token_encryption_nonce = $3, access_token_valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$4), refresh_token = $5, refresh_token_encryption_nonce = $6
3✔
551
                WHERE
3✔
552
                    session_id = $1;",
3✔
553

3✔
554
            ).await?;
3✔
555

556
            tx.execute(
3✔
557
                &update_session_tokens,
3✔
558
                &[
3✔
559
                    &session,
3✔
560
                    &flat_tokens.access_token_value,
3✔
561
                    &flat_tokens.access_token_nonce,
3✔
562
                    &(oidc_tokens.expires_in.num_seconds() as f64),
3✔
563
                    &flat_tokens.refresh_token_value,
3✔
564
                    &flat_tokens.refresh_token_nonce,
3✔
565
                ],
3✔
566
            )
3✔
567
            .await?;
3✔
568

569
            let expiration = tx
3✔
570
                .query_one(
3✔
571
                    "
3✔
572
                    UPDATE
3✔
573
                        sessions
3✔
574
                    SET
3✔
575
                        valid_until = CURRENT_TIMESTAMP + make_interval(secs:=$2)
3✔
576
                    WHERE
3✔
577
                        id = $1
3✔
578
                    RETURNING
3✔
579
                        valid_until;
3✔
580
                    ",
3✔
581
                    &[&session, &(oidc_tokens.expires_in.num_seconds() as f64)],
3✔
582
                )
3✔
583
                .await?
3✔
584
                .get(0);
3✔
585

3✔
586
            return Ok(StoredOidcTokens {
3✔
587
                oidc_tokens,
3✔
588
                db_valid_until: expiration,
3✔
589
            });
3✔
NEW
590
        }
×
591

×
592
        Err(Error::InvalidSession)
×
593
    }
6✔
594

595
    async fn get_access_token(&self, session: SessionId) -> Result<AccessToken> {
4✔
596
        let mut conn = self.pool.get().await?;
4✔
597

598
        let tx = conn.build_transaction().start().await?;
4✔
599

600
        let stmt = tx
4✔
601
            .prepare(
4✔
602
                "
4✔
603
            SELECT
4✔
604
                access_token,
4✔
605
                access_token_encryption_nonce
4✔
606
            FROM
4✔
607
                oidc_session_tokens
4✔
608
            WHERE session_id = $1 AND (CURRENT_TIMESTAMP < access_token_valid_until);",
4✔
609
            )
4✔
610
            .await?;
4✔
611

612
        let rows = tx.query_opt(&stmt, &[&session]).await?;
4✔
613

614
        let access_token = if let Some(token_row) = rows {
4✔
615
            let string_field_and_nonce = MaybeEncryptedBytes {
2✔
616
                value: token_row.get(0),
2✔
617
                nonce: token_row.get(1),
2✔
618
            };
2✔
619
            self.oidc_manager()
2✔
620
                .maybe_decrypt_access_token(string_field_and_nonce)?
2✔
621
        } else {
622
            self.refresh_oidc_session_tokens(session, &tx)
2✔
623
                .await?
2✔
624
                .oidc_tokens
625
                .access
626
        };
627

628
        tx.commit().await?;
4✔
629

630
        Ok(access_token)
4✔
631
    }
8✔
632
}
633

634
#[async_trait]
635
impl<Tls> UserDb for PostgresDb<Tls>
636
where
637
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
638
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
639
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
640
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
641
{
642
    // TODO: clean up expired sessions?
643

644
    async fn logout(&self) -> Result<()> {
6✔
645
        let conn = self.conn_pool.get().await?;
6✔
646
        let stmt = conn
6✔
647
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
6✔
648
            .await?;
6✔
649

650
        conn.execute(&stmt, &[&self.session.id])
6✔
651
            .await
6✔
652
            .map_err(|_error| error::Error::LogoutFailed)?;
6✔
653
        Ok(())
6✔
654
    }
12✔
655

656
    async fn set_session_project(&self, project: ProjectId) -> Result<()> {
3✔
657
        // TODO: check permission
658

659
        let conn = self.conn_pool.get().await?;
3✔
660
        let stmt = conn
3✔
661
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
3✔
662
            .await?;
3✔
663

664
        conn.execute(&stmt, &[&project, &self.session.id]).await?;
3✔
665

666
        Ok(())
3✔
667
    }
6✔
668

669
    async fn set_session_view(&self, view: STRectangle) -> Result<()> {
3✔
670
        let conn = self.conn_pool.get().await?;
3✔
671
        let stmt = conn
3✔
672
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
3✔
673
            .await?;
3✔
674

675
        conn.execute(&stmt, &[&view, &self.session.id]).await?;
3✔
676

677
        Ok(())
3✔
678
    }
6✔
679

680
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
1✔
681
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
682

683
        let conn = self.conn_pool.get().await?;
1✔
684
        let stmt = conn
1✔
685
            .prepare(
1✔
686
                "
1✔
687
            UPDATE users SET 
1✔
688
                quota_available = quota_available - $1, 
1✔
689
                quota_used = quota_used + $1
1✔
690
            WHERE id = $2;",
1✔
691
            )
1✔
692
            .await?;
1✔
693

694
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
1✔
695

696
        Ok(())
1✔
697
    }
2✔
698

699
    async fn bulk_increment_quota_used<I: IntoIterator<Item = (UserId, u64)> + Send>(
700
        &self,
701
        quota_used_updates: I,
702
    ) -> Result<()> {
279✔
703
        ensure!(self.session.is_admin(), error::PermissionDenied);
279✔
704

705
        let conn = self.conn_pool.get().await?;
279✔
706

707
        // collect the user ids and quotas into separate vectors to pass them as parameters to the query
708
        let (users, quotas): (Vec<UserId>, Vec<i64>) = quota_used_updates
279✔
709
            .into_iter()
279✔
710
            .map(|(user, quota)| (user, quota as i64))
280✔
711
            .unzip();
279✔
712

279✔
713
        let query = "
279✔
714
            UPDATE users
279✔
715
            SET quota_available = quota_available - quota_changes.quota, 
279✔
716
                quota_used = quota_used + quota_changes.quota
279✔
717
            FROM 
279✔
718
                (SELECT * FROM UNNEST($1::uuid[], $2::bigint[]) AS t(id, quota)) AS quota_changes
279✔
719
            WHERE users.id = quota_changes.id;
279✔
720
        ";
279✔
721

279✔
722
        conn.execute(query, &[&users, &quotas]).await?;
279✔
723

724
        Ok(())
279✔
725
    }
558✔
726

727
    async fn quota_used(&self) -> Result<u64> {
9✔
728
        let conn = self.conn_pool.get().await?;
9✔
729
        let stmt = conn
9✔
730
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
9✔
731
            .await?;
9✔
732

733
        let row = conn
9✔
734
            .query_one(&stmt, &[&self.session.user.id])
9✔
735
            .await
9✔
736
            .map_err(|_error| error::Error::InvalidSession)?;
9✔
737

738
        Ok(row.get::<usize, i64>(0) as u64)
9✔
739
    }
18✔
740

741
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
7✔
742
        ensure!(
7✔
743
            self.session.user.id == *user || self.session.is_admin(),
7✔
744
            error::PermissionDenied
×
745
        );
746

747
        let conn = self.conn_pool.get().await?;
7✔
748
        let stmt = conn
7✔
749
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
750
            .await?;
7✔
751

752
        let row = conn
7✔
753
            .query_one(&stmt, &[&user])
7✔
754
            .await
7✔
755
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
756

757
        Ok(row.get::<usize, i64>(0) as u64)
7✔
758
    }
14✔
759

760
    async fn quota_available(&self) -> Result<i64> {
65✔
761
        let conn = self.conn_pool.get().await?;
65✔
762
        let stmt = conn
65✔
763
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
65✔
764
            .await?;
65✔
765

766
        let row = conn
65✔
767
            .query_one(&stmt, &[&self.session.user.id])
65✔
768
            .await
65✔
769
            .map_err(|_error| error::Error::InvalidSession)?;
65✔
770

771
        Ok(row.get::<usize, i64>(0))
65✔
772
    }
130✔
773

774
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
6✔
775
        ensure!(
6✔
776
            self.session.user.id == *user || self.session.is_admin(),
6✔
777
            error::PermissionDenied
×
778
        );
779

780
        let conn = self.conn_pool.get().await?;
6✔
781
        let stmt = conn
6✔
782
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
6✔
783
            .await?;
6✔
784

785
        let row = conn
6✔
786
            .query_one(&stmt, &[&user])
6✔
787
            .await
6✔
788
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
789

790
        Ok(row.get::<usize, i64>(0))
6✔
791
    }
12✔
792

793
    async fn log_quota_used<I: IntoIterator<Item = ComputationUnit> + Send>(
794
        &self,
795
        log: I,
796
    ) -> Result<()> {
282✔
797
        ensure!(self.session.is_admin(), error::PermissionDenied);
282✔
798

799
        let conn = self.conn_pool.get().await?;
282✔
800

801
        // collect the log into separate vectors to pass them as parameters to the query
802
        let mut users = Vec::new();
282✔
803
        let mut workflows = Vec::new();
282✔
804
        let mut computations = Vec::new();
282✔
805
        let mut operators_names = Vec::new();
282✔
806
        let mut operator_paths = Vec::new();
282✔
807
        let mut datas = Vec::new();
282✔
808

809
        for unit in log {
569✔
810
            users.push(unit.user);
287✔
811
            workflows.push(unit.workflow);
287✔
812
            computations.push(unit.computation);
287✔
813
            operators_names.push(unit.operator_name);
287✔
814
            operator_paths.push(unit.operator_path.to_string());
287✔
815
            datas.push(unit.data);
287✔
816
        }
287✔
817

818
        let query = "
282✔
819
            INSERT INTO quota_log (user_id, workflow_id, computation_id, operator_name, operator_path, data)
282✔
820
                (SELECT * FROM UNNEST($1::uuid[], $2::uuid[], $3::uuid[], $4::text[], $5::text[], $6::text[]))
282✔
821
        ";
282✔
822

282✔
823
        conn.execute(
282✔
824
            query,
282✔
825
            &[
282✔
826
                &users,
282✔
827
                &workflows,
282✔
828
                &computations,
282✔
829
                &operators_names,
282✔
830
                &operator_paths,
282✔
831
                &datas,
282✔
832
            ],
282✔
833
        )
282✔
834
        .await?;
282✔
835

836
        Ok(())
282✔
837
    }
564✔
838

839
    #[allow(clippy::too_many_lines)]
840
    async fn quota_used_by_computations(
841
        &self,
842
        offset: usize,
843
        limit: usize,
844
    ) -> Result<Vec<ComputationQuota>> {
1✔
845
        let limit = limit.min(10); // TODO: use list limit from config
1✔
846

847
        let conn = self.conn_pool.get().await?;
1✔
848

849
        let rows = conn
1✔
850
            .query(
1✔
851
                "
1✔
852
            SELECT
1✔
853
                computation_id,
1✔
854
                workflow_id,
1✔
855
                MIN(timestamp) AS timestamp,
1✔
856
                COUNT(*) AS count
1✔
857
            FROM
1✔
858
                quota_log
1✔
859
            WHERE
1✔
860
                user_id = $1
1✔
861
            GROUP BY
1✔
862
                computation_id,
1✔
863
                workflow_id
1✔
864
            ORDER BY 
1✔
865
                MIN(TIMESTAMP) DESC
1✔
866
            OFFSET $2
1✔
867
            LIMIT $3;",
1✔
868
                &[&self.session.user.id, &(offset as i64), &(limit as i64)],
1✔
869
            )
1✔
870
            .await?;
1✔
871

872
        Ok(rows
1✔
873
            .iter()
1✔
874
            .map(|row| ComputationQuota {
1✔
875
                computation_id: row.get(0),
1✔
876
                workflow_id: row.get(1),
1✔
877
                timestamp: row.get(2),
1✔
878
                count: row.get::<_, i64>(3) as u64,
1✔
879
            })
1✔
880
            .collect())
1✔
881
    }
2✔
882

883
    async fn quota_used_by_computation(&self, computation_id: Uuid) -> Result<Vec<OperatorQuota>> {
1✔
884
        let conn = self.conn_pool.get().await?;
1✔
885

886
        let rows = conn
1✔
887
            .query(
1✔
888
                "
1✔
889
            SELECT
1✔
890
                operator_name,
1✔
891
                operator_path,
1✔
892
                COUNT(*) AS count
1✔
893
            FROM
1✔
894
                quota_log
1✔
895
            WHERE
1✔
896
                user_id = $1 AND
1✔
897
                computation_id = $2
1✔
898
            GROUP BY
1✔
899
                operator_name, operator_path
1✔
900
            ORDER BY 
1✔
901
            operator_name DESC;",
1✔
902
                &[&self.session.user.id, &computation_id],
1✔
903
            )
1✔
904
            .await?;
1✔
905

906
        Ok(rows
1✔
907
            .iter()
1✔
908
            .map(|row| OperatorQuota {
1✔
909
                operator_name: row.get(0),
1✔
910
                operator_path: row.get(1),
1✔
911
                count: row.get::<_, i64>(2) as u64,
1✔
912
            })
1✔
913
            .collect())
1✔
914
    }
2✔
915

916
    async fn quota_used_on_data(&self, offset: u64, limit: u64) -> Result<Vec<DataUsage>> {
1✔
917
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
918

919
        let conn = self.conn_pool.get().await?;
1✔
920

921
        let rows = conn
1✔
922
            .query(
1✔
923
                "
1✔
924
            SELECT
1✔
925
                user_id,
1✔
926
                computation_id,
1✔
927
                data,
1✔
928
                MIN(timestamp) AS timestamp,
1✔
929
                COUNT(*) AS count
1✔
930
            FROM
1✔
931
                quota_log
1✔
932
            WHERE 
1✔
933
                data IS NOT NULL
1✔
934
            GROUP BY
1✔
935
               user_id,
1✔
936
               computation_id,
1✔
937
               workflow_id,
1✔
938
               data
1✔
939
            ORDER BY
1✔
940
                MIN(timestamp) DESC, user_id ASC, computation_id ASC, workflow_id ASC, data ASC
1✔
941
            OFFSET
1✔
942
                $1
1✔
943
            LIMIT
1✔
944
                $2;",
1✔
945
                &[&(offset as i64), &(limit as i64)],
1✔
946
            )
1✔
947
            .await?;
1✔
948

949
        Ok(rows
1✔
950
            .iter()
1✔
951
            .map(|row| DataUsage {
3✔
952
                user_id: row.get(0),
3✔
953
                computation_id: row.get(1),
3✔
954
                data: row.get(2),
3✔
955
                timestamp: row.get(3),
3✔
956
                count: row.get::<_, i64>(4) as u64,
3✔
957
            })
3✔
958
            .collect())
1✔
959
    }
2✔
960

961
    async fn quota_used_on_data_summary(
962
        &self,
963
        dataset: Option<String>,
964
        granularity: UsageSummaryGranularity,
965
        offset: u64,
966
        limit: u64,
967
    ) -> Result<Vec<DataUsageSummary>> {
1✔
968
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
969

970
        // TODO: check if user is Owner of dataset
971

972
        let conn = self.conn_pool.get().await?;
1✔
973

974
        let trunc = match granularity {
1✔
975
            UsageSummaryGranularity::Minutes => "minute",
×
976
            UsageSummaryGranularity::Hours => "hour",
×
977
            UsageSummaryGranularity::Days => "day",
×
978
            UsageSummaryGranularity::Months => "month",
×
979
            UsageSummaryGranularity::Years => "year",
1✔
980
        };
981

982
        let rows = if let Some(dataset) = dataset {
1✔
983
            conn.query(
×
984
                &format!(
×
985
                    "
×
986
            SELECT
×
987
                date_trunc('{trunc}', timestamp) AS trunc,
×
988
                data
×
989
                COUNT(*) AS count
×
990
            FROM
×
991
                quota_log
×
992
            WHERE 
×
993
                data = $3
×
994
            GROUP BY
×
995
                trunc, data
×
996
            ORDER BY
×
997
                trunc DESC, data ASC
×
998
            OFFSET
×
999
                $1
×
1000
            LIMIT 
×
1001
                $2;",
×
1002
                ),
×
1003
                &[&(offset as i64), &(limit as i64), &dataset],
×
1004
            )
×
1005
            .await?
×
1006
        } else {
1007
            conn.query(
1✔
1008
                &format!(
1✔
1009
                    "
1✔
1010
            SELECT
1✔
1011
                date_trunc('{trunc}', timestamp) AS trunc,
1✔
1012
                data,
1✔
1013
                COUNT(*) AS count
1✔
1014
            FROM
1✔
1015
                quota_log
1✔
1016
            WHERE 
1✔
1017
                data IS NOT NULL
1✔
1018
            GROUP BY
1✔
1019
                trunc, data
1✔
1020
            ORDER BY
1✔
1021
                trunc DESC, data ASC
1✔
1022
            OFFSET
1✔
1023
                $1
1✔
1024
            LIMIT 
1✔
1025
                $2;",
1✔
1026
                ),
1✔
1027
                &[&(offset as i64), &(limit as i64)],
1✔
1028
            )
1✔
1029
            .await?
1✔
1030
        };
1031

1032
        Ok(rows
1✔
1033
            .iter()
1✔
1034
            .map(|row| DataUsageSummary {
2✔
1035
                timestamp: row.get(0),
2✔
1036
                data: row.get(1),
2✔
1037
                count: row.get::<_, i64>(2) as u64,
2✔
1038
            })
2✔
1039
            .collect())
1✔
1040
    }
2✔
1041

1042
    async fn update_quota_available_by_user(
1043
        &self,
1044
        user: &UserId,
1045
        new_available_quota: i64,
1046
    ) -> Result<()> {
5✔
1047
        ensure!(self.session.is_admin(), error::PermissionDenied);
5✔
1048

1049
        let conn = self.conn_pool.get().await?;
5✔
1050
        let stmt = conn
5✔
1051
            .prepare(
5✔
1052
                "
5✔
1053
            UPDATE users SET 
5✔
1054
                quota_available = $1
5✔
1055
            WHERE id = $2;",
5✔
1056
            )
5✔
1057
            .await?;
5✔
1058

1059
        conn.execute(&stmt, &[&(new_available_quota), &user])
5✔
1060
            .await?;
5✔
1061

1062
        Ok(())
5✔
1063
    }
10✔
1064
}
1065

1066
#[async_trait]
1067
impl<Tls> RoleDb for PostgresDb<Tls>
1068
where
1069
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
1070
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
1071
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
1072
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
1073
{
1074
    async fn add_role(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
4✔
1075
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
1076

1077
        let tx = conn
4✔
1078
            .build_transaction()
4✔
1079
            .start()
4✔
1080
            .await
4✔
1081
            .context(PostgresRoleDbError)?;
4✔
1082

1083
        self.ensure_admin_in_tx(&tx)
4✔
1084
            .await
4✔
1085
            .context(PermissionDbRoleDbError)?;
4✔
1086

1087
        let id = RoleId::new();
4✔
1088

1089
        let res = tx
4✔
1090
            .execute(
4✔
1091
                "INSERT INTO roles (id, name) VALUES ($1, $2);",
4✔
1092
                &[&id, &role_name],
4✔
1093
            )
4✔
1094
            .await;
4✔
1095

1096
        if let Err(err) = res {
4✔
1097
            if err.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
×
1098
                return Err(RoleDbError::RoleAlreadyExists {
×
1099
                    role_name: role_name.to_string(),
×
1100
                });
×
1101
            }
×
1102
        }
4✔
1103

1104
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
1105

1106
        Ok(id)
4✔
1107
    }
8✔
1108

1109
    async fn load_role_by_name(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
×
1110
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
×
1111

1112
        let row = conn
×
1113
            .query_opt("SELECT id FROM roles WHERE name = $1;", &[&role_name])
×
1114
            .await
×
1115
            .context(PostgresRoleDbError)?
×
1116
            .ok_or(RoleDbError::RoleNameDoesNotExist {
×
1117
                role_name: role_name.to_string(),
×
1118
            })?;
×
1119

1120
        Ok(RoleId(row.get(0)))
×
1121
    }
×
1122

1123
    async fn remove_role(&self, role_id: &RoleId) -> Result<(), RoleDbError> {
2✔
1124
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
1125

1126
        let tx = conn
2✔
1127
            .build_transaction()
2✔
1128
            .start()
2✔
1129
            .await
2✔
1130
            .context(PostgresRoleDbError)?;
2✔
1131

1132
        self.ensure_admin_in_tx(&tx)
2✔
1133
            .await
2✔
1134
            .context(PermissionDbRoleDbError)?;
2✔
1135

1136
        let deleted = tx
2✔
1137
            .execute("DELETE FROM roles WHERE id = $1;", &[&role_id])
2✔
1138
            .await
2✔
1139
            .context(PostgresRoleDbError)?;
2✔
1140

1141
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
1142

1143
        ensure!(
2✔
1144
            deleted > 0,
2✔
1145
            RoleIdDoesNotExistRoleDbError { role_id: *role_id }
×
1146
        );
1147

1148
        Ok(())
2✔
1149
    }
4✔
1150

1151
    async fn assign_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
4✔
1152
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
1153

1154
        let tx = conn
4✔
1155
            .build_transaction()
4✔
1156
            .start()
4✔
1157
            .await
4✔
1158
            .context(PostgresRoleDbError)?;
4✔
1159

1160
        self.ensure_admin_in_tx(&tx)
4✔
1161
            .await
4✔
1162
            .context(PermissionDbRoleDbError)?;
4✔
1163

1164
        tx.execute(
4✔
1165
            "INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2) ON CONFLICT DO NOTHING;",
4✔
1166
            &[&user_id, &role_id],
4✔
1167
        )
4✔
1168
        .await
4✔
1169
        .context(PostgresRoleDbError)?;
4✔
1170

1171
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
1172

1173
        Ok(())
4✔
1174
    }
8✔
1175

1176
    async fn revoke_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
2✔
1177
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
1178

1179
        let tx = conn
2✔
1180
            .build_transaction()
2✔
1181
            .start()
2✔
1182
            .await
2✔
1183
            .context(PostgresRoleDbError)?;
2✔
1184

1185
        self.ensure_admin_in_tx(&tx)
2✔
1186
            .await
2✔
1187
            .context(PermissionDbRoleDbError)?;
2✔
1188

1189
        let deleted = tx
2✔
1190
            .execute(
2✔
1191
                "DELETE FROM user_roles WHERE user_id= $1 AND role_id = $2;",
2✔
1192
                &[&user_id, &role_id],
2✔
1193
            )
2✔
1194
            .await
2✔
1195
            .context(PostgresRoleDbError)?;
2✔
1196

1197
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
1198

1199
        ensure!(
2✔
1200
            deleted > 0,
2✔
1201
            CannotRevokeRoleThatIsNotAssignedRoleDbError { role_id: *role_id }
×
1202
        );
1203

1204
        Ok(())
2✔
1205
    }
4✔
1206

1207
    async fn get_role_descriptions(
1208
        &self,
1209
        user_id: &UserId,
1210
    ) -> Result<Vec<RoleDescription>, RoleDbError> {
5✔
1211
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
5✔
1212

1213
        let stmt = conn
5✔
1214
            .prepare(
5✔
1215
                "SELECT roles.id, roles.name \
5✔
1216
                FROM roles JOIN user_roles ON (roles.id=user_roles.role_id) \
5✔
1217
                WHERE user_roles.user_id=$1 \
5✔
1218
                ORDER BY roles.name;",
5✔
1219
            )
5✔
1220
            .await
5✔
1221
            .context(PostgresRoleDbError)?;
5✔
1222

1223
        let results = conn
5✔
1224
            .query(&stmt, &[&user_id])
5✔
1225
            .await
5✔
1226
            .context(PostgresRoleDbError)?;
5✔
1227

1228
        let mut result_vec = Vec::new();
5✔
1229

1230
        for result in results {
17✔
1231
            let id = result.get(0);
12✔
1232
            let name = result.get(1);
12✔
1233
            let individual = UserId(id) == *user_id;
12✔
1234
            result_vec.push(RoleDescription {
12✔
1235
                role: Role {
12✔
1236
                    id: RoleId(id),
12✔
1237
                    name,
12✔
1238
                },
12✔
1239
                individual,
12✔
1240
            });
12✔
1241
        }
12✔
1242

1243
        Ok(result_vec)
5✔
1244
    }
10✔
1245
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc