• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12767614094

14 Jan 2025 12:26PM UTC coverage: 90.64% (+0.06%) from 90.576%
12767614094

push

github

web-flow
Merge pull request #1006 from geo-engine/migrate-pro-api

Migrate-pro-api

1106 of 1152 new or added lines in 24 files covered. (96.01%)

248 existing lines in 13 files now uncovered.

133501 of 147287 relevant lines covered (90.64%)

54652.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.73
/services/src/datasets/postgres.rs
1
use super::listing::{OrderBy, Provenance};
2
use super::{AddDataset, DatasetIdAndName, DatasetName};
3
use crate::api::model::services::UpdateDataset;
4
use crate::contexts::PostgresDb;
5
use crate::datasets::listing::ProvenanceOutput;
6
use crate::datasets::listing::{DatasetListOptions, DatasetListing, DatasetProvider};
7
use crate::datasets::storage::{Dataset, DatasetDb, DatasetStore, MetaDataDefinition};
8
use crate::datasets::upload::FileId;
9
use crate::datasets::upload::{Upload, UploadDb, UploadId};
10
use crate::error::{self, Result};
11
use crate::projects::Symbology;
12
use crate::util::postgres::PostgresErrorExt;
13
use async_trait::async_trait;
14
use bb8_postgres::bb8::PooledConnection;
15
use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
16
use bb8_postgres::tokio_postgres::Socket;
17
use bb8_postgres::PostgresConnectionManager;
18
use geoengine_datatypes::dataset::{DataId, DatasetId};
19
use geoengine_datatypes::primitives::RasterQueryRectangle;
20
use geoengine_datatypes::primitives::VectorQueryRectangle;
21
use geoengine_datatypes::util::Identifier;
22
use geoengine_operators::engine::{
23
    MetaData, MetaDataProvider, RasterResultDescriptor, TypedResultDescriptor,
24
    VectorResultDescriptor,
25
};
26
use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo;
27
use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset};
28
use postgres_types::{FromSql, ToSql};
29

30
impl<Tls> DatasetDb for PostgresDb<Tls>
31
where
32
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
33
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
34
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
35
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
36
{
37
}
38

39
pub async fn resolve_dataset_name_to_id<Tls>(
87✔
40
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
87✔
41
    dataset_name: &DatasetName,
87✔
42
) -> Result<Option<DatasetId>>
87✔
43
where
87✔
44
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
87✔
45
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
87✔
46
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
87✔
47
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
87✔
48
{
87✔
49
    let stmt = conn
87✔
50
        .prepare(
87✔
51
            "SELECT id
87✔
52
        FROM datasets
87✔
53
        WHERE name = $1::\"DatasetName\"",
87✔
54
        )
87✔
55
        .await?;
87✔
56

57
    let row_option = conn.query_opt(&stmt, &[&dataset_name]).await?;
87✔
58

59
    Ok(row_option.map(|row| row.get(0)))
87✔
60
}
87✔
61

62
#[allow(clippy::too_many_lines)]
63
#[async_trait]
64
impl<Tls> DatasetProvider for PostgresDb<Tls>
65
where
66
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
67
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
68
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
69
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
70
{
71
    async fn list_datasets(&self, options: DatasetListOptions) -> Result<Vec<DatasetListing>> {
4✔
72
        let conn = self.conn_pool.get().await?;
4✔
73

74
        let order_sql = if options.order == OrderBy::NameAsc {
4✔
75
            "display_name ASC"
4✔
76
        } else {
77
            "display_name DESC"
×
78
        };
79

80
        let mut pos = 2;
4✔
81

82
        let filter_sql = if options.filter.is_some() {
4✔
83
            pos += 1;
3✔
84
            Some(format!("display_name ILIKE ${pos} ESCAPE '\\'"))
3✔
85
        } else {
86
            None
1✔
87
        };
88

89
        let (filter_tags_sql, filter_tags_list) = if let Some(filter_tags) = &options.tags {
4✔
90
            pos += 1;
1✔
91
            (Some(format!("tags @> ${pos}::text[]")), filter_tags.clone())
1✔
92
        } else {
93
            (None, vec![])
3✔
94
        };
95

96
        let where_clause_sql = match (filter_sql, filter_tags_sql) {
4✔
97
            (Some(filter_sql), Some(filter_tags_sql)) => {
1✔
98
                format!("WHERE {filter_sql} AND {filter_tags_sql}")
1✔
99
            }
100
            (Some(filter_sql), None) => format!("WHERE {filter_sql}"),
2✔
101
            (None, Some(filter_tags_sql)) => format!("WHERE {filter_tags_sql}"),
×
102
            (None, None) => String::new(),
1✔
103
        };
104

105
        let stmt = conn
4✔
106
            .prepare(&format!(
4✔
107
                "
4✔
108
            SELECT 
4✔
109
                id,
4✔
110
                name,
4✔
111
                display_name,
4✔
112
                description,
4✔
113
                tags,
4✔
114
                source_operator,
4✔
115
                result_descriptor,
4✔
116
                symbology
4✔
117
            FROM 
4✔
118
                datasets
4✔
119
            {where_clause_sql}
4✔
120
            ORDER BY {order_sql}
4✔
121
            LIMIT $1
4✔
122
            OFFSET $2;"
4✔
123
            ))
4✔
124
            .await?;
4✔
125

126
        let rows = match (options.filter, options.tags) {
4✔
127
            (Some(filter), Some(_)) => {
1✔
128
                conn.query(
1✔
129
                    &stmt,
1✔
130
                    &[
1✔
131
                        &i64::from(options.limit),
1✔
132
                        &i64::from(options.offset),
1✔
133
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
1✔
134
                        &filter_tags_list,
1✔
135
                    ],
1✔
136
                )
1✔
137
                .await?
1✔
138
            }
139
            (Some(filter), None) => {
2✔
140
                conn.query(
2✔
141
                    &stmt,
2✔
142
                    &[
2✔
143
                        &i64::from(options.limit),
2✔
144
                        &i64::from(options.offset),
2✔
145
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
2✔
146
                    ],
2✔
147
                )
2✔
148
                .await?
2✔
149
            }
150
            (None, Some(_)) => {
151
                conn.query(
×
152
                    &stmt,
×
153
                    &[
×
154
                        &i64::from(options.limit),
×
155
                        &i64::from(options.offset),
×
156
                        &filter_tags_list,
×
157
                    ],
×
158
                )
×
159
                .await?
×
160
            }
161
            (None, None) => {
162
                conn.query(
1✔
163
                    &stmt,
1✔
164
                    &[&i64::from(options.limit), &i64::from(options.offset)],
1✔
165
                )
1✔
166
                .await?
1✔
167
            }
168
        };
169

170
        Ok(rows
4✔
171
            .iter()
4✔
172
            .map(|row| {
5✔
173
                Result::<DatasetListing>::Ok(DatasetListing {
5✔
174
                    id: row.get(0),
5✔
175
                    name: row.get(1),
5✔
176
                    display_name: row.get(2),
5✔
177
                    description: row.get(3),
5✔
178
                    tags: row.get::<_, Option<_>>(4).unwrap_or_default(),
5✔
179
                    source_operator: row.get(5),
5✔
180
                    result_descriptor: row.get(6),
5✔
181
                    symbology: row.get(7),
5✔
182
                })
5✔
183
            })
5✔
184
            .filter_map(Result::ok)
4✔
185
            .collect())
4✔
186
    }
8✔
187

188
    async fn load_dataset(&self, dataset: &DatasetId) -> Result<Dataset> {
4✔
189
        let conn = self.conn_pool.get().await?;
4✔
190
        let stmt = conn
4✔
191
            .prepare(
4✔
192
                "
4✔
193
            SELECT
4✔
194
                id,
4✔
195
                name,
4✔
196
                display_name,
4✔
197
                description,
4✔
198
                result_descriptor,
4✔
199
                source_operator,
4✔
200
                symbology,
4✔
201
                provenance,
4✔
202
                tags
4✔
203
            FROM 
4✔
204
                datasets
4✔
205
            WHERE 
4✔
206
                id = $1
4✔
207
            LIMIT 
4✔
208
                1",
4✔
209
            )
4✔
210
            .await?;
4✔
211

212
        let row = conn.query_opt(&stmt, &[dataset]).await?;
4✔
213

214
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
4✔
215

216
        Ok(Dataset {
2✔
217
            id: row.get(0),
2✔
218
            name: row.get(1),
2✔
219
            display_name: row.get(2),
2✔
220
            description: row.get(3),
2✔
221
            result_descriptor: row.get(4),
2✔
222
            source_operator: row.get(5),
2✔
223
            symbology: row.get(6),
2✔
224
            provenance: row.get(7),
2✔
225
            tags: row.get(8),
2✔
226
        })
2✔
227
    }
8✔
228

229
    async fn load_provenance(&self, dataset: &DatasetId) -> Result<ProvenanceOutput> {
1✔
230
        let conn = self.conn_pool.get().await?;
1✔
231

232
        let stmt = conn
1✔
233
            .prepare(
1✔
234
                "
1✔
235
            SELECT 
1✔
236
                provenance 
1✔
237
            FROM 
1✔
238
                datasets
1✔
239
            WHERE
1✔
240
                id = $1;",
1✔
241
            )
1✔
242
            .await?;
1✔
243

244
        let row = conn.query_one(&stmt, &[dataset]).await?;
1✔
245

246
        let provenances: Vec<Provenance> = row.get(0);
1✔
247

1✔
248
        Ok(ProvenanceOutput {
1✔
249
            data: (*dataset).into(),
1✔
250
            provenance: Some(provenances),
1✔
251
        })
1✔
252
    }
2✔
253

UNCOV
254
    async fn load_loading_info(&self, dataset: &DatasetId) -> Result<MetaDataDefinition> {
×
UNCOV
255
        let conn = self.conn_pool.get().await?;
×
256

UNCOV
257
        let stmt = conn
×
UNCOV
258
            .prepare(
×
UNCOV
259
                "
×
UNCOV
260
            SELECT 
×
UNCOV
261
                meta_data 
×
UNCOV
262
            FROM 
×
UNCOV
263
                datasets
×
UNCOV
264
            WHERE
×
UNCOV
265
                id = $1;",
×
UNCOV
266
            )
×
UNCOV
267
            .await?;
×
268

UNCOV
269
        let row = conn.query_one(&stmt, &[dataset]).await?;
×
270

UNCOV
271
        Ok(row.get(0))
×
UNCOV
272
    }
×
273

274
    async fn resolve_dataset_name_to_id(
275
        &self,
276
        dataset_name: &DatasetName,
277
    ) -> Result<Option<DatasetId>> {
2✔
278
        let conn = self.conn_pool.get().await?;
2✔
279
        resolve_dataset_name_to_id(&conn, dataset_name).await
2✔
280
    }
4✔
281

282
    async fn dataset_autocomplete_search(
283
        &self,
284
        tags: Option<Vec<String>>,
285
        search_string: String,
286
        limit: u32,
287
        offset: u32,
288
    ) -> Result<Vec<String>> {
3✔
289
        let connection = self.conn_pool.get().await?;
3✔
290

291
        let limit = i64::from(limit);
3✔
292
        let offset = i64::from(offset);
3✔
293
        let search_string = format!(
3✔
294
            "%{}%",
3✔
295
            search_string.replace('%', "\\%").replace('_', "\\_")
3✔
296
        );
3✔
297

3✔
298
        let mut query_params: Vec<&(dyn ToSql + Sync)> = vec![&limit, &offset, &search_string];
3✔
299

300
        let tags_clause = if let Some(tags) = &tags {
3✔
301
            query_params.push(tags);
1✔
302
            " AND tags @> $4::text[]".to_string()
1✔
303
        } else {
304
            String::new()
2✔
305
        };
306

307
        let stmt = connection
3✔
308
            .prepare(&format!(
3✔
309
                "
3✔
310
            SELECT 
3✔
311
                display_name
3✔
312
            FROM 
3✔
313
                datasets
3✔
314
            WHERE
3✔
315
                display_name ILIKE $3 ESCAPE '\\'
3✔
316
                {tags_clause}
3✔
317
            ORDER BY display_name ASC
3✔
318
            LIMIT $1
3✔
319
            OFFSET $2;"
3✔
320
            ))
3✔
321
            .await?;
3✔
322

323
        let rows = connection.query(&stmt, &query_params).await?;
3✔
324

325
        Ok(rows.iter().map(|row| row.get(0)).collect())
4✔
326
    }
6✔
327
}
328

329
#[async_trait]
330
impl<Tls>
331
    MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
332
    for PostgresDb<Tls>
333
where
334
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
335
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
336
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
337
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
338
{
339
    async fn meta_data(
340
        &self,
341
        _id: &DataId,
342
    ) -> geoengine_operators::util::Result<
343
        Box<
344
            dyn MetaData<
345
                MockDatasetDataSourceLoadingInfo,
346
                VectorResultDescriptor,
347
                VectorQueryRectangle,
348
            >,
349
        >,
350
    > {
×
351
        Err(geoengine_operators::error::Error::NotYetImplemented)
×
352
    }
×
353
}
354

355
#[async_trait]
356
impl<Tls> MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
357
    for PostgresDb<Tls>
358
where
359
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
360
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
361
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
362
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
363
{
364
    async fn meta_data(
365
        &self,
366
        id: &DataId,
367
    ) -> geoengine_operators::util::Result<
368
        Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
369
    > {
2✔
370
        let id = id
2✔
371
            .internal()
2✔
372
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
2✔
373

374
        let conn = self.conn_pool.get().await.map_err(|e| {
2✔
375
            geoengine_operators::error::Error::MetaData {
×
376
                source: Box::new(e),
×
377
            }
×
378
        })?;
2✔
379
        let stmt = conn
2✔
380
            .prepare(
2✔
381
                "
2✔
382
        SELECT
2✔
383
            meta_data
2✔
384
        FROM
2✔
385
            datasets
2✔
386
        WHERE
2✔
387
            id = $1",
2✔
388
            )
2✔
389
            .await
2✔
390
            .map_err(|e| geoengine_operators::error::Error::MetaData {
2✔
391
                source: Box::new(e),
×
392
            })?;
2✔
393

394
        let row = conn.query_one(&stmt, &[&id]).await.map_err(|e| {
2✔
395
            geoengine_operators::error::Error::MetaData {
×
396
                source: Box::new(e),
×
397
            }
×
398
        })?;
2✔
399

400
        let meta_data: MetaDataDefinition = row.get("meta_data");
2✔
401

402
        let MetaDataDefinition::OgrMetaData(meta_data) = meta_data else {
2✔
403
            return Err(geoengine_operators::error::Error::MetaData {
×
404
                source: Box::new(geoengine_operators::error::Error::InvalidType {
×
405
                    expected: "OgrMetaData".to_string(),
×
406
                    found: meta_data.type_name().to_string(),
×
407
                }),
×
408
            });
×
409
        };
410

411
        Ok(Box::new(meta_data))
2✔
412
    }
4✔
413
}
414

415
#[async_trait]
416
impl<Tls> MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
417
    for PostgresDb<Tls>
418
where
419
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
420
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
421
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
422
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
423
{
424
    async fn meta_data(
425
        &self,
426
        id: &DataId,
427
    ) -> geoengine_operators::util::Result<
428
        Box<dyn MetaData<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>>,
429
    > {
5✔
430
        let id = id
5✔
431
            .internal()
5✔
432
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
5✔
433

434
        let conn = self.conn_pool.get().await.map_err(|e| {
5✔
435
            geoengine_operators::error::Error::MetaData {
×
436
                source: Box::new(e),
×
437
            }
×
438
        })?;
5✔
439
        let stmt = conn
5✔
440
            .prepare(
5✔
441
                "
5✔
442
            SELECT
5✔
443
                meta_data
5✔
444
            FROM
5✔
445
               datasets
5✔
446
            WHERE
5✔
447
                id = $1;",
5✔
448
            )
5✔
449
            .await
5✔
450
            .map_err(|e| geoengine_operators::error::Error::MetaData {
5✔
451
                source: Box::new(e),
×
452
            })?;
5✔
453

454
        let row = conn.query_one(&stmt, &[&id]).await.map_err(|e| {
5✔
455
            geoengine_operators::error::Error::MetaData {
×
456
                source: Box::new(e),
×
457
            }
×
458
        })?;
5✔
459

460
        let meta_data: MetaDataDefinition = row.get(0);
5✔
461

5✔
462
        Ok(match meta_data {
5✔
463
            MetaDataDefinition::GdalMetaDataRegular(m) => Box::new(m),
1✔
464
            MetaDataDefinition::GdalStatic(m) => Box::new(m),
2✔
465
            MetaDataDefinition::GdalMetaDataList(m) => Box::new(m),
1✔
466
            MetaDataDefinition::GdalMetadataNetCdfCf(m) => Box::new(m),
1✔
467
            _ => return Err(geoengine_operators::error::Error::DataIdTypeMissMatch),
×
468
        })
469
    }
10✔
470
}
471

472
#[async_trait]
473
pub trait PostgresStorable<Tls>: Send + Sync
474
where
475
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
476
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
477
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
478
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
479
{
480
    fn to_typed_metadata(&self) -> Result<DatasetMetaData>;
481
}
482

483
pub struct DatasetMetaData<'m> {
484
    pub meta_data: &'m MetaDataDefinition,
485
    pub result_descriptor: TypedResultDescriptor,
486
}
487

488
#[async_trait]
489
impl<Tls> DatasetStore for PostgresDb<Tls>
490
where
491
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
492
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
493
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
494
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
495
{
496
    async fn add_dataset(
497
        &self,
498
        dataset: AddDataset,
499
        meta_data: MetaDataDefinition,
500
    ) -> Result<DatasetIdAndName> {
14✔
501
        let id = DatasetId::new();
14✔
502
        let name = dataset.name.unwrap_or_else(|| DatasetName {
14✔
503
            namespace: None,
10✔
504
            name: id.to_string(),
10✔
505
        });
14✔
506

14✔
507
        Self::check_namespace(&name)?;
14✔
508

509
        let typed_meta_data = meta_data.to_typed_metadata();
14✔
510

511
        let mut conn = self.conn_pool.get().await?;
14✔
512

513
        let tx = conn.build_transaction().start().await?;
14✔
514

515
        tx.execute(
14✔
516
            "
14✔
517
                INSERT INTO datasets (
14✔
518
                    id,
14✔
519
                    name,
14✔
520
                    display_name,
14✔
521
                    description,
14✔
522
                    source_operator,
14✔
523
                    result_descriptor,
14✔
524
                    meta_data,
14✔
525
                    symbology,
14✔
526
                    provenance,
14✔
527
                    tags
14✔
528
                )
14✔
529
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
14✔
530
            &[
14✔
531
                &id,
14✔
532
                &name,
14✔
533
                &dataset.display_name,
14✔
534
                &dataset.description,
14✔
535
                &dataset.source_operator,
14✔
536
                &typed_meta_data.result_descriptor,
14✔
537
                typed_meta_data.meta_data,
14✔
538
                &dataset.symbology,
14✔
539
                &dataset.provenance,
14✔
540
                &dataset.tags,
14✔
541
            ],
14✔
542
        )
14✔
543
        .await
14✔
544
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
14✔
545

546
        tx.commit().await?;
14✔
547

548
        Ok(DatasetIdAndName { id, name })
14✔
549
    }
28✔
550

UNCOV
551
    async fn update_dataset(&self, dataset: DatasetId, update: UpdateDataset) -> Result<()> {
×
UNCOV
552
        let conn = self.conn_pool.get().await?;
×
553

UNCOV
554
        conn.execute(
×
UNCOV
555
            "UPDATE datasets SET name = $2, display_name = $3, description = $4, tags = $5 WHERE id = $1;",
×
UNCOV
556
            &[
×
UNCOV
557
                &dataset,
×
UNCOV
558
                &update.name,
×
UNCOV
559
                &update.display_name,
×
UNCOV
560
                &update.description,
×
UNCOV
561
                &update.tags,
×
UNCOV
562
            ],
×
UNCOV
563
        )
×
UNCOV
564
        .await?;
×
565

UNCOV
566
        Ok(())
×
UNCOV
567
    }
×
568

569
    async fn update_dataset_loading_info(
570
        &self,
571
        dataset: DatasetId,
572
        meta_data: &MetaDataDefinition,
UNCOV
573
    ) -> Result<()> {
×
UNCOV
574
        let conn = self.conn_pool.get().await?;
×
575

UNCOV
576
        conn.execute(
×
UNCOV
577
            "UPDATE datasets SET meta_data = $2 WHERE id = $1;",
×
UNCOV
578
            &[&dataset, &meta_data],
×
UNCOV
579
        )
×
UNCOV
580
        .await?;
×
581

UNCOV
582
        Ok(())
×
UNCOV
583
    }
×
584

585
    async fn update_dataset_symbology(
586
        &self,
587
        dataset: DatasetId,
588
        symbology: &Symbology,
UNCOV
589
    ) -> Result<()> {
×
UNCOV
590
        let conn = self.conn_pool.get().await?;
×
591

UNCOV
592
        conn.execute(
×
UNCOV
593
            "UPDATE datasets SET symbology = $2 WHERE id = $1;",
×
UNCOV
594
            &[&dataset, &symbology],
×
UNCOV
595
        )
×
UNCOV
596
        .await?;
×
597

UNCOV
598
        Ok(())
×
UNCOV
599
    }
×
600

601
    async fn update_dataset_provenance(
602
        &self,
603
        dataset: DatasetId,
604
        provenance: &[Provenance],
UNCOV
605
    ) -> Result<()> {
×
UNCOV
606
        let conn = self.conn_pool.get().await?;
×
607

UNCOV
608
        conn.execute(
×
UNCOV
609
            "UPDATE datasets SET provenance = $2 WHERE id = $1;",
×
UNCOV
610
            &[&dataset, &provenance],
×
UNCOV
611
        )
×
UNCOV
612
        .await?;
×
613

UNCOV
614
        Ok(())
×
UNCOV
615
    }
×
616

617
    async fn delete_dataset(&self, dataset_id: DatasetId) -> Result<()> {
2✔
618
        let conn = self.conn_pool.get().await?;
2✔
619

620
        let stmt = conn.prepare("DELETE FROM datasets WHERE id = $1;").await?;
2✔
621

622
        conn.execute(&stmt, &[&dataset_id]).await?;
2✔
623

624
        Ok(())
2✔
625
    }
4✔
626
}
627

628
#[async_trait]
629
impl<Tls> UploadDb for PostgresDb<Tls>
630
where
631
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
632
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
633
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
634
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
635
{
636
    async fn load_upload(&self, upload: UploadId) -> Result<Upload> {
1✔
637
        // TODO: check permissions
638

639
        let conn = self.conn_pool.get().await?;
1✔
640

641
        let stmt = conn
1✔
642
            .prepare("SELECT id, files FROM uploads WHERE id = $1;")
1✔
643
            .await?;
1✔
644

645
        let row = conn.query_one(&stmt, &[&upload]).await?;
1✔
646

647
        Ok(Upload {
1✔
648
            id: row.get(0),
1✔
649
            files: row
1✔
650
                .get::<_, Vec<FileUpload>>(1)
1✔
651
                .into_iter()
1✔
652
                .map(Into::into)
1✔
653
                .collect(),
1✔
654
        })
1✔
655
    }
2✔
656

657
    async fn create_upload(&self, upload: Upload) -> Result<()> {
1✔
658
        let conn = self.conn_pool.get().await?;
1✔
659

660
        let stmt = conn
1✔
661
            .prepare("INSERT INTO uploads (id, files) VALUES ($1, $2)")
1✔
662
            .await?;
1✔
663

664
        conn.execute(
1✔
665
            &stmt,
1✔
666
            &[
1✔
667
                &upload.id,
1✔
668
                &upload
1✔
669
                    .files
1✔
670
                    .iter()
1✔
671
                    .map(FileUpload::from)
1✔
672
                    .collect::<Vec<_>>(),
1✔
673
            ],
1✔
674
        )
1✔
675
        .await?;
1✔
676
        Ok(())
1✔
677
    }
2✔
678
}
679

680
#[derive(Debug, Clone, ToSql, FromSql)]
5✔
681
pub struct FileUpload {
682
    pub id: FileId,
683
    pub name: String,
684
    pub byte_size: i64,
685
}
686

687
impl From<crate::datasets::upload::FileUpload> for FileUpload {
688
    fn from(upload: crate::datasets::upload::FileUpload) -> Self {
×
689
        Self {
×
690
            id: upload.id,
×
691
            name: upload.name,
×
692
            byte_size: upload.byte_size as i64,
×
693
        }
×
694
    }
×
695
}
696

697
impl From<&crate::datasets::upload::FileUpload> for FileUpload {
698
    fn from(upload: &crate::datasets::upload::FileUpload) -> Self {
1✔
699
        Self {
1✔
700
            id: upload.id,
1✔
701
            name: upload.name.clone(),
1✔
702
            byte_size: upload.byte_size as i64,
1✔
703
        }
1✔
704
    }
1✔
705
}
706

707
impl From<FileUpload> for crate::datasets::upload::FileUpload {
708
    fn from(upload: FileUpload) -> Self {
1✔
709
        Self {
1✔
710
            id: upload.id,
1✔
711
            name: upload.name,
1✔
712
            byte_size: upload.byte_size as u64,
1✔
713
        }
1✔
714
    }
1✔
715
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc