• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5939894039

22 Aug 2023 02:03PM UTC coverage: 89.809% (+0.002%) from 89.807%
5939894039

push

github

web-flow
Merge pull request #861 from geo-engine/admin_session_token_remains

remove remains of admin_session_token

105571 of 117551 relevant lines covered (89.81%)

61593.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.25
/services/src/pro/users/postgres_userdb.rs
1
use crate::contexts::SessionId;
2
use crate::error::Result;
3
use crate::pro::contexts::ProPostgresDb;
4
use crate::pro::permissions::{Role, RoleId};
5
use crate::pro::users::oidc::ExternalUserClaims;
6
use crate::pro::users::{
7
    User, UserCredentials, UserDb, UserId, UserInfo, UserRegistration, UserSession,
8
};
9
use crate::projects::{ProjectId, STRectangle};
10
use crate::util::Identifier;
11
use crate::{error, pro::contexts::ProPostgresContext};
12
use async_trait::async_trait;
13

14
use bb8_postgres::{
15
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
16
};
17
use geoengine_datatypes::primitives::Duration;
18
use pwhash::bcrypt;
19
use snafu::ensure;
20
use uuid::Uuid;
21

22
use super::userdb::{RoleDb, UserAuth};
23

24
#[async_trait]
25
impl<Tls> UserAuth for ProPostgresContext<Tls>
26
where
27
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
28
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
29
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
30
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
31
{
32
    // TODO: clean up expired sessions?
33

34
    async fn register_user(&self, user: UserRegistration) -> Result<UserId> {
42✔
35
        let mut conn = self.pool.get().await?;
42✔
36

37
        let tx = conn.build_transaction().start().await?;
42✔
38

39
        let user = User::from(user);
42✔
40

41
        let stmt = tx
42✔
42
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
42✔
43
            .await?;
28✔
44
        let res = tx.execute(&stmt, &[&user.id, &user.email]).await;
42✔
45

46
        if let Err(e) = res {
42✔
47
            if e.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
1✔
48
                return Err(error::Error::Duplicate {
1✔
49
                    reason: "E-mail already exists".to_string(),
1✔
50
                });
1✔
51
            }
×
52
            return Err(e.into());
×
53
        }
41✔
54

55
        let stmt = tx
41✔
56
            .prepare(
41✔
57
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
41✔
58
            )
41✔
59
            .await?;
24✔
60

61
        let quota_available =
41✔
62
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
41✔
63
                .default_available_quota;
64

65
        tx.execute(
41✔
66
            &stmt,
41✔
67
            &[
41✔
68
                &user.id,
41✔
69
                &user.email,
41✔
70
                &user.password_hash,
41✔
71
                &user.real_name,
41✔
72
                &quota_available,
41✔
73
                &user.active,
41✔
74
            ],
41✔
75
        )
41✔
76
        .await?;
22✔
77

78
        let stmt = tx
41✔
79
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
41✔
80
            .await?;
25✔
81
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
41✔
82

83
        let stmt = tx
41✔
84
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
41✔
85
            .await?;
25✔
86
        tx.execute(&stmt, &[&user.id, &Role::registered_user_role_id()])
41✔
87
            .await?;
24✔
88

89
        tx.commit().await?;
41✔
90

91
        Ok(user.id)
41✔
92
    }
84✔
93

94
    async fn create_anonymous_session(&self) -> Result<UserSession> {
27✔
95
        let mut conn = self.pool.get().await?;
27✔
96

97
        let tx = conn.build_transaction().start().await?;
27✔
98

99
        let user_id = UserId::new();
27✔
100

101
        let stmt = tx
27✔
102
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
27✔
103
            .await?;
18✔
104
        tx.execute(&stmt, &[&user_id, &format!("anonymous_user_{user_id}")])
27✔
105
            .await?;
17✔
106

107
        let quota_available =
27✔
108
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
27✔
109
                .default_available_quota;
110

111
        let stmt = tx
27✔
112
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
27✔
113
            .await?;
17✔
114

115
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
27✔
116

117
        let stmt = tx
27✔
118
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
27✔
119
            .await?;
17✔
120
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
27✔
121

122
        let stmt = tx
27✔
123
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
27✔
124
            .await?;
17✔
125
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
27✔
126
            .await?;
17✔
127

128
        let session_id = SessionId::new();
27✔
129
        let stmt = tx
27✔
130
            .prepare(
27✔
131
                "
27✔
132
                INSERT INTO sessions (id)
27✔
133
                VALUES ($1);",
27✔
134
            )
27✔
135
            .await?;
17✔
136

137
        tx.execute(&stmt, &[&session_id]).await?;
27✔
138

139
        let stmt = tx
27✔
140
            .prepare(
27✔
141
                "
27✔
142
            INSERT INTO 
27✔
143
                user_sessions (user_id, session_id, created, valid_until) 
27✔
144
            VALUES 
27✔
145
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
27✔
146
            RETURNING 
27✔
147
                created, valid_until;",
27✔
148
            )
27✔
149
            .await?;
17✔
150

151
        // TODO: load from config
152
        let session_duration = chrono::Duration::days(30);
27✔
153
        let row = tx
27✔
154
            .query_one(
27✔
155
                &stmt,
27✔
156
                &[
27✔
157
                    &user_id,
27✔
158
                    &session_id,
27✔
159
                    &(session_duration.num_seconds() as f64),
27✔
160
                ],
27✔
161
            )
27✔
162
            .await?;
18✔
163

164
        tx.commit().await?;
27✔
165

166
        Ok(UserSession {
27✔
167
            id: session_id,
27✔
168
            user: UserInfo {
27✔
169
                id: user_id,
27✔
170
                email: None,
27✔
171
                real_name: None,
27✔
172
            },
27✔
173
            created: row.get(0),
27✔
174
            valid_until: row.get(1),
27✔
175
            project: None,
27✔
176
            view: None,
27✔
177
            roles: vec![user_id.into(), Role::anonymous_role_id()],
27✔
178
        })
27✔
179
    }
54✔
180

181
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
50✔
182
        let mut conn = self.pool.get().await?;
50✔
183

184
        let tx = conn.build_transaction().start().await?;
50✔
185

186
        let stmt = tx
50✔
187
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
50✔
188
            .await?;
34✔
189

190
        let row = tx
50✔
191
            .query_one(&stmt, &[&user_credentials.email])
50✔
192
            .await
34✔
193
            .map_err(|_error| error::Error::LoginFailed)?;
50✔
194

195
        let user_id = UserId(row.get(0));
50✔
196
        let password_hash = row.get(1);
50✔
197
        let email = row.get(2);
50✔
198
        let real_name = row.get(3);
50✔
199

50✔
200
        if bcrypt::verify(user_credentials.password, password_hash) {
50✔
201
            let session_id = SessionId::new();
49✔
202
            let stmt = tx
49✔
203
                .prepare(
49✔
204
                    "
49✔
205
                INSERT INTO sessions (id)
49✔
206
                VALUES ($1);",
49✔
207
                )
49✔
208
                .await?;
32✔
209

210
            tx.execute(&stmt, &[&session_id]).await?;
49✔
211

212
            // TODO: load from config
213
            let session_duration = chrono::Duration::days(30);
49✔
214
            let stmt = tx
49✔
215
                .prepare(
49✔
216
                    "
49✔
217
                INSERT INTO 
49✔
218
                    user_sessions (user_id, session_id, created, valid_until) 
49✔
219
                VALUES 
49✔
220
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
49✔
221
                RETURNING 
49✔
222
                    created, valid_until;",
49✔
223
                )
49✔
224
                .await?;
30✔
225
            let row = tx
49✔
226
                .query_one(
49✔
227
                    &stmt,
49✔
228
                    &[
49✔
229
                        &user_id,
49✔
230
                        &session_id,
49✔
231
                        &(session_duration.num_seconds() as f64),
49✔
232
                    ],
49✔
233
                )
49✔
234
                .await?;
30✔
235

236
            let stmt = tx
49✔
237
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
49✔
238
                .await?;
30✔
239

240
            let rows = tx
49✔
241
                .query(&stmt, &[&user_id])
49✔
242
                .await
30✔
243
                .map_err(|_error| error::Error::LoginFailed)?;
49✔
244

245
            tx.commit().await?;
49✔
246

247
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
79✔
248

49✔
249
            Ok(UserSession {
49✔
250
                id: session_id,
49✔
251
                user: UserInfo {
49✔
252
                    id: user_id,
49✔
253
                    email,
49✔
254
                    real_name,
49✔
255
                },
49✔
256
                created: row.get(0),
49✔
257
                valid_until: row.get(1),
49✔
258
                project: None,
49✔
259
                view: None,
49✔
260
                roles,
49✔
261
            })
49✔
262
        } else {
263
            Err(error::Error::LoginFailed)
1✔
264
        }
265
    }
100✔
266

267
    #[allow(clippy::too_many_lines)]
268
    async fn login_external(
4✔
269
        &self,
4✔
270
        user: ExternalUserClaims,
4✔
271
        duration: Duration,
4✔
272
    ) -> Result<UserSession> {
4✔
273
        let mut conn = self.pool.get().await?;
4✔
274
        let tx = conn.build_transaction().start().await?;
4✔
275

276
        let stmt = tx
4✔
277
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
4✔
278
            .await?;
4✔
279

280
        let row = tx
4✔
281
            .query_opt(&stmt, &[&user.external_id.to_string()])
4✔
282
            .await
4✔
283
            .map_err(|_error| error::Error::LoginFailed)?;
4✔
284

285
        let user_id = match row {
4✔
286
            Some(row) => UserId(row.get(0)),
2✔
287
            None => {
288
                let user_id = UserId::new();
2✔
289

290
                let stmt = tx
2✔
291
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
2✔
292
                    .await?;
1✔
293
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
2✔
294

295
                let quota_available =
2✔
296
                    crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
2✔
297
                        .default_available_quota;
298

299
                //TODO: Inconsistent to hashmap implementation, where an external user is not part of the user database.
300
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
301
                let stmt = tx
2✔
302
                    .prepare(
2✔
303
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
2✔
304
                    )
2✔
305
                    .await?;
1✔
306
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
2✔
307

308
                let stmt = tx
2✔
309
                    .prepare(
2✔
310
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
2✔
311
                    )
2✔
312
                    .await?;
1✔
313

314
                tx.execute(
2✔
315
                    &stmt,
2✔
316
                    &[
2✔
317
                        &user_id,
2✔
318
                        &user.external_id.to_string(),
2✔
319
                        &user.email,
2✔
320
                        &user.real_name,
2✔
321
                        &true,
2✔
322
                    ],
2✔
323
                )
2✔
324
                .await?;
1✔
325

326
                let stmt = tx
2✔
327
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
2✔
328
                    .await?;
1✔
329
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
2✔
330

331
                let stmt = tx
2✔
332
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
2✔
333
                    .await?;
2✔
334
                tx.execute(&stmt, &[&user_id, &Role::registered_user_role_id()])
2✔
335
                    .await?;
2✔
336

337
                user_id
2✔
338
            }
339
        };
340

341
        let session_id = SessionId::new();
4✔
342
        let stmt = tx
4✔
343
            .prepare(
4✔
344
                "
4✔
345
            INSERT INTO sessions (id)
4✔
346
            VALUES ($1);",
4✔
347
            )
4✔
348
            .await?; //TODO: Check documentation if inconsistent to hashmap implementation - would happen if CURRENT_TIMESTAMP is called twice in postgres for a single query. Worked in tests.
4✔
349

350
        tx.execute(&stmt, &[&session_id]).await?;
4✔
351

352
        let stmt = tx
4✔
353
            .prepare(
4✔
354
                "
4✔
355
            INSERT INTO 
4✔
356
                user_sessions (user_id, session_id, created, valid_until) 
4✔
357
            VALUES 
4✔
358
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
4✔
359
            RETURNING 
4✔
360
                created, valid_until;",
4✔
361
            )
4✔
362
            .await?;
4✔
363
        let row = tx
4✔
364
            .query_one(
4✔
365
                &stmt,
4✔
366
                &[&user_id, &session_id, &(duration.num_seconds() as f64)],
4✔
367
            )
4✔
368
            .await?;
3✔
369

370
        let stmt = tx
4✔
371
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
4✔
372
            .await?;
4✔
373

374
        let rows = tx
4✔
375
            .query(&stmt, &[&user_id])
4✔
376
            .await
4✔
377
            .map_err(|_error| error::Error::LoginFailed)?;
4✔
378

379
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
8✔
380

4✔
381
        tx.commit().await?;
4✔
382

383
        Ok(UserSession {
4✔
384
            id: session_id,
4✔
385
            user: UserInfo {
4✔
386
                id: user_id,
4✔
387
                email: Some(user.email.clone()),
4✔
388
                real_name: Some(user.real_name.clone()),
4✔
389
            },
4✔
390
            created: row.get(0),
4✔
391
            valid_until: row.get(1),
4✔
392
            project: None,
4✔
393
            view: None,
4✔
394
            roles,
4✔
395
        })
4✔
396
    }
8✔
397

398
    async fn user_session_by_id(&self, session: SessionId) -> Result<UserSession> {
44✔
399
        let mut conn = self.pool.get().await?;
44✔
400

401
        let tx = conn.build_transaction().start().await?;
44✔
402

403
        let stmt = tx
44✔
404
            .prepare(
44✔
405
                "
44✔
406
            SELECT 
44✔
407
                u.id,   
44✔
408
                u.email,
44✔
409
                u.real_name,             
44✔
410
                us.created, 
44✔
411
                us.valid_until, 
44✔
412
                s.project_id,
44✔
413
                s.view
44✔
414
            FROM 
44✔
415
                sessions s JOIN user_sessions us ON (s.id = us.session_id) 
44✔
416
                    JOIN users u ON (us.user_id = u.id)
44✔
417
            WHERE s.id = $1 AND CURRENT_TIMESTAMP < us.valid_until;",
44✔
418
            )
44✔
419
            .await?;
102✔
420

421
        let row = tx
44✔
422
            .query_one(&stmt, &[&session])
44✔
423
            .await
25✔
424
            .map_err(|_error| error::Error::InvalidSession)?;
44✔
425

426
        let mut session = UserSession {
38✔
427
            id: session,
38✔
428
            user: UserInfo {
38✔
429
                id: row.get(0),
38✔
430
                email: row.get(1),
38✔
431
                real_name: row.get(2),
38✔
432
            },
38✔
433
            created: row.get(3),
38✔
434
            valid_until: row.get(4),
38✔
435
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
38✔
436
            view: row.get(6),
38✔
437
            roles: vec![],
38✔
438
        };
38✔
439

440
        let stmt = tx
38✔
441
            .prepare(
38✔
442
                "
38✔
443
            SELECT role_id FROM user_roles WHERE user_id = $1;
38✔
444
            ",
38✔
445
            )
38✔
446
            .await?;
19✔
447

448
        let rows = tx.query(&stmt, &[&session.user.id]).await?;
38✔
449

450
        session.roles = rows.into_iter().map(|row| row.get(0)).collect();
64✔
451

38✔
452
        Ok(session)
38✔
453
    }
88✔
454
}
455

456
#[async_trait]
457
impl<Tls> UserDb for ProPostgresDb<Tls>
458
where
459
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
460
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
461
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
462
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
463
{
464
    // TODO: clean up expired sessions?
465

466
    async fn logout(&self) -> Result<()> {
6✔
467
        let conn = self.conn_pool.get().await?;
6✔
468
        let stmt = conn
6✔
469
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
6✔
470
            .await?;
5✔
471

472
        conn.execute(&stmt, &[&self.session.id])
6✔
473
            .await
6✔
474
            .map_err(|_error| error::Error::LogoutFailed)?;
6✔
475
        Ok(())
6✔
476
    }
12✔
477

478
    async fn set_session_project(&self, project: ProjectId) -> Result<()> {
3✔
479
        // TODO: check permission
480

481
        let conn = self.conn_pool.get().await?;
3✔
482
        let stmt = conn
3✔
483
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
3✔
484
            .await?;
3✔
485

486
        conn.execute(&stmt, &[&project, &self.session.id]).await?;
3✔
487

488
        Ok(())
3✔
489
    }
6✔
490

491
    async fn set_session_view(&self, view: STRectangle) -> Result<()> {
3✔
492
        let conn = self.conn_pool.get().await?;
3✔
493
        let stmt = conn
3✔
494
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
3✔
495
            .await?;
3✔
496

497
        conn.execute(&stmt, &[&view, &self.session.id]).await?;
3✔
498

499
        Ok(())
3✔
500
    }
6✔
501

502
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
1✔
503
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
504

505
        let conn = self.conn_pool.get().await?;
1✔
506
        let stmt = conn
1✔
507
            .prepare(
1✔
508
                "
1✔
509
            UPDATE users SET 
1✔
510
                quota_available = quota_available - $1, 
1✔
511
                quota_used = quota_used + $1
1✔
512
            WHERE id = $2;",
1✔
513
            )
1✔
514
            .await?;
×
515

516
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
1✔
517

518
        Ok(())
1✔
519
    }
2✔
520

521
    async fn bulk_increment_quota_used<I: IntoIterator<Item = (UserId, u64)> + Send>(
13✔
522
        &self,
13✔
523
        quota_used_updates: I,
13✔
524
    ) -> Result<()> {
13✔
525
        ensure!(self.session.is_admin(), error::PermissionDenied);
13✔
526

527
        let conn = self.conn_pool.get().await?;
13✔
528

529
        // collect the user ids and quotas into separate vectors to pass them as parameters to the query
530
        let (users, quotas): (Vec<UserId>, Vec<i64>) = quota_used_updates
13✔
531
            .into_iter()
13✔
532
            .map(|(user, quota)| (user, quota as i64))
13✔
533
            .unzip();
13✔
534

13✔
535
        let query = "
13✔
536
            UPDATE users
13✔
537
            SET quota_available = quota_available - quota_changes.quota, 
13✔
538
                quota_used = quota_used + quota_changes.quota
13✔
539
            FROM 
13✔
540
                (SELECT * FROM UNNEST($1::uuid[], $2::bigint[]) AS t(id, quota)) AS quota_changes
13✔
541
            WHERE users.id = quota_changes.id;
13✔
542
        ";
13✔
543

13✔
544
        conn.execute(query, &[&users, &quotas]).await?;
25✔
545

546
        Ok(())
13✔
547
    }
26✔
548

549
    async fn quota_used(&self) -> Result<u64> {
7✔
550
        let conn = self.conn_pool.get().await?;
7✔
551
        let stmt = conn
7✔
552
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
553
            .await?;
6✔
554

555
        let row = conn
7✔
556
            .query_one(&stmt, &[&self.session.user.id])
7✔
557
            .await
6✔
558
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
559

560
        Ok(row.get::<usize, i64>(0) as u64)
7✔
561
    }
14✔
562

563
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
7✔
564
        ensure!(
7✔
565
            self.session.user.id == *user || self.session.is_admin(),
7✔
566
            error::PermissionDenied
×
567
        );
568

569
        let conn = self.conn_pool.get().await?;
7✔
570
        let stmt = conn
7✔
571
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
572
            .await?;
×
573

574
        let row = conn
7✔
575
            .query_one(&stmt, &[&user])
7✔
576
            .await
×
577
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
578

579
        Ok(row.get::<usize, i64>(0) as u64)
7✔
580
    }
14✔
581

582
    async fn quota_available(&self) -> Result<i64> {
12✔
583
        let conn = self.conn_pool.get().await?;
12✔
584
        let stmt = conn
12✔
585
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
12✔
586
            .await?;
9✔
587

588
        let row = conn
12✔
589
            .query_one(&stmt, &[&self.session.user.id])
12✔
590
            .await
9✔
591
            .map_err(|_error| error::Error::InvalidSession)?;
12✔
592

593
        Ok(row.get::<usize, i64>(0))
12✔
594
    }
24✔
595

596
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
6✔
597
        ensure!(
6✔
598
            self.session.user.id == *user || self.session.is_admin(),
6✔
599
            error::PermissionDenied
×
600
        );
601

602
        let conn = self.conn_pool.get().await?;
6✔
603
        let stmt = conn
6✔
604
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
6✔
605
            .await?;
×
606

607
        let row = conn
6✔
608
            .query_one(&stmt, &[&user])
6✔
609
            .await
×
610
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
611

612
        Ok(row.get::<usize, i64>(0))
6✔
613
    }
12✔
614

615
    async fn update_quota_available_by_user(
5✔
616
        &self,
5✔
617
        user: &UserId,
5✔
618
        new_available_quota: i64,
5✔
619
    ) -> Result<()> {
5✔
620
        ensure!(self.session.is_admin(), error::PermissionDenied);
5✔
621

622
        let conn = self.conn_pool.get().await?;
5✔
623
        let stmt = conn
5✔
624
            .prepare(
5✔
625
                "
5✔
626
            UPDATE users SET 
5✔
627
                quota_available = $1
5✔
628
            WHERE id = $2;",
5✔
629
            )
5✔
630
            .await?;
3✔
631

632
        conn.execute(&stmt, &[&(new_available_quota), &user])
5✔
633
            .await?;
5✔
634

635
        Ok(())
5✔
636
    }
10✔
637
}
638

639
#[async_trait]
640
impl<Tls> RoleDb for ProPostgresDb<Tls>
641
where
642
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
643
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
644
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
645
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
646
{
647
    async fn add_role(&self, role_name: &str) -> Result<RoleId> {
3✔
648
        ensure!(self.session.is_admin(), error::PermissionDenied);
3✔
649

650
        let conn = self.conn_pool.get().await?;
3✔
651

652
        let id = RoleId::new();
3✔
653

654
        let stmt = conn
3✔
655
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
3✔
656
            .await?;
1✔
657

658
        // TODO: map postgres error code to error::Error::RoleAlreadyExists
659

660
        conn.execute(&stmt, &[&id, &role_name]).await?;
3✔
661

662
        Ok(id)
3✔
663
    }
6✔
664

665
    async fn remove_role(&self, role_id: &RoleId) -> Result<()> {
2✔
666
        ensure!(self.session.is_admin(), error::PermissionDenied);
2✔
667

668
        let conn = self.conn_pool.get().await?;
2✔
669

670
        let stmt = conn.prepare("DELETE FROM roles WHERE id = $1;").await?;
2✔
671

672
        let deleted = conn.execute(&stmt, &[&role_id]).await?;
2✔
673

674
        ensure!(deleted > 0, error::RoleDoesNotExist);
2✔
675

676
        Ok(())
2✔
677
    }
4✔
678

679
    async fn assign_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<()> {
3✔
680
        ensure!(self.session.is_admin(), error::PermissionDenied);
3✔
681

682
        let conn = self.conn_pool.get().await?;
3✔
683

684
        let stmt = conn
3✔
685
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
3✔
686
            .await?;
1✔
687

688
        // TODO: map postgres error code to error::Error::RoleAlreadyAssigned, RoleDoesNotExist
689

690
        conn.execute(&stmt, &[&user_id, &role_id]).await?;
3✔
691

692
        Ok(())
3✔
693
    }
6✔
694

695
    async fn revoke_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<()> {
2✔
696
        ensure!(self.session.is_admin(), error::PermissionDenied);
2✔
697

698
        let conn = self.conn_pool.get().await?;
2✔
699

700
        let stmt = conn
2✔
701
            .prepare("DELETE FROM user_roles WHERE user_id= $1 AND role_id = $2;")
2✔
702
            .await?;
1✔
703

704
        let deleted = conn.execute(&stmt, &[&user_id, &role_id]).await?;
2✔
705

706
        ensure!(deleted > 0, error::RoleNotAssigned);
2✔
707

708
        Ok(())
2✔
709
    }
4✔
710
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc