• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 6147514757

11 Sep 2023 02:05PM UTC coverage: 89.892% (-0.09%) from 89.985%
6147514757

Pull #873

github

web-flow
Merge 3a80eaf16 into 664a57cd7
Pull Request #873: Edr provider (TEST)

1389 of 1389 new or added lines in 3 files covered. (100.0%)

107997 of 120141 relevant lines covered (89.89%)

60267.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.7
/services/src/pro/users/postgres_userdb.rs
1
use crate::contexts::SessionId;
2
use crate::error::Result;
3
use crate::pro::contexts::ProPostgresDb;
4
use crate::pro::permissions::{Role, RoleDescription, RoleId};
5
use crate::pro::users::oidc::ExternalUserClaims;
6
use crate::pro::users::{
7
    User, UserCredentials, UserDb, UserId, UserInfo, UserRegistration, UserSession,
8
};
9
use crate::projects::{ProjectId, STRectangle};
10
use crate::util::Identifier;
11
use crate::{error, pro::contexts::ProPostgresContext};
12
use async_trait::async_trait;
13

14
use bb8_postgres::{
15
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
16
};
17
use geoengine_datatypes::primitives::Duration;
18
use pwhash::bcrypt;
19
use snafu::ensure;
20
use uuid::Uuid;
21

22
use super::userdb::{RoleDb, UserAuth};
23

24
#[async_trait]
25
impl<Tls> UserAuth for ProPostgresContext<Tls>
26
where
27
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
28
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
29
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
30
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
31
{
32
    // TODO: clean up expired sessions?
33

34
    async fn register_user(&self, user: UserRegistration) -> Result<UserId> {
43✔
35
        let mut conn = self.pool.get().await?;
43✔
36

37
        let tx = conn.build_transaction().start().await?;
43✔
38

39
        let user = User::from(user);
43✔
40

41
        let stmt = tx
43✔
42
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
43✔
43
            .await?;
28✔
44
        let res = tx.execute(&stmt, &[&user.id, &user.email]).await;
43✔
45

46
        if let Err(e) = res {
43✔
47
            if e.code() == Some(&tokio_postgres::error::SqlState::UNIQUE_VIOLATION) {
15✔
48
                return Err(error::Error::Duplicate {
15✔
49
                    reason: "E-mail already exists".to_string(),
15✔
50
                });
15✔
51
            }
×
52
            return Err(e.into());
×
53
        }
28✔
54

55
        let stmt = tx
28✔
56
            .prepare(
28✔
57
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
28✔
58
            )
28✔
59
            .await?;
16✔
60

61
        let quota_available =
28✔
62
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
28✔
63
                .default_available_quota;
64

65
        tx.execute(
28✔
66
            &stmt,
28✔
67
            &[
28✔
68
                &user.id,
28✔
69
                &user.email,
28✔
70
                &user.password_hash,
28✔
71
                &user.real_name,
28✔
72
                &quota_available,
28✔
73
                &user.active,
28✔
74
            ],
28✔
75
        )
28✔
76
        .await?;
15✔
77

78
        let stmt = tx
28✔
79
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
28✔
80
            .await?;
15✔
81
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
28✔
82

83
        let stmt = tx
28✔
84
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
28✔
85
            .await?;
16✔
86
        tx.execute(&stmt, &[&user.id, &Role::registered_user_role_id()])
28✔
87
            .await?;
16✔
88

89
        tx.commit().await?;
28✔
90

91
        Ok(user.id)
28✔
92
    }
86✔
93

94
    async fn create_anonymous_session(&self) -> Result<UserSession> {
33✔
95
        let mut conn = self.pool.get().await?;
33✔
96

97
        let tx = conn.build_transaction().start().await?;
33✔
98

99
        let user_id = UserId::new();
33✔
100

101
        let stmt = tx
33✔
102
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
33✔
103
            .await?;
26✔
104
        tx.execute(&stmt, &[&user_id, &format!("anonymous_user_{user_id}")])
33✔
105
            .await?;
26✔
106

107
        let quota_available =
33✔
108
            crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
33✔
109
                .default_available_quota;
110

111
        let stmt = tx
33✔
112
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
33✔
113
            .await?;
26✔
114

115
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
33✔
116

117
        let stmt = tx
33✔
118
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
33✔
119
            .await?;
26✔
120
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
33✔
121

122
        let stmt = tx
33✔
123
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
33✔
124
            .await?;
26✔
125
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
33✔
126
            .await?;
26✔
127

128
        let session_id = SessionId::new();
33✔
129
        let stmt = tx
33✔
130
            .prepare(
33✔
131
                "
33✔
132
                INSERT INTO sessions (id)
33✔
133
                VALUES ($1);",
33✔
134
            )
33✔
135
            .await?;
26✔
136

137
        tx.execute(&stmt, &[&session_id]).await?;
33✔
138

139
        let stmt = tx
33✔
140
            .prepare(
33✔
141
                "
33✔
142
            INSERT INTO 
33✔
143
                user_sessions (user_id, session_id, created, valid_until) 
33✔
144
            VALUES 
33✔
145
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
33✔
146
            RETURNING 
33✔
147
                created, valid_until;",
33✔
148
            )
33✔
149
            .await?;
26✔
150

151
        // TODO: load from config
152
        let session_duration = chrono::Duration::days(30);
33✔
153
        let row = tx
33✔
154
            .query_one(
33✔
155
                &stmt,
33✔
156
                &[
33✔
157
                    &user_id,
33✔
158
                    &session_id,
33✔
159
                    &(session_duration.num_seconds() as f64),
33✔
160
                ],
33✔
161
            )
33✔
162
            .await?;
26✔
163

164
        tx.commit().await?;
33✔
165

166
        Ok(UserSession {
33✔
167
            id: session_id,
33✔
168
            user: UserInfo {
33✔
169
                id: user_id,
33✔
170
                email: None,
33✔
171
                real_name: None,
33✔
172
            },
33✔
173
            created: row.get(0),
33✔
174
            valid_until: row.get(1),
33✔
175
            project: None,
33✔
176
            view: None,
33✔
177
            roles: vec![user_id.into(), Role::anonymous_role_id()],
33✔
178
        })
33✔
179
    }
66✔
180

181
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
53✔
182
        let mut conn = self.pool.get().await?;
53✔
183

184
        let tx = conn.build_transaction().start().await?;
53✔
185

186
        let stmt = tx
53✔
187
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
53✔
188
            .await?;
23✔
189

190
        let row = tx
53✔
191
            .query_one(&stmt, &[&user_credentials.email])
53✔
192
            .await
23✔
193
            .map_err(|_error| error::Error::LoginFailed)?;
53✔
194

195
        let user_id = UserId(row.get(0));
53✔
196
        let password_hash = row.get(1);
53✔
197
        let email = row.get(2);
53✔
198
        let real_name = row.get(3);
53✔
199

53✔
200
        if bcrypt::verify(user_credentials.password, password_hash) {
53✔
201
            let session_id = SessionId::new();
52✔
202
            let stmt = tx
52✔
203
                .prepare(
52✔
204
                    "
52✔
205
                INSERT INTO sessions (id)
52✔
206
                VALUES ($1);",
52✔
207
                )
52✔
208
                .await?;
24✔
209

210
            tx.execute(&stmt, &[&session_id]).await?;
52✔
211

212
            // TODO: load from config
213
            let session_duration = chrono::Duration::days(30);
52✔
214
            let stmt = tx
52✔
215
                .prepare(
52✔
216
                    "
52✔
217
                INSERT INTO 
52✔
218
                    user_sessions (user_id, session_id, created, valid_until) 
52✔
219
                VALUES 
52✔
220
                    ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
52✔
221
                RETURNING 
52✔
222
                    created, valid_until;",
52✔
223
                )
52✔
224
                .await?;
23✔
225
            let row = tx
52✔
226
                .query_one(
52✔
227
                    &stmt,
52✔
228
                    &[
52✔
229
                        &user_id,
52✔
230
                        &session_id,
52✔
231
                        &(session_duration.num_seconds() as f64),
52✔
232
                    ],
52✔
233
                )
52✔
234
                .await?;
20✔
235

236
            let stmt = tx
52✔
237
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
52✔
238
                .await?;
24✔
239

240
            let rows = tx
52✔
241
                .query(&stmt, &[&user_id])
52✔
242
                .await
20✔
243
                .map_err(|_error| error::Error::LoginFailed)?;
52✔
244

245
            tx.commit().await?;
55✔
246

247
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
84✔
248

52✔
249
            Ok(UserSession {
52✔
250
                id: session_id,
52✔
251
                user: UserInfo {
52✔
252
                    id: user_id,
52✔
253
                    email,
52✔
254
                    real_name,
52✔
255
                },
52✔
256
                created: row.get(0),
52✔
257
                valid_until: row.get(1),
52✔
258
                project: None,
52✔
259
                view: None,
52✔
260
                roles,
52✔
261
            })
52✔
262
        } else {
263
            Err(error::Error::LoginFailed)
1✔
264
        }
265
    }
106✔
266

267
    #[allow(clippy::too_many_lines)]
268
    async fn login_external(
4✔
269
        &self,
4✔
270
        user: ExternalUserClaims,
4✔
271
        duration: Duration,
4✔
272
    ) -> Result<UserSession> {
4✔
273
        let mut conn = self.pool.get().await?;
4✔
274
        let tx = conn.build_transaction().start().await?;
4✔
275

276
        let stmt = tx
4✔
277
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
4✔
278
            .await?;
4✔
279

280
        let row = tx
4✔
281
            .query_opt(&stmt, &[&user.external_id.to_string()])
4✔
282
            .await
4✔
283
            .map_err(|_error| error::Error::LoginFailed)?;
4✔
284

285
        let user_id = match row {
4✔
286
            Some(row) => UserId(row.get(0)),
2✔
287
            None => {
288
                let user_id = UserId::new();
2✔
289

290
                let stmt = tx
2✔
291
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
2✔
292
                    .await?;
2✔
293
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
2✔
294

295
                let quota_available =
2✔
296
                    crate::util::config::get_config_element::<crate::pro::util::config::Quota>()?
2✔
297
                        .default_available_quota;
298

299
                //TODO: Inconsistent to hashmap implementation, where an external user is not part of the user database.
300
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
301
                let stmt = tx
2✔
302
                    .prepare(
2✔
303
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
2✔
304
                    )
2✔
305
                    .await?;
2✔
306
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
2✔
307

308
                let stmt = tx
2✔
309
                    .prepare(
2✔
310
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
2✔
311
                    )
2✔
312
                    .await?;
2✔
313

314
                tx.execute(
2✔
315
                    &stmt,
2✔
316
                    &[
2✔
317
                        &user_id,
2✔
318
                        &user.external_id.to_string(),
2✔
319
                        &user.email,
2✔
320
                        &user.real_name,
2✔
321
                        &true,
2✔
322
                    ],
2✔
323
                )
2✔
324
                .await?;
2✔
325

326
                let stmt = tx
2✔
327
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
2✔
328
                    .await?;
2✔
329
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
2✔
330

331
                let stmt = tx
2✔
332
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
2✔
333
                    .await?;
2✔
334
                tx.execute(&stmt, &[&user_id, &Role::registered_user_role_id()])
2✔
335
                    .await?;
2✔
336

337
                user_id
2✔
338
            }
339
        };
340

341
        let session_id = SessionId::new();
4✔
342
        let stmt = tx
4✔
343
            .prepare(
4✔
344
                "
4✔
345
            INSERT INTO sessions (id)
4✔
346
            VALUES ($1);",
4✔
347
            )
4✔
348
            .await?; //TODO: Check documentation if inconsistent to hashmap implementation - would happen if CURRENT_TIMESTAMP is called twice in postgres for a single query. Worked in tests.
4✔
349

350
        tx.execute(&stmt, &[&session_id]).await?;
4✔
351

352
        let stmt = tx
4✔
353
            .prepare(
4✔
354
                "
4✔
355
            INSERT INTO 
4✔
356
                user_sessions (user_id, session_id, created, valid_until) 
4✔
357
            VALUES 
4✔
358
                ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
4✔
359
            RETURNING 
4✔
360
                created, valid_until;",
4✔
361
            )
4✔
362
            .await?;
4✔
363
        let row = tx
4✔
364
            .query_one(
4✔
365
                &stmt,
4✔
366
                &[&user_id, &session_id, &(duration.num_seconds() as f64)],
4✔
367
            )
4✔
368
            .await?;
4✔
369

370
        let stmt = tx
4✔
371
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
4✔
372
            .await?;
4✔
373

374
        let rows = tx
4✔
375
            .query(&stmt, &[&user_id])
4✔
376
            .await
4✔
377
            .map_err(|_error| error::Error::LoginFailed)?;
4✔
378

379
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
8✔
380

4✔
381
        tx.commit().await?;
4✔
382

383
        Ok(UserSession {
4✔
384
            id: session_id,
4✔
385
            user: UserInfo {
4✔
386
                id: user_id,
4✔
387
                email: Some(user.email.clone()),
4✔
388
                real_name: Some(user.real_name.clone()),
4✔
389
            },
4✔
390
            created: row.get(0),
4✔
391
            valid_until: row.get(1),
4✔
392
            project: None,
4✔
393
            view: None,
4✔
394
            roles,
4✔
395
        })
4✔
396
    }
8✔
397

398
    async fn user_session_by_id(&self, session: SessionId) -> Result<UserSession> {
51✔
399
        let mut conn = self.pool.get().await?;
51✔
400

401
        let tx = conn.build_transaction().start().await?;
51✔
402

403
        let stmt = tx
51✔
404
            .prepare(
51✔
405
                "
51✔
406
            SELECT 
51✔
407
                u.id,   
51✔
408
                u.email,
51✔
409
                u.real_name,             
51✔
410
                us.created, 
51✔
411
                us.valid_until, 
51✔
412
                s.project_id,
51✔
413
                s.view
51✔
414
            FROM 
51✔
415
                sessions s JOIN user_sessions us ON (s.id = us.session_id) 
51✔
416
                    JOIN users u ON (us.user_id = u.id)
51✔
417
            WHERE s.id = $1 AND CURRENT_TIMESTAMP < us.valid_until;",
51✔
418
            )
51✔
419
            .await?;
120✔
420

421
        let row = tx
51✔
422
            .query_one(&stmt, &[&session])
51✔
423
            .await
29✔
424
            .map_err(|_error| error::Error::InvalidSession)?;
51✔
425

426
        let mut session = UserSession {
45✔
427
            id: session,
45✔
428
            user: UserInfo {
45✔
429
                id: row.get(0),
45✔
430
                email: row.get(1),
45✔
431
                real_name: row.get(2),
45✔
432
            },
45✔
433
            created: row.get(3),
45✔
434
            valid_until: row.get(4),
45✔
435
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
45✔
436
            view: row.get(6),
45✔
437
            roles: vec![],
45✔
438
        };
45✔
439

440
        let stmt = tx
45✔
441
            .prepare(
45✔
442
                "
45✔
443
            SELECT role_id FROM user_roles WHERE user_id = $1;
45✔
444
            ",
45✔
445
            )
45✔
446
            .await?;
22✔
447

448
        let rows = tx.query(&stmt, &[&session.user.id]).await?;
45✔
449

450
        session.roles = rows.into_iter().map(|row| row.get(0)).collect();
79✔
451

45✔
452
        Ok(session)
45✔
453
    }
102✔
454
}
455

456
#[async_trait]
457
impl<Tls> UserDb for ProPostgresDb<Tls>
458
where
459
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
460
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
461
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
462
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
463
{
464
    // TODO: clean up expired sessions?
465

466
    async fn logout(&self) -> Result<()> {
6✔
467
        let conn = self.conn_pool.get().await?;
6✔
468
        let stmt = conn
6✔
469
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
6✔
470
            .await?;
7✔
471

472
        conn.execute(&stmt, &[&self.session.id])
6✔
473
            .await
6✔
474
            .map_err(|_error| error::Error::LogoutFailed)?;
6✔
475
        Ok(())
6✔
476
    }
12✔
477

478
    async fn set_session_project(&self, project: ProjectId) -> Result<()> {
3✔
479
        // TODO: check permission
480

481
        let conn = self.conn_pool.get().await?;
3✔
482
        let stmt = conn
3✔
483
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
3✔
484
            .await?;
3✔
485

486
        conn.execute(&stmt, &[&project, &self.session.id]).await?;
3✔
487

488
        Ok(())
3✔
489
    }
6✔
490

491
    async fn set_session_view(&self, view: STRectangle) -> Result<()> {
3✔
492
        let conn = self.conn_pool.get().await?;
3✔
493
        let stmt = conn
3✔
494
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
3✔
495
            .await?;
3✔
496

497
        conn.execute(&stmt, &[&view, &self.session.id]).await?;
3✔
498

499
        Ok(())
3✔
500
    }
6✔
501

502
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
1✔
503
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
504

505
        let conn = self.conn_pool.get().await?;
1✔
506
        let stmt = conn
1✔
507
            .prepare(
1✔
508
                "
1✔
509
            UPDATE users SET 
1✔
510
                quota_available = quota_available - $1, 
1✔
511
                quota_used = quota_used + $1
1✔
512
            WHERE id = $2;",
1✔
513
            )
1✔
514
            .await?;
×
515

516
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
1✔
517

518
        Ok(())
1✔
519
    }
2✔
520

521
    async fn bulk_increment_quota_used<I: IntoIterator<Item = (UserId, u64)> + Send>(
13✔
522
        &self,
13✔
523
        quota_used_updates: I,
13✔
524
    ) -> Result<()> {
13✔
525
        ensure!(self.session.is_admin(), error::PermissionDenied);
13✔
526

527
        let conn = self.conn_pool.get().await?;
13✔
528

529
        // collect the user ids and quotas into separate vectors to pass them as parameters to the query
530
        let (users, quotas): (Vec<UserId>, Vec<i64>) = quota_used_updates
13✔
531
            .into_iter()
13✔
532
            .map(|(user, quota)| (user, quota as i64))
13✔
533
            .unzip();
13✔
534

13✔
535
        let query = "
13✔
536
            UPDATE users
13✔
537
            SET quota_available = quota_available - quota_changes.quota, 
13✔
538
                quota_used = quota_used + quota_changes.quota
13✔
539
            FROM 
13✔
540
                (SELECT * FROM UNNEST($1::uuid[], $2::bigint[]) AS t(id, quota)) AS quota_changes
13✔
541
            WHERE users.id = quota_changes.id;
13✔
542
        ";
13✔
543

13✔
544
        conn.execute(query, &[&users, &quotas]).await?;
26✔
545

546
        Ok(())
13✔
547
    }
26✔
548

549
    async fn quota_used(&self) -> Result<u64> {
6✔
550
        let conn = self.conn_pool.get().await?;
6✔
551
        let stmt = conn
6✔
552
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
6✔
553
            .await?;
4✔
554

555
        let row = conn
6✔
556
            .query_one(&stmt, &[&self.session.user.id])
6✔
557
            .await
4✔
558
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
559

560
        Ok(row.get::<usize, i64>(0) as u64)
6✔
561
    }
12✔
562

563
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
7✔
564
        ensure!(
7✔
565
            self.session.user.id == *user || self.session.is_admin(),
7✔
566
            error::PermissionDenied
×
567
        );
568

569
        let conn = self.conn_pool.get().await?;
7✔
570
        let stmt = conn
7✔
571
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
7✔
572
            .await?;
3✔
573

574
        let row = conn
7✔
575
            .query_one(&stmt, &[&user])
7✔
576
            .await
3✔
577
            .map_err(|_error| error::Error::InvalidSession)?;
7✔
578

579
        Ok(row.get::<usize, i64>(0) as u64)
7✔
580
    }
14✔
581

582
    async fn quota_available(&self) -> Result<i64> {
11✔
583
        let conn = self.conn_pool.get().await?;
11✔
584
        let stmt = conn
11✔
585
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
11✔
586
            .await?;
7✔
587

588
        let row = conn
11✔
589
            .query_one(&stmt, &[&self.session.user.id])
11✔
590
            .await
8✔
591
            .map_err(|_error| error::Error::InvalidSession)?;
11✔
592

593
        Ok(row.get::<usize, i64>(0))
11✔
594
    }
22✔
595

596
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
6✔
597
        ensure!(
6✔
598
            self.session.user.id == *user || self.session.is_admin(),
6✔
599
            error::PermissionDenied
×
600
        );
601

602
        let conn = self.conn_pool.get().await?;
6✔
603
        let stmt = conn
6✔
604
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
6✔
605
            .await?;
2✔
606

607
        let row = conn
6✔
608
            .query_one(&stmt, &[&user])
6✔
609
            .await
2✔
610
            .map_err(|_error| error::Error::InvalidSession)?;
6✔
611

612
        Ok(row.get::<usize, i64>(0))
6✔
613
    }
12✔
614

615
    async fn update_quota_available_by_user(
5✔
616
        &self,
5✔
617
        user: &UserId,
5✔
618
        new_available_quota: i64,
5✔
619
    ) -> Result<()> {
5✔
620
        ensure!(self.session.is_admin(), error::PermissionDenied);
5✔
621

622
        let conn = self.conn_pool.get().await?;
5✔
623
        let stmt = conn
5✔
624
            .prepare(
5✔
625
                "
5✔
626
            UPDATE users SET 
5✔
627
                quota_available = $1
5✔
628
            WHERE id = $2;",
5✔
629
            )
5✔
630
            .await?;
4✔
631

632
        conn.execute(&stmt, &[&(new_available_quota), &user])
5✔
633
            .await?;
5✔
634

635
        Ok(())
5✔
636
    }
10✔
637
}
638

639
#[async_trait]
640
impl<Tls> RoleDb for ProPostgresDb<Tls>
641
where
642
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
643
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
644
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
645
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
646
{
647
    async fn add_role(&self, role_name: &str) -> Result<RoleId> {
4✔
648
        ensure!(self.session.is_admin(), error::PermissionDenied);
4✔
649

650
        let conn = self.conn_pool.get().await?;
4✔
651

652
        let id = RoleId::new();
4✔
653

654
        let stmt = conn
4✔
655
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
4✔
656
            .await?;
×
657

658
        // TODO: map postgres error code to error::Error::RoleAlreadyExists
659

660
        conn.execute(&stmt, &[&id, &role_name]).await?;
4✔
661

662
        Ok(id)
4✔
663
    }
8✔
664

665
    async fn remove_role(&self, role_id: &RoleId) -> Result<()> {
2✔
666
        ensure!(self.session.is_admin(), error::PermissionDenied);
2✔
667

668
        let conn = self.conn_pool.get().await?;
2✔
669

670
        let stmt = conn.prepare("DELETE FROM roles WHERE id = $1;").await?;
2✔
671

672
        let deleted = conn.execute(&stmt, &[&role_id]).await?;
2✔
673

674
        ensure!(deleted > 0, error::RoleDoesNotExist);
2✔
675

676
        Ok(())
2✔
677
    }
4✔
678

679
    async fn assign_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<()> {
4✔
680
        ensure!(self.session.is_admin(), error::PermissionDenied);
4✔
681

682
        let conn = self.conn_pool.get().await?;
4✔
683

684
        let stmt = conn
4✔
685
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
4✔
686
            .await?;
2✔
687

688
        // TODO: map postgres error code to error::Error::RoleAlreadyAssigned, RoleDoesNotExist
689

690
        conn.execute(&stmt, &[&user_id, &role_id]).await?;
4✔
691

692
        Ok(())
4✔
693
    }
8✔
694

695
    async fn revoke_role(&self, role_id: &RoleId, user_id: &UserId) -> Result<()> {
2✔
696
        ensure!(self.session.is_admin(), error::PermissionDenied);
2✔
697

698
        let conn = self.conn_pool.get().await?;
2✔
699

700
        let stmt = conn
2✔
701
            .prepare("DELETE FROM user_roles WHERE user_id= $1 AND role_id = $2;")
2✔
702
            .await?;
×
703

704
        let deleted = conn.execute(&stmt, &[&user_id, &role_id]).await?;
2✔
705

706
        ensure!(deleted > 0, error::RoleNotAssigned);
2✔
707

708
        Ok(())
2✔
709
    }
4✔
710

711
    async fn get_role_descriptions(&self, user_id: &UserId) -> Result<Vec<RoleDescription>> {
5✔
712
        let conn = self.conn_pool.get().await?;
5✔
713

714
        let stmt = conn
5✔
715
            .prepare(
5✔
716
                "SELECT roles.id, roles.name \
5✔
717
                FROM roles JOIN user_roles ON (roles.id=user_roles.role_id) \
5✔
718
                WHERE user_roles.user_id=$1 \
5✔
719
                ORDER BY roles.name;",
5✔
720
            )
5✔
721
            .await?;
3✔
722

723
        let results = conn.query(&stmt, &[&user_id]).await?;
5✔
724

725
        let mut result_vec = Vec::new();
5✔
726

727
        for result in results {
17✔
728
            let id = result.get(0);
12✔
729
            let name = result.get(1);
12✔
730
            let individual = UserId(id) == *user_id;
12✔
731
            result_vec.push(RoleDescription {
12✔
732
                role: Role {
12✔
733
                    id: RoleId(id),
12✔
734
                    name,
12✔
735
                },
12✔
736
                individual,
12✔
737
            });
12✔
738
        }
12✔
739

740
        Ok(result_vec)
5✔
741
    }
10✔
742
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc