• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 9778142714

03 Jul 2024 12:47PM CUT coverage: 90.741% (+0.05%) from 90.687%
9778142714

Pull #970

github

web-flow
Merge 371bbde3c into 9a33a5b13
Pull Request #970: FAIR dataset deletion

1369 of 1424 new or added lines in 9 files covered. (96.14%)

22 existing lines in 12 files now uncovered.

134486 of 148208 relevant lines covered (90.74%)

52199.73 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.8
/services/src/datasets/postgres.rs
1
use super::listing::{OrderBy, Provenance};
2
use super::{AddDataset, DatasetIdAndName, DatasetName};
3
use crate::api::model::services::UpdateDataset;
4
use crate::contexts::PostgresDb;
5
use crate::datasets::listing::ProvenanceOutput;
6
use crate::datasets::listing::{DatasetListOptions, DatasetListing, DatasetProvider};
7
use crate::datasets::storage::{Dataset, DatasetDb, DatasetStore, MetaDataDefinition};
8
use crate::datasets::upload::FileId;
9
use crate::datasets::upload::{Upload, UploadDb, UploadId};
10
use crate::error::{self, Result};
11
use crate::projects::Symbology;
12
use crate::util::postgres::PostgresErrorExt;
13
use async_trait::async_trait;
14
use bb8_postgres::bb8::PooledConnection;
15
use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
16
use bb8_postgres::tokio_postgres::Socket;
17
use bb8_postgres::PostgresConnectionManager;
18
use geoengine_datatypes::dataset::{DataId, DatasetId};
19
use geoengine_datatypes::primitives::RasterQueryRectangle;
20
use geoengine_datatypes::primitives::VectorQueryRectangle;
21
use geoengine_datatypes::util::Identifier;
22
use geoengine_operators::engine::{
23
    MetaData, MetaDataProvider, RasterResultDescriptor, TypedResultDescriptor,
24
    VectorResultDescriptor,
25
};
26
use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo;
27
use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset};
28
use postgres_types::{FromSql, ToSql};
29

30
impl<Tls> DatasetDb for PostgresDb<Tls>
31
where
32
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
33
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
34
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
35
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
36
{
37
}
38

39
pub async fn resolve_dataset_name_to_id<Tls>(
84✔
40
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
84✔
41
    dataset_name: &DatasetName,
84✔
42
) -> Result<Option<DatasetId>>
84✔
43
where
84✔
44
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
84✔
45
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
84✔
46
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
84✔
47
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
84✔
48
{
84✔
49
    let stmt = conn
84✔
50
        .prepare(
84✔
51
            "SELECT id
84✔
52
        FROM datasets
84✔
53
        WHERE name = $1::\"DatasetName\"",
84✔
54
        )
84✔
55
        .await?;
86✔
56

57
    let row_option = conn.query_opt(&stmt, &[&dataset_name]).await?;
84✔
58

59
    Ok(row_option.map(|row| row.get(0)))
84✔
60
}
84✔
61

62
#[allow(clippy::too_many_lines)]
63
#[async_trait]
64
impl<Tls> DatasetProvider for PostgresDb<Tls>
65
where
66
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
67
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
68
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
69
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
70
{
71
    async fn list_datasets(&self, options: DatasetListOptions) -> Result<Vec<DatasetListing>> {
5✔
72
        let conn = self.conn_pool.get().await?;
5✔
73

74
        let order_sql = if options.order == OrderBy::NameAsc {
5✔
75
            "display_name ASC"
5✔
76
        } else {
77
            "display_name DESC"
×
78
        };
79

80
        let mut pos = 2;
5✔
81

82
        let filter_sql = if options.filter.is_some() {
5✔
83
            pos += 1;
3✔
84
            Some(format!("display_name ILIKE ${pos} ESCAPE '\\'"))
3✔
85
        } else {
86
            None
2✔
87
        };
88

89
        let (filter_tags_sql, filter_tags_list) = if let Some(filter_tags) = &options.tags {
5✔
90
            pos += 1;
1✔
91
            (Some(format!("tags @> ${pos}::text[]")), filter_tags.clone())
1✔
92
        } else {
93
            (None, vec![])
4✔
94
        };
95

96
        let where_clause_sql = match (filter_sql, filter_tags_sql) {
5✔
97
            (Some(filter_sql), Some(filter_tags_sql)) => {
1✔
98
                format!("WHERE {filter_sql} AND {filter_tags_sql}")
1✔
99
            }
100
            (Some(filter_sql), None) => format!("WHERE {filter_sql}"),
2✔
101
            (None, Some(filter_tags_sql)) => format!("WHERE {filter_tags_sql}"),
×
102
            (None, None) => String::new(),
2✔
103
        };
104

105
        let stmt = conn
5✔
106
            .prepare(&format!(
5✔
107
                "
5✔
108
            SELECT 
5✔
109
                id,
5✔
110
                name,
5✔
111
                display_name,
5✔
112
                description,
5✔
113
                tags,
5✔
114
                source_operator,
5✔
115
                result_descriptor,
5✔
116
                symbology
5✔
117
            FROM 
5✔
118
                datasets
5✔
119
            {where_clause_sql}
5✔
120
            ORDER BY {order_sql}
5✔
121
            LIMIT $1
5✔
122
            OFFSET $2;"
5✔
123
            ))
5✔
124
            .await?;
5✔
125

126
        let rows = match (options.filter, options.tags) {
5✔
127
            (Some(filter), Some(_)) => {
1✔
128
                conn.query(
1✔
129
                    &stmt,
1✔
130
                    &[
1✔
131
                        &i64::from(options.limit),
1✔
132
                        &i64::from(options.offset),
1✔
133
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
1✔
134
                        &filter_tags_list,
1✔
135
                    ],
1✔
136
                )
1✔
137
                .await?
1✔
138
            }
139
            (Some(filter), None) => {
2✔
140
                conn.query(
2✔
141
                    &stmt,
2✔
142
                    &[
2✔
143
                        &i64::from(options.limit),
2✔
144
                        &i64::from(options.offset),
2✔
145
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
2✔
146
                    ],
2✔
147
                )
2✔
148
                .await?
2✔
149
            }
150
            (None, Some(_)) => {
151
                conn.query(
×
152
                    &stmt,
×
153
                    &[
×
154
                        &i64::from(options.limit),
×
155
                        &i64::from(options.offset),
×
156
                        &filter_tags_list,
×
157
                    ],
×
158
                )
×
159
                .await?
×
160
            }
161
            (None, None) => {
162
                conn.query(
2✔
163
                    &stmt,
2✔
164
                    &[&i64::from(options.limit), &i64::from(options.offset)],
2✔
165
                )
2✔
166
                .await?
2✔
167
            }
168
        };
169

170
        Ok(rows
5✔
171
            .iter()
5✔
172
            .map(|row| {
7✔
173
                Result::<DatasetListing>::Ok(DatasetListing {
7✔
174
                    id: row.get(0),
7✔
175
                    name: row.get(1),
7✔
176
                    display_name: row.get(2),
7✔
177
                    description: row.get(3),
7✔
178
                    tags: row.get::<_, Option<_>>(4).unwrap_or_default(),
7✔
179
                    source_operator: row.get(5),
7✔
180
                    result_descriptor: row.get(6),
7✔
181
                    symbology: row.get(7),
7✔
182
                })
7✔
183
            })
7✔
184
            .filter_map(Result::ok)
5✔
185
            .collect())
5✔
186
    }
15✔
187

188
    async fn load_dataset(&self, dataset: &DatasetId) -> Result<Dataset> {
14✔
189
        let conn = self.conn_pool.get().await?;
14✔
190
        let stmt = conn
14✔
191
            .prepare(
14✔
192
                "
14✔
193
            SELECT
14✔
194
                id,
14✔
195
                name,
14✔
196
                display_name,
14✔
197
                description,
14✔
198
                result_descriptor,
14✔
199
                source_operator,
14✔
200
                symbology,
14✔
201
                provenance,
14✔
202
                tags
14✔
203
            FROM 
14✔
204
                datasets
14✔
205
            WHERE 
14✔
206
                id = $1
14✔
207
            LIMIT 
14✔
208
                1",
14✔
209
            )
14✔
210
            .await?;
96✔
211

212
        let row = conn.query_opt(&stmt, &[dataset]).await?;
14✔
213

214
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
14✔
215

216
        Ok(Dataset {
10✔
217
            id: row.get(0),
10✔
218
            name: row.get(1),
10✔
219
            display_name: row.get(2),
10✔
220
            description: row.get(3),
10✔
221
            result_descriptor: row.get(4),
10✔
222
            source_operator: row.get(5),
10✔
223
            symbology: row.get(6),
10✔
224
            provenance: row.get(7),
10✔
225
            tags: row.get(8),
10✔
226
        })
10✔
227
    }
42✔
228

229
    async fn load_provenance(&self, dataset: &DatasetId) -> Result<ProvenanceOutput> {
3✔
230
        let conn = self.conn_pool.get().await?;
3✔
231

232
        let stmt = conn
3✔
233
            .prepare(
3✔
234
                "
3✔
235
            SELECT 
3✔
236
                provenance 
3✔
237
            FROM 
3✔
238
                datasets
3✔
239
            WHERE
3✔
240
                id = $1;",
3✔
241
            )
3✔
242
            .await?;
8✔
243

244
        let row = conn.query_one(&stmt, &[dataset]).await?;
3✔
245

246
        let provenances: Vec<Provenance> = row.get(0);
3✔
247

3✔
248
        Ok(ProvenanceOutput {
3✔
249
            data: (*dataset).into(),
3✔
250
            provenance: Some(provenances),
3✔
251
        })
3✔
252
    }
9✔
253

254
    async fn load_loading_info(&self, dataset: &DatasetId) -> Result<MetaDataDefinition> {
1✔
255
        let conn = self.conn_pool.get().await?;
1✔
256

257
        let stmt = conn
1✔
258
            .prepare(
1✔
259
                "
1✔
260
            SELECT 
1✔
261
                meta_data 
1✔
262
            FROM 
1✔
263
                datasets
1✔
264
            WHERE
1✔
265
                id = $1;",
1✔
266
            )
1✔
UNCOV
267
            .await?;
×
268

269
        let row = conn.query_one(&stmt, &[dataset]).await?;
1✔
270

271
        Ok(row.get(0))
1✔
272
    }
3✔
273

274
    async fn resolve_dataset_name_to_id(
64✔
275
        &self,
64✔
276
        dataset_name: &DatasetName,
64✔
277
    ) -> Result<Option<DatasetId>> {
64✔
278
        let conn = self.conn_pool.get().await?;
64✔
279
        resolve_dataset_name_to_id(&conn, dataset_name).await
125✔
280
    }
192✔
281

282
    async fn dataset_autocomplete_search(
3✔
283
        &self,
3✔
284
        tags: Option<Vec<String>>,
3✔
285
        search_string: String,
3✔
286
        limit: u32,
3✔
287
        offset: u32,
3✔
288
    ) -> Result<Vec<String>> {
3✔
289
        let connection = self.conn_pool.get().await?;
3✔
290

291
        let limit = i64::from(limit);
3✔
292
        let offset = i64::from(offset);
3✔
293
        let search_string = format!(
3✔
294
            "%{}%",
3✔
295
            search_string.replace('%', "\\%").replace('_', "\\_")
3✔
296
        );
3✔
297

3✔
298
        let mut query_params: Vec<&(dyn ToSql + Sync)> = vec![&limit, &offset, &search_string];
3✔
299

300
        let tags_clause = if let Some(tags) = &tags {
3✔
301
            query_params.push(tags);
1✔
302
            " AND tags @> $4::text[]".to_string()
1✔
303
        } else {
304
            String::new()
2✔
305
        };
306

307
        let stmt = connection
3✔
308
            .prepare(&format!(
3✔
309
                "
3✔
310
            SELECT 
3✔
311
                display_name
3✔
312
            FROM 
3✔
313
                datasets
3✔
314
            WHERE
3✔
315
                display_name ILIKE $3 ESCAPE '\\'
3✔
316
                {tags_clause}
3✔
317
            ORDER BY display_name ASC
3✔
318
            LIMIT $1
3✔
319
            OFFSET $2;"
3✔
320
            ))
3✔
321
            .await?;
3✔
322

323
        let rows = connection.query(&stmt, &query_params).await?;
3✔
324

325
        Ok(rows.iter().map(|row| row.get(0)).collect())
4✔
326
    }
9✔
327
}
328

329
#[async_trait]
330
impl<Tls>
331
    MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
332
    for PostgresDb<Tls>
333
where
334
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
335
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
336
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
337
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
338
{
339
    async fn meta_data(
×
340
        &self,
×
341
        _id: &DataId,
×
342
    ) -> geoengine_operators::util::Result<
×
343
        Box<
×
344
            dyn MetaData<
×
345
                MockDatasetDataSourceLoadingInfo,
×
346
                VectorResultDescriptor,
×
347
                VectorQueryRectangle,
×
348
            >,
×
349
        >,
×
350
    > {
×
351
        Err(geoengine_operators::error::Error::NotYetImplemented)
×
352
    }
×
353
}
354

355
#[async_trait]
356
impl<Tls> MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
357
    for PostgresDb<Tls>
358
where
359
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
360
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
361
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
362
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
363
{
364
    async fn meta_data(
4✔
365
        &self,
4✔
366
        id: &DataId,
4✔
367
    ) -> geoengine_operators::util::Result<
4✔
368
        Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
4✔
369
    > {
4✔
370
        let id = id
4✔
371
            .internal()
4✔
372
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
4✔
373

374
        let conn = self.conn_pool.get().await.map_err(|e| {
4✔
375
            geoengine_operators::error::Error::MetaData {
×
376
                source: Box::new(e),
×
377
            }
×
378
        })?;
4✔
379
        let stmt = conn
4✔
380
            .prepare(
4✔
381
                "
4✔
382
        SELECT
4✔
383
            meta_data
4✔
384
        FROM
4✔
385
            datasets
4✔
386
        WHERE
4✔
387
            id = $1",
4✔
388
            )
4✔
389
            .await
4✔
390
            .map_err(|e| geoengine_operators::error::Error::MetaData {
4✔
391
                source: Box::new(e),
×
392
            })?;
4✔
393

394
        let row = conn.query_one(&stmt, &[&id]).await.map_err(|e| {
4✔
395
            geoengine_operators::error::Error::MetaData {
×
396
                source: Box::new(e),
×
397
            }
×
398
        })?;
4✔
399

400
        let meta_data: MetaDataDefinition = row.get("meta_data");
4✔
401

402
        let MetaDataDefinition::OgrMetaData(meta_data) = meta_data else {
4✔
403
            return Err(geoengine_operators::error::Error::MetaData {
×
404
                source: Box::new(geoengine_operators::error::Error::InvalidType {
×
405
                    expected: "OgrMetaData".to_string(),
×
406
                    found: meta_data.type_name().to_string(),
×
407
                }),
×
408
            });
×
409
        };
410

411
        Ok(Box::new(meta_data))
4✔
412
    }
12✔
413
}
414

415
#[async_trait]
416
impl<Tls> MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
417
    for PostgresDb<Tls>
418
where
419
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
420
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
421
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
422
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
423
{
424
    async fn meta_data(
25✔
425
        &self,
25✔
426
        id: &DataId,
25✔
427
    ) -> geoengine_operators::util::Result<
25✔
428
        Box<dyn MetaData<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>>,
25✔
429
    > {
25✔
430
        let id = id
25✔
431
            .internal()
25✔
432
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
25✔
433

434
        let conn = self.conn_pool.get().await.map_err(|e| {
25✔
435
            geoengine_operators::error::Error::MetaData {
×
436
                source: Box::new(e),
×
437
            }
×
438
        })?;
25✔
439
        let stmt = conn
25✔
440
            .prepare(
25✔
441
                "
25✔
442
            SELECT
25✔
443
                meta_data
25✔
444
            FROM
25✔
445
               datasets
25✔
446
            WHERE
25✔
447
                id = $1;",
25✔
448
            )
25✔
449
            .await
155✔
450
            .map_err(|e| geoengine_operators::error::Error::MetaData {
25✔
451
                source: Box::new(e),
×
452
            })?;
25✔
453

454
        let row = conn.query_one(&stmt, &[&id]).await.map_err(|e| {
25✔
455
            geoengine_operators::error::Error::MetaData {
×
456
                source: Box::new(e),
×
457
            }
×
458
        })?;
25✔
459

460
        let meta_data: MetaDataDefinition = row.get(0);
25✔
461

25✔
462
        Ok(match meta_data {
25✔
463
            MetaDataDefinition::GdalMetaDataRegular(m) => Box::new(m),
18✔
464
            MetaDataDefinition::GdalStatic(m) => Box::new(m),
3✔
465
            MetaDataDefinition::GdalMetaDataList(m) => Box::new(m),
3✔
466
            MetaDataDefinition::GdalMetadataNetCdfCf(m) => Box::new(m),
1✔
467
            _ => return Err(geoengine_operators::error::Error::DataIdTypeMissMatch),
×
468
        })
469
    }
75✔
470
}
471

472
#[async_trait]
473
pub trait PostgresStorable<Tls>: Send + Sync
474
where
475
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
476
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
477
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
478
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
479
{
480
    fn to_typed_metadata(&self) -> Result<DatasetMetaData>;
481
}
482

483
pub struct DatasetMetaData<'m> {
484
    pub meta_data: &'m MetaDataDefinition,
485
    pub result_descriptor: TypedResultDescriptor,
486
}
487

488
#[async_trait]
489
impl<Tls> DatasetStore for PostgresDb<Tls>
490
where
491
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
492
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
493
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
494
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
495
{
496
    async fn add_dataset(
76✔
497
        &self,
76✔
498
        dataset: AddDataset,
76✔
499
        meta_data: MetaDataDefinition,
76✔
500
    ) -> Result<DatasetIdAndName> {
76✔
501
        let id = DatasetId::new();
76✔
502
        let name = dataset.name.unwrap_or_else(|| DatasetName {
76✔
503
            namespace: None,
17✔
504
            name: id.to_string(),
17✔
505
        });
76✔
506

76✔
507
        Self::check_namespace(&name)?;
76✔
508

509
        let typed_meta_data = meta_data.to_typed_metadata();
76✔
510

511
        let mut conn = self.conn_pool.get().await?;
76✔
512

513
        let tx = conn.build_transaction().start().await?;
76✔
514

515
        tx.execute(
76✔
516
            "
76✔
517
                INSERT INTO datasets (
76✔
518
                    id,
76✔
519
                    name,
76✔
520
                    display_name,
76✔
521
                    description,
76✔
522
                    source_operator,
76✔
523
                    result_descriptor,
76✔
524
                    meta_data,
76✔
525
                    symbology,
76✔
526
                    provenance,
76✔
527
                    tags
76✔
528
                )
76✔
529
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
76✔
530
            &[
76✔
531
                &id,
76✔
532
                &name,
76✔
533
                &dataset.display_name,
76✔
534
                &dataset.description,
76✔
535
                &dataset.source_operator,
76✔
536
                &typed_meta_data.result_descriptor,
76✔
537
                typed_meta_data.meta_data,
76✔
538
                &dataset.symbology,
76✔
539
                &dataset.provenance,
76✔
540
                &dataset.tags,
76✔
541
            ],
76✔
542
        )
76✔
543
        .await
7,108✔
544
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
76✔
545

546
        tx.commit().await?;
76✔
547

548
        Ok(DatasetIdAndName { id, name })
76✔
549
    }
228✔
550

551
    async fn update_dataset(&self, dataset: DatasetId, update: UpdateDataset) -> Result<()> {
1✔
552
        let conn = self.conn_pool.get().await?;
1✔
553

554
        conn.execute(
1✔
555
            "UPDATE datasets SET name = $2, display_name = $3, description = $4, tags = $5 WHERE id = $1;",
1✔
556
            &[
1✔
557
                &dataset,
1✔
558
                &update.name,
1✔
559
                &update.display_name,
1✔
560
                &update.description,
1✔
561
                &update.tags,
1✔
562
            ],
1✔
563
        )
1✔
564
        .await?;
2✔
565

566
        Ok(())
1✔
567
    }
3✔
568

569
    async fn update_dataset_symbology(
1✔
570
        &self,
1✔
571
        dataset: DatasetId,
1✔
572
        symbology: &Symbology,
1✔
573
    ) -> Result<()> {
1✔
574
        let conn = self.conn_pool.get().await?;
1✔
575

576
        conn.execute(
1✔
577
            "UPDATE datasets SET symbology = $2 WHERE id = $1;",
1✔
578
            &[&dataset, &symbology],
1✔
579
        )
1✔
580
        .await?;
2✔
581

582
        Ok(())
1✔
583
    }
3✔
584

585
    async fn update_dataset_provenance(
1✔
586
        &self,
1✔
587
        dataset: DatasetId,
1✔
588
        provenance: &[Provenance],
1✔
589
    ) -> Result<()> {
1✔
590
        let conn = self.conn_pool.get().await?;
1✔
591

592
        conn.execute(
1✔
593
            "UPDATE datasets SET provenance = $2 WHERE id = $1;",
1✔
594
            &[&dataset, &provenance],
1✔
595
        )
1✔
596
        .await?;
2✔
597

598
        Ok(())
1✔
599
    }
3✔
600

601
    async fn delete_dataset(&self, dataset_id: DatasetId) -> Result<()> {
32✔
602
        let conn = self.conn_pool.get().await?;
32✔
603

604
        let stmt = conn.prepare("DELETE FROM datasets WHERE id = $1;").await?;
32✔
605

606
        conn.execute(&stmt, &[&dataset_id]).await?;
32✔
607

608
        Ok(())
32✔
609
    }
96✔
610
}
611

612
#[async_trait]
613
impl<Tls> UploadDb for PostgresDb<Tls>
614
where
615
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
616
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
617
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
618
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
619
{
620
    async fn load_upload(&self, upload: UploadId) -> Result<Upload> {
4✔
621
        // TODO: check permissions
622

623
        let conn = self.conn_pool.get().await?;
4✔
624

625
        let stmt = conn
4✔
626
            .prepare("SELECT id, files FROM uploads WHERE id = $1;")
4✔
627
            .await?;
3✔
628

629
        let row = conn.query_one(&stmt, &[&upload]).await?;
4✔
630

631
        Ok(Upload {
4✔
632
            id: row.get(0),
4✔
633
            files: row
4✔
634
                .get::<_, Vec<FileUpload>>(1)
4✔
635
                .into_iter()
4✔
636
                .map(Into::into)
4✔
637
                .collect(),
4✔
638
        })
4✔
639
    }
12✔
640

641
    async fn create_upload(&self, upload: Upload) -> Result<()> {
6✔
642
        let conn = self.conn_pool.get().await?;
9✔
643

644
        let stmt = conn
6✔
645
            .prepare("INSERT INTO uploads (id, files) VALUES ($1, $2)")
6✔
646
            .await?;
19✔
647

648
        conn.execute(
6✔
649
            &stmt,
6✔
650
            &[
6✔
651
                &upload.id,
6✔
652
                &upload
6✔
653
                    .files
6✔
654
                    .iter()
6✔
655
                    .map(FileUpload::from)
6✔
656
                    .collect::<Vec<_>>(),
6✔
657
            ],
6✔
658
        )
6✔
659
        .await?;
6✔
660
        Ok(())
6✔
661
    }
18✔
662
}
663

664
#[derive(Debug, Clone, ToSql, FromSql)]
78✔
665
pub struct FileUpload {
666
    pub id: FileId,
667
    pub name: String,
668
    pub byte_size: i64,
669
}
670

671
impl From<crate::datasets::upload::FileUpload> for FileUpload {
672
    fn from(upload: crate::datasets::upload::FileUpload) -> Self {
×
673
        Self {
×
674
            id: upload.id,
×
675
            name: upload.name,
×
676
            byte_size: upload.byte_size as i64,
×
677
        }
×
678
    }
×
679
}
680

681
impl From<&crate::datasets::upload::FileUpload> for FileUpload {
682
    fn from(upload: &crate::datasets::upload::FileUpload) -> Self {
15✔
683
        Self {
15✔
684
            id: upload.id,
15✔
685
            name: upload.name.clone(),
15✔
686
            byte_size: upload.byte_size as i64,
15✔
687
        }
15✔
688
    }
15✔
689
}
690

691
impl From<FileUpload> for crate::datasets::upload::FileUpload {
692
    fn from(upload: FileUpload) -> Self {
12✔
693
        Self {
12✔
694
            id: upload.id,
12✔
695
            name: upload.name,
12✔
696
            byte_size: upload.byte_size as u64,
12✔
697
        }
12✔
698
    }
12✔
699
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc