• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12768016656

14 Jan 2025 12:26PM UTC coverage: 90.64% (+0.06%) from 90.576%
12768016656

push

github

web-flow
Merge pull request #1006 from geo-engine/migrate-pro-api

Migrate-pro-api

1106 of 1152 new or added lines in 24 files covered. (96.01%)

248 existing lines in 13 files now uncovered.

133501 of 147287 relevant lines covered (90.64%)

54652.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.01
/services/src/projects/postgres_projectdb.rs
1
use crate::contexts::PostgresDb;
2
use crate::projects::error::ProjectNotFoundProjectDbError;
3
use crate::projects::Plot;
4
use crate::projects::ProjectLayer;
5
use crate::projects::{
6
    CreateProject, Project, ProjectDb, ProjectId, ProjectListOptions, ProjectListing,
7
    ProjectVersion, ProjectVersionId, UpdateProject,
8
};
9

10
use super::error::Bb8ProjectDbError;
11
use super::error::PostgresProjectDbError;
12
use super::error::ProjectDbError;
13
use super::LoadVersion;
14
use crate::util::Identifier;
15
use crate::workflows::workflow::WorkflowId;
16
use async_trait::async_trait;
17
use bb8_postgres::tokio_postgres::Transaction;
18
use bb8_postgres::{
19
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
20
};
21
use snafu::ensure;
22
use snafu::ResultExt;
23
use tokio_postgres::Row;
24

25
pub async fn list_plots(
9✔
26
    trans: &Transaction<'_>,
9✔
27
    project_version_id: &ProjectVersionId,
9✔
28
) -> Result<Vec<String>, ProjectDbError> {
9✔
29
    let stmt: tokio_postgres::Statement = trans
9✔
30
        .prepare(
9✔
31
            "
9✔
32
                    SELECT name
9✔
33
                    FROM project_version_plots
9✔
34
                    WHERE project_version_id = $1;
9✔
35
                ",
9✔
36
        )
9✔
37
        .await
9✔
38
        .context(PostgresProjectDbError)?;
9✔
39

40
    let plot_rows = trans
9✔
41
        .query(&stmt, &[project_version_id])
9✔
42
        .await
9✔
43
        .context(PostgresProjectDbError)?;
9✔
44
    let plot_names = plot_rows.iter().map(|row| row.get(0)).collect();
9✔
45

9✔
46
    Ok(plot_names)
9✔
47
}
9✔
48

49
pub async fn load_plots(
45✔
50
    trans: &Transaction<'_>,
45✔
51
    project_version_id: &ProjectVersionId,
45✔
52
) -> Result<Vec<Plot>, ProjectDbError> {
45✔
53
    let stmt = trans
45✔
54
        .prepare(
45✔
55
            "
45✔
56
                SELECT  
45✔
57
                    name, workflow_id
45✔
58
                FROM project_version_plots
45✔
59
                WHERE project_version_id = $1
45✔
60
                ORDER BY plot_index ASC
45✔
61
                ",
45✔
62
        )
45✔
63
        .await
45✔
64
        .context(PostgresProjectDbError)?;
45✔
65

66
    let rows = trans
45✔
67
        .query(&stmt, &[project_version_id])
45✔
68
        .await
45✔
69
        .context(PostgresProjectDbError)?;
45✔
70

71
    let plots = rows
45✔
72
        .into_iter()
45✔
73
        .map(|row| Plot {
45✔
74
            workflow: WorkflowId(row.get(1)),
17✔
75
            name: row.get(0),
17✔
76
        })
45✔
77
        .collect();
45✔
78

45✔
79
    Ok(plots)
45✔
80
}
45✔
81

82
pub async fn update_plots(
30✔
83
    trans: &Transaction<'_>,
30✔
84
    project_id: &ProjectId,
30✔
85
    project_version_id: &ProjectVersionId,
30✔
86
    plots: &[Plot],
30✔
87
) -> Result<(), ProjectDbError> {
30✔
88
    for (idx, plot) in plots.iter().enumerate() {
30✔
89
        let stmt = trans
13✔
90
            .prepare(
13✔
91
                "
13✔
92
                    INSERT INTO project_version_plots (
13✔
93
                        project_id,
13✔
94
                        project_version_id,
13✔
95
                        plot_index,
13✔
96
                        name,
13✔
97
                        workflow_id)
13✔
98
                    VALUES ($1, $2, $3, $4, $5);
13✔
99
                    ",
13✔
100
            )
13✔
101
            .await
13✔
102
            .context(PostgresProjectDbError)?;
13✔
103

104
        trans
13✔
105
            .execute(
13✔
106
                &stmt,
13✔
107
                &[
13✔
108
                    project_id,
13✔
109
                    project_version_id,
13✔
110
                    &(idx as i32),
13✔
111
                    &plot.name,
13✔
112
                    &plot.workflow,
13✔
113
                ],
13✔
114
            )
13✔
115
            .await
13✔
116
            .context(PostgresProjectDbError)?;
13✔
117
    }
118

119
    Ok(())
30✔
120
}
30✔
121

122
pub async fn project_listings_from_rows(
6✔
123
    tx: &tokio_postgres::Transaction<'_>,
6✔
124
    project_rows: Vec<Row>,
6✔
125
) -> Result<Vec<ProjectListing>, ProjectDbError> {
6✔
126
    let mut project_listings = vec![];
6✔
127
    for project_row in project_rows {
15✔
128
        let project_version_id = ProjectVersionId(project_row.get(0));
9✔
129
        let project_id = ProjectId(project_row.get(1));
9✔
130
        let name = project_row.get(2);
9✔
131
        let description = project_row.get(3);
9✔
132
        let changed = project_row.get(4);
9✔
133

134
        let stmt = tx
9✔
135
            .prepare(
9✔
136
                "
9✔
137
                SELECT name
9✔
138
                FROM project_version_layers
9✔
139
                WHERE project_version_id = $1;",
9✔
140
            )
9✔
141
            .await
9✔
142
            .context(PostgresProjectDbError)?;
9✔
143

144
        let layer_rows = tx
9✔
145
            .query(&stmt, &[&project_version_id])
9✔
146
            .await
9✔
147
            .context(PostgresProjectDbError)?;
9✔
148
        let layer_names = layer_rows.iter().map(|row| row.get(0)).collect();
9✔
149

9✔
150
        project_listings.push(ProjectListing {
9✔
151
            id: project_id,
9✔
152
            name,
9✔
153
            description,
9✔
154
            layer_names,
9✔
155
            plot_names: list_plots(tx, &project_version_id).await?,
9✔
156
            changed,
9✔
157
        });
158
    }
159
    Ok(project_listings)
6✔
160
}
6✔
161

162
pub async fn insert_project(
61✔
163
    trans: &Transaction<'_>,
61✔
164
    project: &Project,
61✔
165
) -> Result<ProjectVersionId, ProjectDbError> {
61✔
166
    let stmt = trans
61✔
167
        .prepare("INSERT INTO projects (id) VALUES ($1);")
61✔
168
        .await
61✔
169
        .context(PostgresProjectDbError)?;
61✔
170

171
    trans
61✔
172
        .execute(&stmt, &[&project.id])
61✔
173
        .await
61✔
174
        .context(PostgresProjectDbError)?;
61✔
175

176
    let stmt = trans
61✔
177
        .prepare(
61✔
178
            "INSERT INTO project_versions (
61✔
179
                    id,
61✔
180
                    project_id,
61✔
181
                    name,
61✔
182
                    description,
61✔
183
                    bounds,
61✔
184
                    time_step,
61✔
185
                    changed)
61✔
186
                    VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP);",
61✔
187
        )
61✔
188
        .await
61✔
189
        .context(PostgresProjectDbError)?;
61✔
190

191
    let version_id = ProjectVersionId::new();
61✔
192

61✔
193
    trans
61✔
194
        .execute(
61✔
195
            &stmt,
61✔
196
            &[
61✔
197
                &version_id,
61✔
198
                &project.id,
61✔
199
                &project.name,
61✔
200
                &project.description,
61✔
201
                &project.bounds,
61✔
202
                &project.time_step,
61✔
203
            ],
61✔
204
        )
61✔
205
        .await
61✔
206
        .context(PostgresProjectDbError)?;
61✔
207

208
    Ok(version_id)
61✔
209
}
61✔
210

211
pub async fn update_project(
30✔
212
    trans: &Transaction<'_>,
30✔
213
    project: &Project,
30✔
214
    update: UpdateProject,
30✔
215
) -> Result<Project, ProjectDbError> {
30✔
216
    let project = project.update_project(update)?;
30✔
217

218
    let stmt = trans
30✔
219
        .prepare(
30✔
220
            "
30✔
221
            INSERT INTO project_versions (
30✔
222
                id,
30✔
223
                project_id,
30✔
224
                name,
30✔
225
                description,
30✔
226
                bounds,
30✔
227
                time_step,
30✔
228
                changed)
30✔
229
            VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP);",
30✔
230
        )
30✔
231
        .await
30✔
232
        .context(PostgresProjectDbError)?;
30✔
233

234
    trans
30✔
235
        .execute(
30✔
236
            &stmt,
30✔
237
            &[
30✔
238
                &project.version.id,
30✔
239
                &project.id,
30✔
240
                &project.name,
30✔
241
                &project.description,
30✔
242
                &project.bounds,
30✔
243
                &project.time_step,
30✔
244
            ],
30✔
245
        )
30✔
246
        .await
30✔
247
        .context(PostgresProjectDbError)?;
30✔
248

249
    for (idx, layer) in project.layers.iter().enumerate() {
30✔
250
        let stmt = trans
26✔
251
            .prepare(
26✔
252
                "
26✔
253
            INSERT INTO project_version_layers (
26✔
254
                project_id,
26✔
255
                project_version_id,
26✔
256
                layer_index,
26✔
257
                name,
26✔
258
                workflow_id,
26✔
259
                symbology,
26✔
260
                visibility)
26✔
261
            VALUES ($1, $2, $3, $4, $5, $6, $7);",
26✔
262
            )
26✔
263
            .await
26✔
264
            .context(PostgresProjectDbError)?;
26✔
265

266
        trans
26✔
267
            .execute(
26✔
268
                &stmt,
26✔
269
                &[
26✔
270
                    &project.id,
26✔
271
                    &project.version.id,
26✔
272
                    &(idx as i32),
26✔
273
                    &layer.name,
26✔
274
                    &layer.workflow,
26✔
275
                    &layer.symbology,
26✔
276
                    &layer.visibility,
26✔
277
                ],
26✔
278
            )
26✔
279
            .await
26✔
280
            .context(PostgresProjectDbError)?;
26✔
281
    }
282

283
    update_plots(trans, &project.id, &project.version.id, &project.plots).await?;
30✔
284

285
    Ok(project)
30✔
286
}
30✔
287

288
#[async_trait]
289
impl<Tls> ProjectDb for PostgresDb<Tls>
290
where
291
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
292
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
293
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
294
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
295
{
296
    async fn list_projects(
297
        &self,
298
        options: ProjectListOptions,
299
    ) -> Result<Vec<ProjectListing>, ProjectDbError> {
2✔
300
        // TODO: project filters
301

302
        let mut conn = self.conn_pool.get().await.context(Bb8ProjectDbError)?;
2✔
303
        let trans = conn
2✔
304
            .build_transaction()
2✔
305
            .start()
2✔
306
            .await
2✔
307
            .context(PostgresProjectDbError)?;
2✔
308

309
        let stmt = trans
2✔
310
            .prepare(&format!(
2✔
311
                "
2✔
312
        SELECT p.id, p.project_id, p.name, p.description, p.changed
2✔
313
        FROM project_versions p
2✔
314
        WHERE
2✔
315
            p.changed >= ALL (SELECT changed FROM project_versions WHERE project_id = p.project_id)
2✔
316
        ORDER BY p.{}
2✔
317
        LIMIT $1
2✔
318
        OFFSET $2;",
2✔
319
                options.order.to_sql_string()
2✔
320
            ))
2✔
321
            .await
2✔
322
            .context(PostgresProjectDbError)?;
2✔
323

324
        let project_rows = trans
2✔
325
            .query(
2✔
326
                &stmt,
2✔
327
                &[&i64::from(options.limit), &i64::from(options.offset)],
2✔
328
            )
2✔
329
            .await
2✔
330
            .context(PostgresProjectDbError)?;
2✔
331

332
        let project_listings = project_listings_from_rows(&trans, project_rows).await?;
2✔
333

334
        trans.commit().await.context(PostgresProjectDbError)?;
2✔
335

336
        Ok(project_listings)
2✔
337
    }
4✔
338

339
    async fn create_project(&self, create: CreateProject) -> Result<ProjectId, ProjectDbError> {
10✔
340
        let mut conn = self.conn_pool.get().await.context(Bb8ProjectDbError)?;
10✔
341

342
        let project: Project = Project::from_create_project(create);
10✔
343

344
        let trans = conn
10✔
345
            .build_transaction()
10✔
346
            .start()
10✔
347
            .await
10✔
348
            .context(PostgresProjectDbError)?;
10✔
349

350
        insert_project(&trans, &project).await?;
10✔
351

352
        trans.commit().await.context(PostgresProjectDbError)?;
10✔
353

354
        Ok(project.id)
10✔
355
    }
20✔
356

357
    async fn load_project(&self, project: ProjectId) -> Result<Project, ProjectDbError> {
4✔
358
        self.load_project_version(project, LoadVersion::Latest)
4✔
359
            .await
4✔
360
    }
8✔
361

362
    #[allow(clippy::too_many_lines)]
363
    async fn update_project(&self, update: UpdateProject) -> Result<(), ProjectDbError> {
3✔
364
        let mut conn = self.conn_pool.get().await.context(Bb8ProjectDbError)?;
3✔
365

366
        let trans = conn
3✔
367
            .build_transaction()
3✔
368
            .start()
3✔
369
            .await
3✔
370
            .context(PostgresProjectDbError)?;
3✔
371

372
        let project = self.load_project(update.id).await?; // TODO: move inside transaction?
3✔
373

374
        update_project(&trans, &project, update).await?;
3✔
375

376
        trans.commit().await.context(PostgresProjectDbError)?;
3✔
377

378
        Ok(())
3✔
379
    }
6✔
380

381
    async fn delete_project(&self, project: ProjectId) -> Result<(), ProjectDbError> {
1✔
382
        let conn = self.conn_pool.get().await.context(Bb8ProjectDbError)?;
1✔
383

384
        let stmt = conn
1✔
385
            .prepare("DELETE FROM projects WHERE id = $1;")
1✔
386
            .await
1✔
387
            .context(PostgresProjectDbError)?;
1✔
388

389
        let rows_affected = conn
1✔
390
            .execute(&stmt, &[&project])
1✔
391
            .await
1✔
392
            .context(PostgresProjectDbError)?;
1✔
393

394
        ensure!(
1✔
395
            rows_affected == 1,
1✔
UNCOV
396
            ProjectNotFoundProjectDbError { project }
×
397
        );
398

399
        Ok(())
1✔
400
    }
2✔
401

402
    #[allow(clippy::too_many_lines)]
403
    async fn load_project_version(
404
        &self,
405
        project: ProjectId,
406
        version: LoadVersion,
407
    ) -> Result<Project, ProjectDbError> {
5✔
408
        let mut conn = self.conn_pool.get().await.context(Bb8ProjectDbError)?;
5✔
409

410
        let trans = conn
5✔
411
            .build_transaction()
5✔
412
            .start()
5✔
413
            .await
5✔
414
            .context(PostgresProjectDbError)?;
5✔
415

416
        let rows = if let LoadVersion::Version(version) = version {
5✔
UNCOV
417
            let stmt = trans
×
UNCOV
418
                .prepare(
×
UNCOV
419
                    "
×
UNCOV
420
            SELECT 
×
UNCOV
421
                p.project_id, 
×
UNCOV
422
                p.id, 
×
UNCOV
423
                p.name, 
×
UNCOV
424
                p.description,
×
UNCOV
425
                p.bounds,
×
UNCOV
426
                p.time_step,
×
UNCOV
427
                p.changed
×
UNCOV
428
            FROM 
×
UNCOV
429
                project_versions p
×
UNCOV
430
            WHERE p.project_id = $1 AND p.id = $2",
×
UNCOV
431
                )
×
UNCOV
432
                .await
×
UNCOV
433
                .context(PostgresProjectDbError)?;
×
434

UNCOV
435
            let rows = trans
×
UNCOV
436
                .query(&stmt, &[&project, &version])
×
UNCOV
437
                .await
×
UNCOV
438
                .context(PostgresProjectDbError)?;
×
439

UNCOV
440
            if rows.is_empty() {
×
UNCOV
441
                return Err(ProjectDbError::ProjectVersionNotFound { project, version });
×
UNCOV
442
            }
×
UNCOV
443

×
UNCOV
444
            rows
×
445
        } else {
446
            let stmt = trans
5✔
447
                .prepare(
5✔
448
                    "
5✔
449
            SELECT  
5✔
450
                p.project_id, 
5✔
451
                p.id, 
5✔
452
                p.name, 
5✔
453
                p.description,
5✔
454
                p.bounds,
5✔
455
                p.time_step,
5✔
456
                p.changed
5✔
457
            FROM 
5✔
458
                project_versions p
5✔
459
            WHERE project_id = $1 AND p.changed >= ALL(
5✔
460
                SELECT changed FROM project_versions WHERE project_id = $1
5✔
461
            )",
5✔
462
                )
5✔
463
                .await
5✔
464
                .context(PostgresProjectDbError)?;
5✔
465

466
            let rows = trans
5✔
467
                .query(&stmt, &[&project])
5✔
468
                .await
5✔
469
                .context(PostgresProjectDbError)?;
5✔
470

471
            if rows.is_empty() {
5✔
472
                return Err(ProjectDbError::ProjectNotFound { project });
1✔
473
            }
4✔
474

4✔
475
            rows
4✔
476
        };
477

478
        let row = &rows[0];
4✔
479

4✔
480
        let project_id = ProjectId(row.get(0));
4✔
481
        let version_id = ProjectVersionId(row.get(1));
4✔
482
        let name = row.get(2);
4✔
483
        let description = row.get(3);
4✔
484
        let bounds = row.get(4);
4✔
485
        let time_step = row.get(5);
4✔
486
        let changed = row.get(6);
4✔
487

488
        let stmt = trans
4✔
489
            .prepare(
4✔
490
                "
4✔
491
        SELECT  
4✔
492
            name, workflow_id, symbology, visibility
4✔
493
        FROM project_version_layers
4✔
494
        WHERE project_version_id = $1
4✔
495
        ORDER BY layer_index ASC",
4✔
496
            )
4✔
497
            .await
4✔
498
            .context(PostgresProjectDbError)?;
4✔
499

500
        let rows = trans
4✔
501
            .query(&stmt, &[&version_id])
4✔
502
            .await
4✔
503
            .context(PostgresProjectDbError)?;
4✔
504

505
        let mut layers = vec![];
4✔
506
        for row in rows {
6✔
507
            layers.push(ProjectLayer {
2✔
508
                workflow: WorkflowId(row.get(1)),
2✔
509
                name: row.get(0),
2✔
510
                symbology: row.get(2),
2✔
511
                visibility: row.get(3),
2✔
512
            });
2✔
513
        }
2✔
514

515
        let project = Project {
4✔
516
            id: project_id,
4✔
517
            version: ProjectVersion {
4✔
518
                id: version_id,
4✔
519
                changed,
4✔
520
            },
4✔
521
            name,
4✔
522
            description,
4✔
523
            layers,
4✔
524
            plots: load_plots(&trans, &version_id).await?,
4✔
525
            bounds,
4✔
526
            time_step,
4✔
527
        };
4✔
528

4✔
529
        trans.commit().await.context(PostgresProjectDbError)?;
4✔
530

531
        Ok(project)
4✔
532
    }
10✔
533

534
    async fn list_project_versions(
535
        &self,
536
        project: ProjectId,
537
    ) -> Result<Vec<ProjectVersion>, ProjectDbError> {
3✔
538
        let conn = self.conn_pool.get().await.context(Bb8ProjectDbError)?;
3✔
539

540
        let stmt = conn
3✔
541
            .prepare(
3✔
542
                "
3✔
543
                SELECT 
3✔
544
                    id, changed
3✔
545
                FROM 
3✔
546
                    project_versions
3✔
547
                WHERE 
3✔
548
                    project_id = $1 
3✔
549
                ORDER BY 
3✔
550
                    changed DESC",
3✔
551
            )
3✔
552
            .await
3✔
553
            .context(PostgresProjectDbError)?;
3✔
554

555
        let rows = conn
3✔
556
            .query(&stmt, &[&project])
3✔
557
            .await
3✔
558
            .context(PostgresProjectDbError)?;
3✔
559

560
        Ok(rows
3✔
561
            .iter()
3✔
562
            .map(|row| ProjectVersion {
9✔
563
                id: ProjectVersionId(row.get(0)),
9✔
564
                changed: row.get(1),
9✔
565
            })
9✔
566
            .collect())
3✔
567
    }
6✔
568
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc