• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 4205117943

pending completion
4205117943

push

github

GitHub
Merge #737

473 of 473 new or added lines in 6 files covered. (100.0%)

90778 of 103195 relevant lines covered (87.97%)

75809.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.52
/services/src/pro/users/postgres_userdb.rs
1
use crate::contexts::SessionId;
2
use crate::error::Result;
3
use crate::pro::datasets::Role;
4
use crate::pro::projects::ProjectPermission;
5
use crate::pro::users::oidc::ExternalUserClaims;
6
use crate::pro::users::{
7
    User, UserCredentials, UserDb, UserId, UserInfo, UserRegistration, UserSession,
8
};
9
use crate::projects::{ProjectId, STRectangle};
10
use crate::util::user_input::Validated;
11
use crate::util::Identifier;
12
use crate::{error, pro::contexts::PostgresContext};
13
use async_trait::async_trait;
14
use bb8_postgres::PostgresConnectionManager;
15
use bb8_postgres::{
16
    bb8::Pool, tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect,
17
    tokio_postgres::Socket,
18
};
19
use geoengine_datatypes::primitives::Duration;
20
use pwhash::bcrypt;
21
use uuid::Uuid;
22

23
pub struct PostgresUserDb<Tls>
24
where
25
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
26
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
27
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
28
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
29
{
30
    conn_pool: Pool<PostgresConnectionManager<Tls>>,
31
}
32

33
impl<Tls> PostgresUserDb<Tls>
34
where
35
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
36
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
37
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
38
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
39
{
40
    pub fn new(conn_pool: Pool<PostgresConnectionManager<Tls>>) -> Self {
22✔
41
        Self { conn_pool }
22✔
42
    }
22✔
43
}
44

45
#[async_trait]
46
impl<Tls> UserDb for PostgresUserDb<Tls>
47
where
48
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
49
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
50
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
51
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
52
{
53
    // TODO: clean up expired sessions?
54

55
    async fn register(&self, user: Validated<UserRegistration>) -> Result<UserId> {
6✔
56
        let mut conn = self.conn_pool.get().await?;
6✔
57

58
        let tx = conn.build_transaction().start().await?;
6✔
59

60
        let user = User::from(user.user_input);
6✔
61

62
        let stmt = tx
6✔
63
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
6✔
64
            .await?;
4✔
65
        tx.execute(&stmt, &[&user.id, &user.email]).await?;
6✔
66

67
        let stmt = tx
6✔
68
            .prepare(
6✔
69
                "INSERT INTO users (id, email, password_hash, real_name, quota_available, active) VALUES ($1, $2, $3, $4, $5, $6);",
6✔
70
            )
6✔
71
            .await?;
4✔
72

73
        let quota_available =
6✔
74
            crate::util::config::get_config_element::<crate::pro::util::config::User>()?
6✔
75
                .default_available_quota;
76

77
        tx.execute(
6✔
78
            &stmt,
6✔
79
            &[
6✔
80
                &user.id,
6✔
81
                &user.email,
6✔
82
                &user.password_hash,
6✔
83
                &user.real_name,
6✔
84
                &quota_available,
6✔
85
                &user.active,
6✔
86
            ],
6✔
87
        )
6✔
88
        .await?;
3✔
89

90
        let stmt = tx
6✔
91
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
92
            .await?;
4✔
93
        tx.execute(&stmt, &[&user.id, &user.id]).await?;
6✔
94

95
        let stmt = tx
6✔
96
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
6✔
97
            .await?;
3✔
98
        tx.execute(&stmt, &[&user.id, &Role::user_role_id()])
6✔
99
            .await?;
3✔
100

101
        tx.commit().await?;
7✔
102

103
        Ok(user.id)
6✔
104
    }
12✔
105

106
    async fn anonymous(&self) -> Result<UserSession> {
17✔
107
        let mut conn = self.conn_pool.get().await?;
17✔
108

109
        let tx = conn.build_transaction().start().await?;
17✔
110

111
        let user_id = UserId::new();
17✔
112

113
        let stmt = tx
17✔
114
            .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
17✔
115
            .await?;
17✔
116
        tx.execute(&stmt, &[&user_id, &"anonymous_user"]).await?;
17✔
117

118
        let quota_available =
17✔
119
            crate::util::config::get_config_element::<crate::pro::util::config::User>()?
17✔
120
                .default_available_quota;
121

122
        let stmt = tx
17✔
123
            .prepare("INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);")
17✔
124
            .await?;
17✔
125

126
        tx.execute(&stmt, &[&user_id, &quota_available]).await?;
17✔
127

128
        let stmt = tx
17✔
129
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
17✔
130
            .await?;
17✔
131
        tx.execute(&stmt, &[&user_id, &user_id]).await?;
17✔
132

133
        let stmt = tx
17✔
134
            .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
17✔
135
            .await?;
17✔
136
        tx.execute(&stmt, &[&user_id, &Role::anonymous_role_id()])
17✔
137
            .await?;
17✔
138

139
        let session_id = SessionId::new();
17✔
140
        let stmt = tx
17✔
141
            .prepare(
17✔
142
                "
17✔
143
                INSERT INTO sessions (id, user_id, created, valid_until)
17✔
144
                VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3)) 
17✔
145
                RETURNING created, valid_until;",
17✔
146
            )
17✔
147
            .await?;
17✔
148

149
        // TODO: load from config
150
        let session_duration = chrono::Duration::days(30);
17✔
151
        let row = tx
17✔
152
            .query_one(
17✔
153
                &stmt,
17✔
154
                &[
17✔
155
                    &session_id,
17✔
156
                    &user_id,
17✔
157
                    &(session_duration.num_seconds() as f64),
17✔
158
                ],
17✔
159
            )
17✔
160
            .await?;
17✔
161

162
        tx.commit().await?;
17✔
163

164
        Ok(UserSession {
17✔
165
            id: session_id,
17✔
166
            user: UserInfo {
17✔
167
                id: user_id,
17✔
168
                email: None,
17✔
169
                real_name: None,
17✔
170
            },
17✔
171
            created: row.get(0),
17✔
172
            valid_until: row.get(1),
17✔
173
            project: None,
17✔
174
            view: None,
17✔
175
            roles: vec![user_id.into(), Role::anonymous_role_id()],
17✔
176
        })
17✔
177
    }
34✔
178

179
    async fn login(&self, user_credentials: UserCredentials) -> Result<UserSession> {
6✔
180
        let conn = self.conn_pool.get().await?;
6✔
181
        let stmt = conn
6✔
182
            .prepare("SELECT id, password_hash, email, real_name FROM users WHERE email = $1;")
6✔
183
            .await?;
3✔
184

185
        let row = conn
6✔
186
            .query_one(&stmt, &[&user_credentials.email])
6✔
187
            .await
3✔
188
            .map_err(|_error| error::Error::LoginFailed)?;
6✔
189

190
        let user_id = UserId(row.get(0));
6✔
191
        let password_hash = row.get(1);
6✔
192
        let email = row.get(2);
6✔
193
        let real_name = row.get(3);
6✔
194

6✔
195
        if bcrypt::verify(user_credentials.password, password_hash) {
6✔
196
            let session_id = SessionId::new();
6✔
197
            let stmt = conn
6✔
198
                .prepare(
6✔
199
                    "
6✔
200
                INSERT INTO sessions (id, user_id, created, valid_until)
6✔
201
                VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3)) 
6✔
202
                RETURNING created, valid_until;",
6✔
203
                )
6✔
204
                .await?;
5✔
205

206
            // TODO: load from config
207
            let session_duration = chrono::Duration::days(30);
6✔
208
            let row = conn
6✔
209
                .query_one(
6✔
210
                    &stmt,
6✔
211
                    &[
6✔
212
                        &session_id,
6✔
213
                        &user_id,
6✔
214
                        &(session_duration.num_seconds() as f64),
6✔
215
                    ],
6✔
216
                )
6✔
217
                .await?;
6✔
218

219
            let stmt = conn
6✔
220
                .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
6✔
221
                .await?;
4✔
222

223
            let rows = conn
6✔
224
                .query(&stmt, &[&user_id])
6✔
225
                .await
3✔
226
                .map_err(|_error| error::Error::LoginFailed)?;
6✔
227

228
            let roles = rows.into_iter().map(|row| row.get(0)).collect();
12✔
229

6✔
230
            Ok(UserSession {
6✔
231
                id: session_id,
6✔
232
                user: UserInfo {
6✔
233
                    id: user_id,
6✔
234
                    email,
6✔
235
                    real_name,
6✔
236
                },
6✔
237
                created: row.get(0),
6✔
238
                valid_until: row.get(1),
6✔
239
                project: None,
6✔
240
                view: None,
6✔
241
                roles,
6✔
242
            })
6✔
243
        } else {
244
            Err(error::Error::LoginFailed)
×
245
        }
246
    }
12✔
247

248
    async fn login_external(
3✔
249
        &self,
3✔
250
        user: ExternalUserClaims,
3✔
251
        duration: Duration,
3✔
252
    ) -> Result<UserSession> {
3✔
253
        let mut conn = self.conn_pool.get().await?;
3✔
254
        let stmt = conn
3✔
255
            .prepare("SELECT id, external_id, email, real_name FROM external_users WHERE external_id = $1;")
3✔
256
            .await?;
3✔
257

258
        let row = conn
3✔
259
            .query_opt(&stmt, &[&user.external_id.to_string()])
3✔
260
            .await
3✔
261
            .map_err(|_error| error::Error::LoginFailed)?;
3✔
262

263
        let user_id = match row {
3✔
264
            Some(row) => UserId(row.get(0)),
2✔
265
            None => {
266
                let tx = conn.build_transaction().start().await?;
1✔
267

268
                let user_id = UserId::new();
1✔
269

270
                let stmt = tx
1✔
271
                    .prepare("INSERT INTO roles (id, name) VALUES ($1, $2);")
1✔
272
                    .await?;
1✔
273
                tx.execute(&stmt, &[&user_id, &user.email]).await?;
1✔
274

275
                let quota_available =
1✔
276
                    crate::util::config::get_config_element::<crate::pro::util::config::User>()?
1✔
277
                        .default_available_quota;
278

279
                //TODO: Inconsistent to hashmap implementation, where an external user is not part of the user database.
280
                //TODO: A user might be able to login without external login using this (internal) id. Would be a problem with anonymous users as well.
281
                let stmt = tx
1✔
282
                    .prepare(
1✔
283
                        "INSERT INTO users (id, quota_available, active) VALUES ($1, $2, TRUE);",
1✔
284
                    )
1✔
285
                    .await?;
1✔
286
                tx.execute(&stmt, &[&user_id, &quota_available]).await?;
1✔
287

288
                let stmt = tx
1✔
289
                    .prepare(
1✔
290
                        "INSERT INTO external_users (id, external_id, email, real_name, active) VALUES ($1, $2, $3, $4, $5);",
1✔
291
                    )
1✔
292
                    .await?;
1✔
293

294
                tx.execute(
1✔
295
                    &stmt,
1✔
296
                    &[
1✔
297
                        &user_id,
1✔
298
                        &user.external_id.to_string(),
1✔
299
                        &user.email,
1✔
300
                        &user.real_name,
1✔
301
                        &true,
1✔
302
                    ],
1✔
303
                )
1✔
304
                .await?;
1✔
305

306
                let stmt = tx
1✔
307
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
1✔
308
                    .await?;
1✔
309
                tx.execute(&stmt, &[&user_id, &user_id]).await?;
1✔
310

311
                let stmt = tx
1✔
312
                    .prepare("INSERT INTO user_roles (user_id, role_id) VALUES ($1, $2);")
1✔
313
                    .await?;
1✔
314
                tx.execute(&stmt, &[&user_id, &Role::user_role_id()])
1✔
315
                    .await?;
1✔
316

317
                tx.commit().await?;
1✔
318

319
                user_id
1✔
320
            }
321
        };
322

323
        let session_id = SessionId::new();
3✔
324
        let stmt = conn
3✔
325
            .prepare(
3✔
326
                "
3✔
327
            INSERT INTO sessions (id, user_id, created, valid_until)
3✔
328
            VALUES ($1, $2, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP + make_interval(secs:=$3))
3✔
329
            RETURNING created, valid_until;",
3✔
330
            )
3✔
331
            .await?; //TODO: Check documentation if inconsistent to hashmap implementation - would happen if CURRENT_TIMESTAMP is called twice in postgres for a single query. Worked in tests.
3✔
332

333
        let row = conn
3✔
334
            .query_one(
3✔
335
                &stmt,
3✔
336
                &[&session_id, &user_id, &(duration.num_seconds() as f64)],
3✔
337
            )
3✔
338
            .await?;
3✔
339

340
        let stmt = conn
3✔
341
            .prepare("SELECT role_id FROM user_roles WHERE user_id = $1;")
3✔
342
            .await?;
3✔
343

344
        let rows = conn
3✔
345
            .query(&stmt, &[&user_id])
3✔
346
            .await
3✔
347
            .map_err(|_error| error::Error::LoginFailed)?;
3✔
348

349
        let roles = rows.into_iter().map(|row| row.get(0)).collect();
6✔
350

3✔
351
        Ok(UserSession {
3✔
352
            id: session_id,
3✔
353
            user: UserInfo {
3✔
354
                id: user_id,
3✔
355
                email: Some(user.email.clone()),
3✔
356
                real_name: Some(user.real_name.clone()),
3✔
357
            },
3✔
358
            created: row.get(0),
3✔
359
            valid_until: row.get(1),
3✔
360
            project: None,
3✔
361
            view: None,
3✔
362
            roles,
3✔
363
        })
3✔
364
    }
6✔
365

366
    async fn logout(&self, session: SessionId) -> Result<()> {
4✔
367
        let conn = self.conn_pool.get().await?;
4✔
368
        let stmt = conn
4✔
369
            .prepare("DELETE FROM sessions WHERE id = $1;") // TODO: only invalidate session?
4✔
370
            .await?;
4✔
371

372
        conn.execute(&stmt, &[&session])
4✔
373
            .await
4✔
374
            .map_err(|_error| error::Error::LogoutFailed)?;
4✔
375
        Ok(())
4✔
376
    }
8✔
377

378
    async fn session(&self, session: SessionId) -> Result<UserSession> {
13✔
379
        let conn = self.conn_pool.get().await?;
13✔
380
        let stmt = conn
13✔
381
            .prepare(
13✔
382
                "
13✔
383
            SELECT 
13✔
384
                u.id,   
13✔
385
                u.email,
13✔
386
                u.real_name,             
13✔
387
                s.created, 
13✔
388
                s.valid_until, 
13✔
389
                s.project_id,
13✔
390
                s.view           
13✔
391
            FROM sessions s JOIN users u ON (s.user_id = u.id)
13✔
392
            WHERE s.id = $1 AND CURRENT_TIMESTAMP < s.valid_until;",
13✔
393
            )
13✔
394
            .await?;
73✔
395

396
        let row = conn
13✔
397
            .query_one(&stmt, &[&session])
13✔
398
            .await
13✔
399
            .map_err(|_error| error::Error::InvalidSession)?;
13✔
400

401
        Ok(UserSession {
9✔
402
            id: session,
9✔
403
            user: UserInfo {
9✔
404
                id: row.get(0),
9✔
405
                email: row.get(1),
9✔
406
                real_name: row.get(2),
9✔
407
            },
9✔
408
            created: row.get(3),
9✔
409
            valid_until: row.get(4),
9✔
410
            project: row.get::<usize, Option<Uuid>>(5).map(ProjectId),
9✔
411
            view: row.get(6),
9✔
412
            roles: vec![], // TODO
9✔
413
        })
9✔
414
    }
26✔
415

416
    async fn set_session_project(&self, session: &UserSession, project: ProjectId) -> Result<()> {
2✔
417
        let conn = self.conn_pool.get().await?;
2✔
418
        PostgresContext::check_user_project_permission(
2✔
419
            &conn,
2✔
420
            session.user.id,
2✔
421
            project,
2✔
422
            &[
2✔
423
                ProjectPermission::Read,
2✔
424
                ProjectPermission::Write,
2✔
425
                ProjectPermission::Owner,
2✔
426
            ],
2✔
427
        )
2✔
428
        .await?;
6✔
429

430
        let conn = self.conn_pool.get().await?;
2✔
431
        let stmt = conn
2✔
432
            .prepare("UPDATE sessions SET project_id = $1 WHERE id = $2;")
2✔
433
            .await?;
2✔
434

435
        conn.execute(&stmt, &[&project, &session.id]).await?;
2✔
436

437
        Ok(())
2✔
438
    }
4✔
439

440
    async fn set_session_view(&self, session: &UserSession, view: STRectangle) -> Result<()> {
2✔
441
        let conn = self.conn_pool.get().await?;
2✔
442
        let stmt = conn
2✔
443
            .prepare("UPDATE sessions SET view = $1 WHERE id = $2;")
2✔
444
            .await?;
2✔
445

446
        conn.execute(&stmt, &[&view, &session.id]).await?;
2✔
447

448
        Ok(())
2✔
449
    }
4✔
450

451
    async fn increment_quota_used(&self, user: &UserId, quota_used: u64) -> Result<()> {
4✔
452
        let conn = self.conn_pool.get().await?;
4✔
453
        let stmt = conn
4✔
454
            .prepare(
4✔
455
                "
4✔
456
            UPDATE users SET 
4✔
457
                quota_available = quota_available - $1, 
4✔
458
                quota_used = quota_used + $1
4✔
459
            WHERE id = $2;",
4✔
460
            )
4✔
461
            .await?;
4✔
462

463
        conn.execute(&stmt, &[&(quota_used as i64), &user]).await?;
4✔
464

465
        Ok(())
4✔
466
    }
8✔
467

468
    async fn quota_used(&self, session: &UserSession) -> Result<u64> {
2✔
469
        let conn = self.conn_pool.get().await?;
2✔
470
        let stmt = conn
2✔
471
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
2✔
472
            .await?;
1✔
473

474
        let row = conn
2✔
475
            .query_one(&stmt, &[&session.user.id])
2✔
476
            .await
2✔
477
            .map_err(|_error| error::Error::InvalidSession)?;
2✔
478

479
        Ok(row.get::<usize, i64>(0) as u64)
2✔
480
    }
4✔
481

482
    async fn quota_used_by_user(&self, user: &UserId) -> Result<u64> {
×
483
        let conn = self.conn_pool.get().await?;
×
484
        let stmt = conn
×
485
            .prepare("SELECT quota_used FROM users WHERE id = $1;")
×
486
            .await?;
×
487

488
        let row = conn
×
489
            .query_one(&stmt, &[&user])
×
490
            .await
×
491
            .map_err(|_error| error::Error::InvalidSession)?;
×
492

493
        Ok(row.get::<usize, i64>(0) as u64)
×
494
    }
×
495

496
    async fn quota_available(&self, session: &UserSession) -> Result<i64> {
4✔
497
        let conn = self.conn_pool.get().await?;
4✔
498
        let stmt = conn
4✔
499
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
4✔
500
            .await?;
2✔
501

502
        let row = conn
4✔
503
            .query_one(&stmt, &[&session.user.id])
4✔
504
            .await
2✔
505
            .map_err(|_error| error::Error::InvalidSession)?;
4✔
506

507
        Ok(row.get::<usize, i64>(0))
4✔
508
    }
8✔
509

510
    async fn quota_available_by_user(&self, user: &UserId) -> Result<i64> {
2✔
511
        let conn = self.conn_pool.get().await?;
2✔
512
        let stmt = conn
2✔
513
            .prepare("SELECT quota_available FROM users WHERE id = $1;")
2✔
514
            .await?;
×
515

516
        let row = conn
2✔
517
            .query_one(&stmt, &[&user])
2✔
518
            .await
×
519
            .map_err(|_error| error::Error::InvalidSession)?;
2✔
520

521
        Ok(row.get::<usize, i64>(0))
2✔
522
    }
4✔
523

524
    async fn update_quota_available_by_user(
2✔
525
        &self,
2✔
526
        user: &UserId,
2✔
527
        new_available_quota: i64,
2✔
528
    ) -> Result<()> {
2✔
529
        let conn = self.conn_pool.get().await?;
2✔
530
        let stmt = conn
2✔
531
            .prepare(
2✔
532
                "
2✔
533
            UPDATE users SET 
2✔
534
                quota_available = $1
2✔
535
            WHERE id = $2;",
2✔
536
            )
2✔
537
            .await?;
1✔
538

539
        conn.execute(&stmt, &[&(new_available_quota), &user])
2✔
540
            .await?;
3✔
541

542
        Ok(())
2✔
543
    }
4✔
544
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc