• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5660774425

25 Jul 2023 06:15PM UTC coverage: 89.193% (+0.004%) from 89.189%
5660774425

push

github

web-flow
Merge pull request #836 from geo-engine/batch_quota

add bulk updates for quotas

72 of 72 new or added lines in 4 files covered. (100.0%)

105733 of 118544 relevant lines covered (89.19%)

61072.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.44
/services/src/pro/users/postgres_userdb.rs
1
use crate::contexts::SessionId;
2
use crate::error::Result;
3
use crate::pro::contexts::ProPostgresDb;
4
use crate::pro::permissions::{Role, RoleId};
5
use crate::pro::users::oidc::ExternalUserClaims;
6
use crate::pro::users::{
7
    User, UserCredentials, UserDb, UserId, UserInfo, UserRegistration, UserSession,
8
};
9
use crate::projects::{ProjectId, STRectangle};
10
use crate::util::Identifier;
11
use crate::{error, pro::contexts::ProPostgresContext};
12
use async_trait::async_trait;
13

14
use bb8_postgres::{
15
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
16
};
17
use geoengine_datatypes::primitives::Duration;
18
use pwhash::bcrypt;
19
use snafu::ensure;
20
use uuid::Uuid;
21

22
use super::userdb::{RoleDb, UserAuth};
23

24
#[async_trait]
25
impl<Tls> UserAuth for ProPostgresContext<Tls>
26
where
27
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
28
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
29
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
30
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
31
{
32
    // TODO: clean up expired sessions?
33

34
    async fn register_user(&self, user: UserRegistration) -> Result<UserId> {
9✔
35
        let mut conn = self.pool.get().await?;
9✔
36

37
        let tx = conn.build_transaction().start().await?;
9✔
38

39
        let user = User::from(user);
9✔
40

41
        let stmt = tx
9✔
42
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
9✔
43
            .await?;
8✔
44
        tx.execute(&stmt, &[&user.id, &user.email]).await?;
9✔
45

46
        let stmt = tx
9✔
47
            .prepare(
9✔
48
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
9✔
49
            )
9✔
50
            .await?;
7✔
51

52
        let quota_available =
9✔
53
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
9✔
54
                .default_available_quota;
55

56
        tx.execute(
9✔
57
            &stmt,
9✔
58
            &[
9✔
59
                &user.id,
9✔
60
                &user.email,
9✔
61
                &user.password_hash,
9✔
62
                &user.real_name,
9✔
63
                &quota_available,
9✔
64
                &user.active,
9✔
65
            ],
9✔
66
        )
9✔
67
        .await?;
7✔
68

69
        let stmt = tx
9✔
70
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
9✔
71
            .await?;
6✔
72
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
9✔
73

74
        let stmt = tx
9✔
75
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
9✔
76
            .await?;
6✔
77
        tx.execute(&stmt, &[&user.id, &Role::registered_user_role_id()])
9✔
78
            .await?;
7✔
79

80
        tx.commit().await?;
9✔
81

82
        Ok(user.id)
9✔
83
    }
18✔
84

85
    async fn create_anonymous_session(&self) -> Result<UserSession> {
21✔
86
        let mut conn = self.pool.get().await?;
21✔
87

88
        let tx = conn.build_transaction().start().await?;
21✔
89

90
        let user_id = UserId::new();
21✔
91

92
        let stmt = tx
21✔
93
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
21✔
94
            .await?;
17✔
95
        tx.execute(&stmt, &[&user_id, &format!("anonymous_user_{user_id}")])
21✔
96
            .await?;
17✔
97

98
        let quota_available =
21✔
99
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
21✔
100
                .default_available_quota;
101

102
        let stmt = tx
21✔
103
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
21✔
104
            .await?;
17✔
105

106
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
21✔
107

108
        let stmt = tx
21✔
109
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
21✔
110
            .await?;
17✔
111
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
21✔
112

113
        let stmt = tx
21✔
114
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
21✔
115
            .await?;
17✔
116
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
21✔
117
            .await?;
17✔
118

119
        let session_id = SessionId::new();
21✔
120
        let stmt = tx
21✔
121
            .prepare(
21✔
122
                "
21✔
123
                INSERT INTO sessions (id)
21✔
124
                VALUES ($1);",
21✔
125
            )
21✔
126
            .await?;
17✔
127

128
        tx.execute(&stmt, &[&session_id]).await?;
21✔
129

130
        let stmt = tx
21✔
131
            .prepare(
21✔
132
                "
21✔
133
            INSERT INTO 
21✔
134
                user_sessions (user_id, session_id, created, valid_until) 
21✔
135
            VALUES 
21✔
136
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
21✔
137
            RETURNING 
21✔
138
                created, valid_until;",
21✔
139
            )
21✔
140
            .await?;
17✔
141

142
        // TODO: load from config
143
        let session_duration = chrono::Duration::days(30);
21✔
144
        let row = tx
21✔
145
            .query_one(
21✔
146
                &stmt,
21✔
147
                &[
21✔
148
                    &user_id,
21✔
149
                    &session_id,
21✔
150
                    &(session_duration.num_seconds() as f64),
21✔
151
                ],
21✔
152
            )
21✔
153
            .await?;
18✔
154

155
        tx.commit().await?;
21✔
156

157
        Ok(UserSession {
21✔
158
            id: session_id,
21✔
159
            user: UserInfo {
21✔
160
                id: user_id,
21✔
161
                email: None,
21✔
162
                real_name: None,
21✔
163
            },
21✔
164
            created: row.get(0),
21✔
165
            valid_until: row.get(1),
21✔
166
            project: None,
21✔
167
            view: None,
21✔
168
            roles: vec![user_id.into(), Role::anonymous_role_id()],
21✔
169
        })
21✔
170
    }
42✔
171

172
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
22✔
173
        let mut conn = self.pool.get().await?;
22✔
174

175
        let tx = conn.build_transaction().start().await?;
22✔
176

177
        let stmt = tx
22✔
178
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
22✔
179
            .await?;
18✔
180

181
        let row = tx
22✔
182
            .query_one(&stmt, &[&user_credentials.email])
22✔
183
            .await
19✔
184
            .map_err(|_error| error::Error::LoginFailed)?;
22✔
185

186
        let user_id = UserId(row.get(0));
22✔
187
        let password_hash = row.get(1);
22✔
188
        let email = row.get(2);
22✔
189
        let real_name = row.get(3);
22✔
190

22✔
191
        if bcrypt::verify(user_credentials.password, password_hash) {
22✔
192
            let session_id = SessionId::new();
22✔
193
            let stmt = tx
22✔
194
                .prepare(
22✔
195
                    "
22✔
196
                INSERT INTO sessions (id)
22✔
197
                VALUES ($1);",
22✔
198
                )
22✔
199
                .await?;
14✔
200

201
            tx.execute(&stmt, &[&session_id]).await?;
22✔
202

203
            // TODO: load from config
204
            let session_duration = chrono::Duration::days(30);
22✔
205
            let stmt = tx
22✔
206
                .prepare(
22✔
207
                    "
22✔
208
                INSERT INTO 
22✔
209
                    user_sessions (user_id, session_id, created, valid_until) 
22✔
210
                VALUES 
22✔
211
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
22✔
212
                RETURNING 
22✔
213
                    created, valid_until;",
22✔
214
                )
22✔
215
                .await?;
17✔
216
            let row = tx
22✔
217
                .query_one(
22✔
218
                    &stmt,
22✔
219
                    &[
22✔
220
                        &user_id,
22✔
221
                        &session_id,
22✔
222
                        &(session_duration.num_seconds() as f64),
22✔
223
                    ],
22✔
224
                )
22✔
225
                .await?;
16✔
226

227
            let stmt = tx
22✔
228
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
22✔
229
                .await?;
17✔
230

231
            let rows = tx
22✔
232
                .query(&stmt, &[&user_id])
22✔
233
                .await
17✔
234
                .map_err(|_error| error::Error::LoginFailed)?;
22✔
235

236
            tx.commit().await?;
22✔
237

238
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
35✔
239

22✔
240
            Ok(UserSession {
22✔
241
                id: session_id,
22✔
242
                user: UserInfo {
22✔
243
                    id: user_id,
22✔
244
                    email,
22✔
245
                    real_name,
22✔
246
                },
22✔
247
                created: row.get(0),
22✔
248
                valid_until: row.get(1),
22✔
249
                project: None,
22✔
250
                view: None,
22✔
251
                roles,
22✔
252
            })
22✔
253
        } else {
254
            Err(error::Error::LoginFailed)
×
255
        }
256
    }
44✔
257

258
    #[allow(clippy::too_many_lines)]
259
    async fn login_external(
3✔
260
        &self,
3✔
261
        user: ExternalUserClaims,
3✔
262
        duration: Duration,
3✔
263
    ) -> Result<UserSession> {
3✔
264
        let mut conn = self.pool.get().await?;
3✔
265
        let tx = conn.build_transaction().start().await?;
3✔
266

267
        let stmt = tx
3✔
268
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
3✔
269
            .await?;
3✔
270

271
        let row = tx
3✔
272
            .query_opt(&stmt, &[&user.external_id.to_string()])
3✔
273
            .await
3✔
274
            .map_err(|_error| error::Error::LoginFailed)?;
3✔
275

276
        let user_id = match row {
3✔
277
            Some(row) => UserId(row.get(0)),
2✔
278
            None => {
279
                let user_id = UserId::new();
1✔
280

281
                let stmt = tx
1✔
282
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
1✔
283
                    .await?;
1✔
284
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
1✔
285

286
                let quota_available =
1✔
287
                    crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
1✔
288
                        .default_available_quota;
289

290
                //TODO: Inconsistent to hashmap implementation, where an external user is not part of the user database.
291
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
292
                let stmt = tx
1✔
293
                    .prepare(
1✔
294
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
1✔
295
                    )
1✔
296
                    .await?;
1✔
297
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
1✔
298

299
                let stmt = tx
1✔
300
                    .prepare(
1✔
301
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
1✔
302
                    )
1✔
303
                    .await?;
1✔
304

305
                tx.execute(
1✔
306
                    &stmt,
1✔
307
                    &[
1✔
308
                        &user_id,
1✔
309
                        &user.external_id.to_string(),
1✔
310
                        &user.email,
1✔
311
                        &user.real_name,
1✔
312
                        &true,
1✔
313
                    ],
1✔
314
                )
1✔
315
                .await?;
1✔
316

317
                let stmt = tx
1✔
318
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
1✔
319
                    .await?;
1✔
320
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
1✔
321

322
                let stmt = tx
1✔
323
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
1✔
324
                    .await?;
1✔
325
                tx.execute(&stmt, &[&user_id, &Role::registered_user_role_id()])
1✔
326
                    .await?;
1✔
327

328
                user_id
1✔
329
            }
330
        };
331

332
        let session_id = SessionId::new();
3✔
333
        let stmt = tx
3✔
334
            .prepare(
3✔
335
                "
3✔
336
            INSERT INTO sessions (id)
3✔
337
            VALUES ($1);",
3✔
338
            )
3✔
339
            .await?; //TODO: Check documentation if inconsistent to hashmap implementation - would happen if CURRENT_TIMESTAMP is called twice in postgres for a single query. Worked in tests.
3✔
340

341
        tx.execute(&stmt, &[&session_id]).await?;
3✔
342

343
        let stmt = tx
3✔
344
            .prepare(
3✔
345
                "
3✔
346
            INSERT INTO 
3✔
347
                user_sessions (user_id, session_id, created, valid_until) 
3✔
348
            VALUES 
3✔
349
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
3✔
350
            RETURNING 
3✔
351
                created, valid_until;",
3✔
352
            )
3✔
353
            .await?;
3✔
354
        let row = tx
3✔
355
            .query_one(
3✔
356
                &stmt,
3✔
357
                &[&user_id, &session_id, &(duration.num_seconds() as f64)],
3✔
358
            )
3✔
359
            .await?;
3✔
360

361
        let stmt = tx
3✔
362
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
3✔
363
            .await?;
3✔
364

365
        let rows = tx
3✔
366
            .query(&stmt, &[&user_id])
3✔
367
            .await
3✔
368
            .map_err(|_error| error::Error::LoginFailed)?;
3✔
369

370
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
6✔
371

3✔
372
        tx.commit().await?;
3✔
373

374
        Ok(UserSession {
3✔
375
            id: session_id,
3✔
376
            user: UserInfo {
3✔
377
                id: user_id,
3✔
378
                email: Some(user.email.clone()),
3✔
379
                real_name: Some(user.real_name.clone()),
3✔
380
            },
3✔
381
            created: row.get(0),
3✔
382
            valid_until: row.get(1),
3✔
383
            project: None,
3✔
384
            view: None,
3✔
385
            roles,
3✔
386
        })
3✔
387
    }
6✔
388

389
    async fn user_session_by_id(&self, session: SessionId) -> Result<UserSession> {
13✔
390
        let mut conn = self.pool.get().await?;
13✔
391

392
        let tx = conn.build_transaction().start().await?;
13✔
393

394
        let stmt = tx
13✔
395
            .prepare(
13✔
396
                "
13✔
397
            SELECT 
13✔
398
                u.id,   
13✔
399
                u.email,
13✔
400
                u.real_name,             
13✔
401
                us.created, 
13✔
402
                us.valid_until, 
13✔
403
                s.project_id,
13✔
404
                s.view
13✔
405
            FROM 
13✔
406
                sessions s JOIN user_sessions us ON (s.id = us.session_id) 
13✔
407
                    JOIN users u ON (us.user_id = u.id)
13✔
408
            WHERE s.id = $1 AND CURRENT_TIMESTAMP < us.valid_until;",
13✔
409
            )
13✔
410
            .await?;
43✔
411

412
        let row = tx
13✔
413
            .query_one(&stmt, &[&session])
13✔
414
            .await
13✔
415
            .map_err(|_error| error::Error::InvalidSession)?;
13✔
416

417
        let mut session = UserSession {
9✔
418
            id: session,
9✔
419
            user: UserInfo {
9✔
420
                id: row.get(0),
9✔
421
                email: row.get(1),
9✔
422
                real_name: row.get(2),
9✔
423
            },
9✔
424
            created: row.get(3),
9✔
425
            valid_until: row.get(4),
9✔
426
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
9✔
427
            view: row.get(6),
9✔
428
            roles: vec![],
9✔
429
        };
9✔
430

431
        let stmt = tx
9✔
432
            .prepare(
9✔
433
                "
9✔
434
            SELECT role_id FROM user_roles WHERE user_id = $1;
9✔
435
            ",
9✔
436
            )
9✔
437
            .await?;
9✔
438

439
        let rows = tx.query(&stmt, &[&session.user.id]).await?;
9✔
440

441
        session.roles = rows.into_iter().map(|row| row.get(0)).collect();
18✔
442

9✔
443
        Ok(session)
9✔
444
    }
26✔
445
}
446

447
#[async_trait]
448
impl<Tls> UserDb for ProPostgresDb<Tls>
449
where
450
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
451
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
452
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
453
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
454
{
455
    // TODO: clean up expired sessions?
456

457
    async fn logout(&self) -> Result<()> {
4✔
458
        let conn = self.conn_pool.get().await?;
4✔
459
        let stmt = conn
4✔
460
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
4✔
461
            .await?;
4✔
462

463
        conn.execute(&stmt, &[&self.session.id])
4✔
464
            .await
4✔
465
            .map_err(|_error| error::Error::LogoutFailed)?;
4✔
466
        Ok(())
4✔
467
    }
8✔
468

469
    async fn set_session_project(&self, project: ProjectId) -> Result<()> {
2✔
470
        // TODO: check permission
471

472
        let conn = self.conn_pool.get().await?;
2✔
473
        let stmt = conn
2✔
474
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
2✔
475
            .await?;
2✔
476

477
        conn.execute(&stmt, &[&project, &self.session.id]).await?;
2✔
478

479
        Ok(())
2✔
480
    }
4✔
481

482
    async fn set_session_view(&self, view: STRectangle) -> Result<()> {
2✔
483
        let conn = self.conn_pool.get().await?;
2✔
484
        let stmt = conn
2✔
485
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
2✔
486
            .await?;
2✔
487

488
        conn.execute(&stmt, &[&view, &self.session.id]).await?;
2✔
489

490
        Ok(())
2✔
491
    }
4✔
492

493
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
×
494
        ensure!(self.session.is_admin(), error::PermissionDenied);
×
495

496
        let conn = self.conn_pool.get().await?;
×
497
        let stmt = conn
×
498
            .prepare(
×
499
                "
×
500
            UPDATE users SET 
×
501
                quota_available = quota_available - $1, 
×
502
                quota_used = quota_used + $1
×
503
            WHERE id = $2;",
×
504
            )
×
505
            .await?;
×
506

507
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
×
508

509
        Ok(())
×
510
    }
×
511

512
    async fn bulk_increment_quota_used<I: IntoIterator<Item = (UserId, u64)> + Send>(
6✔
513
        &self,
6✔
514
        quota_used_updates: I,
6✔
515
    ) -> Result<()> {
6✔
516
        ensure!(self.session.is_admin(), error::PermissionDenied);
6✔
517

518
        let conn = self.conn_pool.get().await?;
6✔
519

520
        // collect the user ids and quotas into separate vectors to pass them as parameters to the query
521
        let (users, quotas): (Vec<UserId>, Vec<i64>) = quota_used_updates
6✔
522
            .into_iter()
6✔
523
            .map(|(user, quota)| (user, quota as i64))
7✔
524
            .unzip();
6✔
525

6✔
526
        let query = "
6✔
527
            UPDATE users
6✔
528
            SET quota_available = quota_available - quota_changes.quota, 
6✔
529
                quota_used = quota_used + quota_changes.quota
6✔
530
            FROM 
6✔
531
                (SELECT * FROM UNNEST($1::uuid[], $2::bigint[]) AS t(id, quota)) AS quota_changes
6✔
532
            WHERE users.id = quota_changes.id;
6✔
533
        ";
6✔
534

6✔
535
        conn.execute(query, &[&users, &quotas]).await?;
12✔
536

537
        Ok(())
6✔
538
    }
12✔
539

540
    async fn quota_used(&self) -> Result<u64> {
3✔
541
        let conn = self.conn_pool.get().await?;
3✔
542
        let stmt = conn
3✔
543
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
3✔
544
            .await?;
3✔
545

546
        let row = conn
3✔
547
            .query_one(&stmt, &[&self.session.user.id])
3✔
548
            .await
3✔
549
            .map_err(|_error| error::Error::InvalidSession)?;
3✔
550

551
        Ok(row.get::<usize, i64>(0) as u64)
3✔
552
    }
6✔
553

554
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
3✔
555
        ensure!(
3✔
556
            self.session.user.id == *user || self.session.is_admin(),
3✔
557
            error::PermissionDenied
×
558
        );
559

560
        let conn = self.conn_pool.get().await?;
3✔
561
        let stmt = conn
3✔
562
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
3✔
563
            .await?;
3✔
564

565
        let row = conn
3✔
566
            .query_one(&stmt, &[&user])
3✔
567
            .await
3✔
568
            .map_err(|_error| error::Error::InvalidSession)?;
3✔
569

570
        Ok(row.get::<usize, i64>(0) as u64)
3✔
571
    }
6✔
572

573
    async fn quota_available(&self) -> Result<i64> {
6✔
574
        let conn = self.conn_pool.get().await?;
6✔
575
        let stmt = conn
6✔
576
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
6✔
577
            .await?;
6✔
578

579
        let row = conn
6✔
580
            .query_one(&stmt, &[&self.session.user.id])
6✔
581
            .await
6✔
582
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
583

584
        Ok(row.get::<usize, i64>(0))
6✔
585
    }
12✔
586

587
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
2✔
588
        ensure!(
2✔
589
            self.session.user.id == *user || self.session.is_admin(),
2✔
590
            error::PermissionDenied
×
591
        );
592

593
        let conn = self.conn_pool.get().await?;
2✔
594
        let stmt = conn
2✔
595
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
2✔
596
            .await?;
2✔
597

598
        let row = conn
2✔
599
            .query_one(&stmt, &[&user])
2✔
600
            .await
2✔
601
            .map_err(|_error| error::Error::InvalidSession)?;
2✔
602

603
        Ok(row.get::<usize, i64>(0))
2✔
604
    }
4✔
605

606
    async fn update_quota_available_by_user(
2✔
607
        &self,
2✔
608
        user: &UserId,
2✔
609
        new_available_quota: i64,
2✔
610
    ) -> Result<()> {
2✔
611
        ensure!(self.session.is_admin(), error::PermissionDenied);
2✔
612

613
        let conn = self.conn_pool.get().await?;
2✔
614
        let stmt = conn
2✔
615
            .prepare(
2✔
616
                "
2✔
617
            UPDATE users SET 
2✔
618
                quota_available = $1
2✔
619
            WHERE id = $2;",
2✔
620
            )
2✔
621
            .await?;
2✔
622

623
        conn.execute(&stmt, &[&(new_available_quota), &user])
2✔
624
            .await?;
2✔
625

626
        Ok(())
2✔
627
    }
4✔
628
}
629

630
#[async_trait]
631
impl<Tls> RoleDb for ProPostgresDb<Tls>
632
where
633
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
634
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
635
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
636
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
637
{
638
    async fn add_role(&self, role_name: &str) -> Result<RoleId> {
1✔
639
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
640

641
        let conn = self.conn_pool.get().await?;
1✔
642

643
        let id = RoleId::new();
1✔
644

645
        let stmt = conn
1✔
646
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
1✔
647
            .await?;
1✔
648

649
        // TODO: map postgres error code to error::Error::RoleAlreadyExists
650

651
        conn.execute(&stmt, &[&id, &role_name]).await?;
1✔
652

653
        Ok(id)
1✔
654
    }
2✔
655

656
    async fn remove_role(&self, role_id: &RoleId) -> Result<()> {
1✔
657
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
658

659
        let conn = self.conn_pool.get().await?;
1✔
660

661
        let stmt = conn.prepare("DELETE FROM roles WHERE id = $1;").await?;
1✔
662

663
        let deleted = conn.execute(&stmt, &[&role_id]).await?;
1✔
664

665
        ensure!(deleted > 0, error::RoleDoesNotExist);
1✔
666

667
        Ok(())
1✔
668
    }
2✔
669

670
    async fn assign_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<()> {
2✔
671
        ensure!(self.session.is_admin(), error::PermissionDenied);
2✔
672

673
        let conn = self.conn_pool.get().await?;
2✔
674

675
        let stmt = conn
2✔
676
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
2✔
677
            .await?;
2✔
678

679
        // TODO: map postgres error code to error::Error::RoleAlreadyAssigned, RoleDoesNotExist
680

681
        conn.execute(&stmt, &[&user_id, &role_id]).await?;
2✔
682

683
        Ok(())
2✔
684
    }
4✔
685

686
    async fn revoke_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<()> {
1✔
687
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
688

689
        let conn = self.conn_pool.get().await?;
1✔
690

691
        let stmt = conn
1✔
692
            .prepare("DELETE FROM user_roles WHERE user_id= $1 AND role_id = $2;")
1✔
693
            .await?;
1✔
694

695
        let deleted = conn.execute(&stmt, &[&user_id, &role_id]).await?;
1✔
696

697
        ensure!(deleted > 0, error::RoleNotAssigned);
1✔
698

699
        Ok(())
1✔
700
    }
2✔
701
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc