• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10182401222

31 Jul 2024 02:42PM CUT coverage: 91.14% (+0.07%) from 91.068%
10182401222

Pull #970

github

web-flow
Merge f0fcf6203 into c97f87c56
Pull Request #970: FAIR dataset deletion

1798 of 1863 new or added lines in 13 files covered. (96.51%)

16 existing lines in 9 files now uncovered.

132740 of 145644 relevant lines covered (91.14%)

52956.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.59
/services/src/pro/datasets/postgres.rs
1
use crate::api::model::services::UpdateDataset;
2
use crate::datasets::listing::Provenance;
3
use crate::datasets::listing::{DatasetListOptions, DatasetListing, DatasetProvider};
4
use crate::datasets::listing::{OrderBy, ProvenanceOutput};
5
use crate::datasets::postgres::resolve_dataset_name_to_id;
6
use crate::datasets::storage::{
7
    Dataset, DatasetDb, DatasetStore, MetaDataDefinition, ReservedTags,
8
};
9
use crate::datasets::upload::{delete_upload, FileId};
10
use crate::datasets::upload::{Upload, UploadDb, UploadId};
11
use crate::datasets::{AddDataset, DatasetIdAndName, DatasetName};
12
use crate::error::Error::{
13
    ExpirationTimestampInPast, IllegalDatasetStatus, IllegalExpirationUpdate, UnknownDatasetId,
14
};
15
use crate::error::{self, Error, Result};
16
use crate::pro::contexts::ProPostgresDb;
17
use crate::pro::datasets::storage::DatasetDeletionType::{DeleteData, DeleteRecordAndData};
18
use crate::pro::datasets::storage::InternalUploadedDatasetStatus::{Deleted, DeletedWithError};
19
use crate::pro::datasets::storage::{
20
    ChangeDatasetExpiration, DatasetAccessStatus, DatasetDeletionType, DatasetType,
21
    InternalUploadedDatasetStatus, TxUploadedUserDatasetStore, UploadedDatasetStatus,
22
    UploadedUserDatasetStore,
23
};
24
use crate::pro::datasets::{Expiration, ExpirationChange};
25
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
26
use crate::pro::permissions::{Permission, RoleId};
27
use crate::pro::users::{UserId, UserSession};
28
use crate::projects::Symbology;
29
use crate::util::postgres::PostgresErrorExt;
30
use async_trait::async_trait;
31
use bb8_postgres::bb8::PooledConnection;
32
use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
33
use bb8_postgres::tokio_postgres::Socket;
34
use bb8_postgres::PostgresConnectionManager;
35
use geoengine_datatypes::dataset::{DataId, DatasetId};
36
use geoengine_datatypes::error::BoxedResultExt;
37
use geoengine_datatypes::primitives::RasterQueryRectangle;
38
use geoengine_datatypes::primitives::VectorQueryRectangle;
39
use geoengine_datatypes::util::Identifier;
40
use geoengine_operators::engine::{
41
    MetaData, MetaDataProvider, RasterResultDescriptor, VectorResultDescriptor,
42
};
43
use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo;
44
use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset};
45
use postgres_types::{FromSql, ToSql};
46
use snafu::ensure;
47
use tokio_postgres::Transaction;
48
use InternalUploadedDatasetStatus::{Available, Expired, Expires, UpdateExpired};
49

50
impl<Tls> DatasetDb for ProPostgresDb<Tls>
51
where
52
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
53
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
54
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
55
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
56
{
57
}
58

59
#[allow(clippy::too_many_lines)]
60
#[async_trait]
61
impl<Tls> DatasetProvider for ProPostgresDb<Tls>
62
where
63
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
64
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
65
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
66
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
67
{
68
    async fn list_datasets(&self, options: DatasetListOptions) -> Result<Vec<DatasetListing>> {
12✔
69
        let mut conn = self.conn_pool.get().await?;
12✔
70
        self.lazy_dataset_store_updates(&mut conn, None).await?;
112✔
71

12✔
72
        let mut pos = 3;
12✔
73
        let order_sql = if options.order == OrderBy::NameAsc {
12✔
74
            "display_name ASC"
12✔
75
        } else {
12✔
76
            "display_name DESC"
12✔
77
        };
12✔
78

12✔
79
        let filter_sql = if options.filter.is_some() {
12✔
80
            pos += 1;
12✔
81
            format!("AND display_name ILIKE ${pos} ESCAPE '\\'")
×
82
        } else {
12✔
83
            String::new()
12✔
84
        };
12✔
85

12✔
86
        let (filter_tags_sql, filter_tags_list) = if let Some(filter_tags) = &options.tags {
12✔
87
            pos += 1;
12✔
88
            (format!("AND d.tags @> ${pos}::text[]"), filter_tags.clone())
×
89
        } else {
12✔
90
            (
12✔
91
                format!(
12✔
92
                    "AND (d.tags IS NULL OR NOT d.tags @> '{{{}}}'::text[])",
12✔
93
                    ReservedTags::Deleted
12✔
94
                ),
12✔
95
                vec![],
12✔
96
            )
12✔
97
        };
12✔
98

12✔
99
        let stmt = conn
12✔
100
            .prepare(&format!(
12✔
101
                "
12✔
102
            SELECT 
12✔
103
                d.id,
12✔
104
                d.name,
12✔
105
                d.display_name,
12✔
106
                d.description,
12✔
107
                d.tags,
12✔
108
                d.source_operator,
12✔
109
                d.result_descriptor,
12✔
110
                d.symbology
12✔
111
            FROM 
12✔
112
                user_permitted_datasets p JOIN datasets d 
12✔
113
                    ON (p.dataset_id = d.id)
12✔
114
            WHERE 
12✔
115
                p.user_id = $1
12✔
116
                {filter_sql}
12✔
117
                {filter_tags_sql}
12✔
118
            ORDER BY {order_sql}
12✔
119
            LIMIT $2
12✔
120
            OFFSET $3;  
12✔
121
            ",
12✔
122
            ))
12✔
123
            .await?;
12✔
124

12✔
125
        let rows = match (options.filter, options.tags) {
12✔
126
            (Some(filter), Some(_)) => {
12✔
127
                conn.query(
×
128
                    &stmt,
×
129
                    &[
×
130
                        &self.session.user.id,
×
131
                        &i64::from(options.limit),
×
132
                        &i64::from(options.offset),
×
133
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
×
134
                        &filter_tags_list,
×
135
                    ],
×
136
                )
×
137
                .await?
12✔
138
            }
12✔
139
            (Some(filter), None) => {
12✔
140
                conn.query(
×
141
                    &stmt,
×
142
                    &[
×
143
                        &self.session.user.id,
×
144
                        &i64::from(options.limit),
×
145
                        &i64::from(options.offset),
×
146
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
×
147
                    ],
×
148
                )
×
149
                .await?
12✔
150
            }
12✔
151
            (None, Some(_)) => {
12✔
152
                conn.query(
12✔
153
                    &stmt,
×
154
                    &[
×
155
                        &self.session.user.id,
×
156
                        &i64::from(options.limit),
×
157
                        &i64::from(options.offset),
×
158
                        &filter_tags_list,
×
159
                    ],
×
160
                )
×
161
                .await?
12✔
162
            }
12✔
163
            (None, None) => {
12✔
164
                conn.query(
12✔
165
                    &stmt,
12✔
166
                    &[
12✔
167
                        &self.session.user.id,
12✔
168
                        &i64::from(options.limit),
12✔
169
                        &i64::from(options.offset),
12✔
170
                    ],
12✔
171
                )
12✔
172
                .await?
12✔
173
            }
12✔
174
        };
12✔
175

12✔
176
        Ok(rows
12✔
177
            .iter()
12✔
178
            .map(|row| {
12✔
179
                Result::<DatasetListing>::Ok(DatasetListing {
12✔
180
                    id: row.get(0),
12✔
181
                    name: row.get(1),
12✔
182
                    display_name: row.get(2),
12✔
183
                    description: row.get(3),
12✔
184
                    tags: row.get::<_, Option<Vec<String>>>(4).unwrap_or_default(),
12✔
185
                    source_operator: row.get(5),
12✔
186
                    result_descriptor: row.get(6),
12✔
187
                    symbology: row.get(7),
12✔
188
                })
12✔
189
            })
12✔
190
            .filter_map(Result::ok)
12✔
191
            .collect())
12✔
192
    }
12✔
193

194
    async fn load_dataset(&self, dataset: &DatasetId) -> Result<Dataset> {
25✔
195
        let mut conn = self.conn_pool.get().await?;
25✔
196
        self.lazy_dataset_store_updates(&mut conn, Some(dataset))
25✔
197
            .await?;
220✔
198

25✔
199
        let stmt = conn
25✔
200
            .prepare(
25✔
201
                "
25✔
202
            SELECT
25✔
203
                d.id,
25✔
204
                d.name,
25✔
205
                d.display_name,
25✔
206
                d.description,
25✔
207
                d.result_descriptor,
25✔
208
                d.source_operator,
25✔
209
                d.symbology,
25✔
210
                d.provenance,
25✔
211
                d.tags
25✔
212
            FROM 
25✔
213
                user_permitted_datasets p JOIN datasets d 
25✔
214
                    ON (p.dataset_id = d.id)
25✔
215
            WHERE 
25✔
216
                p.user_id = $1 AND d.id = $2
25✔
217
            LIMIT 
25✔
218
                1",
25✔
219
            )
25✔
220
            .await?;
104✔
221

25✔
222
        let row = conn
25✔
223
            .query_opt(&stmt, &[&self.session.user.id, dataset])
25✔
224
            .await?;
25✔
225

25✔
226
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
25✔
227

25✔
228
        Ok(Dataset {
25✔
229
            id: row.get(0),
15✔
230
            name: row.get(1),
15✔
231
            display_name: row.get(2),
15✔
232
            description: row.get(3),
15✔
233
            result_descriptor: row.get(4),
15✔
234
            source_operator: row.get(5),
15✔
235
            symbology: row.get(6),
15✔
236
            provenance: row.get(7),
15✔
237
            tags: row.get(8),
15✔
238
        })
15✔
239
    }
25✔
240

241
    async fn load_provenance(&self, dataset: &DatasetId) -> Result<ProvenanceOutput> {
3✔
242
        let mut conn = self.conn_pool.get().await?;
3✔
243
        self.lazy_dataset_store_updates(&mut conn, Some(dataset))
3✔
244
            .await?;
28✔
245

3✔
246
        let stmt = conn
3✔
247
            .prepare(
3✔
248
                "
3✔
249
            SELECT 
3✔
250
                d.provenance 
3✔
251
            FROM 
3✔
252
                user_permitted_datasets p JOIN datasets d
3✔
253
                    ON(p.dataset_id = d.id)
3✔
254
            WHERE 
3✔
255
                p.user_id = $1 AND d.id = $2",
3✔
256
            )
3✔
257
            .await?;
3✔
258

3✔
259
        let row = conn
3✔
260
            .query_opt(&stmt, &[&self.session.user.id, dataset])
3✔
261
            .await?;
3✔
262

3✔
263
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
3✔
264

3✔
265
        Ok(ProvenanceOutput {
3✔
266
            data: (*dataset).into(),
2✔
267
            provenance: row.get(0),
2✔
268
        })
2✔
269
    }
3✔
270

271
    async fn load_loading_info(&self, dataset: &DatasetId) -> Result<MetaDataDefinition> {
×
NEW
272
        let mut conn = self.conn_pool.get().await?;
×
NEW
273
        self.lazy_dataset_store_updates(&mut conn, Some(dataset))
×
NEW
274
            .await?;
×
275

×
276
        let stmt = conn
×
277
            .prepare(
×
278
                "
×
279
            SELECT 
×
280
                meta_data 
×
281
            FROM 
×
282
                user_permitted_datasets p JOIN datasets d
×
283
                    ON(p.dataset_id = d.id)
×
284
            WHERE 
×
285
                p.user_id = $1 AND d.id = $2",
×
286
            )
×
287
            .await?;
×
288

×
289
        let row = conn
×
290
            .query_one(&stmt, &[&self.session.user.id, dataset])
×
291
            .await?;
×
292

×
293
        Ok(row.get(0))
×
294
    }
×
295

296
    async fn resolve_dataset_name_to_id(
297
        &self,
298
        dataset_name: &DatasetName,
299
    ) -> Result<Option<DatasetId>> {
24✔
300
        let mut conn = self.conn_pool.get().await?;
24✔
301
        self.lazy_dataset_store_updates(&mut conn, None).await?;
204✔
302
        resolve_dataset_name_to_id(&conn, dataset_name).await
42✔
303
    }
24✔
304

305
    async fn dataset_autocomplete_search(
306
        &self,
307
        tags: Option<Vec<String>>,
308
        search_string: String,
309
        limit: u32,
310
        offset: u32,
311
    ) -> Result<Vec<String>> {
4✔
312
        let mut conn = self.conn_pool.get().await?;
4✔
313
        self.lazy_dataset_store_updates(&mut conn, None).await?;
36✔
314

4✔
315
        let limit = i64::from(limit);
4✔
316
        let offset = i64::from(offset);
4✔
317
        let search_string = format!(
4✔
318
            "%{}%",
4✔
319
            search_string.replace('%', "\\%").replace('_', "\\_")
4✔
320
        );
4✔
321

4✔
322
        let mut query_params: Vec<&(dyn ToSql + Sync)> =
4✔
323
            vec![&self.session.user.id, &limit, &offset, &search_string];
4✔
324

4✔
325
        let tags_clause = if let Some(tags) = &tags {
4✔
326
            query_params.push(tags);
4✔
327
            " AND tags @> $5::text[]".to_string()
2✔
328
        } else {
4✔
329
            String::new()
4✔
330
        };
4✔
331

4✔
332
        let stmt = conn
4✔
333
            .prepare(&format!(
4✔
334
                "
4✔
335
            SELECT 
4✔
336
                display_name
4✔
337
            FROM 
4✔
338
                user_permitted_datasets p JOIN datasets d ON (p.dataset_id = d.id)
4✔
339
            WHERE 
4✔
340
                p.user_id = $1
4✔
341
                AND display_name ILIKE $4 ESCAPE '\\'
4✔
342
                {tags_clause}
4✔
343
            ORDER BY display_name ASC
4✔
344
            LIMIT $2
4✔
345
            OFFSET $3;"
4✔
346
            ))
4✔
347
            .await?;
4✔
348

4✔
349
        let rows = conn.query(&stmt, &query_params).await?;
4✔
350

4✔
351
        Ok(rows.iter().map(|row| row.get(0)).collect())
4✔
352
    }
4✔
353
}
354

355
#[async_trait]
356
impl<Tls>
357
    MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
358
    for ProPostgresDb<Tls>
359
where
360
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
361
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
362
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
363
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
364
{
365
    async fn meta_data(
366
        &self,
367
        _id: &DataId,
368
    ) -> geoengine_operators::util::Result<
369
        Box<
370
            dyn MetaData<
371
                MockDatasetDataSourceLoadingInfo,
372
                VectorResultDescriptor,
373
                VectorQueryRectangle,
374
            >,
375
        >,
376
    > {
×
377
        Err(geoengine_operators::error::Error::NotYetImplemented)
×
378
    }
×
379
}
380

381
#[async_trait]
382
impl<Tls> MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
383
    for ProPostgresDb<Tls>
384
where
385
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
386
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
387
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
388
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
389
{
390
    async fn meta_data(
391
        &self,
392
        id: &DataId,
393
    ) -> geoengine_operators::util::Result<
394
        Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
395
    > {
9✔
396
        let id = id
9✔
397
            .internal()
9✔
398
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
9✔
399

9✔
400
        let mut conn = self.conn_pool.get().await.map_err(|e| {
9✔
401
            geoengine_operators::error::Error::MetaData {
×
402
                source: Box::new(e),
×
403
            }
×
404
        })?;
9✔
405
        let tx = conn.build_transaction().start().await.map_err(|e| {
9✔
406
            geoengine_operators::error::Error::MetaData {
×
407
                source: Box::new(e),
×
408
            }
×
409
        })?;
9✔
410

9✔
411
        if !self
9✔
412
            .has_permission_in_tx(id, Permission::Read, &tx)
9✔
413
            .await
22✔
414
            .map_err(|e| geoengine_operators::error::Error::MetaData {
9✔
415
                source: Box::new(e),
×
416
            })?
9✔
417
        {
9✔
418
            return Err(geoengine_operators::error::Error::PermissionDenied);
9✔
419
        };
9✔
420

9✔
421
        let uploaded_status = self.uploaded_dataset_status_in_tx(&id, &tx).await;
78✔
422
        if let Ok(status) = uploaded_status {
9✔
423
            if matches!(status, UploadedDatasetStatus::Deleted { .. }) {
9✔
424
                return Err(geoengine_operators::error::Error::DatasetDeleted { id });
9✔
425
            }
9✔
426
        }
9✔
427

9✔
428
        let stmt = tx
9✔
429
            .prepare(
6✔
430
                "
6✔
431
        SELECT
6✔
432
            d.meta_data
6✔
433
        FROM
6✔
434
            user_permitted_datasets p JOIN datasets d
6✔
435
                ON (p.dataset_id = d.id)
6✔
436
        WHERE
6✔
437
            d.id = $1 AND p.user_id = $2",
6✔
438
            )
6✔
439
            .await
9✔
440
            .map_err(|e| geoengine_operators::error::Error::MetaData {
9✔
441
                source: Box::new(e),
×
442
            })?;
6✔
443

9✔
444
        let row = tx
9✔
445
            .query_one(&stmt, &[&id, &self.session.user.id])
6✔
446
            .await
9✔
447
            .map_err(|e| geoengine_operators::error::Error::MetaData {
9✔
448
                source: Box::new(e),
×
449
            })?;
6✔
450

9✔
451
        let meta_data: MetaDataDefinition = row.get("meta_data");
9✔
452

9✔
453
        let MetaDataDefinition::OgrMetaData(meta_data) = meta_data else {
9✔
454
            return Err(geoengine_operators::error::Error::MetaData {
9✔
455
                source: Box::new(geoengine_operators::error::Error::InvalidType {
×
456
                    expected: "OgrMetaData".to_string(),
×
457
                    found: meta_data.type_name().to_string(),
×
458
                }),
×
459
            });
×
460
        };
9✔
461

9✔
462
        tx.commit()
9✔
463
            .await
9✔
464
            .map_err(|e| geoengine_operators::error::Error::MetaData {
9✔
465
                source: Box::new(e),
×
466
            })?;
6✔
467

9✔
468
        Ok(Box::new(meta_data))
9✔
469
    }
9✔
470
}
471

472
#[async_trait]
473
impl<Tls> MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
474
    for ProPostgresDb<Tls>
475
where
476
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
477
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
478
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
479
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
480
{
481
    async fn meta_data(
482
        &self,
483
        id: &DataId,
484
    ) -> geoengine_operators::util::Result<
485
        Box<dyn MetaData<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>>,
486
    > {
8✔
487
        let id = id
8✔
488
            .internal()
8✔
489
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
8✔
490

8✔
491
        let mut conn = self.conn_pool.get().await.map_err(|e| {
8✔
492
            geoengine_operators::error::Error::MetaData {
×
493
                source: Box::new(e),
×
494
            }
×
495
        })?;
8✔
496
        let tx = conn.build_transaction().start().await.map_err(|e| {
8✔
497
            geoengine_operators::error::Error::MetaData {
×
498
                source: Box::new(e),
×
499
            }
×
500
        })?;
8✔
501

8✔
502
        if !self
8✔
503
            .has_permission_in_tx(id, Permission::Read, &tx)
8✔
504
            .await
17✔
505
            .map_err(|e| geoengine_operators::error::Error::MetaData {
8✔
506
                source: Box::new(e),
×
507
            })?
8✔
508
        {
8✔
509
            return Err(geoengine_operators::error::Error::PermissionDenied);
8✔
510
        };
8✔
511

8✔
512
        let uploaded_status = self.uploaded_dataset_status_in_tx(&id, &tx).await;
70✔
513
        if let Ok(status) = uploaded_status {
8✔
514
            if matches!(status, UploadedDatasetStatus::Deleted { .. }) {
8✔
515
                return Err(geoengine_operators::error::Error::DatasetDeleted { id });
8✔
516
            }
8✔
517
        }
8✔
518

8✔
519
        let stmt = tx
8✔
520
            .prepare(
7✔
521
                "
7✔
522
            SELECT
7✔
523
                d.meta_data
7✔
524
            FROM
7✔
525
                user_permitted_datasets p JOIN datasets d
7✔
526
                    ON (p.dataset_id = d.id)
7✔
527
            WHERE
7✔
528
                d.id = $1 AND p.user_id = $2",
7✔
529
            )
7✔
530
            .await
8✔
531
            .map_err(|e| geoengine_operators::error::Error::MetaData {
8✔
532
                source: Box::new(e),
×
533
            })?;
7✔
534

8✔
535
        let row = tx
8✔
536
            .query_one(&stmt, &[&id, &self.session.user.id])
7✔
537
            .await
8✔
538
            .map_err(|e| geoengine_operators::error::Error::MetaData {
8✔
539
                source: Box::new(e),
×
540
            })?;
7✔
541

8✔
542
        let meta_data: MetaDataDefinition = row.get(0);
8✔
543

7✔
544
        Ok(match meta_data {
7✔
545
            MetaDataDefinition::GdalMetaDataRegular(m) => Box::new(m),
8✔
546
            MetaDataDefinition::GdalStatic(m) => Box::new(m),
8✔
547
            MetaDataDefinition::GdalMetaDataList(m) => Box::new(m),
8✔
548
            MetaDataDefinition::GdalMetadataNetCdfCf(m) => Box::new(m),
8✔
549
            _ => return Err(geoengine_operators::error::Error::DataIdTypeMissMatch),
8✔
550
        })
8✔
551
    }
8✔
552
}
553

554
#[async_trait]
555
impl<Tls> DatasetStore for ProPostgresDb<Tls>
556
where
557
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
558
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
559
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
560
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
561
{
562
    async fn add_dataset(
563
        &self,
564
        dataset: AddDataset,
565
        meta_data: MetaDataDefinition,
566
    ) -> Result<DatasetIdAndName> {
25✔
567
        let id = DatasetId::new();
25✔
568
        let name = dataset.name.unwrap_or_else(|| DatasetName {
25✔
569
            namespace: Some(self.session.user.id.to_string()),
13✔
570
            name: id.to_string(),
13✔
571
        });
25✔
572

25✔
573
        log::info!(
25✔
574
            "Adding dataset with name: {:?}, tags: {:?}",
25✔
575
            name,
25✔
576
            dataset.tags
25✔
577
        );
25✔
578

25✔
579
        self.check_namespace(&name)?;
25✔
580

25✔
581
        let typed_meta_data = meta_data.to_typed_metadata();
25✔
582

25✔
583
        let mut conn = self.conn_pool.get().await?;
25✔
584

25✔
585
        let tx = conn.build_transaction().start().await?;
25✔
586

25✔
587
        tx.execute(
25✔
588
            "
25✔
589
                INSERT INTO datasets (
25✔
590
                    id,
25✔
591
                    name,
25✔
592
                    display_name,
25✔
593
                    description,
25✔
594
                    source_operator,
25✔
595
                    result_descriptor,
25✔
596
                    meta_data,
25✔
597
                    symbology,
25✔
598
                    provenance,
25✔
599
                    tags
25✔
600
                )
25✔
601
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
25✔
602
            &[
25✔
603
                &id,
25✔
604
                &name,
25✔
605
                &dataset.display_name,
25✔
606
                &dataset.description,
25✔
607
                &dataset.source_operator,
25✔
608
                &typed_meta_data.result_descriptor,
25✔
609
                typed_meta_data.meta_data,
25✔
610
                &dataset.symbology,
25✔
611
                &dataset.provenance,
25✔
612
                &dataset.tags,
25✔
613
            ],
25✔
614
        )
25✔
615
        .await
2,985✔
616
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
25✔
617

25✔
618
        let stmt = tx
25✔
619
            .prepare(
25✔
620
                "
25✔
621
            INSERT INTO permissions (
25✔
622
                role_id,
25✔
623
                dataset_id,
25✔
624
                permission
25✔
625
            )
25✔
626
            VALUES ($1, $2, $3)",
25✔
627
            )
25✔
628
            .await?;
63✔
629

25✔
630
        tx.execute(
25✔
631
            &stmt,
25✔
632
            &[&RoleId::from(self.session.user.id), &id, &Permission::Owner],
25✔
633
        )
25✔
634
        .await?;
25✔
635

25✔
636
        tx.commit().await?;
25✔
637

25✔
638
        Ok(DatasetIdAndName { id, name })
25✔
639
    }
25✔
640

641
    async fn update_dataset(&self, dataset: DatasetId, update: UpdateDataset) -> Result<()> {
×
642
        let mut conn = self.conn_pool.get().await?;
×
643

×
644
        let tx = conn.build_transaction().start().await?;
×
645

×
646
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
647
            .await
×
648
            .boxed_context(crate::error::PermissionDb)?;
×
649

×
650
        tx.execute(
×
651
            "UPDATE datasets SET name = $2, display_name = $3, description = $4, tags = $5 WHERE id = $1;",
×
652
            &[
×
653
                &dataset,
×
654
                &update.name,
×
655
                &update.display_name,
×
656
                &update.description,
×
657
                &update.tags,
×
658
            ],
×
659
        )
×
660
        .await?;
×
661

×
662
        tx.commit().await?;
×
663

×
664
        Ok(())
×
665
    }
×
666

667
    async fn update_dataset_loading_info(
668
        &self,
669
        dataset: DatasetId,
670
        meta_data: &MetaDataDefinition,
671
    ) -> Result<()> {
×
672
        let mut conn = self.conn_pool.get().await?;
×
673

×
674
        let tx = conn.build_transaction().start().await?;
×
675

×
676
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
677
            .await
×
678
            .boxed_context(crate::error::PermissionDb)?;
×
679

×
680
        tx.execute(
×
681
            "UPDATE datasets SET meta_data = $2 WHERE id = $1;",
×
682
            &[&dataset, &meta_data],
×
683
        )
×
684
        .await?;
×
685

×
686
        tx.commit().await?;
×
687

×
688
        Ok(())
×
689
    }
×
690

691
    async fn update_dataset_symbology(
692
        &self,
693
        dataset: DatasetId,
694
        symbology: &Symbology,
695
    ) -> Result<()> {
×
696
        let mut conn = self.conn_pool.get().await?;
×
697

×
698
        let tx = conn.build_transaction().start().await?;
×
699

×
700
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
701
            .await
×
702
            .boxed_context(crate::error::PermissionDb)?;
×
703

×
704
        tx.execute(
×
705
            "UPDATE datasets SET symbology = $2 WHERE id = $1;",
×
706
            &[&dataset, &symbology],
×
707
        )
×
708
        .await?;
×
709

×
710
        tx.commit().await?;
×
711

×
712
        Ok(())
×
713
    }
×
714

715
    async fn update_dataset_provenance(
716
        &self,
717
        dataset: DatasetId,
718
        provenance: &[Provenance],
719
    ) -> Result<()> {
×
720
        let mut conn = self.conn_pool.get().await?;
×
721

×
722
        let tx = conn.build_transaction().start().await?;
×
723

×
724
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
725
            .await
×
726
            .boxed_context(crate::error::PermissionDb)?;
×
727

×
728
        tx.execute(
×
729
            "UPDATE datasets SET provenance = $2 WHERE id = $1;",
×
730
            &[&dataset, &provenance],
×
731
        )
×
732
        .await?;
×
733

×
734
        tx.commit().await?;
×
735

×
736
        Ok(())
×
737
    }
×
738

739
    async fn delete_dataset(&self, dataset_id: DatasetId) -> Result<()> {
4✔
740
        let mut conn = self.conn_pool.get().await?;
4✔
741
        let tx = conn.build_transaction().start().await?;
4✔
742

4✔
743
        let is_user_upload = self.is_user_upload_in_tx(&dataset_id, &tx).await?;
19✔
744
        if !is_user_upload {
4✔
745
            self.ensure_permission_in_tx(dataset_id.into(), Permission::Owner, &tx)
4✔
746
                .await
6✔
747
                .boxed_context(crate::error::PermissionDb)?;
4✔
748

4✔
749
            let stmt = tx
4✔
750
                .prepare(
3✔
751
                    "
3✔
752
            SELECT
3✔
753
                TRUE
3✔
754
            FROM
3✔
755
                user_permitted_datasets p JOIN datasets d
3✔
756
                    ON (p.dataset_id = d.id)
3✔
757
            WHERE
3✔
758
                d.id = $1 AND p.user_id = $2 AND p.permission = 'Owner';",
3✔
759
                )
3✔
760
                .await?;
4✔
761

4✔
762
            let rows = tx
4✔
763
                .query(&stmt, &[&dataset_id, &self.session.user.id])
3✔
764
                .await?;
4✔
765

4✔
766
            if rows.is_empty() {
4✔
767
                return Err(Error::OperationRequiresOwnerPermission);
4✔
768
            }
4✔
769

4✔
770
            let stmt = tx.prepare("DELETE FROM datasets WHERE id = $1;").await?;
4✔
771

4✔
772
            tx.execute(&stmt, &[&dataset_id]).await?;
4✔
773

4✔
774
            tx.commit().await?;
4✔
775

4✔
776
            return Ok(());
4✔
777
        }
4✔
778

1✔
779
        self.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(dataset_id))
1✔
780
            .await?;
29✔
781

4✔
782
        Ok(())
4✔
783
    }
4✔
784
}
785

786
#[async_trait]
787
impl<Tls> UploadDb for ProPostgresDb<Tls>
788
where
789
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
790
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
791
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
792
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
793
{
794
    async fn load_upload(&self, upload: UploadId) -> Result<Upload> {
8✔
795
        let conn = self.conn_pool.get().await?;
8✔
796

8✔
797
        let stmt = conn
8✔
798
            .prepare(
8✔
799
                "
8✔
800
            SELECT u.id, u.files 
8✔
801
            FROM uploads u JOIN user_uploads uu ON(u.id = uu.upload_id)
8✔
802
            WHERE u.id = $1 AND uu.user_id = $2",
8✔
803
            )
8✔
804
            .await?;
9✔
805

8✔
806
        let row = conn
8✔
807
            .query_one(&stmt, &[&upload, &self.session.user.id])
8✔
808
            .await?;
8✔
809

8✔
810
        Ok(Upload {
8✔
811
            id: row.get(0),
5✔
812
            files: row
5✔
813
                .get::<_, Vec<FileUpload>>(1)
5✔
814
                .into_iter()
5✔
815
                .map(Into::into)
5✔
816
                .collect(),
5✔
817
        })
5✔
818
    }
8✔
819

820
    async fn create_upload(&self, upload: Upload) -> Result<()> {
15✔
821
        let mut conn = self.conn_pool.get().await?;
28✔
822
        let tx = conn.build_transaction().start().await?;
15✔
823

15✔
824
        let stmt = tx
15✔
825
            .prepare("INSERT INTO uploads (id, files) VALUES ($1, $2)")
15✔
826
            .await?;
50✔
827

15✔
828
        tx.execute(
15✔
829
            &stmt,
15✔
830
            &[
15✔
831
                &upload.id,
15✔
832
                &upload
15✔
833
                    .files
15✔
834
                    .iter()
15✔
835
                    .map(FileUpload::from)
15✔
836
                    .collect::<Vec<_>>(),
15✔
837
            ],
15✔
838
        )
15✔
839
        .await?;
15✔
840

15✔
841
        let stmt = tx
15✔
842
            .prepare("INSERT INTO user_uploads (user_id, upload_id) VALUES ($1, $2)")
15✔
843
            .await?;
15✔
844

15✔
845
        tx.execute(&stmt, &[&self.session.user.id, &upload.id])
15✔
846
            .await?;
15✔
847

15✔
848
        tx.commit().await?;
15✔
849

15✔
850
        Ok(())
15✔
851
    }
15✔
852
}
853

854
#[derive(Debug, Clone, ToSql, FromSql)]
87✔
855
pub struct FileUpload {
856
    pub id: FileId,
857
    pub name: String,
858
    pub byte_size: i64,
859
}
860

861
impl From<crate::datasets::upload::FileUpload> for FileUpload {
862
    fn from(upload: crate::datasets::upload::FileUpload) -> Self {
×
863
        Self {
×
864
            id: upload.id,
×
865
            name: upload.name,
×
866
            byte_size: upload.byte_size as i64,
×
867
        }
×
868
    }
×
869
}
870

871
impl From<&crate::datasets::upload::FileUpload> for FileUpload {
872
    fn from(upload: &crate::datasets::upload::FileUpload) -> Self {
27✔
873
        Self {
27✔
874
            id: upload.id,
27✔
875
            name: upload.name.clone(),
27✔
876
            byte_size: upload.byte_size as i64,
27✔
877
        }
27✔
878
    }
27✔
879
}
880

881
impl From<FileUpload> for crate::datasets::upload::FileUpload {
882
    fn from(upload: FileUpload) -> Self {
17✔
883
        Self {
17✔
884
            id: upload.id,
17✔
885
            name: upload.name,
17✔
886
            byte_size: upload.byte_size as u64,
17✔
887
        }
17✔
888
    }
17✔
889
}
890

891
#[async_trait]
892
impl<Tls> TxUploadedUserDatasetStore<PostgresConnectionManager<Tls>> for ProPostgresDb<Tls>
893
where
894
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
895
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
896
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
897
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
898
{
899
    async fn is_user_upload_in_tx(&self, dataset_id: &DatasetId, tx: &Transaction) -> Result<bool> {
4✔
900
        self.ensure_permission_in_tx((*dataset_id).into(), Permission::Read, tx)
4✔
901
            .await
11✔
902
            .boxed_context(crate::error::PermissionDb)?;
4✔
903

4✔
904
        let stmt = tx
4✔
905
            .prepare(
4✔
906
                "
4✔
907
            SELECT
4✔
908
                TRUE
4✔
909
            FROM
4✔
910
                uploaded_user_datasets
4✔
911
            WHERE
4✔
912
                dataset_id = $1;",
4✔
913
            )
4✔
914
            .await?;
4✔
915

4✔
916
        let result = tx.query_opt(&stmt, &[&dataset_id]).await?;
4✔
917

4✔
918
        return Ok(result.is_some());
4✔
919
    }
4✔
920

921
    async fn get_dataset_access_status_in_tx(
922
        &self,
923
        dataset_id: &DatasetId,
924
        tx: &Transaction,
925
    ) -> Result<DatasetAccessStatus> {
7✔
926
        let permissions = self
7✔
927
            .get_user_permissions_in_tx(*dataset_id, tx)
7✔
928
            .await
14✔
929
            .boxed_context(crate::error::PermissionDb)?;
7✔
930
        let uploaded = self.uploaded_dataset_status_in_tx(dataset_id, tx).await;
63✔
931
        let access_status = if let Ok(user_upload) = uploaded {
7✔
932
            if let UploadedDatasetStatus::Deleted(expiration) = &user_upload {
7✔
933
                if matches!(expiration.deletion_type, DeleteRecordAndData) {
7✔
934
                    return Err(UnknownDatasetId);
7✔
935
                }
7✔
936
            }
7✔
937
            DatasetAccessStatus {
7✔
938
                id: *dataset_id,
5✔
939
                dataset_type: DatasetType::UserUpload(user_upload),
5✔
940
                permissions,
5✔
941
            }
5✔
942
        } else {
7✔
943
            let stmt = tx
7✔
944
                .prepare(
2✔
945
                    "
2✔
946
                SELECT
2✔
947
                    TRUE
2✔
948
                FROM
2✔
949
                    user_permitted_datasets p JOIN datasets d
2✔
950
                        ON (p.dataset_id = d.id)
2✔
951
                WHERE
2✔
952
                    d.id = $1 AND p.user_id = $2;",
2✔
953
                )
2✔
954
                .await?;
7✔
955

7✔
956
            let rows = tx
7✔
957
                .query(&stmt, &[&dataset_id, &self.session.user.id])
2✔
958
                .await?;
7✔
959

7✔
960
            if rows.is_empty() {
7✔
961
                return Err(UnknownDatasetId);
7✔
962
            }
7✔
963

1✔
964
            DatasetAccessStatus {
1✔
965
                id: *dataset_id,
1✔
966
                dataset_type: DatasetType::NonUserUpload,
1✔
967
                permissions,
1✔
968
            }
1✔
969
        };
7✔
970

7✔
971
        Ok(access_status)
7✔
972
    }
7✔
973

974
    async fn validate_expiration_request_in_tx(
975
        &self,
976
        dataset_id: &DatasetId,
977
        expiration: &Expiration,
978
        tx: &Transaction,
979
    ) -> Result<()> {
2✔
980
        let (status, deletion_type, legal_expiration): (
2✔
981
            InternalUploadedDatasetStatus,
2✔
982
            Option<DatasetDeletionType>,
2✔
983
            bool,
2✔
984
        ) = if let Some(timestamp) = expiration.deletion_timestamp {
2✔
985
            let stmt = tx
2✔
986
                .prepare(
2✔
987
                    "
2✔
988
                    SELECT
2✔
989
                        status,
2✔
990
                        deletion_type,
2✔
991
                        $2 >= CURRENT_TIMESTAMP as legal_expiration
2✔
992
                    FROM
2✔
993
                        uploaded_user_datasets
2✔
994
                    WHERE
2✔
995
                        dataset_id = $1;",
2✔
996
                )
2✔
997
                .await?;
2✔
998
            let row = tx
2✔
999
                .query_opt(&stmt, &[&dataset_id, &timestamp])
2✔
1000
                .await?
2✔
1001
                .ok_or(UnknownDatasetId)?;
2✔
1002
            (row.get(0), row.get(1), row.get::<usize, bool>(2))
2✔
1003
        } else {
2✔
1004
            let stmt = tx
2✔
NEW
1005
                .prepare(
×
NEW
1006
                    "
×
NEW
1007
                    SELECT
×
NEW
1008
                        status,
×
NEW
1009
                        deletion_type,
×
NEW
1010
                        TRUE as legal_expiration
×
NEW
1011
                    FROM
×
NEW
1012
                        uploaded_user_datasets
×
NEW
1013
                    WHERE
×
NEW
1014
                        dataset_id = $1;",
×
NEW
1015
                )
×
1016
                .await?;
2✔
1017
            let row = tx
2✔
NEW
1018
                .query_opt(&stmt, &[&dataset_id])
×
1019
                .await?
2✔
1020
                .ok_or(UnknownDatasetId)?;
2✔
1021
            (row.get(0), row.get(1), row.get::<usize, bool>(2))
2✔
1022
        };
2✔
1023

2✔
1024
        match status {
2✔
1025
            Available | Expires => {
2✔
1026
                if !legal_expiration {
2✔
1027
                    return Err(ExpirationTimestampInPast {
2✔
1028
                        dataset: (*dataset_id).into(),
1✔
1029
                    });
1✔
1030
                }
2✔
1031
            }
2✔
1032
            Expired | UpdateExpired | Deleted => {
2✔
1033
                if matches!(expiration.deletion_type, DeleteData)
2✔
1034
                    && matches!(deletion_type, Some(DeleteRecordAndData))
2✔
1035
                {
2✔
1036
                    return Err(IllegalExpirationUpdate {
2✔
NEW
1037
                        dataset: (*dataset_id).into(),
×
NEW
1038
                        reason: "Prior expiration already deleted data and record".to_string(),
×
NEW
1039
                    });
×
1040
                }
2✔
1041
                if expiration.deletion_timestamp.is_some() {
1✔
1042
                    return Err(IllegalExpirationUpdate {
2✔
1043
                        dataset: (*dataset_id).into(),
1✔
1044
                        reason: "Setting expiration after deletion".to_string(),
1✔
1045
                    });
1✔
1046
                }
2✔
1047
            }
2✔
1048
            DeletedWithError => {
2✔
1049
                return Err(IllegalDatasetStatus {
2✔
NEW
1050
                    dataset: (*dataset_id).into(),
×
NEW
1051
                    status: "Dataset was deleted, but an error occurred during deletion"
×
NEW
1052
                        .to_string(),
×
NEW
1053
                });
×
1054
            }
2✔
1055
        }
2✔
1056
        Ok(())
2✔
1057
    }
2✔
1058

1059
    async fn uploaded_dataset_status_in_tx(
1060
        &self,
1061
        dataset_id: &DatasetId,
1062
        tx: &Transaction,
1063
    ) -> Result<UploadedDatasetStatus> {
21✔
1064
        self.ensure_permission_in_tx((*dataset_id).into(), Permission::Read, tx)
21✔
1065
            .await
43✔
1066
            .boxed_context(crate::error::PermissionDb)?;
21✔
1067

21✔
1068
        self.update_uploaded_datasets_status_in_tx(Some(dataset_id), tx)
21✔
1069
            .await?;
128✔
1070

21✔
1071
        let stmt = tx
21✔
1072
            .prepare(
20✔
1073
                "
20✔
1074
            SELECT
20✔
1075
                status, expiration, deletion_type
20✔
1076
            FROM
20✔
1077
                uploaded_user_datasets
20✔
1078
            WHERE
20✔
1079
                dataset_id = $1;",
20✔
1080
            )
20✔
1081
            .await?;
21✔
1082

21✔
1083
        let result = tx
21✔
1084
            .query_opt(&stmt, &[&dataset_id])
20✔
1085
            .await?
21✔
1086
            .ok_or(error::Error::UnknownDatasetId)?;
21✔
1087

21✔
1088
        let internal_status: InternalUploadedDatasetStatus = result.get(0);
21✔
1089
        let expiration_timestamp = result.get(1);
7✔
1090
        let dataset_deletion_type = result.get(2);
7✔
1091

21✔
1092
        let status = internal_status.convert_to_uploaded_dataset_status(
21✔
1093
            dataset_id,
7✔
1094
            expiration_timestamp,
7✔
1095
            dataset_deletion_type,
7✔
1096
        )?;
7✔
1097

21✔
1098
        Ok(status)
21✔
1099
    }
21✔
1100

1101
    async fn lazy_dataset_store_updates(
1102
        &self,
1103
        conn: &mut PooledConnection<PostgresConnectionManager<Tls>>,
1104
        dataset_id: Option<&DatasetId>,
1105
    ) -> Result<()> {
75✔
1106
        let tx = conn.build_transaction().start().await?;
75✔
1107
        self.update_uploaded_datasets_status_in_tx(dataset_id, &tx)
75✔
1108
            .await?;
516✔
1109
        tx.commit().await?;
75✔
1110

75✔
1111
        Ok(())
75✔
1112
    }
75✔
1113

1114
    async fn expire_uploaded_dataset_in_tx(
1115
        &self,
1116
        expire_dataset: ChangeDatasetExpiration,
1117
        tx: &Transaction,
1118
    ) -> Result<()> {
20✔
1119
        self.ensure_permission_in_tx(expire_dataset.dataset_id.into(), Permission::Owner, tx)
20✔
1120
            .await
50✔
1121
            .boxed_context(error::PermissionDb)?;
20✔
1122

20✔
1123
        self.update_uploaded_datasets_status_in_tx(Some(&expire_dataset.dataset_id), tx)
20✔
1124
            .await?;
134✔
1125

20✔
1126
        match expire_dataset.expiration_change {
20✔
1127
            ExpirationChange::SetExpire(expiration) => {
20✔
1128
                self.set_expire_for_uploaded_dataset(&expire_dataset.dataset_id, &expiration, tx)
17✔
1129
                    .await?;
42✔
1130
            }
20✔
1131
            ExpirationChange::UnsetExpire => {
20✔
1132
                self.unset_expire_for_uploaded_dataset(&expire_dataset.dataset_id, tx)
20✔
1133
                    .await?;
20✔
1134
            }
20✔
1135
        }
20✔
1136
        self.update_uploaded_datasets_status_in_tx(Some(&expire_dataset.dataset_id), tx)
20✔
1137
            .await?;
97✔
1138

20✔
1139
        Ok(())
20✔
1140
    }
20✔
1141

1142
    #[allow(clippy::too_many_lines)]
1143
    async fn update_uploaded_datasets_status_in_tx(
1144
        &self,
1145
        dataset_id: Option<&DatasetId>,
1146
        tx: &Transaction,
1147
    ) -> Result<()> {
135✔
1148
        fn create_filter(
270✔
1149
            session: &UserSession,
270✔
1150
            dataset_id: Option<&DatasetId>,
270✔
1151
            mut param_size: usize,
270✔
1152
        ) -> (String, Option<UserId>, String, Option<DatasetId>) {
270✔
1153
            let (user_filter, user_param) = if session.is_admin() {
270✔
1154
                (String::new(), None)
135✔
1155
            } else {
135✔
1156
                param_size += 1;
240✔
1157
                let filter = format!("AND up.user_id = ${param_size}").to_string();
240✔
1158
                (filter, Some(session.user.id))
240✔
1159
            };
135✔
1160

135✔
1161
            let (dataset_filter, dataset_param) = if let Some(dataset_id) = dataset_id {
270✔
1162
                param_size += 1;
180✔
1163
                let filter = format!("AND up.dataset_id = ${param_size}").to_string();
180✔
1164
                (filter, Some(*dataset_id))
180✔
1165
            } else {
135✔
1166
                (String::new(), None)
135✔
1167
            };
135✔
1168

135✔
1169
            (user_filter, user_param, dataset_filter, dataset_param)
270✔
1170
        }
270✔
1171

135✔
1172
        fn create_filter_params<'a>(
405✔
1173
            filter_params: &'a mut Vec<&'a (dyn ToSql + Sync)>,
405✔
1174
            user_id: Option<&'a UserId>,
405✔
1175
            dataset_id: Option<&'a DatasetId>,
405✔
1176
        ) -> &'a [&'a (dyn ToSql + Sync)] {
405✔
1177
            if let Some(user_id) = user_id {
405✔
1178
                filter_params.push(user_id);
360✔
1179
            }
360✔
1180
            if let Some(dataset_id) = dataset_id {
405✔
1181
                filter_params.push(dataset_id);
270✔
1182
            }
270✔
1183
            filter_params.as_slice()
405✔
1184
        }
405✔
1185

135✔
1186
        let (user_filter, user_param, dataset_filter, dataset_param) =
135✔
1187
            create_filter(&self.session, dataset_id, 1);
135✔
1188
        let tag_deletion = tx
135✔
1189
            .prepare(
135✔
1190
                format!("
135✔
1191
                UPDATE
135✔
1192
                    datasets
135✔
1193
                SET
135✔
1194
                    tags = tags || '{{{}}}'
135✔
1195
                FROM
135✔
1196
                    updatable_uploaded_user_datasets up
135✔
1197
                WHERE
135✔
1198
                    datasets.id = up.dataset_id AND up.deletion_type = $1 {user_filter} {dataset_filter};",
135✔
1199
                    ReservedTags::Deleted
135✔
1200
                )
135✔
1201
                    .as_str(),
135✔
1202
            )
135✔
1203
            .await?;
190✔
1204
        let mut tag_deletion_params: Vec<&(dyn ToSql + Sync)> = vec![&DeleteData];
135✔
1205
        tx.execute(
135✔
1206
            &tag_deletion,
135✔
1207
            create_filter_params(
135✔
1208
                &mut tag_deletion_params,
135✔
1209
                user_param.as_ref(),
135✔
1210
                dataset_param.as_ref(),
135✔
1211
            ),
135✔
1212
        )
135✔
1213
        .await?;
135✔
1214

135✔
1215
        let mark_deletion = tx
135✔
1216
            .prepare(
135✔
1217
                format!("
135✔
1218
                UPDATE
135✔
1219
                    uploaded_user_datasets
135✔
1220
                SET
135✔
1221
                    status = $1
135✔
1222
                FROM
135✔
1223
                    updatable_uploaded_user_datasets up
135✔
1224
                WHERE
135✔
1225
                    uploaded_user_datasets.dataset_id = up.dataset_id {user_filter} {dataset_filter};"
135✔
1226
                )
135✔
1227
                .as_str(),
135✔
1228
            )
135✔
1229
            .await?;
188✔
1230
        let mut mark_deletion_params: Vec<&(dyn ToSql + Sync)> = vec![&Expired];
135✔
1231
        tx.execute(
135✔
1232
            &mark_deletion,
135✔
1233
            create_filter_params(
135✔
1234
                &mut mark_deletion_params,
135✔
1235
                user_param.as_ref(),
135✔
1236
                dataset_param.as_ref(),
135✔
1237
            ),
135✔
1238
        )
135✔
1239
        .await?;
135✔
1240

135✔
1241
        let (user_filter, user_param, dataset_filter, dataset_param) =
135✔
1242
            create_filter(&self.session, dataset_id, 2);
135✔
1243
        let delete_records = tx
135✔
1244
            .prepare(
135✔
1245
                format!("
135✔
1246
                DELETE FROM
135✔
1247
                    datasets
135✔
1248
                USING
135✔
1249
                    uploaded_user_datasets up
135✔
1250
                WHERE
135✔
1251
                    datasets.id = up.dataset_id AND up.status = $1 AND up.deletion_type = $2 {user_filter} {dataset_filter};").as_str(),
135✔
1252
            )
135✔
1253
            .await?;
135✔
1254
        let mut delete_records_params: Vec<&(dyn ToSql + Sync)> =
135✔
1255
            vec![&Expired, &DeleteRecordAndData];
135✔
1256
        tx.execute(
135✔
1257
            &delete_records,
135✔
1258
            create_filter_params(
135✔
1259
                &mut delete_records_params,
135✔
1260
                user_param.as_ref(),
135✔
1261
                dataset_param.as_ref(),
135✔
1262
            ),
135✔
1263
        )
135✔
1264
        .await?;
135✔
1265
        Ok(())
135✔
1266
    }
135✔
1267

1268
    async fn set_expire_for_uploaded_dataset(
1269
        &self,
1270
        dataset_id: &DatasetId,
1271
        expiration: &Expiration,
1272
        tx: &Transaction,
1273
    ) -> Result<()> {
17✔
1274
        let num_changes = if let Some(delete_timestamp) = expiration.deletion_timestamp {
17✔
1275
            let stmt = tx
17✔
1276
                .prepare("
10✔
1277
                UPDATE uploaded_user_datasets
10✔
1278
                SET status = $2, expiration = $3, deletion_type = $4
10✔
1279
                WHERE dataset_id = $1 AND $3 >= CURRENT_TIMESTAMP AND (status = $5 OR status = $6);",
10✔
1280
                ).await?;
10✔
1281
            tx.execute(
17✔
1282
                &stmt,
10✔
1283
                &[
10✔
1284
                    &dataset_id,
10✔
1285
                    &Expires,
10✔
1286
                    &delete_timestamp,
10✔
1287
                    &expiration.deletion_type,
10✔
1288
                    &Available,
10✔
1289
                    &Expires,
10✔
1290
                ],
10✔
1291
            )
10✔
1292
            .await?
17✔
1293
        } else {
17✔
1294
            let stmt = tx
17✔
1295
                .prepare(
7✔
1296
                    "
7✔
1297
                UPDATE uploaded_user_datasets
7✔
1298
                SET status = $2, expiration = CURRENT_TIMESTAMP, deletion_type = $3
7✔
1299
                WHERE dataset_id = $1 AND (status = $4 OR status = $5);",
7✔
1300
                )
7✔
1301
                .await?;
17✔
1302
            let num_expired = tx
17✔
1303
                .execute(
7✔
1304
                    &stmt,
7✔
1305
                    &[
7✔
1306
                        &dataset_id,
7✔
1307
                        &Expires,
7✔
1308
                        &expiration.deletion_type,
7✔
1309
                        &Available,
7✔
1310
                        &Expires,
7✔
1311
                    ],
7✔
1312
                )
7✔
1313
                .await?;
17✔
1314

17✔
1315
            if num_expired == 0 && matches!(expiration.deletion_type, DeleteRecordAndData) {
17✔
1316
                let stmt = tx
17✔
1317
                    .prepare(
2✔
1318
                        "
2✔
1319
                    UPDATE uploaded_user_datasets
2✔
1320
                    SET deletion_type = $2,
2✔
1321
                        status = $3
2✔
1322
                    WHERE dataset_id = $1 AND (status = $4 OR status = $5) AND deletion_type = $6;",
2✔
1323
                    )
2✔
1324
                    .await?;
17✔
1325
                tx.execute(
17✔
1326
                    &stmt,
2✔
1327
                    &[
2✔
1328
                        &dataset_id,
2✔
1329
                        &expiration.deletion_type,
2✔
1330
                        &UpdateExpired,
2✔
1331
                        &Expired,
2✔
1332
                        &Deleted,
2✔
1333
                        &DeleteData,
2✔
1334
                    ],
2✔
1335
                )
2✔
1336
                .await?
17✔
1337
            } else {
17✔
1338
                num_expired
17✔
1339
            }
17✔
1340
        };
17✔
1341

17✔
1342
        if num_changes == 0 {
17✔
1343
            self.validate_expiration_request_in_tx(dataset_id, expiration, tx)
17✔
1344
                .await?;
17✔
1345
        };
17✔
1346

17✔
1347
        Ok(())
17✔
1348
    }
17✔
1349

1350
    async fn unset_expire_for_uploaded_dataset(
1351
        &self,
1352
        dataset_id: &DatasetId,
1353
        tx: &Transaction,
1354
    ) -> Result<()> {
2✔
1355
        let stmt = tx
2✔
1356
            .prepare(
2✔
1357
                "
2✔
1358
                    UPDATE uploaded_user_datasets
2✔
1359
                    SET status = $2, expiration = NULL, deletion_type = NULL
2✔
1360
                    WHERE dataset_id = $1 AND status = $3;",
2✔
1361
            )
2✔
1362
            .await?;
2✔
1363
        let set_changes = tx
2✔
1364
            .execute(&stmt, &[&dataset_id, &Available, &Expires])
2✔
1365
            .await?;
2✔
1366
        if set_changes == 0 {
2✔
1367
            return Err(IllegalDatasetStatus {
2✔
1368
                dataset: (*dataset_id).into(),
1✔
1369
                status: "Requested dataset does not exist or does not have an expiration"
1✔
1370
                    .to_string(),
1✔
1371
            });
1✔
1372
        }
2✔
1373
        Ok(())
1✔
1374
    }
2✔
1375
}
1376

1377
#[async_trait]
1378
impl<Tls> UploadedUserDatasetStore for ProPostgresDb<Tls>
1379
where
1380
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
1381
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
1382
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
1383
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
1384
{
1385
    async fn add_uploaded_dataset(
1386
        &self,
1387
        upload_id: UploadId,
1388
        dataset: AddDataset,
1389
        meta_data: MetaDataDefinition,
1390
    ) -> Result<DatasetIdAndName> {
13✔
1391
        let id = DatasetId::new();
13✔
1392
        let name = dataset.name.unwrap_or_else(|| DatasetName {
13✔
1393
            namespace: Some(self.session.user.id.to_string()),
3✔
1394
            name: id.to_string(),
3✔
1395
        });
13✔
1396

13✔
1397
        log::info!(
13✔
1398
            "Adding dataset with name: {:?}, tags: {:?}",
13✔
1399
            name,
13✔
1400
            dataset.tags
13✔
1401
        );
13✔
1402

13✔
1403
        self.check_namespace(&name)?;
13✔
1404

13✔
1405
        let typed_meta_data = meta_data.to_typed_metadata();
13✔
1406

13✔
1407
        let mut conn = self.conn_pool.get().await?;
13✔
1408

13✔
1409
        let tx = conn.build_transaction().start().await?;
13✔
1410

13✔
1411
        tx.execute(
13✔
1412
            "
13✔
1413
                INSERT INTO datasets (
13✔
1414
                    id,
13✔
1415
                    name,
13✔
1416
                    display_name,
13✔
1417
                    description,
13✔
1418
                    source_operator,
13✔
1419
                    result_descriptor,
13✔
1420
                    meta_data,
13✔
1421
                    symbology,
13✔
1422
                    provenance,
13✔
1423
                    tags
13✔
1424
                )
13✔
1425
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
13✔
1426
            &[
13✔
1427
                &id,
13✔
1428
                &name,
13✔
1429
                &dataset.display_name,
13✔
1430
                &dataset.description,
13✔
1431
                &dataset.source_operator,
13✔
1432
                &typed_meta_data.result_descriptor,
13✔
1433
                typed_meta_data.meta_data,
13✔
1434
                &dataset.symbology,
13✔
1435
                &dataset.provenance,
13✔
1436
                &dataset.tags,
13✔
1437
            ],
13✔
1438
        )
13✔
1439
        .await
1,138✔
1440
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
13✔
1441

13✔
1442
        let stmt = tx
13✔
1443
            .prepare(
13✔
1444
                "
13✔
1445
            INSERT INTO permissions (
13✔
1446
                role_id,
13✔
1447
                dataset_id,
13✔
1448
                permission
13✔
1449
            )
13✔
1450
            VALUES ($1, $2, $3)",
13✔
1451
            )
13✔
1452
            .await?;
29✔
1453

13✔
1454
        tx.execute(
13✔
1455
            &stmt,
13✔
1456
            &[&RoleId::from(self.session.user.id), &id, &Permission::Owner],
13✔
1457
        )
13✔
1458
        .await?;
13✔
1459

13✔
1460
        let stmt = tx
13✔
1461
            .prepare(
13✔
1462
                "
13✔
1463
            INSERT INTO uploaded_user_datasets (
13✔
1464
                user_id,
13✔
1465
                upload_id,
13✔
1466
                dataset_id,
13✔
1467
                status,
13✔
1468
                created,
13✔
1469
                expiration,
13✔
1470
                deleted,
13✔
1471
                deletion_type
13✔
1472
            )
13✔
1473
            VALUES ($1, $2, $3, 'Available', CURRENT_TIMESTAMP, NULL, NULL, NULL)",
13✔
1474
            )
13✔
1475
            .await?;
13✔
1476

13✔
1477
        tx.execute(
13✔
1478
            &stmt,
13✔
1479
            &[&RoleId::from(self.session.user.id), &upload_id, &id],
13✔
1480
        )
13✔
1481
        .await?;
13✔
1482

13✔
1483
        tx.commit().await?;
13✔
1484

13✔
1485
        Ok(DatasetIdAndName { id, name })
13✔
1486
    }
13✔
1487

1488
    async fn expire_uploaded_dataset(&self, expire_dataset: ChangeDatasetExpiration) -> Result<()> {
13✔
1489
        let mut conn = self.conn_pool.get().await?;
14✔
1490
        let tx = conn.build_transaction().start().await?;
13✔
1491

13✔
1492
        self.expire_uploaded_dataset_in_tx(expire_dataset, &tx)
13✔
1493
            .await?;
205✔
1494

13✔
1495
        tx.commit().await?;
13✔
1496

13✔
1497
        Ok(())
13✔
1498
    }
13✔
1499

1500
    async fn get_dataset_access_status(
1501
        &self,
1502
        dataset_id: &DatasetId,
1503
    ) -> Result<DatasetAccessStatus> {
7✔
1504
        let mut conn = self.conn_pool.get().await?;
7✔
1505
        self.lazy_dataset_store_updates(&mut conn, Some(dataset_id))
7✔
1506
            .await?;
56✔
1507

7✔
1508
        let tx = conn.build_transaction().start().await?;
7✔
1509

7✔
1510
        let result = self.get_dataset_access_status_in_tx(dataset_id, &tx).await;
81✔
1511

7✔
1512
        tx.commit().await?;
7✔
1513

7✔
1514
        result
7✔
1515
    }
7✔
1516

1517
    async fn clear_expired_datasets(&self) -> Result<u64> {
6✔
1518
        ensure!(self.session.is_admin(), error::PermissionDenied);
6✔
1519

6✔
1520
        let mut conn = self.conn_pool.get().await?;
6✔
1521
        let tx = conn.build_transaction().start().await?;
6✔
1522

6✔
1523
        self.update_uploaded_datasets_status_in_tx(None, &tx)
6✔
1524
            .await?;
30✔
1525

6✔
1526
        let update_expired = tx
6✔
1527
            .prepare(
5✔
1528
                "
5✔
1529
                UPDATE
5✔
1530
                    uploaded_user_datasets
5✔
1531
                SET
5✔
1532
                    status = $1
5✔
1533
                WHERE
5✔
1534
                    status = $2 AND deleted IS NOT NULL;",
5✔
1535
            )
5✔
1536
            .await?;
6✔
1537
        let mut updated = tx.execute(&update_expired, &[&Deleted, &Expired]).await?;
6✔
1538

6✔
1539
        let marked_datasets = tx
6✔
1540
            .prepare(
5✔
1541
                "
5✔
1542
                SELECT
5✔
1543
                    dataset_id, upload_id
5✔
1544
                FROM
5✔
1545
                    uploaded_user_datasets
5✔
1546
                WHERE
5✔
1547
                    status = $1 AND deleted IS NULL;",
5✔
1548
            )
5✔
1549
            .await?;
6✔
1550

6✔
1551
        let rows = tx.query(&marked_datasets, &[&Expired]).await?;
6✔
1552

6✔
1553
        let mut deleted = vec![];
6✔
1554
        let mut deleted_with_error = vec![];
5✔
1555

6✔
1556
        for row in rows {
8✔
1557
            let dataset_id: DatasetId = row.get(0);
6✔
1558
            let upload_id = row.get(1);
3✔
1559
            let res = delete_upload(upload_id).await;
6✔
1560
            if let Err(error) = res {
6✔
1561
                log::error!("Error during deletion of upload {upload_id} from dataset {dataset_id}: {error}, marking as DeletedWithError");
6✔
1562
                deleted_with_error.push(upload_id);
6✔
1563
            } else {
6✔
1564
                deleted.push(upload_id);
3✔
1565
            }
3✔
1566
            updated += 1; //Could hypothetically overflow
6✔
1567
        }
6✔
1568

6✔
1569
        let mark_deletion = tx
6✔
1570
            .prepare(
5✔
1571
                "
5✔
1572
                UPDATE
5✔
1573
                    uploaded_user_datasets
5✔
1574
                SET
5✔
1575
                    status = $1, deleted = CURRENT_TIMESTAMP
5✔
1576
                WHERE
5✔
1577
                    status = $2 AND upload_id = ANY($3);",
5✔
1578
            )
5✔
1579
            .await?;
6✔
1580

6✔
1581
        if !deleted.is_empty() {
6✔
1582
            tx.execute(&mark_deletion, &[&Deleted, &Expired, &deleted])
6✔
1583
                .await?;
6✔
1584
        }
6✔
1585

6✔
1586
        if !deleted_with_error.is_empty() {
6✔
1587
            tx.execute(
6✔
NEW
1588
                &mark_deletion,
×
NEW
1589
                &[&DeletedWithError, &Expired, &deleted_with_error],
×
NEW
1590
            )
×
1591
            .await?;
6✔
1592
        }
6✔
1593

6✔
1594
        tx.commit().await?;
6✔
1595

6✔
1596
        Ok(updated)
6✔
1597
    }
6✔
1598
}
1599

1600
#[cfg(test)]
1601
mod tests {
1602
    use std::fs;
1603
    use std::ops::{Add, Sub};
1604
    use std::path::PathBuf;
1605

1606
    use super::*;
1607
    use crate::api::model::responses::IdResponse;
1608
    use crate::contexts::SessionId;
1609
    use crate::datasets::upload::UploadRootPath;
1610
    use crate::error::Error::PermissionDenied;
1611
    use crate::pro::permissions::{PermissionDb, Role};
1612
    use crate::pro::users::{UserCredentials, UserRegistration};
1613
    use crate::pro::util::tests::{admin_login, send_pro_test_request};
1614
    use crate::pro::util::tests::{get_db_timestamp, get_db_timestamp_in_tx};
1615
    use crate::util::tests::{SetMultipartBody, TestDataUploads};
1616
    use crate::{
1617
        contexts::{ApplicationContext, SessionContext},
1618
        pro::{
1619
            contexts::ProPostgresContext,
1620
            ge_context,
1621
            users::{UserAuth, UserSession},
1622
        },
1623
    };
1624
    use actix_web::http::header;
1625
    use actix_web::test;
1626
    use actix_web_httpauth::headers::authorization::Bearer;
1627
    use geoengine_datatypes::primitives::{DateTime, Duration};
1628
    use geoengine_datatypes::{
1629
        collections::VectorDataType,
1630
        primitives::{CacheTtlSeconds, FeatureDataType, Measurement},
1631
        spatial_reference::SpatialReference,
1632
    };
1633
    use geoengine_operators::error::Error::DatasetDeleted;
1634
    use geoengine_operators::{
1635
        engine::{StaticMetaData, VectorColumnInfo},
1636
        source::{
1637
            CsvHeader, FormatSpecifics, OgrSourceColumnSpec, OgrSourceDatasetTimeType,
1638
            OgrSourceDurationSpec, OgrSourceErrorSpec, OgrSourceTimeFormat,
1639
        },
1640
    };
1641
    use tokio_postgres::NoTls;
1642

1643
    #[ge_context::test]
3✔
1644
    async fn it_autocompletes_datasets(app_ctx: ProPostgresContext<NoTls>) {
1✔
1645
        let session_a = app_ctx.create_anonymous_session().await.unwrap();
14✔
1646
        let session_b = app_ctx.create_anonymous_session().await.unwrap();
15✔
1647

1✔
1648
        let db_a = app_ctx.session_context(session_a.clone()).db();
1✔
1649
        let db_b = app_ctx.session_context(session_b.clone()).db();
1✔
1650

1✔
1651
        add_single_dataset(&db_a, &session_a).await;
178✔
1652

1653
        assert_eq!(
1✔
1654
            db_a.dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0)
1✔
1655
                .await
15✔
1656
                .unwrap(),
1✔
1657
            vec!["Ogr Test"]
1✔
1658
        );
1659
        assert_eq!(
1✔
1660
            db_a.dataset_autocomplete_search(
1✔
1661
                Some(vec!["upload".to_string()]),
1✔
1662
                "Ogr".to_owned(),
1✔
1663
                10,
1✔
1664
                0
1✔
1665
            )
1✔
1666
            .await
11✔
1667
            .unwrap(),
1✔
1668
            vec!["Ogr Test"]
1✔
1669
        );
1670

1671
        // check that other user B cannot access datasets of user A
1672

1673
        assert!(db_b
1✔
1674
            .dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0)
1✔
1675
            .await
11✔
1676
            .unwrap()
1✔
1677
            .is_empty());
1✔
1678
        assert!(db_b
1✔
1679
            .dataset_autocomplete_search(Some(vec!["upload".to_string()]), "Ogr".to_owned(), 10, 0)
1✔
1680
            .await
11✔
1681
            .unwrap()
1✔
1682
            .is_empty());
1✔
1683
    }
1✔
1684

1685
    async fn add_single_dataset(db: &ProPostgresDb<NoTls>, session: &UserSession) -> DatasetName {
2✔
1686
        let loading_info = OgrSourceDataset {
2✔
1687
            file_name: PathBuf::from("test.csv"),
2✔
1688
            layer_name: "test.csv".to_owned(),
2✔
1689
            data_type: Some(VectorDataType::MultiPoint),
2✔
1690
            time: OgrSourceDatasetTimeType::Start {
2✔
1691
                start_field: "start".to_owned(),
2✔
1692
                start_format: OgrSourceTimeFormat::Auto,
2✔
1693
                duration: OgrSourceDurationSpec::Zero,
2✔
1694
            },
2✔
1695
            default_geometry: None,
2✔
1696
            columns: Some(OgrSourceColumnSpec {
2✔
1697
                format_specifics: Some(FormatSpecifics::Csv {
2✔
1698
                    header: CsvHeader::Auto,
2✔
1699
                }),
2✔
1700
                x: "x".to_owned(),
2✔
1701
                y: None,
2✔
1702
                int: vec![],
2✔
1703
                float: vec![],
2✔
1704
                text: vec![],
2✔
1705
                bool: vec![],
2✔
1706
                datetime: vec![],
2✔
1707
                rename: None,
2✔
1708
            }),
2✔
1709
            force_ogr_time_filter: false,
2✔
1710
            force_ogr_spatial_filter: false,
2✔
1711
            on_error: OgrSourceErrorSpec::Ignore,
2✔
1712
            sql_query: None,
2✔
1713
            attribute_query: None,
2✔
1714
            cache_ttl: CacheTtlSeconds::default(),
2✔
1715
        };
2✔
1716

2✔
1717
        let meta_data = MetaDataDefinition::OgrMetaData(StaticMetaData::<
2✔
1718
            OgrSourceDataset,
2✔
1719
            VectorResultDescriptor,
2✔
1720
            VectorQueryRectangle,
2✔
1721
        > {
2✔
1722
            loading_info: loading_info.clone(),
2✔
1723
            result_descriptor: VectorResultDescriptor {
2✔
1724
                data_type: VectorDataType::MultiPoint,
2✔
1725
                spatial_reference: SpatialReference::epsg_4326().into(),
2✔
1726
                columns: [(
2✔
1727
                    "foo".to_owned(),
2✔
1728
                    VectorColumnInfo {
2✔
1729
                        data_type: FeatureDataType::Float,
2✔
1730
                        measurement: Measurement::Unitless,
2✔
1731
                    },
2✔
1732
                )]
2✔
1733
                .into_iter()
2✔
1734
                .collect(),
2✔
1735
                time: None,
2✔
1736
                bbox: None,
2✔
1737
            },
2✔
1738
            phantom: Default::default(),
2✔
1739
        });
2✔
1740

2✔
1741
        let dataset_name = DatasetName::new(Some(session.user.id.to_string()), "my_dataset");
2✔
1742

2✔
1743
        db.add_dataset(
2✔
1744
            AddDataset {
2✔
1745
                name: Some(dataset_name.clone()),
2✔
1746
                display_name: "Ogr Test".to_owned(),
2✔
1747
                description: "desc".to_owned(),
2✔
1748
                source_operator: "OgrSource".to_owned(),
2✔
1749
                symbology: None,
2✔
1750
                provenance: None,
2✔
1751
                tags: Some(vec!["upload".to_owned(), "test".to_owned()]),
2✔
1752
            },
2✔
1753
            meta_data,
2✔
1754
        )
2✔
1755
        .await
355✔
1756
        .unwrap();
2✔
1757

2✔
1758
        dataset_name
2✔
1759
    }
2✔
1760

1761
    const TEST_POINT_DATASET_SOURCE_PATH: &str = "vector/data/points.fgb";
1762

1763
    struct TestDatasetDefinition {
1764
        meta_data: MetaDataDefinition,
1765
        dataset_name: DatasetName,
1766
    }
1767

1768
    struct UploadedTestDataset {
1769
        dataset_name: DatasetName,
1770
        dataset_id: DatasetId,
1771
        upload_id: UploadId,
1772
    }
1773

1774
    fn test_point_dataset(name_space: Option<String>, name: &str) -> TestDatasetDefinition {
11✔
1775
        let local_path = PathBuf::from(TEST_POINT_DATASET_SOURCE_PATH);
11✔
1776
        let file_name = local_path.file_name().unwrap().to_str().unwrap();
11✔
1777
        let loading_info = OgrSourceDataset {
11✔
1778
            file_name: PathBuf::from(file_name),
11✔
1779
            layer_name: file_name.to_owned(),
11✔
1780
            data_type: Some(VectorDataType::MultiPoint),
11✔
1781
            time: OgrSourceDatasetTimeType::None,
11✔
1782
            default_geometry: None,
11✔
1783
            columns: Some(OgrSourceColumnSpec {
11✔
1784
                format_specifics: None,
11✔
1785
                x: "x".to_owned(),
11✔
1786
                y: Some("y".to_owned()),
11✔
1787
                int: vec!["num".to_owned()],
11✔
1788
                float: vec![],
11✔
1789
                text: vec!["txt".to_owned()],
11✔
1790
                bool: vec![],
11✔
1791
                datetime: vec![],
11✔
1792
                rename: None,
11✔
1793
            }),
11✔
1794
            force_ogr_time_filter: false,
11✔
1795
            force_ogr_spatial_filter: false,
11✔
1796
            on_error: OgrSourceErrorSpec::Ignore,
11✔
1797
            sql_query: None,
11✔
1798
            attribute_query: None,
11✔
1799
            cache_ttl: CacheTtlSeconds::default(),
11✔
1800
        };
11✔
1801

11✔
1802
        let meta_data = MetaDataDefinition::OgrMetaData(StaticMetaData::<
11✔
1803
            OgrSourceDataset,
11✔
1804
            VectorResultDescriptor,
11✔
1805
            VectorQueryRectangle,
11✔
1806
        > {
11✔
1807
            loading_info: loading_info.clone(),
11✔
1808
            result_descriptor: VectorResultDescriptor {
11✔
1809
                data_type: VectorDataType::MultiPoint,
11✔
1810
                spatial_reference: SpatialReference::epsg_4326().into(),
11✔
1811
                columns: [
11✔
1812
                    (
11✔
1813
                        "num".to_owned(),
11✔
1814
                        VectorColumnInfo {
11✔
1815
                            data_type: FeatureDataType::Int,
11✔
1816
                            measurement: Measurement::Unitless,
11✔
1817
                        },
11✔
1818
                    ),
11✔
1819
                    (
11✔
1820
                        "txt".to_owned(),
11✔
1821
                        VectorColumnInfo {
11✔
1822
                            data_type: FeatureDataType::Text,
11✔
1823
                            measurement: Measurement::Unitless,
11✔
1824
                        },
11✔
1825
                    ),
11✔
1826
                ]
11✔
1827
                .into_iter()
11✔
1828
                .collect(),
11✔
1829
                time: None,
11✔
1830
                bbox: None,
11✔
1831
            },
11✔
1832
            phantom: Default::default(),
11✔
1833
        });
11✔
1834

11✔
1835
        let dataset_name = DatasetName::new(name_space, name);
11✔
1836

11✔
1837
        TestDatasetDefinition {
11✔
1838
            meta_data,
11✔
1839
            dataset_name,
11✔
1840
        }
11✔
1841
    }
11✔
1842

1843
    async fn upload_point_dataset(
10✔
1844
        app_ctx: &ProPostgresContext<NoTls>,
10✔
1845
        session_id: SessionId,
10✔
1846
    ) -> UploadId {
10✔
1847
        let files =
10✔
1848
            vec![geoengine_datatypes::test_data!(TEST_POINT_DATASET_SOURCE_PATH).to_path_buf()];
10✔
1849

10✔
1850
        let req = actix_web::test::TestRequest::post()
10✔
1851
            .uri("/upload")
10✔
1852
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())))
10✔
1853
            .set_multipart_files(&files);
10✔
1854

1855
        let res = send_pro_test_request(req, app_ctx.clone()).await;
257✔
1856
        assert_eq!(res.status(), 200);
10✔
1857
        let upload: IdResponse<UploadId> = test::read_body_json(res).await;
10✔
1858

1859
        upload.id
10✔
1860
    }
10✔
1861

1862
    async fn upload_and_add_point_dataset(
10✔
1863
        app_ctx: &ProPostgresContext<NoTls>,
10✔
1864
        user_session: &UserSession,
10✔
1865
        name: &str,
10✔
1866
        upload_dir: &mut TestDataUploads,
10✔
1867
    ) -> UploadedTestDataset {
10✔
1868
        let test_dataset = test_point_dataset(Some(user_session.user.id.to_string()), name);
10✔
1869
        let upload_id = upload_point_dataset(app_ctx, user_session.id).await;
257✔
1870

1871
        let res = app_ctx
10✔
1872
            .session_context(user_session.clone())
10✔
1873
            .db()
10✔
1874
            .add_uploaded_dataset(
10✔
1875
                upload_id,
10✔
1876
                AddDataset {
10✔
1877
                    name: Some(test_dataset.dataset_name.clone()),
10✔
1878
                    display_name: "Ogr Test".to_owned(),
10✔
1879
                    description: "desc".to_owned(),
10✔
1880
                    source_operator: "OgrSource".to_owned(),
10✔
1881
                    symbology: None,
10✔
1882
                    provenance: None,
10✔
1883
                    tags: Some(vec!["upload".to_owned(), "test".to_owned()]),
10✔
1884
                },
10✔
1885
                test_dataset.meta_data.clone(),
10✔
1886
            )
10✔
1887
            .await
878✔
1888
            .unwrap();
10✔
1889

10✔
1890
        upload_dir.uploads.push(upload_id);
10✔
1891

10✔
1892
        UploadedTestDataset {
10✔
1893
            dataset_name: test_dataset.dataset_name,
10✔
1894
            dataset_id: res.id,
10✔
1895
            upload_id,
10✔
1896
        }
10✔
1897
    }
10✔
1898

1899
    async fn add_test_volume_dataset(app_ctx: &ProPostgresContext<NoTls>) -> DatasetId {
1✔
1900
        let admin_session = admin_login(app_ctx).await;
11✔
1901
        let admin_ctx = app_ctx.session_context(admin_session.clone());
1✔
1902
        let db = admin_ctx.db();
1✔
1903
        let dataset_name = add_single_dataset(&db, &admin_session).await;
177✔
1904
        let dataset_id = db
1✔
1905
            .resolve_dataset_name_to_id(&dataset_name)
1✔
1906
            .await
15✔
1907
            .unwrap()
1✔
1908
            .unwrap();
1✔
1909

1✔
1910
        db.add_permission(
1✔
1911
            Role::registered_user_role_id(),
1✔
1912
            dataset_id,
1✔
1913
            Permission::Read,
1✔
1914
        )
1✔
1915
        .await
8✔
1916
        .unwrap();
1✔
1917

1✔
1918
        db.add_permission(Role::anonymous_role_id(), dataset_id, Permission::Read)
1✔
1919
            .await
7✔
1920
            .unwrap();
1✔
1921

1✔
1922
        dataset_id
1✔
1923
    }
1✔
1924

1925
    fn listing_not_deleted(dataset: &DatasetListing, origin: &UploadedTestDataset) -> bool {
9✔
1926
        dataset.name == origin.dataset_name
9✔
1927
            && !dataset.tags.contains(&ReservedTags::Deleted.to_string())
9✔
1928
    }
9✔
1929

1930
    fn dataset_deleted(dataset: &Dataset, origin: &UploadedTestDataset) -> bool {
4✔
1931
        let tags = dataset.tags.clone().unwrap();
4✔
1932
        let mut num_deleted = 0;
4✔
1933
        for tag in tags {
16✔
1934
            if tag == ReservedTags::Deleted.to_string() {
12✔
1935
                num_deleted += 1;
4✔
1936
            }
8✔
1937
        }
1938
        dataset.name == origin.dataset_name && num_deleted == 1
4✔
1939
    }
4✔
1940

1941
    fn dir_exists(origin: &UploadedTestDataset) -> bool {
8✔
1942
        let path = origin.upload_id.root_path().unwrap();
8✔
1943
        fs::read_dir(path).is_ok()
8✔
1944
    }
8✔
1945

1946
    fn has_read_and_owner_permissions(permissions: &[Permission]) {
3✔
1947
        assert_eq!(permissions.len(), 2);
3✔
1948
        assert!(permissions.contains(&Permission::Read));
3✔
1949
        assert!(permissions.contains(&Permission::Owner));
3✔
1950
    }
3✔
1951

1952
    async fn register_test_user(app_ctx: &ProPostgresContext<NoTls>) -> UserSession {
6✔
1953
        let _user_id = app_ctx
6✔
1954
            .register_user(UserRegistration {
6✔
1955
                email: "test@localhost".to_string(),
6✔
1956
                real_name: "Foo Bar".to_string(),
6✔
1957
                password: "test".to_string(),
6✔
1958
            })
6✔
1959
            .await
63✔
1960
            .unwrap();
6✔
1961

6✔
1962
        app_ctx
6✔
1963
            .login(UserCredentials {
6✔
1964
                email: "test@localhost".to_string(),
6✔
1965
                password: "test".to_string(),
6✔
1966
            })
6✔
1967
            .await
64✔
1968
            .unwrap()
6✔
1969
    }
6✔
1970

1971
    async fn expire_in_tx_time_duration(
7✔
1972
        app_ctx: &ProPostgresContext<NoTls>,
7✔
1973
        user_session: &UserSession,
7✔
1974
        dataset_id: DatasetId,
7✔
1975
        fair: bool,
7✔
1976
        duration: Duration,
7✔
1977
    ) -> DateTime {
7✔
1978
        let mut conn = app_ctx.pool.get().await.unwrap();
7✔
1979
        let tx = conn.build_transaction().start().await.unwrap();
7✔
1980

7✔
1981
        let db = app_ctx.session_context(user_session.clone()).db();
7✔
1982

1983
        let current_time = get_db_timestamp_in_tx(&tx).await;
14✔
1984
        let future_time = current_time.add(duration);
7✔
1985

1986
        let change_dataset_expiration = if fair {
7✔
1987
            ChangeDatasetExpiration::expire_fair(dataset_id, future_time)
6✔
1988
        } else {
1989
            ChangeDatasetExpiration::expire_full(dataset_id, future_time)
1✔
1990
        };
1991

1992
        db.expire_uploaded_dataset_in_tx(change_dataset_expiration, &tx)
7✔
1993
            .await
122✔
1994
            .unwrap();
7✔
1995

7✔
1996
        tx.commit().await.unwrap();
7✔
1997

7✔
1998
        future_time
7✔
1999
    }
7✔
2000

2001
    #[ge_context::test]
3✔
2002
    async fn it_lists_datasets_without_tags(app_ctx: ProPostgresContext<NoTls>) {
1✔
2003
        let admin_session = admin_login(&app_ctx).await;
11✔
2004
        let admin_ctx = app_ctx.session_context(admin_session.clone());
1✔
2005
        let db = admin_ctx.db();
1✔
2006
        let test_dataset = test_point_dataset(None, "test_data");
1✔
2007

1✔
2008
        let ds = AddDataset {
1✔
2009
            name: None,
1✔
2010
            display_name: "TestData".to_string(),
1✔
2011
            description: "TestData without tags".to_string(),
1✔
2012
            source_operator: "OgrSource".to_string(),
1✔
2013
            symbology: None,
1✔
2014
            provenance: None,
1✔
2015
            tags: None,
1✔
2016
        };
1✔
2017

1✔
2018
        db.add_dataset(ds, test_dataset.meta_data).await.unwrap();
177✔
2019

1✔
2020
        let default_list_options = DatasetListOptions {
1✔
2021
            filter: None,
1✔
2022
            order: OrderBy::NameAsc,
1✔
2023
            offset: 0,
1✔
2024
            limit: 10,
1✔
2025
            tags: None,
1✔
2026
        };
1✔
2027

2028
        let listing = db
1✔
2029
            .list_datasets(default_list_options.clone())
1✔
2030
            .await
15✔
2031
            .unwrap();
1✔
2032

1✔
2033
        assert_eq!(listing.len(), 1);
1✔
2034
    }
1✔
2035

2036
    #[ge_context::test]
3✔
2037
    async fn it_deletes_datasets(app_ctx: ProPostgresContext<NoTls>) {
1✔
2038
        let mut test_data = TestDataUploads::default();
1✔
2039
        let user_session = register_test_user(&app_ctx).await;
22✔
2040

2041
        let available =
1✔
2042
            upload_and_add_point_dataset(&app_ctx, &user_session, "available", &mut test_data)
1✔
2043
                .await;
188✔
2044
        let fair =
1✔
2045
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair", &mut test_data).await;
24✔
2046
        let full =
1✔
2047
            upload_and_add_point_dataset(&app_ctx, &user_session, "full", &mut test_data).await;
27✔
2048

2049
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
2050

1✔
2051
        let default_list_options = DatasetListOptions {
1✔
2052
            filter: None,
1✔
2053
            order: OrderBy::NameAsc,
1✔
2054
            offset: 0,
1✔
2055
            limit: 10,
1✔
2056
            tags: None,
1✔
2057
        };
1✔
2058

2059
        let listing = db
1✔
2060
            .list_datasets(default_list_options.clone())
1✔
2061
            .await
15✔
2062
            .unwrap();
1✔
2063

1✔
2064
        assert_eq!(listing.len(), 3);
1✔
2065
        assert!(listing_not_deleted(listing.first().unwrap(), &available));
1✔
2066
        assert!(listing_not_deleted(listing.get(1).unwrap(), &fair));
1✔
2067
        assert!(listing_not_deleted(listing.get(2).unwrap(), &full));
1✔
2068

2069
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(fair.dataset_id))
1✔
2070
            .await
20✔
2071
            .unwrap();
1✔
2072
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(full.dataset_id))
1✔
2073
            .await
20✔
2074
            .unwrap();
1✔
2075

2076
        let listing = db
1✔
2077
            .list_datasets(default_list_options.clone())
1✔
2078
            .await
11✔
2079
            .unwrap();
1✔
2080

1✔
2081
        assert_eq!(listing.len(), 1);
1✔
2082
        assert!(listing_not_deleted(listing.first().unwrap(), &available));
1✔
2083
        assert!(dataset_deleted(
1✔
2084
            &db.load_dataset(&fair.dataset_id).await.unwrap(),
11✔
2085
            &fair
1✔
2086
        ));
2087
        assert!(matches!(
1✔
2088
            db.load_dataset(&full.dataset_id).await.unwrap_err(),
11✔
2089
            UnknownDatasetId
2090
        ));
2091

2092
        assert!(dir_exists(&available));
1✔
2093
        assert!(dir_exists(&fair));
1✔
2094
        assert!(dir_exists(&full));
1✔
2095

2096
        let admin_session = admin_login(&app_ctx).await;
11✔
2097
        let admin_ctx = app_ctx.session_context(admin_session.clone());
1✔
2098
        let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap();
15✔
2099

1✔
2100
        assert_eq!(deleted, 2);
1✔
2101
        assert!(dir_exists(&available));
1✔
2102
        assert!(!dir_exists(&fair));
1✔
2103
        assert!(!dir_exists(&full));
1✔
2104

2105
        let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap();
14✔
2106
        assert_eq!(deleted, 0);
1✔
2107
    }
1✔
2108

2109
    #[ge_context::test]
3✔
2110
    async fn it_expires_dataset(app_ctx: ProPostgresContext<NoTls>) {
1✔
2111
        let mut test_data = TestDataUploads::default();
1✔
2112
        let user_session = register_test_user(&app_ctx).await;
19✔
2113

2114
        let fair =
1✔
2115
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair", &mut test_data).await;
202✔
2116

2117
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
2118

1✔
2119
        let default_list_options = DatasetListOptions {
1✔
2120
            filter: None,
1✔
2121
            order: OrderBy::NameAsc,
1✔
2122
            offset: 0,
1✔
2123
            limit: 10,
1✔
2124
            tags: None,
1✔
2125
        };
1✔
2126

1✔
2127
        expire_in_tx_time_duration(
1✔
2128
            &app_ctx,
1✔
2129
            &user_session,
1✔
2130
            fair.dataset_id,
1✔
2131
            true,
1✔
2132
            Duration::seconds(3),
1✔
2133
        )
1✔
2134
        .await;
26✔
2135

2136
        let listing = db
1✔
2137
            .list_datasets(default_list_options.clone())
1✔
2138
            .await
11✔
2139
            .unwrap();
1✔
2140

1✔
2141
        assert_eq!(listing.len(), 1);
1✔
2142
        assert!(listing_not_deleted(listing.first().unwrap(), &fair));
1✔
2143

2144
        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1✔
2145

2146
        let listing = db
1✔
2147
            .list_datasets(default_list_options.clone())
1✔
2148
            .await
11✔
2149
            .unwrap();
1✔
2150

1✔
2151
        assert_eq!(listing.len(), 0);
1✔
2152
        assert!(dataset_deleted(
1✔
2153
            &db.load_dataset(&fair.dataset_id).await.unwrap(),
11✔
2154
            &fair
1✔
2155
        ));
2156
    }
1✔
2157

2158
    #[ge_context::test]
3✔
2159
    async fn it_updates_expiring_dataset(app_ctx: ProPostgresContext<NoTls>) {
1✔
2160
        let mut test_data = TestDataUploads::default();
1✔
2161
        let user_session = register_test_user(&app_ctx).await;
22✔
2162

2163
        let fair =
1✔
2164
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair", &mut test_data).await;
203✔
2165
        let fair2full =
1✔
2166
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair2full", &mut test_data)
1✔
2167
                .await;
27✔
2168

2169
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
2170

1✔
2171
        let default_list_options = DatasetListOptions {
1✔
2172
            filter: None,
1✔
2173
            order: OrderBy::NameAsc,
1✔
2174
            offset: 0,
1✔
2175
            limit: 10,
1✔
2176
            tags: None,
1✔
2177
        };
1✔
2178

1✔
2179
        expire_in_tx_time_duration(
1✔
2180
            &app_ctx,
1✔
2181
            &user_session,
1✔
2182
            fair.dataset_id,
1✔
2183
            true,
1✔
2184
            Duration::seconds(5),
1✔
2185
        )
1✔
2186
        .await;
26✔
2187
        expire_in_tx_time_duration(
1✔
2188
            &app_ctx,
1✔
2189
            &user_session,
1✔
2190
            fair.dataset_id,
1✔
2191
            true,
1✔
2192
            Duration::seconds(10),
1✔
2193
        )
1✔
2194
        .await;
21✔
2195
        expire_in_tx_time_duration(
1✔
2196
            &app_ctx,
1✔
2197
            &user_session,
1✔
2198
            fair2full.dataset_id,
1✔
2199
            true,
1✔
2200
            Duration::seconds(5),
1✔
2201
        )
1✔
2202
        .await;
21✔
2203
        expire_in_tx_time_duration(
1✔
2204
            &app_ctx,
1✔
2205
            &user_session,
1✔
2206
            fair2full.dataset_id,
1✔
2207
            false,
1✔
2208
            Duration::seconds(5),
1✔
2209
        )
1✔
2210
        .await;
21✔
2211

2212
        let listing = db
1✔
2213
            .list_datasets(default_list_options.clone())
1✔
2214
            .await
11✔
2215
            .unwrap();
1✔
2216

1✔
2217
        assert_eq!(listing.len(), 2);
1✔
2218
        assert!(listing_not_deleted(listing.first().unwrap(), &fair));
1✔
2219
        assert!(listing_not_deleted(listing.get(1).unwrap(), &fair2full));
1✔
2220

2221
        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1✔
2222

2223
        let listing = db
1✔
2224
            .list_datasets(default_list_options.clone())
1✔
2225
            .await
11✔
2226
            .unwrap();
1✔
2227

1✔
2228
        assert_eq!(listing.len(), 1);
1✔
2229
        assert!(listing_not_deleted(listing.first().unwrap(), &fair));
1✔
2230
        assert!(matches!(
1✔
2231
            db.load_dataset(&fair2full.dataset_id).await.unwrap_err(),
11✔
2232
            UnknownDatasetId
2233
        ));
2234

2235
        tokio::time::sleep(std::time::Duration::from_secs(10)).await;
1✔
2236

2237
        let listing = db
1✔
2238
            .list_datasets(default_list_options.clone())
1✔
2239
            .await
11✔
2240
            .unwrap();
1✔
2241
        assert_eq!(listing.len(), 0);
1✔
2242
        assert!(dataset_deleted(
1✔
2243
            &db.load_dataset(&fair.dataset_id).await.unwrap(),
11✔
2244
            &fair
1✔
2245
        ));
2246
    }
1✔
2247

2248
    #[allow(clippy::too_many_lines)]
2249
    #[ge_context::test]
3✔
2250
    async fn it_updates_expired_dataset(app_ctx: ProPostgresContext<NoTls>) {
1✔
2251
        let mut test_data = TestDataUploads::default();
1✔
2252
        let user_session = register_test_user(&app_ctx).await;
22✔
2253

2254
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
2255
        let default_list_options = DatasetListOptions {
1✔
2256
            filter: None,
1✔
2257
            order: OrderBy::NameAsc,
1✔
2258
            offset: 0,
1✔
2259
            limit: 10,
1✔
2260
            tags: None,
1✔
2261
        };
1✔
2262

2263
        let fair2full =
1✔
2264
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair2full", &mut test_data)
1✔
2265
                .await;
202✔
2266
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(fair2full.dataset_id))
1✔
2267
            .await
24✔
2268
            .unwrap();
1✔
2269
        assert!(dataset_deleted(
1✔
2270
            &db.load_dataset(&fair2full.dataset_id).await.unwrap(),
11✔
2271
            &fair2full
1✔
2272
        ));
2273

2274
        let admin_session = admin_login(&app_ctx).await;
6✔
2275
        let admin_ctx = app_ctx.session_context(admin_session.clone());
1✔
2276
        let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap();
16✔
2277
        assert_eq!(deleted, 1);
1✔
2278

2279
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(fair2full.dataset_id))
1✔
2280
            .await
21✔
2281
            .unwrap();
1✔
2282
        assert!(matches!(
1✔
2283
            db.load_dataset(&fair2full.dataset_id).await.unwrap_err(),
11✔
2284
            UnknownDatasetId
2285
        ));
2286

2287
        let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap();
14✔
2288
        assert_eq!(deleted, 1);
1✔
2289

2290
        assert!(db
1✔
2291
            .expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(fair2full.dataset_id))
1✔
2292
            .await
4✔
2293
            .is_err());
1✔
2294

2295
        let fair2available =
1✔
2296
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair2available", &mut test_data)
1✔
2297
                .await;
28✔
2298

2299
        expire_in_tx_time_duration(
1✔
2300
            &app_ctx,
1✔
2301
            &user_session,
1✔
2302
            fair2available.dataset_id,
1✔
2303
            true,
1✔
2304
            Duration::seconds(3),
1✔
2305
        )
1✔
2306
        .await;
21✔
2307
        db.expire_uploaded_dataset(ChangeDatasetExpiration::unset_expire(
1✔
2308
            fair2available.dataset_id,
1✔
2309
        ))
1✔
2310
        .await
19✔
2311
        .unwrap();
1✔
2312

1✔
2313
        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1✔
2314

2315
        let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap();
14✔
2316
        assert_eq!(deleted, 0);
1✔
2317

2318
        let listing = db
1✔
2319
            .list_datasets(default_list_options.clone())
1✔
2320
            .await
11✔
2321
            .unwrap();
1✔
2322
        assert_eq!(listing.len(), 1);
1✔
2323
        assert!(listing_not_deleted(
1✔
2324
            listing.first().unwrap(),
1✔
2325
            &fair2available
1✔
2326
        ));
1✔
2327

2328
        assert!(dir_exists(&fair2available));
1✔
2329
        assert!(!dir_exists(&fair2full));
1✔
2330
    }
1✔
2331

2332
    #[ge_context::test]
3✔
2333
    async fn it_handles_dataset_status(app_ctx: ProPostgresContext<NoTls>) {
1✔
2334
        let mut test_data = TestDataUploads::default();
1✔
2335

2336
        let volume_dataset = add_test_volume_dataset(&app_ctx).await;
218✔
2337

2338
        let user_session = register_test_user(&app_ctx).await;
22✔
2339
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
2340

2341
        let access_status = db.get_dataset_access_status(&volume_dataset).await.unwrap();
25✔
2342
        assert!(matches!(
1✔
2343
            access_status.dataset_type,
1✔
2344
            DatasetType::NonUserUpload
2345
        ));
2346
        assert_eq!(access_status.permissions, vec![Permission::Read]);
1✔
2347

2348
        let user_dataset =
1✔
2349
            upload_and_add_point_dataset(&app_ctx, &user_session, "user_dataset", &mut test_data)
1✔
2350
                .await
32✔
2351
                .dataset_id;
2352

2353
        let access_status = db.get_dataset_access_status(&user_dataset).await.unwrap();
23✔
2354
        assert!(matches!(
1✔
2355
            access_status.dataset_type,
1✔
2356
            DatasetType::UserUpload(UploadedDatasetStatus::Available)
2357
        ));
2358
        has_read_and_owner_permissions(&access_status.permissions);
1✔
2359

2360
        let future_time = expire_in_tx_time_duration(
1✔
2361
            &app_ctx,
1✔
2362
            &user_session,
1✔
2363
            user_dataset,
1✔
2364
            true,
1✔
2365
            Duration::seconds(3),
1✔
2366
        )
1✔
2367
        .await;
21✔
2368
        let access_status = db.get_dataset_access_status(&user_dataset).await.unwrap();
23✔
2369
        assert!(matches!(
1✔
2370
            access_status.dataset_type,
1✔
2371
            DatasetType::UserUpload(UploadedDatasetStatus::Expires(ex)) if ex.deletion_timestamp.unwrap() == future_time
1✔
2372
        ));
2373
        has_read_and_owner_permissions(&access_status.permissions);
1✔
2374

1✔
2375
        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1✔
2376

2377
        let access_status = db.get_dataset_access_status(&user_dataset).await.unwrap();
23✔
2378
        assert!(matches!(
1✔
2379
            access_status.dataset_type,
1✔
2380
            DatasetType::UserUpload(UploadedDatasetStatus::Deleted(ex)) if ex.deletion_timestamp.unwrap() == future_time
1✔
2381
        ));
2382
        has_read_and_owner_permissions(&access_status.permissions);
1✔
2383

1✔
2384
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(user_dataset))
1✔
2385
            .await
21✔
2386
            .unwrap();
1✔
2387

2388
        let access_status = db.get_dataset_access_status(&user_dataset).await;
17✔
2389
        assert!(matches!(access_status, Err(UnknownDatasetId)));
1✔
2390
    }
1✔
2391

2392
    #[ge_context::test]
3✔
2393
    async fn it_handles_expiration_errors(app_ctx: ProPostgresContext<NoTls>) {
1✔
2394
        let mut test_data = TestDataUploads::default();
1✔
2395
        let user_session = register_test_user(&app_ctx).await;
20✔
2396

2397
        let current_time = get_db_timestamp(&app_ctx).await;
3✔
2398
        let future_time = current_time.add(Duration::hours(1));
1✔
2399
        let past_time = current_time.sub(Duration::hours(1));
1✔
2400

1✔
2401
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
2402

2403
        //Expire before current time
2404
        let test_dataset =
1✔
2405
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair2full", &mut test_data)
1✔
2406
                .await;
202✔
2407
        let err = db
1✔
2408
            .expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
2409
                test_dataset.dataset_id,
1✔
2410
                past_time,
1✔
2411
            ))
1✔
2412
            .await;
19✔
2413
        assert!(err.is_err());
1✔
2414
        assert!(matches!(err.unwrap_err(), ExpirationTimestampInPast { .. }));
1✔
2415

2416
        //Unset expire for non-expiring dataset
2417
        let err = db
1✔
2418
            .expire_uploaded_dataset(ChangeDatasetExpiration::unset_expire(
1✔
2419
                test_dataset.dataset_id,
1✔
2420
            ))
1✔
2421
            .await;
12✔
2422
        assert!(err.is_err());
1✔
2423
        assert!(matches!(err.unwrap_err(), IllegalDatasetStatus { .. }));
1✔
2424

2425
        //Expire already deleted
2426
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(
1✔
2427
            test_dataset.dataset_id,
1✔
2428
        ))
1✔
2429
        .await
19✔
2430
        .unwrap();
1✔
2431
        let err = db
1✔
2432
            .expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
2433
                test_dataset.dataset_id,
1✔
2434
                future_time,
1✔
2435
            ))
1✔
2436
            .await;
14✔
2437
        assert!(err.is_err());
1✔
2438
        assert!(matches!(err.unwrap_err(), IllegalExpirationUpdate { .. }));
1✔
2439

2440
        // Call meta data for deleted
2441
        let err: std::result::Result<
1✔
2442
            Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
1✔
2443
            geoengine_operators::error::Error,
1✔
2444
        > = db
1✔
2445
            .meta_data(&DataId::Internal {
1✔
2446
                dataset_id: test_dataset.dataset_id,
1✔
2447
            })
1✔
2448
            .await;
14✔
2449
        assert!(err.is_err());
1✔
2450
        assert!(matches!(err.unwrap_err(), DatasetDeleted { .. }));
1✔
2451

2452
        //Clear without admin permission
2453
        let err = db.clear_expired_datasets().await;
1✔
2454
        assert!(err.is_err());
1✔
2455
        assert!(matches!(err.unwrap_err(), PermissionDenied));
1✔
2456
    }
1✔
2457
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc