• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12633507883

06 Jan 2025 01:36PM UTC coverage: 90.602% (+0.04%) from 90.56%
12633507883

Pull #1006

github

web-flow
Merge a30730161 into 071ba4e63
Pull Request #1006: Migrate-pro-api

537 of 577 new or added lines in 10 files covered. (93.07%)

221 existing lines in 12 files now uncovered.

133569 of 147424 relevant lines covered (90.6%)

54601.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.73
/services/src/datasets/postgres.rs
1
use super::listing::{OrderBy, Provenance};
2
use super::{AddDataset, DatasetIdAndName, DatasetName};
3
use crate::api::model::services::UpdateDataset;
4
use crate::contexts::PostgresDb;
5
use crate::datasets::listing::ProvenanceOutput;
6
use crate::datasets::listing::{DatasetListOptions, DatasetListing, DatasetProvider};
7
use crate::datasets::storage::{Dataset, DatasetDb, DatasetStore, MetaDataDefinition};
8
use crate::datasets::upload::FileId;
9
use crate::datasets::upload::{Upload, UploadDb, UploadId};
10
use crate::error::{self, Result};
11
use crate::projects::Symbology;
12
use crate::util::postgres::PostgresErrorExt;
13
use async_trait::async_trait;
14
use bb8_postgres::bb8::PooledConnection;
15
use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
16
use bb8_postgres::tokio_postgres::Socket;
17
use bb8_postgres::PostgresConnectionManager;
18
use geoengine_datatypes::dataset::{DataId, DatasetId};
19
use geoengine_datatypes::primitives::RasterQueryRectangle;
20
use geoengine_datatypes::primitives::VectorQueryRectangle;
21
use geoengine_datatypes::util::Identifier;
22
use geoengine_operators::engine::{
23
    MetaData, MetaDataProvider, RasterResultDescriptor, TypedResultDescriptor,
24
    VectorResultDescriptor,
25
};
26
use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo;
27
use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset};
28
use postgres_types::{FromSql, ToSql};
29

30
impl<Tls> DatasetDb for PostgresDb<Tls>
31
where
32
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
33
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
34
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
35
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
36
{
37
}
38

39
pub async fn resolve_dataset_name_to_id<Tls>(
87✔
40
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
87✔
41
    dataset_name: &DatasetName,
87✔
42
) -> Result<Option<DatasetId>>
87✔
43
where
87✔
44
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
87✔
45
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
87✔
46
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
87✔
47
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
87✔
48
{
87✔
49
    let stmt = conn
87✔
50
        .prepare(
87✔
51
            "SELECT id
87✔
52
        FROM datasets
87✔
53
        WHERE name = $1::\"DatasetName\"",
87✔
54
        )
87✔
55
        .await?;
87✔
56

57
    let row_option = conn.query_opt(&stmt, &[&dataset_name]).await?;
87✔
58

59
    Ok(row_option.map(|row| row.get(0)))
87✔
60
}
87✔
61

62
#[allow(clippy::too_many_lines)]
63
#[async_trait]
64
impl<Tls> DatasetProvider for PostgresDb<Tls>
65
where
66
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
67
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
68
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
69
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
70
{
71
    async fn list_datasets(&self, options: DatasetListOptions) -> Result<Vec<DatasetListing>> {
4✔
72
        let conn = self.conn_pool.get().await?;
4✔
73

74
        let order_sql = if options.order == OrderBy::NameAsc {
4✔
75
            "display_name ASC"
4✔
76
        } else {
77
            "display_name DESC"
×
78
        };
79

80
        let mut pos = 2;
4✔
81

82
        let filter_sql = if options.filter.is_some() {
4✔
83
            pos += 1;
3✔
84
            Some(format!("display_name ILIKE ${pos} ESCAPE '\\'"))
3✔
85
        } else {
86
            None
1✔
87
        };
88

89
        let (filter_tags_sql, filter_tags_list) = if let Some(filter_tags) = &options.tags {
4✔
90
            pos += 1;
1✔
91
            (Some(format!("tags @> ${pos}::text[]")), filter_tags.clone())
1✔
92
        } else {
93
            (None, vec![])
3✔
94
        };
95

96
        let where_clause_sql = match (filter_sql, filter_tags_sql) {
4✔
97
            (Some(filter_sql), Some(filter_tags_sql)) => {
1✔
98
                format!("WHERE {filter_sql} AND {filter_tags_sql}")
1✔
99
            }
100
            (Some(filter_sql), None) => format!("WHERE {filter_sql}"),
2✔
101
            (None, Some(filter_tags_sql)) => format!("WHERE {filter_tags_sql}"),
×
102
            (None, None) => String::new(),
1✔
103
        };
104

105
        let stmt = conn
4✔
106
            .prepare(&format!(
4✔
107
                "
4✔
108
            SELECT 
4✔
109
                id,
4✔
110
                name,
4✔
111
                display_name,
4✔
112
                description,
4✔
113
                tags,
4✔
114
                source_operator,
4✔
115
                result_descriptor,
4✔
116
                symbology
4✔
117
            FROM 
4✔
118
                datasets
4✔
119
            {where_clause_sql}
4✔
120
            ORDER BY {order_sql}
4✔
121
            LIMIT $1
4✔
122
            OFFSET $2;"
4✔
123
            ))
4✔
124
            .await?;
4✔
125

126
        let rows = match (options.filter, options.tags) {
4✔
127
            (Some(filter), Some(_)) => {
1✔
128
                conn.query(
1✔
129
                    &stmt,
1✔
130
                    &[
1✔
131
                        &i64::from(options.limit),
1✔
132
                        &i64::from(options.offset),
1✔
133
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
1✔
134
                        &filter_tags_list,
1✔
135
                    ],
1✔
136
                )
1✔
137
                .await?
1✔
138
            }
139
            (Some(filter), None) => {
2✔
140
                conn.query(
2✔
141
                    &stmt,
2✔
142
                    &[
2✔
143
                        &i64::from(options.limit),
2✔
144
                        &i64::from(options.offset),
2✔
145
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
2✔
146
                    ],
2✔
147
                )
2✔
148
                .await?
2✔
149
            }
150
            (None, Some(_)) => {
151
                conn.query(
×
152
                    &stmt,
×
153
                    &[
×
154
                        &i64::from(options.limit),
×
155
                        &i64::from(options.offset),
×
156
                        &filter_tags_list,
×
157
                    ],
×
158
                )
×
159
                .await?
×
160
            }
161
            (None, None) => {
162
                conn.query(
1✔
163
                    &stmt,
1✔
164
                    &[&i64::from(options.limit), &i64::from(options.offset)],
1✔
165
                )
1✔
166
                .await?
1✔
167
            }
168
        };
169

170
        Ok(rows
4✔
171
            .iter()
4✔
172
            .map(|row| {
5✔
173
                Result::<DatasetListing>::Ok(DatasetListing {
5✔
174
                    id: row.get(0),
5✔
175
                    name: row.get(1),
5✔
176
                    display_name: row.get(2),
5✔
177
                    description: row.get(3),
5✔
178
                    tags: row.get::<_, Option<_>>(4).unwrap_or_default(),
5✔
179
                    source_operator: row.get(5),
5✔
180
                    result_descriptor: row.get(6),
5✔
181
                    symbology: row.get(7),
5✔
182
                })
5✔
183
            })
5✔
184
            .filter_map(Result::ok)
4✔
185
            .collect())
4✔
186
    }
8✔
187

188
    async fn load_dataset(&self, dataset: &DatasetId) -> Result<Dataset> {
4✔
189
        let conn = self.conn_pool.get().await?;
4✔
190
        let stmt = conn
4✔
191
            .prepare(
4✔
192
                "
4✔
193
            SELECT
4✔
194
                id,
4✔
195
                name,
4✔
196
                display_name,
4✔
197
                description,
4✔
198
                result_descriptor,
4✔
199
                source_operator,
4✔
200
                symbology,
4✔
201
                provenance,
4✔
202
                tags
4✔
203
            FROM 
4✔
204
                datasets
4✔
205
            WHERE 
4✔
206
                id = $1
4✔
207
            LIMIT 
4✔
208
                1",
4✔
209
            )
4✔
210
            .await?;
4✔
211

212
        let row = conn.query_opt(&stmt, &[dataset]).await?;
4✔
213

214
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
4✔
215

216
        Ok(Dataset {
2✔
217
            id: row.get(0),
2✔
218
            name: row.get(1),
2✔
219
            display_name: row.get(2),
2✔
220
            description: row.get(3),
2✔
221
            result_descriptor: row.get(4),
2✔
222
            source_operator: row.get(5),
2✔
223
            symbology: row.get(6),
2✔
224
            provenance: row.get(7),
2✔
225
            tags: row.get(8),
2✔
226
        })
2✔
227
    }
8✔
228

229
    async fn load_provenance(&self, dataset: &DatasetId) -> Result<ProvenanceOutput> {
3✔
230
        let conn = self.conn_pool.get().await?;
3✔
231

232
        let stmt = conn
3✔
233
            .prepare(
3✔
234
                "
3✔
235
            SELECT 
3✔
236
                provenance 
3✔
237
            FROM 
3✔
238
                datasets
3✔
239
            WHERE
3✔
240
                id = $1;",
3✔
241
            )
3✔
242
            .await?;
3✔
243

244
        let row = conn.query_one(&stmt, &[dataset]).await?;
3✔
245

246
        let provenances: Vec<Provenance> = row.get(0);
3✔
247

3✔
248
        Ok(ProvenanceOutput {
3✔
249
            data: (*dataset).into(),
3✔
250
            provenance: Some(provenances),
3✔
251
        })
3✔
252
    }
6✔
253

UNCOV
254
    async fn load_loading_info(&self, dataset: &DatasetId) -> Result<MetaDataDefinition> {
×
UNCOV
255
        let conn = self.conn_pool.get().await?;
×
256

UNCOV
257
        let stmt = conn
×
UNCOV
258
            .prepare(
×
UNCOV
259
                "
×
UNCOV
260
            SELECT 
×
UNCOV
261
                meta_data 
×
UNCOV
262
            FROM 
×
UNCOV
263
                datasets
×
UNCOV
264
            WHERE
×
UNCOV
265
                id = $1;",
×
UNCOV
266
            )
×
UNCOV
267
            .await?;
×
268

UNCOV
269
        let row = conn.query_one(&stmt, &[dataset]).await?;
×
270

UNCOV
271
        Ok(row.get(0))
×
UNCOV
272
    }
×
273

274
    async fn resolve_dataset_name_to_id(
275
        &self,
276
        dataset_name: &DatasetName,
277
    ) -> Result<Option<DatasetId>> {
59✔
278
        let conn = self.conn_pool.get().await?;
59✔
279
        resolve_dataset_name_to_id(&conn, dataset_name).await
59✔
280
    }
118✔
281

282
    async fn dataset_autocomplete_search(
283
        &self,
284
        tags: Option<Vec<String>>,
285
        search_string: String,
286
        limit: u32,
287
        offset: u32,
288
    ) -> Result<Vec<String>> {
3✔
289
        let connection = self.conn_pool.get().await?;
3✔
290

291
        let limit = i64::from(limit);
3✔
292
        let offset = i64::from(offset);
3✔
293
        let search_string = format!(
3✔
294
            "%{}%",
3✔
295
            search_string.replace('%', "\\%").replace('_', "\\_")
3✔
296
        );
3✔
297

3✔
298
        let mut query_params: Vec<&(dyn ToSql + Sync)> = vec![&limit, &offset, &search_string];
3✔
299

300
        let tags_clause = if let Some(tags) = &tags {
3✔
301
            query_params.push(tags);
1✔
302
            " AND tags @> $4::text[]".to_string()
1✔
303
        } else {
304
            String::new()
2✔
305
        };
306

307
        let stmt = connection
3✔
308
            .prepare(&format!(
3✔
309
                "
3✔
310
            SELECT 
3✔
311
                display_name
3✔
312
            FROM 
3✔
313
                datasets
3✔
314
            WHERE
3✔
315
                display_name ILIKE $3 ESCAPE '\\'
3✔
316
                {tags_clause}
3✔
317
            ORDER BY display_name ASC
3✔
318
            LIMIT $1
3✔
319
            OFFSET $2;"
3✔
320
            ))
3✔
321
            .await?;
3✔
322

323
        let rows = connection.query(&stmt, &query_params).await?;
3✔
324

325
        Ok(rows.iter().map(|row| row.get(0)).collect())
4✔
326
    }
6✔
327
}
328

329
#[async_trait]
330
impl<Tls>
331
    MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
332
    for PostgresDb<Tls>
333
where
334
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
335
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
336
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
337
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
338
{
339
    async fn meta_data(
340
        &self,
341
        _id: &DataId,
342
    ) -> geoengine_operators::util::Result<
343
        Box<
344
            dyn MetaData<
345
                MockDatasetDataSourceLoadingInfo,
346
                VectorResultDescriptor,
347
                VectorQueryRectangle,
348
            >,
349
        >,
350
    > {
×
351
        Err(geoengine_operators::error::Error::NotYetImplemented)
×
352
    }
×
353
}
354

355
#[async_trait]
356
impl<Tls> MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
357
    for PostgresDb<Tls>
358
where
359
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
360
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
361
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
362
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
363
{
364
    async fn meta_data(
365
        &self,
366
        id: &DataId,
367
    ) -> geoengine_operators::util::Result<
368
        Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
369
    > {
3✔
370
        let id = id
3✔
371
            .internal()
3✔
372
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
3✔
373

374
        let conn = self.conn_pool.get().await.map_err(|e| {
3✔
375
            geoengine_operators::error::Error::MetaData {
×
376
                source: Box::new(e),
×
377
            }
×
378
        })?;
3✔
379
        let stmt = conn
3✔
380
            .prepare(
3✔
381
                "
3✔
382
        SELECT
3✔
383
            meta_data
3✔
384
        FROM
3✔
385
            datasets
3✔
386
        WHERE
3✔
387
            id = $1",
3✔
388
            )
3✔
389
            .await
3✔
390
            .map_err(|e| geoengine_operators::error::Error::MetaData {
3✔
391
                source: Box::new(e),
×
392
            })?;
3✔
393

394
        let row = conn.query_one(&stmt, &[&id]).await.map_err(|e| {
3✔
395
            geoengine_operators::error::Error::MetaData {
×
396
                source: Box::new(e),
×
397
            }
×
398
        })?;
3✔
399

400
        let meta_data: MetaDataDefinition = row.get("meta_data");
3✔
401

402
        let MetaDataDefinition::OgrMetaData(meta_data) = meta_data else {
3✔
403
            return Err(geoengine_operators::error::Error::MetaData {
×
404
                source: Box::new(geoengine_operators::error::Error::InvalidType {
×
405
                    expected: "OgrMetaData".to_string(),
×
406
                    found: meta_data.type_name().to_string(),
×
407
                }),
×
408
            });
×
409
        };
410

411
        Ok(Box::new(meta_data))
3✔
412
    }
6✔
413
}
414

415
#[async_trait]
416
impl<Tls> MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
417
    for PostgresDb<Tls>
418
where
419
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
420
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
421
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
422
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
423
{
424
    async fn meta_data(
425
        &self,
426
        id: &DataId,
427
    ) -> geoengine_operators::util::Result<
428
        Box<dyn MetaData<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>>,
429
    > {
31✔
430
        let id = id
31✔
431
            .internal()
31✔
432
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
31✔
433

434
        let conn = self.conn_pool.get().await.map_err(|e| {
31✔
435
            geoengine_operators::error::Error::MetaData {
×
436
                source: Box::new(e),
×
437
            }
×
438
        })?;
31✔
439
        let stmt = conn
31✔
440
            .prepare(
31✔
441
                "
31✔
442
            SELECT
31✔
443
                meta_data
31✔
444
            FROM
31✔
445
               datasets
31✔
446
            WHERE
31✔
447
                id = $1;",
31✔
448
            )
31✔
449
            .await
31✔
450
            .map_err(|e| geoengine_operators::error::Error::MetaData {
31✔
451
                source: Box::new(e),
×
452
            })?;
31✔
453

454
        let row = conn.query_one(&stmt, &[&id]).await.map_err(|e| {
31✔
455
            geoengine_operators::error::Error::MetaData {
×
456
                source: Box::new(e),
×
457
            }
×
458
        })?;
31✔
459

460
        let meta_data: MetaDataDefinition = row.get(0);
31✔
461

31✔
462
        Ok(match meta_data {
31✔
463
            MetaDataDefinition::GdalMetaDataRegular(m) => Box::new(m),
20✔
464
            MetaDataDefinition::GdalStatic(m) => Box::new(m),
9✔
465
            MetaDataDefinition::GdalMetaDataList(m) => Box::new(m),
1✔
466
            MetaDataDefinition::GdalMetadataNetCdfCf(m) => Box::new(m),
1✔
467
            _ => return Err(geoengine_operators::error::Error::DataIdTypeMissMatch),
×
468
        })
469
    }
62✔
470
}
471

472
#[async_trait]
473
pub trait PostgresStorable<Tls>: Send + Sync
474
where
475
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
476
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
477
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
478
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
479
{
480
    fn to_typed_metadata(&self) -> Result<DatasetMetaData>;
481
}
482

483
pub struct DatasetMetaData<'m> {
484
    pub meta_data: &'m MetaDataDefinition,
485
    pub result_descriptor: TypedResultDescriptor,
486
}
487

488
#[async_trait]
489
impl<Tls> DatasetStore for PostgresDb<Tls>
490
where
491
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
492
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
493
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
494
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
495
{
496
    async fn add_dataset(
497
        &self,
498
        dataset: AddDataset,
499
        meta_data: MetaDataDefinition,
500
    ) -> Result<DatasetIdAndName> {
69✔
501
        let id = DatasetId::new();
69✔
502
        let name = dataset.name.unwrap_or_else(|| DatasetName {
69✔
503
            namespace: None,
11✔
504
            name: id.to_string(),
11✔
505
        });
69✔
506

69✔
507
        Self::check_namespace(&name)?;
69✔
508

509
        let typed_meta_data = meta_data.to_typed_metadata();
69✔
510

511
        let mut conn = self.conn_pool.get().await?;
69✔
512

513
        let tx = conn.build_transaction().start().await?;
69✔
514

515
        tx.execute(
69✔
516
            "
69✔
517
                INSERT INTO datasets (
69✔
518
                    id,
69✔
519
                    name,
69✔
520
                    display_name,
69✔
521
                    description,
69✔
522
                    source_operator,
69✔
523
                    result_descriptor,
69✔
524
                    meta_data,
69✔
525
                    symbology,
69✔
526
                    provenance,
69✔
527
                    tags
69✔
528
                )
69✔
529
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
69✔
530
            &[
69✔
531
                &id,
69✔
532
                &name,
69✔
533
                &dataset.display_name,
69✔
534
                &dataset.description,
69✔
535
                &dataset.source_operator,
69✔
536
                &typed_meta_data.result_descriptor,
69✔
537
                typed_meta_data.meta_data,
69✔
538
                &dataset.symbology,
69✔
539
                &dataset.provenance,
69✔
540
                &dataset.tags,
69✔
541
            ],
69✔
542
        )
69✔
543
        .await
69✔
544
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
69✔
545

546
        tx.commit().await?;
69✔
547

548
        Ok(DatasetIdAndName { id, name })
69✔
549
    }
138✔
550

UNCOV
551
    async fn update_dataset(&self, dataset: DatasetId, update: UpdateDataset) -> Result<()> {
×
UNCOV
552
        let conn = self.conn_pool.get().await?;
×
553

UNCOV
554
        conn.execute(
×
UNCOV
555
            "UPDATE datasets SET name = $2, display_name = $3, description = $4, tags = $5 WHERE id = $1;",
×
UNCOV
556
            &[
×
UNCOV
557
                &dataset,
×
UNCOV
558
                &update.name,
×
UNCOV
559
                &update.display_name,
×
UNCOV
560
                &update.description,
×
UNCOV
561
                &update.tags,
×
UNCOV
562
            ],
×
UNCOV
563
        )
×
UNCOV
564
        .await?;
×
565

UNCOV
566
        Ok(())
×
UNCOV
567
    }
×
568

569
    async fn update_dataset_loading_info(
570
        &self,
571
        dataset: DatasetId,
572
        meta_data: &MetaDataDefinition,
UNCOV
573
    ) -> Result<()> {
×
UNCOV
574
        let conn = self.conn_pool.get().await?;
×
575

UNCOV
576
        conn.execute(
×
UNCOV
577
            "UPDATE datasets SET meta_data = $2 WHERE id = $1;",
×
UNCOV
578
            &[&dataset, &meta_data],
×
UNCOV
579
        )
×
UNCOV
580
        .await?;
×
581

UNCOV
582
        Ok(())
×
UNCOV
583
    }
×
584

585
    async fn update_dataset_symbology(
586
        &self,
587
        dataset: DatasetId,
588
        symbology: &Symbology,
UNCOV
589
    ) -> Result<()> {
×
UNCOV
590
        let conn = self.conn_pool.get().await?;
×
591

UNCOV
592
        conn.execute(
×
UNCOV
593
            "UPDATE datasets SET symbology = $2 WHERE id = $1;",
×
UNCOV
594
            &[&dataset, &symbology],
×
UNCOV
595
        )
×
UNCOV
596
        .await?;
×
597

UNCOV
598
        Ok(())
×
UNCOV
599
    }
×
600

601
    async fn update_dataset_provenance(
602
        &self,
603
        dataset: DatasetId,
604
        provenance: &[Provenance],
UNCOV
605
    ) -> Result<()> {
×
UNCOV
606
        let conn = self.conn_pool.get().await?;
×
607

UNCOV
608
        conn.execute(
×
UNCOV
609
            "UPDATE datasets SET provenance = $2 WHERE id = $1;",
×
UNCOV
610
            &[&dataset, &provenance],
×
UNCOV
611
        )
×
UNCOV
612
        .await?;
×
613

UNCOV
614
        Ok(())
×
UNCOV
615
    }
×
616

617
    async fn delete_dataset(&self, dataset_id: DatasetId) -> Result<()> {
30✔
618
        let conn = self.conn_pool.get().await?;
30✔
619

620
        let stmt = conn.prepare("DELETE FROM datasets WHERE id = $1;").await?;
30✔
621

622
        conn.execute(&stmt, &[&dataset_id]).await?;
30✔
623

624
        Ok(())
30✔
625
    }
60✔
626
}
627

628
#[async_trait]
629
impl<Tls> UploadDb for PostgresDb<Tls>
630
where
631
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
632
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
633
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
634
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
635
{
636
    async fn load_upload(&self, upload: UploadId) -> Result<Upload> {
1✔
637
        // TODO: check permissions
638

639
        let conn = self.conn_pool.get().await?;
1✔
640

641
        let stmt = conn
1✔
642
            .prepare("SELECT id, files FROM uploads WHERE id = $1;")
1✔
643
            .await?;
1✔
644

645
        let row = conn.query_one(&stmt, &[&upload]).await?;
1✔
646

647
        Ok(Upload {
1✔
648
            id: row.get(0),
1✔
649
            files: row
1✔
650
                .get::<_, Vec<FileUpload>>(1)
1✔
651
                .into_iter()
1✔
652
                .map(Into::into)
1✔
653
                .collect(),
1✔
654
        })
1✔
655
    }
2✔
656

657
    async fn create_upload(&self, upload: Upload) -> Result<()> {
3✔
658
        let conn = self.conn_pool.get().await?;
3✔
659

660
        let stmt = conn
3✔
661
            .prepare("INSERT INTO uploads (id, files) VALUES ($1, $2)")
3✔
662
            .await?;
3✔
663

664
        conn.execute(
3✔
665
            &stmt,
3✔
666
            &[
3✔
667
                &upload.id,
3✔
668
                &upload
3✔
669
                    .files
3✔
670
                    .iter()
3✔
671
                    .map(FileUpload::from)
3✔
672
                    .collect::<Vec<_>>(),
3✔
673
            ],
3✔
674
        )
3✔
675
        .await?;
3✔
676
        Ok(())
3✔
677
    }
6✔
678
}
679

680
#[derive(Debug, Clone, ToSql, FromSql)]
16✔
681
pub struct FileUpload {
682
    pub id: FileId,
683
    pub name: String,
684
    pub byte_size: i64,
685
}
686

687
impl From<crate::datasets::upload::FileUpload> for FileUpload {
688
    fn from(upload: crate::datasets::upload::FileUpload) -> Self {
×
689
        Self {
×
690
            id: upload.id,
×
691
            name: upload.name,
×
692
            byte_size: upload.byte_size as i64,
×
693
        }
×
694
    }
×
695
}
696

697
impl From<&crate::datasets::upload::FileUpload> for FileUpload {
698
    fn from(upload: &crate::datasets::upload::FileUpload) -> Self {
4✔
699
        Self {
4✔
700
            id: upload.id,
4✔
701
            name: upload.name.clone(),
4✔
702
            byte_size: upload.byte_size as i64,
4✔
703
        }
4✔
704
    }
4✔
705
}
706

707
impl From<FileUpload> for crate::datasets::upload::FileUpload {
708
    fn from(upload: FileUpload) -> Self {
1✔
709
        Self {
1✔
710
            id: upload.id,
1✔
711
            name: upload.name,
1✔
712
            byte_size: upload.byte_size as u64,
1✔
713
        }
1✔
714
    }
1✔
715
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc