• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.32
/services/src/pro/datasets/postgres.rs
1
use crate::api::model::services::UpdateDataset;
2
use crate::datasets::listing::Provenance;
3
use crate::datasets::listing::{DatasetListOptions, DatasetListing, DatasetProvider};
4
use crate::datasets::listing::{OrderBy, ProvenanceOutput};
5
use crate::datasets::postgres::resolve_dataset_name_to_id;
6
use crate::datasets::storage::{Dataset, DatasetDb, DatasetStore, MetaDataDefinition};
7
use crate::datasets::upload::FileId;
8
use crate::datasets::upload::{Upload, UploadDb, UploadId};
9
use crate::datasets::{AddDataset, DatasetIdAndName, DatasetName};
10
use crate::error::{self, Error, Result};
11
use crate::pro::contexts::ProPostgresDb;
12
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
13
use crate::pro::permissions::{Permission, RoleId};
14
use crate::projects::Symbology;
15
use crate::util::postgres::PostgresErrorExt;
16
use async_trait::async_trait;
17
use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
18
use bb8_postgres::tokio_postgres::Socket;
19
use geoengine_datatypes::dataset::{DataId, DatasetId};
20
use geoengine_datatypes::error::BoxedResultExt;
21
use geoengine_datatypes::primitives::RasterQueryRectangle;
22
use geoengine_datatypes::primitives::VectorQueryRectangle;
23
use geoengine_datatypes::util::Identifier;
24
use geoengine_operators::engine::{
25
    MetaData, MetaDataProvider, RasterResultDescriptor, VectorResultDescriptor,
26
};
27
use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo;
28
use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset};
29
use postgres_types::{FromSql, ToSql};
30

31
impl<Tls> DatasetDb for ProPostgresDb<Tls>
32
where
33
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
34
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
35
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
36
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
37
{
38
}
39

40
#[allow(clippy::too_many_lines)]
41
#[async_trait]
42
impl<Tls> DatasetProvider for ProPostgresDb<Tls>
43
where
44
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
45
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
46
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
47
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
48
{
49
    async fn list_datasets(&self, options: DatasetListOptions) -> Result<Vec<DatasetListing>> {
3✔
50
        let conn = self.conn_pool.get().await?;
3✔
51

52
        let mut pos = 3;
3✔
53
        let order_sql = if options.order == OrderBy::NameAsc {
3✔
54
            "display_name ASC"
3✔
55
        } else {
UNCOV
56
            "display_name DESC"
×
57
        };
58

59
        let filter_sql = if options.filter.is_some() {
3✔
UNCOV
60
            pos += 1;
×
61
            format!("AND display_name ILIKE ${pos} ESCAPE '\\'")
×
62
        } else {
63
            String::new()
3✔
64
        };
65

66
        let (filter_tags_sql, filter_tags_list) = if let Some(filter_tags) = &options.tags {
3✔
UNCOV
67
            pos += 1;
×
68
            (format!("AND d.tags @> ${pos}::text[]"), filter_tags.clone())
×
69
        } else {
70
            (String::new(), vec![])
3✔
71
        };
72

73
        let stmt = conn
3✔
74
            .prepare(&format!(
3✔
75
                "
3✔
76
            SELECT 
3✔
77
                d.id,
3✔
78
                d.name,
3✔
79
                d.display_name,
3✔
80
                d.description,
3✔
81
                d.tags,
3✔
82
                d.source_operator,
3✔
83
                d.result_descriptor,
3✔
84
                d.symbology
3✔
85
            FROM 
3✔
86
                user_permitted_datasets p JOIN datasets d 
3✔
87
                    ON (p.dataset_id = d.id)
3✔
88
            WHERE 
3✔
89
                p.user_id = $1
3✔
90
                {filter_sql}
3✔
91
                {filter_tags_sql}
3✔
92
            ORDER BY {order_sql}
3✔
93
            LIMIT $2
3✔
94
            OFFSET $3;  
3✔
95
            ",
3✔
96
            ))
3✔
97
            .await?;
3✔
98

99
        let rows = match (options.filter, options.tags) {
3✔
UNCOV
100
            (Some(filter), Some(_)) => {
×
101
                conn.query(
×
102
                    &stmt,
×
103
                    &[
×
104
                        &self.session.user.id,
×
105
                        &i64::from(options.limit),
×
106
                        &i64::from(options.offset),
×
107
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
×
108
                        &filter_tags_list,
×
109
                    ],
×
110
                )
×
UNCOV
111
                .await?
×
112
            }
UNCOV
113
            (Some(filter), None) => {
×
114
                conn.query(
×
115
                    &stmt,
×
116
                    &[
×
117
                        &self.session.user.id,
×
118
                        &i64::from(options.limit),
×
119
                        &i64::from(options.offset),
×
120
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
×
121
                    ],
×
122
                )
×
UNCOV
123
                .await?
×
124
            }
125
            (None, Some(_)) => {
UNCOV
126
                conn.query(
×
127
                    &stmt,
×
128
                    &[
×
129
                        &self.session.user.id,
×
130
                        &i64::from(options.limit),
×
131
                        &i64::from(options.offset),
×
132
                        &filter_tags_list,
×
133
                    ],
×
134
                )
×
UNCOV
135
                .await?
×
136
            }
137
            (None, None) => {
138
                conn.query(
3✔
139
                    &stmt,
3✔
140
                    &[
3✔
141
                        &self.session.user.id,
3✔
142
                        &i64::from(options.limit),
3✔
143
                        &i64::from(options.offset),
3✔
144
                    ],
3✔
145
                )
3✔
146
                .await?
3✔
147
            }
148
        };
149

150
        Ok(rows
3✔
151
            .iter()
3✔
152
            .map(|row| {
3✔
153
                Result::<DatasetListing>::Ok(DatasetListing {
2✔
154
                    id: row.get(0),
2✔
155
                    name: row.get(1),
2✔
156
                    display_name: row.get(2),
2✔
157
                    description: row.get(3),
2✔
158
                    tags: row.get::<_, Option<Vec<String>>>(4).unwrap_or_default(),
2✔
159
                    source_operator: row.get(5),
2✔
160
                    result_descriptor: row.get(6),
2✔
161
                    symbology: row.get(7),
2✔
162
                })
2✔
163
            })
3✔
164
            .filter_map(Result::ok)
3✔
165
            .collect())
3✔
166
    }
6✔
167

168
    async fn load_dataset(&self, dataset: &DatasetId) -> Result<Dataset> {
15✔
169
        let conn = self.conn_pool.get().await?;
15✔
170
        let stmt = conn
15✔
171
            .prepare(
15✔
172
                "
15✔
173
            SELECT
15✔
174
                d.id,
15✔
175
                d.name,
15✔
176
                d.display_name,
15✔
177
                d.description,
15✔
178
                d.result_descriptor,
15✔
179
                d.source_operator,
15✔
180
                d.symbology,
15✔
181
                d.provenance,
15✔
182
                d.tags
15✔
183
            FROM 
15✔
184
                user_permitted_datasets p JOIN datasets d 
15✔
185
                    ON (p.dataset_id = d.id)
15✔
186
            WHERE 
15✔
187
                p.user_id = $1 AND d.id = $2
15✔
188
            LIMIT 
15✔
189
                1",
15✔
190
            )
15✔
191
            .await?;
15✔
192

193
        let row = conn
15✔
194
            .query_opt(&stmt, &[&self.session.user.id, dataset])
15✔
195
            .await?;
15✔
196

197
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
15✔
198

199
        Ok(Dataset {
9✔
200
            id: row.get(0),
9✔
201
            name: row.get(1),
9✔
202
            display_name: row.get(2),
9✔
203
            description: row.get(3),
9✔
204
            result_descriptor: row.get(4),
9✔
205
            source_operator: row.get(5),
9✔
206
            symbology: row.get(6),
9✔
207
            provenance: row.get(7),
9✔
208
            tags: row.get(8),
9✔
209
        })
9✔
210
    }
30✔
211

212
    async fn load_provenance(&self, dataset: &DatasetId) -> Result<ProvenanceOutput> {
3✔
213
        let conn = self.conn_pool.get().await?;
3✔
214

215
        let stmt = conn
3✔
216
            .prepare(
3✔
217
                "
3✔
218
            SELECT 
3✔
219
                d.provenance 
3✔
220
            FROM 
3✔
221
                user_permitted_datasets p JOIN datasets d
3✔
222
                    ON(p.dataset_id = d.id)
3✔
223
            WHERE 
3✔
224
                p.user_id = $1 AND d.id = $2",
3✔
225
            )
3✔
226
            .await?;
3✔
227

228
        let row = conn
3✔
229
            .query_opt(&stmt, &[&self.session.user.id, dataset])
3✔
230
            .await?;
3✔
231

232
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
3✔
233

234
        Ok(ProvenanceOutput {
2✔
235
            data: (*dataset).into(),
2✔
236
            provenance: row.get(0),
2✔
237
        })
2✔
238
    }
6✔
239

240
    async fn load_loading_info(&self, dataset: &DatasetId) -> Result<MetaDataDefinition> {
×
241
        let conn = self.conn_pool.get().await?;
×
242

243
        let stmt = conn
×
244
            .prepare(
×
245
                "
×
246
            SELECT 
×
247
                meta_data 
×
248
            FROM 
×
249
                user_permitted_datasets p JOIN datasets d
×
250
                    ON(p.dataset_id = d.id)
×
251
            WHERE 
×
252
                p.user_id = $1 AND d.id = $2",
×
253
            )
×
254
            .await?;
×
255

256
        let row = conn
×
257
            .query_one(&stmt, &[&self.session.user.id, dataset])
×
258
            .await?;
×
259

260
        Ok(row.get(0))
×
261
    }
×
262

263
    async fn resolve_dataset_name_to_id(
264
        &self,
265
        dataset_name: &DatasetName,
266
    ) -> Result<Option<DatasetId>> {
15✔
267
        let conn = self.conn_pool.get().await?;
15✔
268
        resolve_dataset_name_to_id(&conn, dataset_name).await
32✔
269
    }
30✔
270

271
    async fn dataset_autocomplete_search(
272
        &self,
273
        tags: Option<Vec<String>>,
274
        search_string: String,
275
        limit: u32,
276
        offset: u32,
277
    ) -> Result<Vec<String>> {
4✔
278
        let connection = self.conn_pool.get().await?;
4✔
279

280
        let limit = i64::from(limit);
4✔
281
        let offset = i64::from(offset);
4✔
282
        let search_string = format!(
4✔
283
            "%{}%",
4✔
284
            search_string.replace('%', "\\%").replace('_', "\\_")
4✔
285
        );
4✔
286

4✔
287
        let mut query_params: Vec<&(dyn ToSql + Sync)> =
4✔
288
            vec![&self.session.user.id, &limit, &offset, &search_string];
4✔
289

290
        let tags_clause = if let Some(tags) = &tags {
4✔
291
            query_params.push(tags);
2✔
292
            " AND tags @> $5::text[]".to_string()
2✔
293
        } else {
294
            String::new()
2✔
295
        };
296

297
        let stmt = connection
4✔
298
            .prepare(&format!(
4✔
299
                "
4✔
300
            SELECT 
4✔
301
                display_name
4✔
302
            FROM 
4✔
303
                user_permitted_datasets p JOIN datasets d ON (p.dataset_id = d.id)
4✔
304
            WHERE 
4✔
305
                p.user_id = $1
4✔
306
                AND display_name ILIKE $4 ESCAPE '\\'
4✔
307
                {tags_clause}
4✔
308
            ORDER BY display_name ASC
4✔
309
            LIMIT $2
4✔
310
            OFFSET $3;"
4✔
311
            ))
4✔
312
            .await?;
4✔
313

314
        let rows = connection.query(&stmt, &query_params).await?;
4✔
315

316
        Ok(rows.iter().map(|row| row.get(0)).collect())
4✔
317
    }
8✔
318
}
319

320
#[async_trait]
321
impl<Tls>
322
    MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
323
    for ProPostgresDb<Tls>
324
where
325
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
326
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
327
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
328
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
329
{
330
    async fn meta_data(
331
        &self,
332
        _id: &DataId,
333
    ) -> geoengine_operators::util::Result<
334
        Box<
335
            dyn MetaData<
336
                MockDatasetDataSourceLoadingInfo,
337
                VectorResultDescriptor,
338
                VectorQueryRectangle,
339
            >,
340
        >,
341
    > {
×
342
        Err(geoengine_operators::error::Error::NotYetImplemented)
×
343
    }
×
344
}
345

346
#[async_trait]
347
impl<Tls> MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
348
    for ProPostgresDb<Tls>
349
where
350
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
351
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
352
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
353
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
354
{
355
    async fn meta_data(
356
        &self,
357
        id: &DataId,
358
    ) -> geoengine_operators::util::Result<
359
        Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
360
    > {
8✔
361
        let id = id
8✔
362
            .internal()
8✔
363
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
8✔
364

365
        let mut conn = self.conn_pool.get().await.map_err(|e| {
8✔
366
            geoengine_operators::error::Error::MetaData {
×
367
                source: Box::new(e),
×
368
            }
×
369
        })?;
8✔
370
        let tx = conn.build_transaction().start().await.map_err(|e| {
8✔
371
            geoengine_operators::error::Error::MetaData {
×
372
                source: Box::new(e),
×
373
            }
×
374
        })?;
8✔
375

376
        if !self
8✔
377
            .has_permission_in_tx(id, Permission::Read, &tx)
8✔
378
            .await
20✔
379
            .map_err(|e| geoengine_operators::error::Error::MetaData {
8✔
380
                source: Box::new(e),
×
381
            })?
8✔
382
        {
383
            return Err(geoengine_operators::error::Error::PermissionDenied);
2✔
384
        };
6✔
385

386
        let stmt = tx
6✔
387
            .prepare(
6✔
388
                "
6✔
389
        SELECT
6✔
390
            d.meta_data
6✔
391
        FROM
6✔
392
            user_permitted_datasets p JOIN datasets d
6✔
393
                ON (p.dataset_id = d.id)
6✔
394
        WHERE
6✔
395
            d.id = $1 AND p.user_id = $2",
6✔
396
            )
6✔
397
            .await
6✔
398
            .map_err(|e| geoengine_operators::error::Error::MetaData {
6✔
399
                source: Box::new(e),
×
400
            })?;
6✔
401

402
        let row = tx
6✔
403
            .query_one(&stmt, &[&id, &self.session.user.id])
6✔
404
            .await
6✔
405
            .map_err(|e| geoengine_operators::error::Error::MetaData {
6✔
406
                source: Box::new(e),
×
407
            })?;
6✔
408

409
        let meta_data: MetaDataDefinition = row.get("meta_data");
6✔
410

411
        let MetaDataDefinition::OgrMetaData(meta_data) = meta_data else {
6✔
UNCOV
412
            return Err(geoengine_operators::error::Error::MetaData {
×
413
                source: Box::new(geoengine_operators::error::Error::InvalidType {
×
414
                    expected: "OgrMetaData".to_string(),
×
415
                    found: meta_data.type_name().to_string(),
×
416
                }),
×
417
            });
×
418
        };
419

420
        tx.commit()
6✔
421
            .await
6✔
422
            .map_err(|e| geoengine_operators::error::Error::MetaData {
6✔
423
                source: Box::new(e),
×
424
            })?;
6✔
425

426
        Ok(Box::new(meta_data))
6✔
427
    }
16✔
428
}
429

430
#[async_trait]
431
impl<Tls> MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
432
    for ProPostgresDb<Tls>
433
where
434
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
435
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
436
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
437
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
438
{
439
    async fn meta_data(
440
        &self,
441
        id: &DataId,
442
    ) -> geoengine_operators::util::Result<
443
        Box<dyn MetaData<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>>,
444
    > {
8✔
445
        let id = id
8✔
446
            .internal()
8✔
447
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
8✔
448

449
        let mut conn = self.conn_pool.get().await.map_err(|e| {
8✔
450
            geoengine_operators::error::Error::MetaData {
×
451
                source: Box::new(e),
×
452
            }
×
453
        })?;
8✔
454
        let tx = conn.build_transaction().start().await.map_err(|e| {
8✔
455
            geoengine_operators::error::Error::MetaData {
×
456
                source: Box::new(e),
×
457
            }
×
458
        })?;
8✔
459

460
        if !self
8✔
461
            .has_permission_in_tx(id, Permission::Read, &tx)
8✔
462
            .await
17✔
463
            .map_err(|e| geoengine_operators::error::Error::MetaData {
8✔
464
                source: Box::new(e),
×
465
            })?
8✔
466
        {
467
            return Err(geoengine_operators::error::Error::PermissionDenied);
1✔
468
        };
7✔
469

470
        let stmt = tx
7✔
471
            .prepare(
7✔
472
                "
7✔
473
            SELECT
7✔
474
                d.meta_data
7✔
475
            FROM
7✔
476
                user_permitted_datasets p JOIN datasets d
7✔
477
                    ON (p.dataset_id = d.id)
7✔
478
            WHERE
7✔
479
                d.id = $1 AND p.user_id = $2",
7✔
480
            )
7✔
481
            .await
7✔
482
            .map_err(|e| geoengine_operators::error::Error::MetaData {
7✔
483
                source: Box::new(e),
×
484
            })?;
7✔
485

486
        let row = tx
7✔
487
            .query_one(&stmt, &[&id, &self.session.user.id])
7✔
488
            .await
7✔
489
            .map_err(|e| geoengine_operators::error::Error::MetaData {
7✔
490
                source: Box::new(e),
×
491
            })?;
7✔
492

493
        let meta_data: MetaDataDefinition = row.get(0);
7✔
494

7✔
495
        Ok(match meta_data {
7✔
496
            MetaDataDefinition::GdalMetaDataRegular(m) => Box::new(m),
4✔
497
            MetaDataDefinition::GdalStatic(m) => Box::new(m),
1✔
498
            MetaDataDefinition::GdalMetaDataList(m) => Box::new(m),
1✔
499
            MetaDataDefinition::GdalMetadataNetCdfCf(m) => Box::new(m),
1✔
UNCOV
500
            _ => return Err(geoengine_operators::error::Error::DataIdTypeMissMatch),
×
501
        })
502
    }
16✔
503
}
504

505
#[async_trait]
506
impl<Tls> DatasetStore for ProPostgresDb<Tls>
507
where
508
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
509
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
510
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
511
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
512
{
513
    async fn add_dataset(
514
        &self,
515
        dataset: AddDataset,
516
        meta_data: MetaDataDefinition,
517
    ) -> Result<DatasetIdAndName> {
25✔
518
        let id = DatasetId::new();
25✔
519
        let name = dataset.name.unwrap_or_else(|| DatasetName {
25✔
520
            namespace: Some(self.session.user.id.to_string()),
14✔
521
            name: id.to_string(),
14✔
522
        });
25✔
523

25✔
524
        log::info!(
25✔
UNCOV
525
            "Adding dataset with name: {:?}, tags: {:?}",
×
526
            name,
527
            dataset.tags
528
        );
529

530
        self.check_dataset_namespace(&name)?;
25✔
531

532
        let typed_meta_data = meta_data.to_typed_metadata();
25✔
533

534
        let mut conn = self.conn_pool.get().await?;
25✔
535

536
        let tx = conn.build_transaction().start().await?;
25✔
537

538
        tx.execute(
25✔
539
            "
25✔
540
                INSERT INTO datasets (
25✔
541
                    id,
25✔
542
                    name,
25✔
543
                    display_name,
25✔
544
                    description,
25✔
545
                    source_operator,
25✔
546
                    result_descriptor,
25✔
547
                    meta_data,
25✔
548
                    symbology,
25✔
549
                    provenance,
25✔
550
                    tags
25✔
551
                )
25✔
552
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
25✔
553
            &[
25✔
554
                &id,
25✔
555
                &name,
25✔
556
                &dataset.display_name,
25✔
557
                &dataset.description,
25✔
558
                &dataset.source_operator,
25✔
559
                &typed_meta_data.result_descriptor,
25✔
560
                typed_meta_data.meta_data,
25✔
561
                &dataset.symbology,
25✔
562
                &dataset.provenance,
25✔
563
                &dataset.tags,
25✔
564
            ],
25✔
565
        )
25✔
566
        .await
3,113✔
567
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
25✔
568

569
        let stmt = tx
25✔
570
            .prepare(
25✔
571
                "
25✔
572
            INSERT INTO permissions (
25✔
573
                role_id,
25✔
574
                dataset_id,
25✔
575
                permission
25✔
576
            )
25✔
577
            VALUES ($1, $2, $3)",
25✔
578
            )
25✔
579
            .await?;
63✔
580

581
        tx.execute(
25✔
582
            &stmt,
25✔
583
            &[&RoleId::from(self.session.user.id), &id, &Permission::Owner],
25✔
584
        )
25✔
585
        .await?;
25✔
586

587
        tx.commit().await?;
25✔
588

589
        Ok(DatasetIdAndName { id, name })
25✔
590
    }
50✔
591

592
    async fn update_dataset(&self, dataset: DatasetId, update: UpdateDataset) -> Result<()> {
×
593
        let mut conn = self.conn_pool.get().await?;
×
594

595
        let tx = conn.build_transaction().start().await?;
×
596

597
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
598
            .await
×
599
            .boxed_context(crate::error::PermissionDb)?;
×
600

601
        tx.execute(
×
602
            "UPDATE datasets SET name = $2, display_name = $3, description = $4, tags = $5 WHERE id = $1;",
×
603
            &[
×
604
                &dataset,
×
605
                &update.name,
×
606
                &update.display_name,
×
607
                &update.description,
×
608
                &update.tags,
×
609
            ],
×
610
        )
×
611
        .await?;
×
612

613
        tx.commit().await?;
×
614

615
        Ok(())
×
616
    }
×
617

618
    async fn update_dataset_loading_info(
619
        &self,
620
        dataset: DatasetId,
621
        meta_data: &MetaDataDefinition,
622
    ) -> Result<()> {
×
623
        let mut conn = self.conn_pool.get().await?;
×
624

625
        let tx = conn.build_transaction().start().await?;
×
626

627
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
628
            .await
×
629
            .boxed_context(crate::error::PermissionDb)?;
×
630

631
        tx.execute(
×
632
            "UPDATE datasets SET meta_data = $2 WHERE id = $1;",
×
633
            &[&dataset, &meta_data],
×
634
        )
×
635
        .await?;
×
636

637
        tx.commit().await?;
×
638

639
        Ok(())
×
640
    }
×
641

642
    async fn update_dataset_symbology(
643
        &self,
644
        dataset: DatasetId,
645
        symbology: &Symbology,
646
    ) -> Result<()> {
×
647
        let mut conn = self.conn_pool.get().await?;
×
648

649
        let tx = conn.build_transaction().start().await?;
×
650

651
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
652
            .await
×
653
            .boxed_context(crate::error::PermissionDb)?;
×
654

655
        tx.execute(
×
656
            "UPDATE datasets SET symbology = $2 WHERE id = $1;",
×
657
            &[&dataset, &symbology],
×
658
        )
×
659
        .await?;
×
660

661
        tx.commit().await?;
×
662

663
        Ok(())
×
664
    }
×
665

666
    async fn update_dataset_provenance(
667
        &self,
668
        dataset: DatasetId,
669
        provenance: &[Provenance],
670
    ) -> Result<()> {
×
671
        let mut conn = self.conn_pool.get().await?;
×
672

673
        let tx = conn.build_transaction().start().await?;
×
674

675
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
676
            .await
×
677
            .boxed_context(crate::error::PermissionDb)?;
×
678

679
        tx.execute(
×
680
            "UPDATE datasets SET provenance = $2 WHERE id = $1;",
×
681
            &[&dataset, &provenance],
×
682
        )
×
683
        .await?;
×
684

685
        tx.commit().await?;
×
686

687
        Ok(())
×
688
    }
×
689

690
    async fn delete_dataset(&self, dataset_id: DatasetId) -> Result<()> {
4✔
691
        let mut conn = self.conn_pool.get().await?;
4✔
692
        let tx = conn.build_transaction().start().await?;
4✔
693

694
        self.ensure_permission_in_tx(dataset_id.into(), Permission::Owner, &tx)
4✔
695
            .await
11✔
696
            .boxed_context(crate::error::PermissionDb)?;
4✔
697

698
        let stmt = tx
4✔
699
            .prepare(
4✔
700
                "
4✔
701
        SELECT 
4✔
702
            TRUE
4✔
703
        FROM 
4✔
704
            user_permitted_datasets p JOIN datasets d 
4✔
705
                ON (p.dataset_id = d.id)
4✔
706
        WHERE 
4✔
707
            d.id = $1 AND p.user_id = $2 AND p.permission = 'Owner';",
4✔
708
            )
4✔
709
            .await?;
4✔
710

711
        let rows = tx
4✔
712
            .query(&stmt, &[&dataset_id, &self.session.user.id])
4✔
713
            .await?;
4✔
714

715
        if rows.is_empty() {
4✔
UNCOV
716
            return Err(Error::OperationRequiresOwnerPermission);
×
717
        }
4✔
718

719
        let stmt = tx.prepare("DELETE FROM datasets WHERE id = $1;").await?;
4✔
720

721
        tx.execute(&stmt, &[&dataset_id]).await?;
4✔
722

723
        tx.commit().await?;
4✔
724

725
        Ok(())
4✔
726
    }
8✔
727
}
728

729
#[async_trait]
730
impl<Tls> UploadDb for ProPostgresDb<Tls>
731
where
732
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
733
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
734
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
735
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
736
{
737
    async fn load_upload(&self, upload: UploadId) -> Result<Upload> {
7✔
738
        let conn = self.conn_pool.get().await?;
7✔
739

740
        let stmt = conn
7✔
741
            .prepare(
7✔
742
                "
7✔
743
            SELECT u.id, u.files 
7✔
744
            FROM uploads u JOIN user_uploads uu ON(u.id = uu.upload_id)
7✔
745
            WHERE u.id = $1 AND uu.user_id = $2",
7✔
746
            )
7✔
747
            .await?;
9✔
748

749
        let row = conn
7✔
750
            .query_one(&stmt, &[&upload, &self.session.user.id])
7✔
751
            .await?;
5✔
752

753
        Ok(Upload {
4✔
754
            id: row.get(0),
4✔
755
            files: row
4✔
756
                .get::<_, Vec<FileUpload>>(1)
4✔
757
                .into_iter()
4✔
758
                .map(Into::into)
4✔
759
                .collect(),
4✔
760
        })
4✔
761
    }
14✔
762

763
    async fn create_upload(&self, upload: Upload) -> Result<()> {
5✔
764
        let mut conn = self.conn_pool.get().await?;
6✔
765
        let tx = conn.build_transaction().start().await?;
5✔
766

767
        let stmt = tx
5✔
768
            .prepare("INSERT INTO uploads (id, files) VALUES ($1, $2)")
5✔
769
            .await?;
20✔
770

771
        tx.execute(
5✔
772
            &stmt,
5✔
773
            &[
5✔
774
                &upload.id,
5✔
775
                &upload
5✔
776
                    .files
5✔
777
                    .iter()
5✔
778
                    .map(FileUpload::from)
5✔
779
                    .collect::<Vec<_>>(),
5✔
780
            ],
5✔
781
        )
5✔
782
        .await?;
4✔
783

784
        let stmt = tx
5✔
785
            .prepare("INSERT INTO user_uploads (user_id, upload_id) VALUES ($1, $2)")
5✔
786
            .await?;
4✔
787

788
        tx.execute(&stmt, &[&self.session.user.id, &upload.id])
5✔
789
            .await?;
4✔
790

791
        tx.commit().await?;
6✔
792

793
        Ok(())
5✔
794
    }
10✔
795
}
796

797
#[derive(Debug, Clone, ToSql, FromSql)]
33✔
798
pub struct FileUpload {
799
    pub id: FileId,
800
    pub name: String,
801
    pub byte_size: i64,
802
}
803

804
impl From<crate::datasets::upload::FileUpload> for FileUpload {
805
    fn from(upload: crate::datasets::upload::FileUpload) -> Self {
×
806
        Self {
×
807
            id: upload.id,
×
808
            name: upload.name,
×
809
            byte_size: upload.byte_size as i64,
×
810
        }
×
811
    }
×
812
}
813

814
impl From<&crate::datasets::upload::FileUpload> for FileUpload {
815
    fn from(upload: &crate::datasets::upload::FileUpload) -> Self {
13✔
816
        Self {
13✔
817
            id: upload.id,
13✔
818
            name: upload.name.clone(),
13✔
819
            byte_size: upload.byte_size as i64,
13✔
820
        }
13✔
821
    }
13✔
822
}
823

824
impl From<FileUpload> for crate::datasets::upload::FileUpload {
825
    fn from(upload: FileUpload) -> Self {
12✔
826
        Self {
12✔
827
            id: upload.id,
12✔
828
            name: upload.name,
12✔
829
            byte_size: upload.byte_size as u64,
12✔
830
        }
12✔
831
    }
12✔
832
}
833

834
#[cfg(test)]
835
mod tests {
836
    use std::path::PathBuf;
837

838
    use super::*;
839
    use crate::{
840
        contexts::{ApplicationContext, SessionContext},
841
        pro::{
842
            contexts::ProPostgresContext,
843
            ge_context,
844
            users::{UserAuth, UserSession},
845
        },
846
    };
847
    use geoengine_datatypes::{
848
        collections::VectorDataType,
849
        primitives::{CacheTtlSeconds, FeatureDataType, Measurement},
850
        spatial_reference::SpatialReference,
851
    };
852
    use geoengine_operators::{
853
        engine::{StaticMetaData, VectorColumnInfo},
854
        source::{
855
            CsvHeader, FormatSpecifics, OgrSourceColumnSpec, OgrSourceDatasetTimeType,
856
            OgrSourceDurationSpec, OgrSourceErrorSpec, OgrSourceTimeFormat,
857
        },
858
    };
859
    use tokio_postgres::NoTls;
860

861
    #[ge_context::test]
2✔
862
    async fn it_autocompletes_datasets(app_ctx: ProPostgresContext<NoTls>) {
1✔
863
        let session_a = app_ctx.create_anonymous_session().await.unwrap();
15✔
864
        let session_b = app_ctx.create_anonymous_session().await.unwrap();
15✔
865

1✔
866
        let db_a = app_ctx.session_context(session_a.clone()).db();
1✔
867
        let db_b = app_ctx.session_context(session_b.clone()).db();
1✔
868

1✔
869
        add_single_dataset(&db_a, &session_a).await;
177✔
870

871
        assert_eq!(
1✔
872
            db_a.dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0)
1✔
873
                .await
3✔
874
                .unwrap(),
1✔
875
            vec!["Ogr Test"]
1✔
876
        );
877
        assert_eq!(
1✔
878
            db_a.dataset_autocomplete_search(
1✔
879
                Some(vec!["upload".to_string()]),
1✔
880
                "Ogr".to_owned(),
1✔
881
                10,
1✔
882
                0
1✔
883
            )
1✔
884
            .await
3✔
885
            .unwrap(),
1✔
886
            vec!["Ogr Test"]
1✔
887
        );
888

889
        // check that other user B cannot access datasets of user A
890

891
        assert!(db_b
1✔
892
            .dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0)
1✔
893
            .await
3✔
894
            .unwrap()
1✔
895
            .is_empty());
1✔
896
        assert!(db_b
1✔
897
            .dataset_autocomplete_search(Some(vec!["upload".to_string()]), "Ogr".to_owned(), 10, 0)
1✔
898
            .await
3✔
899
            .unwrap()
1✔
900
            .is_empty());
1✔
901
    }
1✔
902

903
    async fn add_single_dataset(db: &ProPostgresDb<NoTls>, session: &UserSession) {
1✔
904
        let loading_info = OgrSourceDataset {
1✔
905
            file_name: PathBuf::from("test.csv"),
1✔
906
            layer_name: "test.csv".to_owned(),
1✔
907
            data_type: Some(VectorDataType::MultiPoint),
1✔
908
            time: OgrSourceDatasetTimeType::Start {
1✔
909
                start_field: "start".to_owned(),
1✔
910
                start_format: OgrSourceTimeFormat::Auto,
1✔
911
                duration: OgrSourceDurationSpec::Zero,
1✔
912
            },
1✔
913
            default_geometry: None,
1✔
914
            columns: Some(OgrSourceColumnSpec {
1✔
915
                format_specifics: Some(FormatSpecifics::Csv {
1✔
916
                    header: CsvHeader::Auto,
1✔
917
                }),
1✔
918
                x: "x".to_owned(),
1✔
919
                y: None,
1✔
920
                int: vec![],
1✔
921
                float: vec![],
1✔
922
                text: vec![],
1✔
923
                bool: vec![],
1✔
924
                datetime: vec![],
1✔
925
                rename: None,
1✔
926
            }),
1✔
927
            force_ogr_time_filter: false,
1✔
928
            force_ogr_spatial_filter: false,
1✔
929
            on_error: OgrSourceErrorSpec::Ignore,
1✔
930
            sql_query: None,
1✔
931
            attribute_query: None,
1✔
932
            cache_ttl: CacheTtlSeconds::default(),
1✔
933
        };
1✔
934

1✔
935
        let meta_data = MetaDataDefinition::OgrMetaData(StaticMetaData::<
1✔
936
            OgrSourceDataset,
1✔
937
            VectorResultDescriptor,
1✔
938
            VectorQueryRectangle,
1✔
939
        > {
1✔
940
            loading_info: loading_info.clone(),
1✔
941
            result_descriptor: VectorResultDescriptor {
1✔
942
                data_type: VectorDataType::MultiPoint,
1✔
943
                spatial_reference: SpatialReference::epsg_4326().into(),
1✔
944
                columns: [(
1✔
945
                    "foo".to_owned(),
1✔
946
                    VectorColumnInfo {
1✔
947
                        data_type: FeatureDataType::Float,
1✔
948
                        measurement: Measurement::Unitless,
1✔
949
                    },
1✔
950
                )]
1✔
951
                .into_iter()
1✔
952
                .collect(),
1✔
953
                time: None,
1✔
954
                bbox: None,
1✔
955
            },
1✔
956
            phantom: Default::default(),
1✔
957
        });
1✔
958

1✔
959
        let dataset_name = DatasetName::new(Some(session.user.id.to_string()), "my_dataset");
1✔
960

1✔
961
        db.add_dataset(
1✔
962
            AddDataset {
1✔
963
                name: Some(dataset_name.clone()),
1✔
964
                display_name: "Ogr Test".to_owned(),
1✔
965
                description: "desc".to_owned(),
1✔
966
                source_operator: "OgrSource".to_owned(),
1✔
967
                symbology: None,
1✔
968
                provenance: None,
1✔
969
                tags: Some(vec!["upload".to_owned(), "test".to_owned()]),
1✔
970
            },
1✔
971
            meta_data,
1✔
972
        )
1✔
973
        .await
177✔
974
        .unwrap();
1✔
975
    }
1✔
976
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc