• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3931034422

pending completion
3931034422

Pull #714

github

GitHub
Merge 5f93dc588 into a5761b055
Pull Request #714: use nextest in test

86519 of 98458 relevant lines covered (87.87%)

78874.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.1
/services/src/pro/users/postgres_userdb.rs
1
use crate::contexts::SessionId;
2
use crate::error::Result;
3
use crate::pro::datasets::Role;
4
use crate::pro::projects::ProjectPermission;
5
use crate::pro::users::oidc::ExternalUserClaims;
6
use crate::pro::users::{
7
    User, UserCredentials, UserDb, UserId, UserInfo, UserRegistration, UserSession,
8
};
9
use crate::projects::{ProjectId, STRectangle};
10
use crate::util::user_input::Validated;
11
use crate::util::Identifier;
12
use crate::{error, pro::contexts::PostgresContext};
13
use async_trait::async_trait;
14
use bb8_postgres::PostgresConnectionManager;
15
use bb8_postgres::{
16
    bb8::Pool, tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect,
17
    tokio_postgres::Socket,
18
};
19
use geoengine_datatypes::primitives::Duration;
20
use pwhash::bcrypt;
21
use uuid::Uuid;
22

23
pub struct PostgresUserDb<Tls>
24
where
25
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
26
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
27
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
28
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
29
{
30
    conn_pool: Pool<PostgresConnectionManager<Tls>>,
31
}
32

33
impl<Tls> PostgresUserDb<Tls>
34
where
35
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
36
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
37
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
38
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
39
{
40
    pub fn new(conn_pool: Pool<PostgresConnectionManager<Tls>>) -> Self {
18✔
41
        Self { conn_pool }
18✔
42
    }
18✔
43
}
44

45
#[async_trait]
46
impl<Tls> UserDb for PostgresUserDb<Tls>
47
where
48
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
49
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
50
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
51
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
52
{
53
    // TODO: clean up expired sessions?
54

55
    async fn register(&self, user: Validated<UserRegistration>) -> Result<UserId> {
4✔
56
        let mut conn = self.conn_pool.get().await?;
4✔
57

58
        let tx = conn.build_transaction().start().await?;
4✔
59

60
        let user = User::from(user.user_input);
4✔
61

62
        let stmt = tx
4✔
63
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
4✔
64
            .await?;
1✔
65
        tx.execute(&stmt, &[&user.id, &user.email]).await?;
4✔
66

67
        let stmt = tx
4✔
68
            .prepare(
4✔
69
                "INSERT INTO users (id, email, password_hash, real_name, active) VALUES ($1, $2, $3, $4, $5);",
4✔
70
            )
4✔
71
            .await?;
×
72

73
        tx.execute(
4✔
74
            &stmt,
4✔
75
            &[
4✔
76
                &user.id,
4✔
77
                &user.email,
4✔
78
                &user.password_hash,
4✔
79
                &user.real_name,
4✔
80
                &user.active,
4✔
81
            ],
4✔
82
        )
4✔
83
        .await?;
×
84

85
        let stmt = tx
4✔
86
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
4✔
87
            .await?;
×
88
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
4✔
89

90
        let stmt = tx
4✔
91
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
4✔
92
            .await?;
×
93
        tx.execute(&stmt, &[&user.id, &Role::user_role_id()])
4✔
94
            .await?;
×
95

96
        tx.commit().await?;
4✔
97

98
        Ok(user.id)
4✔
99
    }
8✔
100

101
    async fn anonymous(&self) -> Result<UserSession> {
16✔
102
        let mut conn = self.conn_pool.get().await?;
16✔
103

104
        let tx = conn.build_transaction().start().await?;
16✔
105

106
        let user_id = UserId::new();
16✔
107

108
        let stmt = tx
16✔
109
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
16✔
110
            .await?;
16✔
111
        tx.execute(&stmt, &[&user_id, &"anonymous_user"]).await?;
16✔
112

113
        let stmt = tx
16✔
114
            .prepare("INSERT INTO users (id, active) VALUES ($1, TRUE);")
16✔
115
            .await?;
16✔
116

117
        tx.execute(&stmt, &[&user_id]).await?;
16✔
118

119
        let stmt = tx
16✔
120
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
16✔
121
            .await?;
16✔
122
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
16✔
123

124
        let stmt = tx
16✔
125
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
16✔
126
            .await?;
16✔
127
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
16✔
128
            .await?;
16✔
129

130
        let session_id = SessionId::new();
16✔
131
        let stmt = tx
16✔
132
            .prepare(
16✔
133
                "
16✔
134
                INSERT INTO sessions (id, user_id, created, valid_until)
16✔
135
                VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3)) 
16✔
136
                RETURNING created, valid_until;",
16✔
137
            )
16✔
138
            .await?;
16✔
139

140
        // TODO: load from config
141
        let session_duration = chrono::Duration::days(30);
16✔
142
        let row = tx
16✔
143
            .query_one(
16✔
144
                &stmt,
16✔
145
                &[
16✔
146
                    &session_id,
16✔
147
                    &user_id,
16✔
148
                    &(session_duration.num_seconds() as f64),
16✔
149
                ],
16✔
150
            )
16✔
151
            .await?;
16✔
152

153
        tx.commit().await?;
16✔
154

155
        Ok(UserSession {
16✔
156
            id: session_id,
16✔
157
            user: UserInfo {
16✔
158
                id: user_id,
16✔
159
                email: None,
16✔
160
                real_name: None,
16✔
161
            },
16✔
162
            created: row.get(0),
16✔
163
            valid_until: row.get(1),
16✔
164
            project: None,
16✔
165
            view: None,
16✔
166
            roles: vec![user_id.into(), Role::anonymous_role_id()],
16✔
167
        })
16✔
168
    }
32✔
169

170
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
4✔
171
        let conn = self.conn_pool.get().await?;
4✔
172
        let stmt = conn
4✔
173
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
4✔
174
            .await?;
1✔
175

176
        let row = conn
4✔
177
            .query_one(&stmt, &[&user_credentials.email])
4✔
178
            .await
1✔
179
            .map_err(|_error| error::Error::LoginFailed)?;
4✔
180

181
        let user_id = UserId(row.get(0));
4✔
182
        let password_hash = row.get(1);
4✔
183
        let email = row.get(2);
4✔
184
        let real_name = row.get(3);
4✔
185

4✔
186
        if bcrypt::verify(user_credentials.password, password_hash) {
4✔
187
            let session_id = SessionId::new();
4✔
188
            let stmt = conn
4✔
189
                .prepare(
4✔
190
                    "
4✔
191
                INSERT INTO sessions (id, user_id, created, valid_until)
4✔
192
                VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3)) 
4✔
193
                RETURNING created, valid_until;",
4✔
194
                )
4✔
195
                .await?;
3✔
196

197
            // TODO: load from config
198
            let session_duration = chrono::Duration::days(30);
4✔
199
            let row = conn
4✔
200
                .query_one(
4✔
201
                    &stmt,
4✔
202
                    &[
4✔
203
                        &session_id,
4✔
204
                        &user_id,
4✔
205
                        &(session_duration.num_seconds() as f64),
4✔
206
                    ],
4✔
207
                )
4✔
208
                .await?;
5✔
209

210
            let stmt = conn
4✔
211
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
4✔
212
                .await?;
1✔
213

214
            let rows = conn
4✔
215
                .query(&stmt, &[&user_id])
4✔
216
                .await
1✔
217
                .map_err(|_error| error::Error::LoginFailed)?;
4✔
218

219
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
8✔
220

4✔
221
            Ok(UserSession {
4✔
222
                id: session_id,
4✔
223
                user: UserInfo {
4✔
224
                    id: user_id,
4✔
225
                    email,
4✔
226
                    real_name,
4✔
227
                },
4✔
228
                created: row.get(0),
4✔
229
                valid_until: row.get(1),
4✔
230
                project: None,
4✔
231
                view: None,
4✔
232
                roles,
4✔
233
            })
4✔
234
        } else {
235
            Err(error::Error::LoginFailed)
×
236
        }
237
    }
8✔
238

239
    async fn login_external(
3✔
240
        &self,
3✔
241
        user: ExternalUserClaims,
3✔
242
        duration: Duration,
3✔
243
    ) -> Result<UserSession> {
3✔
244
        let mut conn = self.conn_pool.get().await?;
3✔
245
        let stmt = conn
3✔
246
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
3✔
247
            .await?;
3✔
248

249
        let row = conn
3✔
250
            .query_opt(&stmt, &[&user.external_id.to_string()])
3✔
251
            .await
3✔
252
            .map_err(|_error| error::Error::LoginFailed)?;
3✔
253

254
        let user_id = match row {
3✔
255
            Some(row) => UserId(row.get(0)),
2✔
256
            None => {
257
                let tx = conn.build_transaction().start().await?;
1✔
258

259
                let user_id = UserId::new();
1✔
260

261
                let stmt = tx
1✔
262
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
1✔
263
                    .await?;
1✔
264
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
1✔
265

266
                //TODO: Inconsistent to hashmap implementation, where an external user is not part of the user database.
267
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
268
                let stmt = tx
1✔
269
                    .prepare("INSERT INTO users (id, active) VALUES ($1, TRUE);")
1✔
270
                    .await?;
1✔
271
                tx.execute(&stmt, &[&user_id]).await?;
1✔
272

273
                let stmt = tx
1✔
274
                    .prepare(
1✔
275
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
1✔
276
                    )
1✔
277
                    .await?;
1✔
278

279
                tx.execute(
1✔
280
                    &stmt,
1✔
281
                    &[
1✔
282
                        &user_id,
1✔
283
                        &user.external_id.to_string(),
1✔
284
                        &user.email,
1✔
285
                        &user.real_name,
1✔
286
                        &true,
1✔
287
                    ],
1✔
288
                )
1✔
289
                .await?;
1✔
290

291
                let stmt = tx
1✔
292
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
1✔
293
                    .await?;
1✔
294
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
1✔
295

296
                let stmt = tx
1✔
297
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
1✔
298
                    .await?;
1✔
299
                tx.execute(&stmt, &[&user_id, &Role::user_role_id()])
1✔
300
                    .await?;
1✔
301

302
                tx.commit().await?;
1✔
303

304
                user_id
1✔
305
            }
306
        };
307

308
        let session_id = SessionId::new();
3✔
309
        let stmt = conn
3✔
310
            .prepare(
3✔
311
                "
3✔
312
            INSERT INTO sessions (id, user_id, created, valid_until)
3✔
313
            VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
3✔
314
            RETURNING created, valid_until;",
3✔
315
            )
3✔
316
            .await?; //TODO: Check documentation if inconsistent to hashmap implementation - would happen if CURRENT_TIMESTAMP is called twice in postgres for a single query. Worked in tests.
3✔
317

318
        let row = conn
3✔
319
            .query_one(
3✔
320
                &stmt,
3✔
321
                &[&session_id, &user_id, &(duration.num_seconds() as f64)],
3✔
322
            )
3✔
323
            .await?;
3✔
324

325
        let stmt = conn
3✔
326
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
3✔
327
            .await?;
3✔
328

329
        let rows = conn
3✔
330
            .query(&stmt, &[&user_id])
3✔
331
            .await
3✔
332
            .map_err(|_error| error::Error::LoginFailed)?;
3✔
333

334
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
6✔
335

3✔
336
        Ok(UserSession {
3✔
337
            id: session_id,
3✔
338
            user: UserInfo {
3✔
339
                id: user_id,
3✔
340
                email: Some(user.email.clone()),
3✔
341
                real_name: Some(user.real_name.clone()),
3✔
342
            },
3✔
343
            created: row.get(0),
3✔
344
            valid_until: row.get(1),
3✔
345
            project: None,
3✔
346
            view: None,
3✔
347
            roles,
3✔
348
        })
3✔
349
    }
6✔
350

351
    async fn logout(&self, session: SessionId) -> Result<()> {
4✔
352
        let conn = self.conn_pool.get().await?;
4✔
353
        let stmt = conn
4✔
354
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
4✔
355
            .await?;
3✔
356

357
        conn.execute(&stmt, &[&session])
4✔
358
            .await
4✔
359
            .map_err(|_error| error::Error::LogoutFailed)?;
4✔
360
        Ok(())
4✔
361
    }
8✔
362

363
    async fn session(&self, session: SessionId) -> Result<UserSession> {
13✔
364
        let conn = self.conn_pool.get().await?;
13✔
365
        let stmt = conn
13✔
366
            .prepare(
13✔
367
                "
13✔
368
            SELECT 
13✔
369
                u.id,   
13✔
370
                u.email,
13✔
371
                u.real_name,             
13✔
372
                s.created, 
13✔
373
                s.valid_until, 
13✔
374
                s.project_id,
13✔
375
                s.view           
13✔
376
            FROM sessions s JOIN users u ON (s.user_id = u.id)
13✔
377
            WHERE s.id = $1 AND CURRENT_TIMESTAMP < s.valid_until;",
13✔
378
            )
13✔
379
            .await?;
71✔
380

381
        let row = conn
13✔
382
            .query_one(&stmt, &[&session])
13✔
383
            .await
11✔
384
            .map_err(|_error| error::Error::InvalidSession)?;
13✔
385

386
        Ok(UserSession {
9✔
387
            id: session,
9✔
388
            user: UserInfo {
9✔
389
                id: row.get(0),
9✔
390
                email: row.get(1),
9✔
391
                real_name: row.get(2),
9✔
392
            },
9✔
393
            created: row.get(3),
9✔
394
            valid_until: row.get(4),
9✔
395
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
9✔
396
            view: row.get(6),
9✔
397
            roles: vec![], // TODO
9✔
398
        })
9✔
399
    }
26✔
400

401
    async fn set_session_project(&self, session: &UserSession, project: ProjectId) -> Result<()> {
2✔
402
        let conn = self.conn_pool.get().await?;
2✔
403
        PostgresContext::check_user_project_permission(
2✔
404
            &conn,
2✔
405
            session.user.id,
2✔
406
            project,
2✔
407
            &[
2✔
408
                ProjectPermission::Read,
2✔
409
                ProjectPermission::Write,
2✔
410
                ProjectPermission::Owner,
2✔
411
            ],
2✔
412
        )
2✔
413
        .await?;
3✔
414

415
        let conn = self.conn_pool.get().await?;
2✔
416
        let stmt = conn
2✔
417
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
2✔
418
            .await?;
2✔
419

420
        conn.execute(&stmt, &[&project, &session.id]).await?;
2✔
421

422
        Ok(())
2✔
423
    }
4✔
424

425
    async fn set_session_view(&self, session: &UserSession, view: STRectangle) -> Result<()> {
2✔
426
        let conn = self.conn_pool.get().await?;
2✔
427
        let stmt = conn
2✔
428
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
2✔
429
            .await?;
2✔
430

431
        conn.execute(&stmt, &[&view, &session.id]).await?;
2✔
432

433
        Ok(())
2✔
434
    }
4✔
435

436
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
2✔
437
        let conn = self.conn_pool.get().await?;
2✔
438
        let stmt = conn
2✔
439
            .prepare("UPDATE users SET quota_used = quota_used + $1 WHERE id = $2;")
2✔
440
            .await?;
2✔
441

442
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
2✔
443

444
        Ok(())
2✔
445
    }
4✔
446

447
    async fn quota_used(&self, session: &UserSession) -> Result<u64> {
2✔
448
        let conn = self.conn_pool.get().await?;
2✔
449
        let stmt = conn
2✔
450
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
2✔
451
            .await?;
2✔
452

453
        let row = conn
2✔
454
            .query_one(&stmt, &[&session.user.id])
2✔
455
            .await
2✔
456
            .map_err(|_error| error::Error::InvalidSession)?;
2✔
457

458
        Ok(row.get::<usize, i64>(0) as u64)
2✔
459
    }
4✔
460

461
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
×
462
        let conn = self.conn_pool.get().await?;
×
463
        let stmt = conn
×
464
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
×
465
            .await?;
×
466

467
        let row = conn
×
468
            .query_one(&stmt, &[&user])
×
469
            .await
×
470
            .map_err(|_error| error::Error::InvalidSession)?;
×
471

472
        Ok(row.get::<usize, i64>(0) as u64)
×
473
    }
×
474
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc