• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.65
/services/src/datasets/postgres.rs
1
use super::listing::{OrderBy, Provenance};
2
use super::{AddDataset, DatasetIdAndName, DatasetName};
3
use crate::api::model::services::UpdateDataset;
4
use crate::contexts::PostgresDb;
5
use crate::datasets::listing::ProvenanceOutput;
6
use crate::datasets::listing::{DatasetListOptions, DatasetListing, DatasetProvider};
7
use crate::datasets::storage::{Dataset, DatasetDb, DatasetStore, MetaDataDefinition};
8
use crate::datasets::upload::FileId;
9
use crate::datasets::upload::{Upload, UploadDb, UploadId};
10
use crate::error::{self, Result};
11
use crate::projects::Symbology;
12
use crate::util::postgres::PostgresErrorExt;
13
use async_trait::async_trait;
14
use bb8_postgres::bb8::PooledConnection;
15
use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
16
use bb8_postgres::tokio_postgres::Socket;
17
use bb8_postgres::PostgresConnectionManager;
18
use geoengine_datatypes::dataset::{DataId, DatasetId};
19
use geoengine_datatypes::primitives::RasterQueryRectangle;
20
use geoengine_datatypes::primitives::VectorQueryRectangle;
21
use geoengine_datatypes::util::Identifier;
22
use geoengine_operators::engine::{
23
    MetaData, MetaDataProvider, RasterResultDescriptor, TypedResultDescriptor,
24
    VectorResultDescriptor,
25
};
26
use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo;
27
use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset};
28
use postgres_types::{FromSql, ToSql};
29

30
impl<Tls> DatasetDb for PostgresDb<Tls>
31
where
32
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
33
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
34
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
35
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
36
{
37
}
38

39
pub async fn resolve_dataset_name_to_id<Tls>(
86✔
40
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
86✔
41
    dataset_name: &DatasetName,
86✔
42
) -> Result<Option<DatasetId>>
86✔
43
where
86✔
44
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
86✔
45
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
86✔
46
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
86✔
47
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
86✔
48
{
86✔
49
    let stmt = conn
86✔
50
        .prepare(
86✔
51
            "SELECT id
86✔
52
        FROM datasets
86✔
53
        WHERE name = $1::\"DatasetName\"",
86✔
54
        )
86✔
55
        .await?;
110✔
56

57
    let row_option = conn.query_opt(&stmt, &[&dataset_name]).await?;
86✔
58

59
    Ok(row_option.map(|row| row.get(0)))
86✔
60
}
86✔
61

62
#[allow(clippy::too_many_lines)]
63
#[async_trait]
64
impl<Tls> DatasetProvider for PostgresDb<Tls>
65
where
66
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
67
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
68
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
69
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
70
{
71
    async fn list_datasets(&self, options: DatasetListOptions) -> Result<Vec<DatasetListing>> {
5✔
72
        let conn = self.conn_pool.get().await?;
5✔
73

74
        let order_sql = if options.order == OrderBy::NameAsc {
5✔
75
            "display_name ASC"
5✔
76
        } else {
UNCOV
77
            "display_name DESC"
×
78
        };
79

80
        let mut pos = 2;
5✔
81

82
        let filter_sql = if options.filter.is_some() {
5✔
83
            pos += 1;
3✔
84
            Some(format!("display_name ILIKE ${pos} ESCAPE '\\'"))
3✔
85
        } else {
86
            None
2✔
87
        };
88

89
        let (filter_tags_sql, filter_tags_list) = if let Some(filter_tags) = &options.tags {
5✔
90
            pos += 1;
1✔
91
            (Some(format!("tags @> ${pos}::text[]")), filter_tags.clone())
1✔
92
        } else {
93
            (None, vec![])
4✔
94
        };
95

96
        let where_clause_sql = match (filter_sql, filter_tags_sql) {
5✔
97
            (Some(filter_sql), Some(filter_tags_sql)) => {
1✔
98
                format!("WHERE {filter_sql} AND {filter_tags_sql}")
1✔
99
            }
100
            (Some(filter_sql), None) => format!("WHERE {filter_sql}"),
2✔
UNCOV
101
            (None, Some(filter_tags_sql)) => format!("WHERE {filter_tags_sql}"),
×
102
            (None, None) => String::new(),
2✔
103
        };
104

105
        let stmt = conn
5✔
106
            .prepare(&format!(
5✔
107
                "
5✔
108
            SELECT 
5✔
109
                id,
5✔
110
                name,
5✔
111
                display_name,
5✔
112
                description,
5✔
113
                tags,
5✔
114
                source_operator,
5✔
115
                result_descriptor,
5✔
116
                symbology
5✔
117
            FROM 
5✔
118
                datasets
5✔
119
            {where_clause_sql}
5✔
120
            ORDER BY {order_sql}
5✔
121
            LIMIT $1
5✔
122
            OFFSET $2;"
5✔
123
            ))
5✔
124
            .await?;
4✔
125

126
        let rows = match (options.filter, options.tags) {
5✔
127
            (Some(filter), Some(_)) => {
1✔
128
                conn.query(
1✔
129
                    &stmt,
1✔
130
                    &[
1✔
131
                        &i64::from(options.limit),
1✔
132
                        &i64::from(options.offset),
1✔
133
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
1✔
134
                        &filter_tags_list,
1✔
135
                    ],
1✔
136
                )
1✔
137
                .await?
1✔
138
            }
139
            (Some(filter), None) => {
2✔
140
                conn.query(
2✔
141
                    &stmt,
2✔
142
                    &[
2✔
143
                        &i64::from(options.limit),
2✔
144
                        &i64::from(options.offset),
2✔
145
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
2✔
146
                    ],
2✔
147
                )
2✔
148
                .await?
2✔
149
            }
150
            (None, Some(_)) => {
UNCOV
151
                conn.query(
×
152
                    &stmt,
×
153
                    &[
×
154
                        &i64::from(options.limit),
×
155
                        &i64::from(options.offset),
×
156
                        &filter_tags_list,
×
157
                    ],
×
158
                )
×
UNCOV
159
                .await?
×
160
            }
161
            (None, None) => {
162
                conn.query(
2✔
163
                    &stmt,
2✔
164
                    &[&i64::from(options.limit), &i64::from(options.offset)],
2✔
165
                )
2✔
166
                .await?
1✔
167
            }
168
        };
169

170
        Ok(rows
5✔
171
            .iter()
5✔
172
            .map(|row| {
7✔
173
                Result::<DatasetListing>::Ok(DatasetListing {
7✔
174
                    id: row.get(0),
7✔
175
                    name: row.get(1),
7✔
176
                    display_name: row.get(2),
7✔
177
                    description: row.get(3),
7✔
178
                    tags: row.get::<_, Option<_>>(4).unwrap_or_default(),
7✔
179
                    source_operator: row.get(5),
7✔
180
                    result_descriptor: row.get(6),
7✔
181
                    symbology: row.get(7),
7✔
182
                })
7✔
183
            })
7✔
184
            .filter_map(Result::ok)
5✔
185
            .collect())
5✔
186
    }
10✔
187

188
    async fn load_dataset(&self, dataset: &DatasetId) -> Result<Dataset> {
13✔
189
        let conn = self.conn_pool.get().await?;
13✔
190
        let stmt = conn
13✔
191
            .prepare(
13✔
192
                "
13✔
193
            SELECT
13✔
194
                id,
13✔
195
                name,
13✔
196
                display_name,
13✔
197
                description,
13✔
198
                result_descriptor,
13✔
199
                source_operator,
13✔
200
                symbology,
13✔
201
                provenance,
13✔
202
                tags
13✔
203
            FROM 
13✔
204
                datasets
13✔
205
            WHERE 
13✔
206
                id = $1
13✔
207
            LIMIT 
13✔
208
                1",
13✔
209
            )
13✔
210
            .await?;
11✔
211

212
        let row = conn.query_opt(&stmt, &[dataset]).await?;
13✔
213

214
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
13✔
215

216
        Ok(Dataset {
9✔
217
            id: row.get(0),
9✔
218
            name: row.get(1),
9✔
219
            display_name: row.get(2),
9✔
220
            description: row.get(3),
9✔
221
            result_descriptor: row.get(4),
9✔
222
            source_operator: row.get(5),
9✔
223
            symbology: row.get(6),
9✔
224
            provenance: row.get(7),
9✔
225
            tags: row.get(8),
9✔
226
        })
9✔
227
    }
26✔
228

229
    async fn load_provenance(&self, dataset: &DatasetId) -> Result<ProvenanceOutput> {
3✔
230
        let conn = self.conn_pool.get().await?;
3✔
231

232
        let stmt = conn
3✔
233
            .prepare(
3✔
234
                "
3✔
235
            SELECT 
3✔
236
                provenance 
3✔
237
            FROM 
3✔
238
                datasets
3✔
239
            WHERE
3✔
240
                id = $1;",
3✔
241
            )
3✔
242
            .await?;
8✔
243

244
        let row = conn.query_one(&stmt, &[dataset]).await?;
3✔
245

246
        let provenances: Vec<Provenance> = row.get(0);
3✔
247

3✔
248
        Ok(ProvenanceOutput {
3✔
249
            data: (*dataset).into(),
3✔
250
            provenance: Some(provenances),
3✔
251
        })
3✔
252
    }
6✔
253

254
    async fn load_loading_info(&self, dataset: &DatasetId) -> Result<MetaDataDefinition> {
2✔
255
        let conn = self.conn_pool.get().await?;
2✔
256

257
        let stmt = conn
2✔
258
            .prepare(
2✔
259
                "
2✔
260
            SELECT 
2✔
261
                meta_data 
2✔
262
            FROM 
2✔
263
                datasets
2✔
264
            WHERE
2✔
265
                id = $1;",
2✔
266
            )
2✔
267
            .await?;
2✔
268

269
        let row = conn.query_one(&stmt, &[dataset]).await?;
2✔
270

271
        Ok(row.get(0))
2✔
272
    }
4✔
273

274
    async fn resolve_dataset_name_to_id(
275
        &self,
276
        dataset_name: &DatasetName,
277
    ) -> Result<Option<DatasetId>> {
71✔
278
        let conn = self.conn_pool.get().await?;
94✔
279
        resolve_dataset_name_to_id(&conn, dataset_name).await
147✔
280
    }
142✔
281

282
    async fn dataset_autocomplete_search(
283
        &self,
284
        tags: Option<Vec<String>>,
285
        search_string: String,
286
        limit: u32,
287
        offset: u32,
288
    ) -> Result<Vec<String>> {
3✔
289
        let connection = self.conn_pool.get().await?;
3✔
290

291
        let limit = i64::from(limit);
3✔
292
        let offset = i64::from(offset);
3✔
293
        let search_string = format!(
3✔
294
            "%{}%",
3✔
295
            search_string.replace('%', "\\%").replace('_', "\\_")
3✔
296
        );
3✔
297

3✔
298
        let mut query_params: Vec<&(dyn ToSql + Sync)> = vec![&limit, &offset, &search_string];
3✔
299

300
        let tags_clause = if let Some(tags) = &tags {
3✔
301
            query_params.push(tags);
1✔
302
            " AND tags @> $4::text[]".to_string()
1✔
303
        } else {
304
            String::new()
2✔
305
        };
306

307
        let stmt = connection
3✔
308
            .prepare(&format!(
3✔
309
                "
3✔
310
            SELECT 
3✔
311
                display_name
3✔
312
            FROM 
3✔
313
                datasets
3✔
314
            WHERE
3✔
315
                display_name ILIKE $3 ESCAPE '\\'
3✔
316
                {tags_clause}
3✔
317
            ORDER BY display_name ASC
3✔
318
            LIMIT $1
3✔
319
            OFFSET $2;"
3✔
320
            ))
3✔
321
            .await?;
3✔
322

323
        let rows = connection.query(&stmt, &query_params).await?;
3✔
324

325
        Ok(rows.iter().map(|row| row.get(0)).collect())
4✔
326
    }
6✔
327
}
328

329
#[async_trait]
330
impl<Tls>
331
    MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
332
    for PostgresDb<Tls>
333
where
334
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
335
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
336
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
337
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
338
{
339
    async fn meta_data(
340
        &self,
341
        _id: &DataId,
342
    ) -> geoengine_operators::util::Result<
343
        Box<
344
            dyn MetaData<
345
                MockDatasetDataSourceLoadingInfo,
346
                VectorResultDescriptor,
347
                VectorQueryRectangle,
348
            >,
349
        >,
350
    > {
×
351
        Err(geoengine_operators::error::Error::NotYetImplemented)
×
352
    }
×
353
}
354

355
#[async_trait]
356
impl<Tls> MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
357
    for PostgresDb<Tls>
358
where
359
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
360
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
361
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
362
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
363
{
364
    async fn meta_data(
365
        &self,
366
        id: &DataId,
367
    ) -> geoengine_operators::util::Result<
368
        Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
369
    > {
4✔
370
        let id = id
4✔
371
            .internal()
4✔
372
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
4✔
373

374
        let conn = self.conn_pool.get().await.map_err(|e| {
4✔
375
            geoengine_operators::error::Error::MetaData {
×
376
                source: Box::new(e),
×
377
            }
×
378
        })?;
4✔
379
        let stmt = conn
4✔
380
            .prepare(
4✔
381
                "
4✔
382
        SELECT
4✔
383
            meta_data
4✔
384
        FROM
4✔
385
            datasets
4✔
386
        WHERE
4✔
387
            id = $1",
4✔
388
            )
4✔
389
            .await
3✔
390
            .map_err(|e| geoengine_operators::error::Error::MetaData {
4✔
391
                source: Box::new(e),
×
392
            })?;
4✔
393

394
        let row = conn.query_one(&stmt, &[&id]).await.map_err(|e| {
4✔
395
            geoengine_operators::error::Error::MetaData {
×
396
                source: Box::new(e),
×
397
            }
×
398
        })?;
4✔
399

400
        let meta_data: MetaDataDefinition = row.get("meta_data");
4✔
401

402
        let MetaDataDefinition::OgrMetaData(meta_data) = meta_data else {
4✔
UNCOV
403
            return Err(geoengine_operators::error::Error::MetaData {
×
404
                source: Box::new(geoengine_operators::error::Error::InvalidType {
×
405
                    expected: "OgrMetaData".to_string(),
×
406
                    found: meta_data.type_name().to_string(),
×
407
                }),
×
408
            });
×
409
        };
410

411
        Ok(Box::new(meta_data))
4✔
412
    }
8✔
413
}
414

415
#[async_trait]
416
impl<Tls> MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
417
    for PostgresDb<Tls>
418
where
419
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
420
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
421
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
422
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
423
{
424
    async fn meta_data(
425
        &self,
426
        id: &DataId,
427
    ) -> geoengine_operators::util::Result<
428
        Box<dyn MetaData<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>>,
429
    > {
31✔
430
        let id = id
31✔
431
            .internal()
31✔
432
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
31✔
433

434
        let conn = self.conn_pool.get().await.map_err(|e| {
33✔
435
            geoengine_operators::error::Error::MetaData {
×
436
                source: Box::new(e),
×
437
            }
×
438
        })?;
31✔
439
        let stmt = conn
31✔
440
            .prepare(
31✔
441
                "
31✔
442
            SELECT
31✔
443
                meta_data
31✔
444
            FROM
31✔
445
               datasets
31✔
446
            WHERE
31✔
447
                id = $1;",
31✔
448
            )
31✔
449
            .await
431✔
450
            .map_err(|e| geoengine_operators::error::Error::MetaData {
31✔
451
                source: Box::new(e),
×
452
            })?;
31✔
453

454
        let row = conn.query_one(&stmt, &[&id]).await.map_err(|e| {
35✔
455
            geoengine_operators::error::Error::MetaData {
×
456
                source: Box::new(e),
×
457
            }
×
458
        })?;
31✔
459

460
        let meta_data: MetaDataDefinition = row.get(0);
31✔
461

31✔
462
        Ok(match meta_data {
31✔
463
            MetaDataDefinition::GdalMetaDataRegular(m) => Box::new(m),
18✔
464
            MetaDataDefinition::GdalStatic(m) => Box::new(m),
9✔
465
            MetaDataDefinition::GdalMetaDataList(m) => Box::new(m),
3✔
466
            MetaDataDefinition::GdalMetadataNetCdfCf(m) => Box::new(m),
1✔
UNCOV
467
            _ => return Err(geoengine_operators::error::Error::DataIdTypeMissMatch),
×
468
        })
469
    }
62✔
470
}
471

472
#[async_trait]
473
pub trait PostgresStorable<Tls>: Send + Sync
474
where
475
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
476
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
477
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
478
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
479
{
480
    fn to_typed_metadata(&self) -> Result<DatasetMetaData>;
481
}
482

483
pub struct DatasetMetaData<'m> {
484
    pub meta_data: &'m MetaDataDefinition,
485
    pub result_descriptor: TypedResultDescriptor,
486
}
487

488
#[async_trait]
489
impl<Tls> DatasetStore for PostgresDb<Tls>
490
where
491
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
492
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
493
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
494
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
495
{
496
    async fn add_dataset(
497
        &self,
498
        dataset: AddDataset,
499
        meta_data: MetaDataDefinition,
500
    ) -> Result<DatasetIdAndName> {
83✔
501
        let id = DatasetId::new();
83✔
502
        let name = dataset.name.unwrap_or_else(|| DatasetName {
83✔
503
            namespace: None,
18✔
504
            name: id.to_string(),
18✔
505
        });
83✔
506

83✔
507
        Self::check_namespace(&name)?;
83✔
508

509
        let typed_meta_data = meta_data.to_typed_metadata();
83✔
510

511
        let mut conn = self.conn_pool.get().await?;
83✔
512

513
        let tx = conn.build_transaction().start().await?;
83✔
514

515
        tx.execute(
83✔
516
            "
83✔
517
                INSERT INTO datasets (
83✔
518
                    id,
83✔
519
                    name,
83✔
520
                    display_name,
83✔
521
                    description,
83✔
522
                    source_operator,
83✔
523
                    result_descriptor,
83✔
524
                    meta_data,
83✔
525
                    symbology,
83✔
526
                    provenance,
83✔
527
                    tags
83✔
528
                )
83✔
529
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
83✔
530
            &[
83✔
531
                &id,
83✔
532
                &name,
83✔
533
                &dataset.display_name,
83✔
534
                &dataset.description,
83✔
535
                &dataset.source_operator,
83✔
536
                &typed_meta_data.result_descriptor,
83✔
537
                typed_meta_data.meta_data,
83✔
538
                &dataset.symbology,
83✔
539
                &dataset.provenance,
83✔
540
                &dataset.tags,
83✔
541
            ],
83✔
542
        )
83✔
543
        .await
7,513✔
544
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
83✔
545

546
        tx.commit().await?;
83✔
547

548
        Ok(DatasetIdAndName { id, name })
83✔
549
    }
166✔
550

551
    async fn update_dataset(&self, dataset: DatasetId, update: UpdateDataset) -> Result<()> {
1✔
552
        let conn = self.conn_pool.get().await?;
1✔
553

554
        conn.execute(
1✔
555
            "UPDATE datasets SET name = $2, display_name = $3, description = $4, tags = $5 WHERE id = $1;",
1✔
556
            &[
1✔
557
                &dataset,
1✔
558
                &update.name,
1✔
559
                &update.display_name,
1✔
560
                &update.description,
1✔
561
                &update.tags,
1✔
562
            ],
1✔
563
        )
1✔
564
        .await?;
1✔
565

566
        Ok(())
1✔
567
    }
2✔
568

569
    async fn update_dataset_loading_info(
570
        &self,
571
        dataset: DatasetId,
572
        meta_data: &MetaDataDefinition,
573
    ) -> Result<()> {
1✔
574
        let conn = self.conn_pool.get().await?;
1✔
575

576
        conn.execute(
1✔
577
            "UPDATE datasets SET meta_data = $2 WHERE id = $1;",
1✔
578
            &[&dataset, &meta_data],
1✔
579
        )
1✔
580
        .await?;
2✔
581

582
        Ok(())
1✔
583
    }
2✔
584

585
    async fn update_dataset_symbology(
586
        &self,
587
        dataset: DatasetId,
588
        symbology: &Symbology,
589
    ) -> Result<()> {
1✔
590
        let conn = self.conn_pool.get().await?;
1✔
591

592
        conn.execute(
1✔
593
            "UPDATE datasets SET symbology = $2 WHERE id = $1;",
1✔
594
            &[&dataset, &symbology],
1✔
595
        )
1✔
596
        .await?;
2✔
597

598
        Ok(())
1✔
599
    }
2✔
600

601
    async fn update_dataset_provenance(
602
        &self,
603
        dataset: DatasetId,
604
        provenance: &[Provenance],
605
    ) -> Result<()> {
1✔
606
        let conn = self.conn_pool.get().await?;
1✔
607

608
        conn.execute(
1✔
609
            "UPDATE datasets SET provenance = $2 WHERE id = $1;",
1✔
610
            &[&dataset, &provenance],
1✔
611
        )
1✔
612
        .await?;
2✔
613

614
        Ok(())
1✔
615
    }
2✔
616

617
    async fn delete_dataset(&self, dataset_id: DatasetId) -> Result<()> {
32✔
618
        let conn = self.conn_pool.get().await?;
32✔
619

620
        let stmt = conn.prepare("DELETE FROM datasets WHERE id = $1;").await?;
32✔
621

622
        conn.execute(&stmt, &[&dataset_id]).await?;
32✔
623

624
        Ok(())
32✔
625
    }
64✔
626
}
627

628
#[async_trait]
629
impl<Tls> UploadDb for PostgresDb<Tls>
630
where
631
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
632
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
633
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
634
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
635
{
636
    async fn load_upload(&self, upload: UploadId) -> Result<Upload> {
4✔
637
        // TODO: check permissions
638

639
        let conn = self.conn_pool.get().await?;
4✔
640

641
        let stmt = conn
4✔
642
            .prepare("SELECT id, files FROM uploads WHERE id = $1;")
4✔
643
            .await?;
4✔
644

645
        let row = conn.query_one(&stmt, &[&upload]).await?;
4✔
646

647
        Ok(Upload {
4✔
648
            id: row.get(0),
4✔
649
            files: row
4✔
650
                .get::<_, Vec<FileUpload>>(1)
4✔
651
                .into_iter()
4✔
652
                .map(Into::into)
4✔
653
                .collect(),
4✔
654
        })
4✔
655
    }
8✔
656

657
    async fn create_upload(&self, upload: Upload) -> Result<()> {
6✔
658
        let conn = self.conn_pool.get().await?;
11✔
659

660
        let stmt = conn
6✔
661
            .prepare("INSERT INTO uploads (id, files) VALUES ($1, $2)")
6✔
662
            .await?;
24✔
663

664
        conn.execute(
6✔
665
            &stmt,
6✔
666
            &[
6✔
667
                &upload.id,
6✔
668
                &upload
6✔
669
                    .files
6✔
670
                    .iter()
6✔
671
                    .map(FileUpload::from)
6✔
672
                    .collect::<Vec<_>>(),
6✔
673
            ],
6✔
674
        )
6✔
675
        .await?;
6✔
676
        Ok(())
6✔
677
    }
12✔
678
}
679

680
#[derive(Debug, Clone, ToSql, FromSql)]
39✔
681
pub struct FileUpload {
682
    pub id: FileId,
683
    pub name: String,
684
    pub byte_size: i64,
685
}
686

687
impl From<crate::datasets::upload::FileUpload> for FileUpload {
688
    fn from(upload: crate::datasets::upload::FileUpload) -> Self {
×
689
        Self {
×
690
            id: upload.id,
×
691
            name: upload.name,
×
692
            byte_size: upload.byte_size as i64,
×
693
        }
×
694
    }
×
695
}
696

697
impl From<&crate::datasets::upload::FileUpload> for FileUpload {
698
    fn from(upload: &crate::datasets::upload::FileUpload) -> Self {
15✔
699
        Self {
15✔
700
            id: upload.id,
15✔
701
            name: upload.name.clone(),
15✔
702
            byte_size: upload.byte_size as i64,
15✔
703
        }
15✔
704
    }
15✔
705
}
706

707
impl From<FileUpload> for crate::datasets::upload::FileUpload {
708
    fn from(upload: FileUpload) -> Self {
12✔
709
        Self {
12✔
710
            id: upload.id,
12✔
711
            name: upload.name,
12✔
712
            byte_size: upload.byte_size as u64,
12✔
713
        }
12✔
714
    }
12✔
715
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc