• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5869096654

15 Aug 2023 02:59PM UTC coverage: 89.79% (+0.3%) from 89.481%
5869096654

push

github

web-flow
Merge pull request #851 from geo-engine/pg-dataset-metadata-mapping

Pg dataset metadata mapping

1982 of 1982 new or added lines in 9 files covered. (100.0%)

106148 of 118218 relevant lines covered (89.79%)

61246.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.72
/services/src/projects/postgres_projectdb.rs
1
use crate::contexts::PostgresDb;
2
use crate::error;
3
use crate::error::Result;
4
use crate::projects::Plot;
5
use crate::projects::ProjectLayer;
6
use crate::projects::{
7
    CreateProject, Project, ProjectDb, ProjectId, ProjectListOptions, ProjectListing,
8
    ProjectVersion, ProjectVersionId, UpdateProject,
9
};
10

11
use super::LoadVersion;
12
use crate::util::Identifier;
13
use crate::workflows::workflow::WorkflowId;
14
use async_trait::async_trait;
15
use bb8_postgres::bb8::PooledConnection;
16
use bb8_postgres::tokio_postgres::Transaction;
17
use bb8_postgres::PostgresConnectionManager;
18
use bb8_postgres::{
19
    tokio_postgres::tls::MakeTlsConnect, tokio_postgres::tls::TlsConnect, tokio_postgres::Socket,
20
};
21
use snafu::ensure;
22

23
async fn list_plots<Tls>(
3✔
24
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
3✔
25
    project_version_id: &ProjectVersionId,
3✔
26
) -> Result<Vec<String>>
3✔
27
where
3✔
28
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
3✔
29
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
3✔
30
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
3✔
31
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
3✔
32
{
3✔
33
    let stmt = conn
3✔
34
        .prepare(
3✔
35
            "
3✔
36
                    SELECT name
3✔
37
                    FROM project_version_plots
3✔
38
                    WHERE project_version_id = $1;
3✔
39
                ",
3✔
40
        )
3✔
41
        .await?;
2✔
42

43
    let plot_rows = conn.query(&stmt, &[project_version_id]).await?;
3✔
44
    let plot_names = plot_rows.iter().map(|row| row.get(0)).collect();
3✔
45

3✔
46
    Ok(plot_names)
3✔
47
}
3✔
48

49
async fn load_plots<Tls>(
32✔
50
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
32✔
51
    project_version_id: &ProjectVersionId,
32✔
52
) -> Result<Vec<Plot>>
32✔
53
where
32✔
54
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
32✔
55
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
32✔
56
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
32✔
57
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
32✔
58
{
32✔
59
    let stmt = conn
32✔
60
        .prepare(
32✔
61
            "
32✔
62
                SELECT  
32✔
63
                    name, workflow_id
32✔
64
                FROM project_version_plots
32✔
65
                WHERE project_version_id = $1
32✔
66
                ORDER BY plot_index ASC
32✔
67
                ",
32✔
68
        )
32✔
69
        .await?;
25✔
70

71
    let rows = conn.query(&stmt, &[project_version_id]).await?;
32✔
72

73
    let plots = rows
32✔
74
        .into_iter()
32✔
75
        .map(|row| Plot {
32✔
76
            workflow: WorkflowId(row.get(1)),
11✔
77
            name: row.get(0),
11✔
78
        })
32✔
79
        .collect();
32✔
80

32✔
81
    Ok(plots)
32✔
82
}
32✔
83

84
async fn update_plots(
19✔
85
    trans: &Transaction<'_>,
19✔
86
    project_id: &ProjectId,
19✔
87
    project_version_id: &ProjectVersionId,
19✔
88
    plots: &[Plot],
19✔
89
) -> Result<()> {
19✔
90
    for (idx, plot) in plots.iter().enumerate() {
19✔
91
        let stmt = trans
7✔
92
            .prepare(
7✔
93
                "
7✔
94
                    INSERT INTO project_version_plots (
7✔
95
                        project_id,
7✔
96
                        project_version_id,
7✔
97
                        plot_index,
7✔
98
                        name,
7✔
99
                        workflow_id)
7✔
100
                    VALUES ($1, $2, $3, $4, $5);
7✔
101
                    ",
7✔
102
            )
7✔
103
            .await?;
7✔
104

105
        trans
7✔
106
            .execute(
7✔
107
                &stmt,
7✔
108
                &[
7✔
109
                    project_id,
7✔
110
                    project_version_id,
7✔
111
                    &(idx as i32),
7✔
112
                    &plot.name,
7✔
113
                    &plot.workflow,
7✔
114
                ],
7✔
115
            )
7✔
116
            .await?;
7✔
117
    }
118

119
    Ok(())
19✔
120
}
19✔
121

122
#[async_trait]
123
impl<Tls> ProjectDb for PostgresDb<Tls>
124
where
125
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
126
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
127
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
128
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
129
{
130
    async fn list_projects(&self, options: ProjectListOptions) -> Result<Vec<ProjectListing>> {
2✔
131
        // TODO: project filters
132

133
        let conn = self.conn_pool.get().await?;
2✔
134

135
        let stmt = conn
2✔
136
            .prepare(&format!(
2✔
137
                "
2✔
138
        SELECT p.id, p.project_id, p.name, p.description, p.changed
2✔
139
        FROM project_versions p
2✔
140
        WHERE
2✔
141
            p.changed >= ALL (SELECT changed FROM project_versions WHERE project_id = p.project_id)
2✔
142
        ORDER BY p.{}
2✔
143
        LIMIT $1
2✔
144
        OFFSET $2;",
2✔
145
                options.order.to_sql_string()
2✔
146
            ))
2✔
147
            .await?;
1✔
148

149
        let project_rows = conn
2✔
150
            .query(
2✔
151
                &stmt,
2✔
152
                &[&i64::from(options.limit), &i64::from(options.offset)],
2✔
153
            )
2✔
154
            .await?;
1✔
155

156
        let mut project_listings = vec![];
2✔
157
        for project_row in project_rows {
5✔
158
            let project_version_id = ProjectVersionId(project_row.get(0));
3✔
159
            let project_id = ProjectId(project_row.get(1));
3✔
160
            let name = project_row.get(2);
3✔
161
            let description = project_row.get(3);
3✔
162
            let changed = project_row.get(4);
3✔
163

164
            let stmt = conn
3✔
165
                .prepare(
3✔
166
                    "
3✔
167
                    SELECT name
3✔
168
                    FROM project_version_layers
3✔
169
                    WHERE project_version_id = $1;",
3✔
170
                )
3✔
171
                .await?;
2✔
172

173
            let layer_rows = conn.query(&stmt, &[&project_version_id]).await?;
3✔
174
            let layer_names = layer_rows.iter().map(|row| row.get(0)).collect();
3✔
175

3✔
176
            project_listings.push(ProjectListing {
3✔
177
                id: project_id,
3✔
178
                name,
3✔
179
                description,
3✔
180
                layer_names,
3✔
181
                plot_names: list_plots(&conn, &project_version_id).await?,
4✔
182
                changed,
3✔
183
            });
184
        }
185
        Ok(project_listings)
2✔
186
    }
4✔
187

188
    async fn create_project(&self, create: CreateProject) -> Result<ProjectId> {
40✔
189
        let mut conn = self.conn_pool.get().await?;
40✔
190

191
        let project: Project = Project::from_create_project(create);
40✔
192

193
        let trans = conn.build_transaction().start().await?;
40✔
194

195
        let stmt = trans
40✔
196
            .prepare("INSERT INTO projects (id) VALUES ($1);")
40✔
197
            .await?;
39✔
198

199
        trans.execute(&stmt, &[&project.id]).await?;
40✔
200

201
        let stmt = trans
40✔
202
            .prepare(
40✔
203
                "INSERT INTO project_versions (
40✔
204
                    id,
40✔
205
                    project_id,
40✔
206
                    name,
40✔
207
                    description,
40✔
208
                    bounds,
40✔
209
                    time_step,
40✔
210
                    changed)
40✔
211
                    VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP);",
40✔
212
            )
40✔
213
            .await?;
155✔
214

215
        let version_id = ProjectVersionId::new();
40✔
216

40✔
217
        trans
40✔
218
            .execute(
40✔
219
                &stmt,
40✔
220
                &[
40✔
221
                    &version_id,
40✔
222
                    &project.id,
40✔
223
                    &project.name,
40✔
224
                    &project.description,
40✔
225
                    &project.bounds,
40✔
226
                    &project.time_step,
40✔
227
                ],
40✔
228
            )
40✔
229
            .await?;
39✔
230

231
        trans.commit().await?;
40✔
232

233
        Ok(project.id)
40✔
234
    }
80✔
235

236
    async fn load_project(&self, project: ProjectId) -> Result<Project> {
30✔
237
        self.load_project_version(project, LoadVersion::Latest)
30✔
238
            .await
716✔
239
    }
60✔
240

241
    #[allow(clippy::too_many_lines)]
242
    async fn update_project(&self, update: UpdateProject) -> Result<()> {
19✔
243
        let update = update;
19✔
244

245
        let mut conn = self.conn_pool.get().await?;
19✔
246

247
        let trans = conn.build_transaction().start().await?;
19✔
248

249
        let project = self.load_project(update.id).await?; // TODO: move inside transaction?
677✔
250

251
        let project = project.update_project(update)?;
19✔
252

253
        let stmt = trans
19✔
254
            .prepare(
19✔
255
                "
19✔
256
                INSERT INTO project_versions (
19✔
257
                    id,
19✔
258
                    project_id,
19✔
259
                    name,
19✔
260
                    description,
19✔
261
                    bounds,
19✔
262
                    time_step,
19✔
263
                    changed)
19✔
264
                VALUES ($1, $2, $3, $4, $5, $6, CURRENT_TIMESTAMP);",
19✔
265
            )
19✔
266
            .await?;
20✔
267

268
        trans
19✔
269
            .execute(
19✔
270
                &stmt,
19✔
271
                &[
19✔
272
                    &project.version.id,
19✔
273
                    &project.id,
19✔
274
                    &project.name,
19✔
275
                    &project.description,
19✔
276
                    &project.bounds,
19✔
277
                    &project.time_step,
19✔
278
                ],
19✔
279
            )
19✔
280
            .await?;
19✔
281

282
        for (idx, layer) in project.layers.iter().enumerate() {
19✔
283
            let stmt = trans
15✔
284
                .prepare(
15✔
285
                    "
15✔
286
                INSERT INTO project_version_layers (
15✔
287
                    project_id,
15✔
288
                    project_version_id,
15✔
289
                    layer_index,
15✔
290
                    name,
15✔
291
                    workflow_id,
15✔
292
                    symbology,
15✔
293
                    visibility)
15✔
294
                VALUES ($1, $2, $3, $4, $5, $6, $7);",
15✔
295
                )
15✔
296
                .await?;
140✔
297

298
            trans
15✔
299
                .execute(
15✔
300
                    &stmt,
15✔
301
                    &[
15✔
302
                        &project.id,
15✔
303
                        &project.version.id,
15✔
304
                        &(idx as i32),
15✔
305
                        &layer.name,
15✔
306
                        &layer.workflow,
15✔
307
                        &layer.symbology,
15✔
308
                        &layer.visibility,
15✔
309
                    ],
15✔
310
                )
15✔
311
                .await?;
15✔
312
        }
313

314
        update_plots(&trans, &project.id, &project.version.id, &project.plots).await?;
19✔
315

316
        trans.commit().await?;
20✔
317

318
        Ok(())
19✔
319
    }
38✔
320

321
    async fn delete_project(&self, project: ProjectId) -> Result<()> {
3✔
322
        let conn = self.conn_pool.get().await?;
3✔
323

324
        let stmt = conn.prepare("DELETE FROM projects WHERE id = $1;").await?;
3✔
325

326
        let rows_affected = conn.execute(&stmt, &[&project]).await?;
3✔
327

328
        ensure!(rows_affected == 1, error::ProjectDeleteFailed);
3✔
329

330
        Ok(())
2✔
331
    }
6✔
332

333
    #[allow(clippy::too_many_lines)]
334
    async fn load_project_version(
35✔
335
        &self,
35✔
336
        project: ProjectId,
35✔
337
        version: LoadVersion,
35✔
338
    ) -> Result<Project> {
35✔
339
        let conn = self.conn_pool.get().await?;
35✔
340

341
        let row = if let LoadVersion::Version(version) = version {
35✔
342
            let stmt = conn
2✔
343
                .prepare(
2✔
344
                    "
2✔
345
            SELECT 
2✔
346
                p.project_id, 
2✔
347
                p.id, 
2✔
348
                p.name, 
2✔
349
                p.description,
2✔
350
                p.bounds,
2✔
351
                p.time_step,
2✔
352
                p.changed
2✔
353
            FROM 
2✔
354
                project_versions p
2✔
355
            WHERE p.project_id = $1 AND p.id = $2",
2✔
356
                )
2✔
357
                .await?;
×
358

359
            conn.query_one(&stmt, &[&project, &version]).await?
2✔
360
        } else {
361
            let stmt = conn
33✔
362
                .prepare(
33✔
363
                    "
33✔
364
            SELECT  
33✔
365
                p.project_id, 
33✔
366
                p.id, 
33✔
367
                p.name, 
33✔
368
                p.description,
33✔
369
                p.bounds,
33✔
370
                p.time_step,
33✔
371
                p.changed
33✔
372
            FROM 
33✔
373
                project_versions p
33✔
374
            WHERE project_id = $1 AND p.changed >= ALL(
33✔
375
                SELECT changed FROM project_versions WHERE project_id = $1
33✔
376
            )",
33✔
377
                )
33✔
378
                .await?;
234✔
379

380
            conn.query_one(&stmt, &[&project]).await?
33✔
381
        };
382

383
        let project_id = ProjectId(row.get(0));
32✔
384
        let version_id = ProjectVersionId(row.get(1));
32✔
385
        let name = row.get(2);
32✔
386
        let description = row.get(3);
32✔
387
        let bounds = row.get(4);
32✔
388
        let time_step = row.get(5);
32✔
389
        let changed = row.get(6);
32✔
390

391
        let stmt = conn
32✔
392
            .prepare(
32✔
393
                "
32✔
394
        SELECT  
32✔
395
            name, workflow_id, symbology, visibility
32✔
396
        FROM project_version_layers
32✔
397
        WHERE project_version_id = $1
32✔
398
        ORDER BY layer_index ASC",
32✔
399
            )
32✔
400
            .await?;
435✔
401

402
        let rows = conn.query(&stmt, &[&version_id]).await?;
32✔
403

404
        let mut layers = vec![];
32✔
405
        for row in rows {
49✔
406
            layers.push(ProjectLayer {
17✔
407
                workflow: WorkflowId(row.get(1)),
17✔
408
                name: row.get(0),
17✔
409
                symbology: row.get(2),
17✔
410
                visibility: row.get(3),
17✔
411
            });
17✔
412
        }
17✔
413

414
        Ok(Project {
415
            id: project_id,
32✔
416
            version: ProjectVersion {
32✔
417
                id: version_id,
32✔
418
                changed,
32✔
419
            },
32✔
420
            name,
32✔
421
            description,
32✔
422
            layers,
32✔
423
            plots: load_plots(&conn, &version_id).await?,
50✔
424
            bounds,
32✔
425
            time_step,
32✔
426
        })
427
    }
70✔
428

429
    async fn list_project_versions(&self, project: ProjectId) -> Result<Vec<ProjectVersion>> {
5✔
430
        let conn = self.conn_pool.get().await?;
5✔
431

432
        let stmt = conn
5✔
433
            .prepare(
5✔
434
                "
5✔
435
                SELECT 
5✔
436
                    id, changed
5✔
437
                FROM 
5✔
438
                    project_versions
5✔
439
                WHERE 
5✔
440
                    project_id = $1 
5✔
441
                ORDER BY 
5✔
442
                    changed DESC",
5✔
443
            )
5✔
444
            .await?;
5✔
445

446
        let rows = conn.query(&stmt, &[&project]).await?;
5✔
447

448
        Ok(rows
5✔
449
            .iter()
5✔
450
            .map(|row| ProjectVersion {
13✔
451
                id: ProjectVersionId(row.get(0)),
13✔
452
                changed: row.get(1),
13✔
453
            })
13✔
454
            .collect())
5✔
455
    }
10✔
456
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc