• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 8167499361

06 Mar 2024 05:40AM UTC coverage: 90.514% (-0.07%) from 90.58%
8167499361

push

github

web-flow
Merge pull request #936 from geo-engine/manage_permissions

Manage permissions

261 of 404 new or added lines in 16 files covered. (64.6%)

25 existing lines in 8 files now uncovered.

128619 of 142098 relevant lines covered (90.51%)

54399.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.57
/services/src/pro/users/postgres_userdb.rs
1
use crate::contexts::SessionId;
2
use crate::error::Result;
3
use crate::pro::contexts::ProPostgresDb;
4
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
5
use crate::pro::permissions::{Role, RoleDescription, RoleId};
6
use crate::pro::users::oidc::ExternalUserClaims;
7
use crate::pro::users::userdb::{
8
    CannotRevokeRoleThatIsNotAssignedRoleDbError, RoleIdDoesNotExistRoleDbError,
9
};
10
use crate::pro::users::{
11
    User, UserCredentials, UserDb, UserId, UserInfo, UserRegistration, UserSession,
12
};
13
use crate::projects::{ProjectId, STRectangle};
14
use crate::util::Identifier;
15
use crate::{error, pro::contexts::ProPostgresContext};
16
use async_trait::async_trait;
17

18
use bb8_postgres::{
19
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
20
};
21
use geoengine_datatypes::primitives::Duration;
22
use pwhash::bcrypt;
23
use snafu::{ensure, ResultExt};
24
use uuid::Uuid;
25

26
use super::userdb::{
27
    Bb8RoleDbError, PermissionDbRoleDbError, PostgresRoleDbError, RoleDb, RoleDbError, UserAuth,
28
};
29

30
#[async_trait]
31
impl<Tls> UserAuth for ProPostgresContext<Tls>
32
where
33
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
34
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
35
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
36
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
37
{
38
    // TODO: clean up expired sessions?
39

40
    async fn register_user(&self, user: UserRegistration) -> Result<UserId> {
43✔
41
        let mut conn = self.pool.get().await?;
43✔
42

43
        let tx = conn.build_transaction().start().await?;
43✔
44

45
        let user = User::from(user);
43✔
46

47
        let stmt = tx
43✔
48
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
43✔
49
            .await?;
40✔
50
        let res = tx.execute(&stmt, &[&user.id, &user.email]).await;
43✔
51

52
        if let Err(e) = res {
43✔
53
            if e.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
15✔
54
                return Err(error::Error::Duplicate {
15✔
55
                    reason: "E-mail already exists".to_string(),
15✔
56
                });
15✔
57
            }
×
58
            return Err(e.into());
×
59
        }
28✔
60

61
        let stmt = tx
28✔
62
            .prepare(
28✔
63
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
28✔
64
            )
28✔
65
            .await?;
25✔
66

67
        let quota_available =
28✔
68
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
28✔
69
                .initial_credits;
70

71
        tx.execute(
28✔
72
            &stmt,
28✔
73
            &[
28✔
74
                &user.id,
28✔
75
                &user.email,
28✔
76
                &user.password_hash,
28✔
77
                &user.real_name,
28✔
78
                &quota_available,
28✔
79
                &user.active,
28✔
80
            ],
28✔
81
        )
28✔
82
        .await?;
24✔
83

84
        let stmt = tx
28✔
85
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
28✔
86
            .await?;
24✔
87
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
28✔
88

89
        let stmt = tx
28✔
90
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
28✔
91
            .await?;
24✔
92
        tx.execute(&stmt, &[&user.id, &Role::registered_user_role_id()])
28✔
93
            .await?;
24✔
94

95
        tx.commit().await?;
28✔
96

97
        Ok(user.id)
28✔
98
    }
129✔
99

100
    async fn create_anonymous_session(&self) -> Result<UserSession> {
42✔
101
        let mut conn = self.pool.get().await?;
42✔
102

103
        let tx = conn.build_transaction().start().await?;
42✔
104

105
        let user_id = UserId::new();
42✔
106

107
        let stmt = tx
42✔
108
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
42✔
109
            .await?;
29✔
110
        tx.execute(&stmt, &[&user_id, &format!("anonymous_user_{user_id}")])
42✔
111
            .await?;
29✔
112

113
        let quota_available =
42✔
114
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
42✔
115
                .initial_credits;
116

117
        let stmt = tx
42✔
118
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
42✔
119
            .await?;
29✔
120

121
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
42✔
122

123
        let stmt = tx
42✔
124
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
42✔
125
            .await?;
28✔
126
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
42✔
127

128
        let stmt = tx
42✔
129
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
42✔
130
            .await?;
28✔
131
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
42✔
132
            .await?;
28✔
133

134
        let session_id = SessionId::new();
42✔
135
        let stmt = tx
42✔
136
            .prepare(
42✔
137
                "
42✔
138
                INSERT INTO sessions (id)
42✔
139
                VALUES ($1);",
42✔
140
            )
42✔
141
            .await?;
28✔
142

143
        tx.execute(&stmt, &[&session_id]).await?;
42✔
144

145
        let stmt = tx
42✔
146
            .prepare(
42✔
147
                "
42✔
148
            INSERT INTO 
42✔
149
                user_sessions (user_id, session_id, created, valid_until) 
42✔
150
            VALUES 
42✔
151
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
42✔
152
            RETURNING 
42✔
153
                created, valid_until;",
42✔
154
            )
42✔
155
            .await?;
28✔
156

157
        // TODO: load from config
158
        let session_duration = chrono::Duration::days(30);
42✔
159
        let row = tx
42✔
160
            .query_one(
42✔
161
                &stmt,
42✔
162
                &[
42✔
163
                    &user_id,
42✔
164
                    &session_id,
42✔
165
                    &(session_duration.num_seconds() as f64),
42✔
166
                ],
42✔
167
            )
42✔
168
            .await?;
29✔
169

170
        tx.commit().await?;
42✔
171

172
        Ok(UserSession {
42✔
173
            id: session_id,
42✔
174
            user: UserInfo {
42✔
175
                id: user_id,
42✔
176
                email: None,
42✔
177
                real_name: None,
42✔
178
            },
42✔
179
            created: row.get(0),
42✔
180
            valid_until: row.get(1),
42✔
181
            project: None,
42✔
182
            view: None,
42✔
183
            roles: vec![user_id.into(), Role::anonymous_role_id()],
42✔
184
        })
42✔
185
    }
126✔
186

187
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
61✔
188
        let mut conn = self.pool.get().await?;
61✔
189

190
        let tx = conn.build_transaction().start().await?;
61✔
191

192
        let stmt = tx
61✔
193
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
61✔
194
            .await?;
48✔
195

196
        let row = tx
61✔
197
            .query_one(&stmt, &[&user_credentials.email])
61✔
198
            .await
47✔
199
            .map_err(|_error| error::Error::LoginFailed)?;
61✔
200

201
        let user_id = UserId(row.get(0));
61✔
202
        let password_hash = row.get(1);
61✔
203
        let email = row.get(2);
61✔
204
        let real_name = row.get(3);
61✔
205

61✔
206
        if bcrypt::verify(user_credentials.password, password_hash) {
61✔
207
            let session_id = SessionId::new();
60✔
208
            let stmt = tx
60✔
209
                .prepare(
60✔
210
                    "
60✔
211
                INSERT INTO sessions (id)
60✔
212
                VALUES ($1);",
60✔
213
                )
60✔
214
                .await?;
58✔
215

216
            tx.execute(&stmt, &[&session_id]).await?;
60✔
217

218
            // TODO: load from config
219
            let session_duration = chrono::Duration::days(30);
60✔
220
            let stmt = tx
60✔
221
                .prepare(
60✔
222
                    "
60✔
223
                INSERT INTO 
60✔
224
                    user_sessions (user_id, session_id, created, valid_until) 
60✔
225
                VALUES 
60✔
226
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
60✔
227
                RETURNING 
60✔
228
                    created, valid_until;",
60✔
229
                )
60✔
230
                .await?;
53✔
231
            let row = tx
60✔
232
                .query_one(
60✔
233
                    &stmt,
60✔
234
                    &[
60✔
235
                        &user_id,
60✔
236
                        &session_id,
60✔
237
                        &(session_duration.num_seconds() as f64),
60✔
238
                    ],
60✔
239
                )
60✔
240
                .await?;
52✔
241

242
            let stmt = tx
60✔
243
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
60✔
244
                .await?;
53✔
245

246
            let rows = tx
60✔
247
                .query(&stmt, &[&user_id])
60✔
248
                .await
51✔
249
                .map_err(|_error| error::Error::LoginFailed)?;
60✔
250

251
            tx.commit().await?;
60✔
252

253
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
92✔
254

60✔
255
            Ok(UserSession {
60✔
256
                id: session_id,
60✔
257
                user: UserInfo {
60✔
258
                    id: user_id,
60✔
259
                    email,
60✔
260
                    real_name,
60✔
261
                },
60✔
262
                created: row.get(0),
60✔
263
                valid_until: row.get(1),
60✔
264
                project: None,
60✔
265
                view: None,
60✔
266
                roles,
60✔
267
            })
60✔
268
        } else {
269
            Err(error::Error::LoginFailed)
1✔
270
        }
271
    }
183✔
272

273
    #[allow(clippy::too_many_lines)]
274
    async fn login_external(
4✔
275
        &self,
4✔
276
        user: ExternalUserClaims,
4✔
277
        duration: Duration,
4✔
278
    ) -> Result<UserSession> {
4✔
279
        let mut conn = self.pool.get().await?;
4✔
280
        let tx = conn.build_transaction().start().await?;
4✔
281

282
        let stmt = tx
4✔
283
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
4✔
284
            .await?;
3✔
285

286
        let row = tx
4✔
287
            .query_opt(&stmt, &[&user.external_id.to_string()])
4✔
288
            .await
4✔
289
            .map_err(|_error| error::Error::LoginFailed)?;
4✔
290

291
        let user_id = match row {
4✔
292
            Some(row) => UserId(row.get(0)),
2✔
293
            None => {
294
                let user_id = UserId::new();
2✔
295

296
                let stmt = tx
2✔
297
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
2✔
298
                    .await?;
2✔
299
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
2✔
300

301
                let quota_available =
2✔
302
                    crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
2✔
303
                        .initial_credits;
304

305
                //TODO: Inconsistent to hashmap implementation, where an external user is not part of the user database.
306
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
307
                let stmt = tx
2✔
308
                    .prepare(
2✔
309
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
2✔
310
                    )
2✔
311
                    .await?;
2✔
312
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
2✔
313

314
                let stmt = tx
2✔
315
                    .prepare(
2✔
316
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
2✔
317
                    )
2✔
318
                    .await?;
2✔
319

320
                tx.execute(
2✔
321
                    &stmt,
2✔
322
                    &[
2✔
323
                        &user_id,
2✔
324
                        &user.external_id.to_string(),
2✔
325
                        &user.email,
2✔
326
                        &user.real_name,
2✔
327
                        &true,
2✔
328
                    ],
2✔
329
                )
2✔
330
                .await?;
2✔
331

332
                let stmt = tx
2✔
333
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
2✔
334
                    .await?;
2✔
335
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
2✔
336

337
                let stmt = tx
2✔
338
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
2✔
339
                    .await?;
2✔
340
                tx.execute(&stmt, &[&user_id, &Role::registered_user_role_id()])
2✔
341
                    .await?;
2✔
342

343
                user_id
2✔
344
            }
345
        };
346

347
        let session_id = SessionId::new();
4✔
348
        let stmt = tx
4✔
349
            .prepare(
4✔
350
                "
4✔
351
            INSERT INTO sessions (id)
4✔
352
            VALUES ($1);",
4✔
353
            )
4✔
354
            .await?; //TODO: Check documentation if inconsistent to hashmap implementation - would happen if CURRENT_TIMESTAMP is called twice in postgres for a single query. Worked in tests.
4✔
355

356
        tx.execute(&stmt, &[&session_id]).await?;
4✔
357

358
        let stmt = tx
4✔
359
            .prepare(
4✔
360
                "
4✔
361
            INSERT INTO 
4✔
362
                user_sessions (user_id, session_id, created, valid_until) 
4✔
363
            VALUES 
4✔
364
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
4✔
365
            RETURNING 
4✔
366
                created, valid_until;",
4✔
367
            )
4✔
368
            .await?;
4✔
369
        let row = tx
4✔
370
            .query_one(
4✔
371
                &stmt,
4✔
372
                &[&user_id, &session_id, &(duration.num_seconds() as f64)],
4✔
373
            )
4✔
374
            .await?;
4✔
375

376
        let stmt = tx
4✔
377
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
4✔
378
            .await?;
4✔
379

380
        let rows = tx
4✔
381
            .query(&stmt, &[&user_id])
4✔
382
            .await
4✔
383
            .map_err(|_error| error::Error::LoginFailed)?;
4✔
384

385
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
8✔
386

4✔
387
        tx.commit().await?;
4✔
388

389
        Ok(UserSession {
4✔
390
            id: session_id,
4✔
391
            user: UserInfo {
4✔
392
                id: user_id,
4✔
393
                email: Some(user.email.clone()),
4✔
394
                real_name: Some(user.real_name.clone()),
4✔
395
            },
4✔
396
            created: row.get(0),
4✔
397
            valid_until: row.get(1),
4✔
398
            project: None,
4✔
399
            view: None,
4✔
400
            roles,
4✔
401
        })
4✔
402
    }
12✔
403

404
    async fn user_session_by_id(&self, session: SessionId) -> Result<UserSession> {
54✔
405
        let mut conn = self.pool.get().await?;
54✔
406

407
        let tx = conn.build_transaction().start().await?;
54✔
408

409
        let stmt = tx
54✔
410
            .prepare(
54✔
411
                "
54✔
412
            SELECT 
54✔
413
                u.id,   
54✔
414
                u.email,
54✔
415
                u.real_name,             
54✔
416
                us.created, 
54✔
417
                us.valid_until, 
54✔
418
                s.project_id,
54✔
419
                s.view
54✔
420
            FROM 
54✔
421
                sessions s JOIN user_sessions us ON (s.id = us.session_id) 
54✔
422
                    JOIN users u ON (us.user_id = u.id)
54✔
423
            WHERE s.id = $1 AND CURRENT_TIMESTAMP < us.valid_until;",
54✔
424
            )
54✔
425
            .await?;
273✔
426

427
        let row = tx
54✔
428
            .query_one(&stmt, &[&session])
54✔
429
            .await
47✔
430
            .map_err(|_error| error::Error::InvalidSession)?;
54✔
431

432
        let mut session = UserSession {
48✔
433
            id: session,
48✔
434
            user: UserInfo {
48✔
435
                id: row.get(0),
48✔
436
                email: row.get(1),
48✔
437
                real_name: row.get(2),
48✔
438
            },
48✔
439
            created: row.get(3),
48✔
440
            valid_until: row.get(4),
48✔
441
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
48✔
442
            view: row.get(6),
48✔
443
            roles: vec![],
48✔
444
        };
48✔
445

446
        let stmt = tx
48✔
447
            .prepare(
48✔
448
                "
48✔
449
            SELECT role_id FROM user_roles WHERE user_id = $1;
48✔
450
            ",
48✔
451
            )
48✔
452
            .await?;
40✔
453

454
        let rows = tx.query(&stmt, &[&session.user.id]).await?;
48✔
455

456
        session.roles = rows.into_iter().map(|row| row.get(0)).collect();
82✔
457

48✔
458
        Ok(session)
48✔
459
    }
162✔
460
}
461

462
#[async_trait]
463
impl<Tls> UserDb for ProPostgresDb<Tls>
464
where
465
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
466
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
467
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
468
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
469
{
470
    // TODO: clean up expired sessions?
471

472
    async fn logout(&self) -> Result<()> {
6✔
473
        let conn = self.conn_pool.get().await?;
6✔
474
        let stmt = conn
6✔
475
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
6✔
476
            .await?;
6✔
477

478
        conn.execute(&stmt, &[&self.session.id])
6✔
479
            .await
6✔
480
            .map_err(|_error| error::Error::LogoutFailed)?;
6✔
481
        Ok(())
6✔
482
    }
18✔
483

484
    async fn set_session_project(&self, project: ProjectId) -> Result<()> {
3✔
485
        // TODO: check permission
486

487
        let conn = self.conn_pool.get().await?;
3✔
488
        let stmt = conn
3✔
489
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
3✔
490
            .await?;
3✔
491

492
        conn.execute(&stmt, &[&project, &self.session.id]).await?;
3✔
493

494
        Ok(())
3✔
495
    }
9✔
496

497
    async fn set_session_view(&self, view: STRectangle) -> Result<()> {
3✔
498
        let conn = self.conn_pool.get().await?;
3✔
499
        let stmt = conn
3✔
500
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
3✔
501
            .await?;
1✔
502

503
        conn.execute(&stmt, &[&view, &self.session.id]).await?;
3✔
504

505
        Ok(())
3✔
506
    }
9✔
507

508
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
1✔
509
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
510

511
        let conn = self.conn_pool.get().await?;
1✔
512
        let stmt = conn
1✔
513
            .prepare(
1✔
514
                "
1✔
515
            UPDATE users SET 
1✔
516
                quota_available = quota_available - $1, 
1✔
517
                quota_used = quota_used + $1
1✔
518
            WHERE id = $2;",
1✔
519
            )
1✔
520
            .await?;
1✔
521

522
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
1✔
523

524
        Ok(())
1✔
525
    }
3✔
526

527
    async fn bulk_increment_quota_used<I: IntoIterator<Item = (UserId, u64)> + Send>(
12,915✔
528
        &self,
12,915✔
529
        quota_used_updates: I,
12,915✔
530
    ) -> Result<()> {
12,915✔
531
        ensure!(self.session.is_admin(), error::PermissionDenied);
12,915✔
532

533
        let conn = self.conn_pool.get().await?;
12,917✔
534

535
        // collect the user ids and quotas into separate vectors to pass them as parameters to the query
536
        let (users, quotas): (Vec<UserId>, Vec<i64>) = quota_used_updates
12,914✔
537
            .into_iter()
12,914✔
538
            .map(|(user, quota)| (user, quota as i64))
12,914✔
539
            .unzip();
12,914✔
540

12,914✔
541
        let query = "
12,914✔
542
            UPDATE users
12,914✔
543
            SET quota_available = quota_available - quota_changes.quota, 
12,914✔
544
                quota_used = quota_used + quota_changes.quota
12,914✔
545
            FROM 
12,914✔
546
                (SELECT * FROM UNNEST($1::uuid[], $2::bigint[]) AS t(id, quota)) AS quota_changes
12,914✔
547
            WHERE users.id = quota_changes.id;
12,914✔
548
        ";
12,914✔
549

12,914✔
550
        conn.execute(query, &[&users, &quotas]).await?;
25,812✔
551

552
        Ok(())
12,900✔
553
    }
38,743✔
554

555
    async fn quota_used(&self) -> Result<u64> {
8✔
556
        let conn = self.conn_pool.get().await?;
10✔
557
        let stmt = conn
8✔
558
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
8✔
559
            .await?;
8✔
560

561
        let row = conn
8✔
562
            .query_one(&stmt, &[&self.session.user.id])
8✔
563
            .await
7✔
564
            .map_err(|_error| error::Error::InvalidSession)?;
8✔
565

566
        Ok(row.get::<usize, i64>(0) as u64)
8✔
567
    }
24✔
568

569
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
7✔
570
        ensure!(
7✔
571
            self.session.user.id == *user || self.session.is_admin(),
7✔
572
            error::PermissionDenied
×
573
        );
574

575
        let conn = self.conn_pool.get().await?;
7✔
576
        let stmt = conn
7✔
577
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
578
            .await?;
4✔
579

580
        let row = conn
7✔
581
            .query_one(&stmt, &[&user])
7✔
582
            .await
4✔
583
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
584

585
        Ok(row.get::<usize, i64>(0) as u64)
7✔
586
    }
21✔
587

588
    async fn quota_available(&self) -> Result<i64> {
27✔
589
        let conn = self.conn_pool.get().await?;
33✔
590
        let stmt = conn
27✔
591
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
27✔
592
            .await?;
27✔
593

594
        let row = conn
27✔
595
            .query_one(&stmt, &[&self.session.user.id])
27✔
596
            .await
26✔
597
            .map_err(|_error| error::Error::InvalidSession)?;
27✔
598

599
        Ok(row.get::<usize, i64>(0))
27✔
600
    }
81✔
601

602
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
6✔
603
        ensure!(
6✔
604
            self.session.user.id == *user || self.session.is_admin(),
6✔
605
            error::PermissionDenied
×
606
        );
607

608
        let conn = self.conn_pool.get().await?;
6✔
609
        let stmt = conn
6✔
610
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
6✔
611
            .await?;
6✔
612

613
        let row = conn
6✔
614
            .query_one(&stmt, &[&user])
6✔
615
            .await
6✔
616
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
617

618
        Ok(row.get::<usize, i64>(0))
6✔
619
    }
18✔
620

621
    async fn update_quota_available_by_user(
5✔
622
        &self,
5✔
623
        user: &UserId,
5✔
624
        new_available_quota: i64,
5✔
625
    ) -> Result<()> {
5✔
626
        ensure!(self.session.is_admin(), error::PermissionDenied);
5✔
627

628
        let conn = self.conn_pool.get().await?;
5✔
629
        let stmt = conn
5✔
630
            .prepare(
5✔
631
                "
5✔
632
            UPDATE users SET 
5✔
633
                quota_available = $1
5✔
634
            WHERE id = $2;",
5✔
635
            )
5✔
636
            .await?;
5✔
637

638
        conn.execute(&stmt, &[&(new_available_quota), &user])
5✔
639
            .await?;
5✔
640

641
        Ok(())
5✔
642
    }
15✔
643
}
644

645
#[async_trait]
646
impl<Tls> RoleDb for ProPostgresDb<Tls>
647
where
648
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
649
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
650
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
651
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
652
{
653
    async fn add_role(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
4✔
654
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
655

656
        let tx = conn
4✔
657
            .build_transaction()
4✔
658
            .start()
4✔
659
            .await
3✔
660
            .context(PostgresRoleDbError)?;
4✔
661

662
        self.ensure_admin_in_tx(&tx)
4✔
NEW
663
            .await
×
664
            .context(PermissionDbRoleDbError)?;
4✔
665

666
        let id = RoleId::new();
4✔
667

668
        let res = tx
4✔
669
            .execute(
4✔
670
                "INSERT INTO roles (id, name) VALUES ($1, $2);",
4✔
671
                &[&id, &role_name],
4✔
672
            )
4✔
673
            .await;
6✔
674

675
        if let Err(err) = res {
4✔
NEW
676
            if err.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
×
NEW
677
                return Err(RoleDbError::RoleAlreadyExists {
×
NEW
678
                    role_name: role_name.to_string(),
×
NEW
679
                });
×
NEW
680
            }
×
681
        }
4✔
682

683
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
684

685
        Ok(id)
4✔
686
    }
12✔
687

NEW
688
    async fn load_role_by_name(&self, role_name: &str) -> Result<RoleId, RoleDbError> {
×
NEW
689
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
×
690

NEW
691
        let row = conn
×
NEW
692
            .query_opt("SELECT id FROM roles WHERE name = $1;", &[&role_name])
×
NEW
693
            .await
×
NEW
694
            .context(PostgresRoleDbError)?
×
NEW
695
            .ok_or(RoleDbError::RoleNameDoesNotExist {
×
NEW
696
                role_name: role_name.to_string(),
×
NEW
697
            })?;
×
698

NEW
699
        Ok(RoleId(row.get(0)))
×
NEW
700
    }
×
701

702
    async fn remove_role(&self, role_id: &RoleId) -> Result<(), RoleDbError> {
2✔
703
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
704

705
        let tx = conn
2✔
706
            .build_transaction()
2✔
707
            .start()
2✔
708
            .await
2✔
709
            .context(PostgresRoleDbError)?;
2✔
710

711
        self.ensure_admin_in_tx(&tx)
2✔
NEW
712
            .await
×
713
            .context(PermissionDbRoleDbError)?;
2✔
714

715
        let deleted = tx
2✔
716
            .execute("DELETE FROM roles WHERE id = $1;", &[&role_id])
2✔
717
            .await
4✔
718
            .context(PostgresRoleDbError)?;
2✔
719

720
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
721

722
        ensure!(
2✔
723
            deleted > 0,
2✔
NEW
724
            RoleIdDoesNotExistRoleDbError { role_id: *role_id }
×
725
        );
726

727
        Ok(())
2✔
728
    }
6✔
729

730
    async fn assign_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
4✔
731
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
4✔
732

733
        let tx = conn
4✔
734
            .build_transaction()
4✔
735
            .start()
4✔
736
            .await
3✔
737
            .context(PostgresRoleDbError)?;
4✔
738

739
        self.ensure_admin_in_tx(&tx)
4✔
NEW
740
            .await
×
741
            .context(PermissionDbRoleDbError)?;
4✔
742

743
        tx.execute(
4✔
744
            "INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2) ON CONFLICT DO NOTHING;",
4✔
745
            &[&user_id, &role_id],
4✔
746
        )
4✔
747
        .await
6✔
748
        .context(PostgresRoleDbError)?;
4✔
749

750
        tx.commit().await.context(PostgresRoleDbError)?;
4✔
751

752
        Ok(())
4✔
753
    }
12✔
754

755
    async fn revoke_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<(), RoleDbError> {
2✔
756
        let mut conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
2✔
757

758
        let tx = conn
2✔
759
            .build_transaction()
2✔
760
            .start()
2✔
NEW
761
            .await
×
762
            .context(PostgresRoleDbError)?;
2✔
763

764
        self.ensure_admin_in_tx(&tx)
2✔
NEW
765
            .await
×
766
            .context(PermissionDbRoleDbError)?;
2✔
767

768
        let deleted = tx
2✔
769
            .execute(
2✔
770
                "DELETE FROM user_roles WHERE user_id= $1 AND role_id = $2;",
2✔
771
                &[&user_id, &role_id],
2✔
772
            )
2✔
773
            .await
2✔
774
            .context(PostgresRoleDbError)?;
2✔
775

776
        tx.commit().await.context(PostgresRoleDbError)?;
2✔
777

778
        ensure!(
2✔
779
            deleted > 0,
2✔
NEW
780
            CannotRevokeRoleThatIsNotAssignedRoleDbError { role_id: *role_id }
×
781
        );
782

783
        Ok(())
2✔
784
    }
6✔
785

786
    async fn get_role_descriptions(
5✔
787
        &self,
5✔
788
        user_id: &UserId,
5✔
789
    ) -> Result<Vec<RoleDescription>, RoleDbError> {
5✔
790
        let conn = self.conn_pool.get().await.context(Bb8RoleDbError)?;
5✔
791

792
        let stmt = conn
5✔
793
            .prepare(
5✔
794
                "SELECT roles.id, roles.name \
5✔
795
                FROM roles JOIN user_roles ON (roles.id=user_roles.role_id) \
5✔
796
                WHERE user_roles.user_id=$1 \
5✔
797
                ORDER BY roles.name;",
5✔
798
            )
5✔
799
            .await
3✔
800
            .context(PostgresRoleDbError)?;
5✔
801

802
        let results = conn
5✔
803
            .query(&stmt, &[&user_id])
5✔
804
            .await
3✔
805
            .context(PostgresRoleDbError)?;
5✔
806

807
        let mut result_vec = Vec::new();
5✔
808

809
        for result in results {
17✔
810
            let id = result.get(0);
12✔
811
            let name = result.get(1);
12✔
812
            let individual = UserId(id) == *user_id;
12✔
813
            result_vec.push(RoleDescription {
12✔
814
                role: Role {
12✔
815
                    id: RoleId(id),
12✔
816
                    name,
12✔
817
                },
12✔
818
                individual,
12✔
819
            });
12✔
820
        }
12✔
821

822
        Ok(result_vec)
5✔
823
    }
15✔
824
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc