• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 9778355898

03 Jul 2024 01:01PM CUT coverage: 90.738% (+0.05%) from 90.687%
9778355898

Pull #970

github

web-flow
Merge 40c130305 into 9a33a5b13
Pull Request #970: FAIR dataset deletion

1374 of 1429 new or added lines in 10 files covered. (96.15%)

28 existing lines in 16 files now uncovered.

134479 of 148206 relevant lines covered (90.74%)

52200.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.55
/services/src/pro/datasets/postgres.rs
1
use crate::api::model::services::UpdateDataset;
2
use crate::datasets::listing::Provenance;
3
use crate::datasets::listing::{DatasetListOptions, DatasetListing, DatasetProvider};
4
use crate::datasets::listing::{OrderBy, ProvenanceOutput};
5
use crate::datasets::postgres::resolve_dataset_name_to_id;
6
use crate::datasets::storage::{Dataset, DatasetDb, DatasetStore, MetaDataDefinition};
7
use crate::datasets::upload::{delete_upload, FileId};
8
use crate::datasets::upload::{Upload, UploadDb, UploadId};
9
use crate::datasets::{AddDataset, DatasetIdAndName, DatasetName};
10
use crate::error::Error::{
11
    ExpirationTimestampInPast, IllegalDatasetStatus, IllegalExpirationUpdate, UnknownDatasetId,
12
};
13
use crate::error::{self, Error, Result};
14
use crate::pro::contexts::ProPostgresDb;
15
use crate::pro::datasets::storage::{
16
    ChangeDatasetExpiration, InternalUploadedDatasetStatus, UploadedDatasetStatus,
17
    UploadedUserDatasetStore,
18
};
19
use crate::pro::datasets::{Expiration, ExpirationChange};
20
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
21
use crate::pro::permissions::{Permission, RoleId};
22
use crate::projects::Symbology;
23
use crate::util::postgres::PostgresErrorExt;
24
use async_trait::async_trait;
25
use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
26
use bb8_postgres::tokio_postgres::Socket;
27
use geoengine_datatypes::dataset::{DataId, DatasetId};
28
use geoengine_datatypes::error::BoxedResultExt;
29
use geoengine_datatypes::primitives::RasterQueryRectangle;
30
use geoengine_datatypes::primitives::VectorQueryRectangle;
31
use geoengine_datatypes::util::Identifier;
32
use geoengine_operators::engine::{
33
    MetaData, MetaDataProvider, RasterResultDescriptor, TypedResultDescriptor,
34
    VectorResultDescriptor,
35
};
36
use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo;
37
use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset};
38
use postgres_types::{FromSql, ToSql};
39
use snafu::ensure;
40
use tokio_postgres::Transaction;
41

42
impl<Tls> DatasetDb for ProPostgresDb<Tls>
43
where
44
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
45
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
46
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
47
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
48
{
49
}
50

51
#[allow(clippy::too_many_lines)]
52
#[async_trait]
53
impl<Tls> DatasetProvider for ProPostgresDb<Tls>
54
where
55
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
56
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
57
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
58
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
59
{
60
    async fn list_datasets(&self, options: DatasetListOptions) -> Result<Vec<DatasetListing>> {
11✔
61
        self.update_datasets_status().await?;
90✔
62
        let conn = self.conn_pool.get().await?;
11✔
63

64
        let mut pos = 3;
11✔
65
        let order_sql = if options.order == OrderBy::NameAsc {
11✔
66
            "display_name ASC"
11✔
67
        } else {
68
            "display_name DESC"
×
69
        };
70

71
        let filter_sql = if options.filter.is_some() {
11✔
72
            pos += 1;
×
73
            format!("AND display_name ILIKE ${pos} ESCAPE '\\'")
×
74
        } else {
75
            String::new()
11✔
76
        };
77

78
        let (filter_tags_sql, filter_tags_list) = if let Some(filter_tags) = &options.tags {
11✔
79
            pos += 1;
×
80
            (format!("AND d.tags @> ${pos}::text[]"), filter_tags.clone())
×
81
        } else {
82
            ("AND NOT d.tags @> '{deleted}'::text[]".to_string(), vec![])
11✔
83
        };
84

85
        let stmt = conn
11✔
86
            .prepare(&format!(
11✔
87
                "
11✔
88
            SELECT 
11✔
89
                d.id,
11✔
90
                d.name,
11✔
91
                d.display_name,
11✔
92
                d.description,
11✔
93
                d.tags,
11✔
94
                d.source_operator,
11✔
95
                d.result_descriptor,
11✔
96
                d.symbology
11✔
97
            FROM 
11✔
98
                user_permitted_datasets p JOIN datasets d 
11✔
99
                    ON (p.dataset_id = d.id)
11✔
100
            WHERE 
11✔
101
                p.user_id = $1
11✔
102
                {filter_sql}
11✔
103
                {filter_tags_sql}
11✔
104
            ORDER BY {order_sql}
11✔
105
            LIMIT $2
11✔
106
            OFFSET $3;  
11✔
107
            ",
11✔
108
            ))
11✔
109
            .await?;
10✔
110

111
        let rows = match (options.filter, options.tags) {
11✔
112
            (Some(filter), Some(_)) => {
×
113
                conn.query(
×
114
                    &stmt,
×
115
                    &[
×
116
                        &self.session.user.id,
×
117
                        &i64::from(options.limit),
×
118
                        &i64::from(options.offset),
×
119
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
×
120
                        &filter_tags_list,
×
121
                    ],
×
122
                )
×
123
                .await?
×
124
            }
125
            (Some(filter), None) => {
×
126
                conn.query(
×
127
                    &stmt,
×
128
                    &[
×
129
                        &self.session.user.id,
×
130
                        &i64::from(options.limit),
×
131
                        &i64::from(options.offset),
×
132
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
×
133
                    ],
×
134
                )
×
135
                .await?
×
136
            }
137
            (None, Some(_)) => {
138
                conn.query(
×
139
                    &stmt,
×
140
                    &[
×
141
                        &self.session.user.id,
×
142
                        &i64::from(options.limit),
×
143
                        &i64::from(options.offset),
×
144
                        &filter_tags_list,
×
145
                    ],
×
146
                )
×
147
                .await?
×
148
            }
149
            (None, None) => {
150
                conn.query(
11✔
151
                    &stmt,
11✔
152
                    &[
11✔
153
                        &self.session.user.id,
11✔
154
                        &i64::from(options.limit),
11✔
155
                        &i64::from(options.offset),
11✔
156
                    ],
11✔
157
                )
11✔
158
                .await?
10✔
159
            }
160
        };
161

162
        Ok(rows
11✔
163
            .iter()
11✔
164
            .map(|row| {
12✔
165
                Result::<DatasetListing>::Ok(DatasetListing {
12✔
166
                    id: row.get(0),
12✔
167
                    name: row.get(1),
12✔
168
                    display_name: row.get(2),
12✔
169
                    description: row.get(3),
12✔
170
                    tags: row.get::<_, Option<_>>(4).unwrap_or_default(),
12✔
171
                    source_operator: row.get(5),
12✔
172
                    result_descriptor: row.get(6),
12✔
173
                    symbology: row.get(7),
12✔
174
                })
12✔
175
            })
12✔
176
            .filter_map(Result::ok)
11✔
177
            .collect())
11✔
178
    }
33✔
179

180
    async fn load_dataset(&self, dataset: &DatasetId) -> Result<Dataset> {
30✔
181
        self.update_dataset_status(dataset).await?;
196✔
182
        let conn = self.conn_pool.get().await?;
30✔
183
        let stmt = conn
30✔
184
            .prepare(
30✔
185
                "
30✔
186
            SELECT
30✔
187
                d.id,
30✔
188
                d.name,
30✔
189
                d.display_name,
30✔
190
                d.description,
30✔
191
                d.result_descriptor,
30✔
192
                d.source_operator,
30✔
193
                d.symbology,
30✔
194
                d.provenance,
30✔
195
                d.tags
30✔
196
            FROM 
30✔
197
                user_permitted_datasets p JOIN datasets d 
30✔
198
                    ON (p.dataset_id = d.id)
30✔
199
            WHERE 
30✔
200
                p.user_id = $1 AND d.id = $2
30✔
201
            LIMIT 
30✔
202
                1",
30✔
203
            )
30✔
204
            .await?;
108✔
205

206
        let row = conn
30✔
207
            .query_opt(&stmt, &[&self.session.user.id, dataset])
30✔
208
            .await?;
28✔
209

210
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
30✔
211

212
        Ok(Dataset {
19✔
213
            id: row.get(0),
19✔
214
            name: row.get(1),
19✔
215
            display_name: row.get(2),
19✔
216
            description: row.get(3),
19✔
217
            result_descriptor: row.get(4),
19✔
218
            source_operator: row.get(5),
19✔
219
            symbology: row.get(6),
19✔
220
            provenance: row.get(7),
19✔
221
            tags: row.get(8),
19✔
222
        })
19✔
223
    }
90✔
224

225
    async fn load_provenance(&self, dataset: &DatasetId) -> Result<ProvenanceOutput> {
3✔
226
        self.update_dataset_status(dataset).await?;
21✔
227
        let conn = self.conn_pool.get().await?;
3✔
228

229
        let stmt = conn
3✔
230
            .prepare(
3✔
231
                "
3✔
232
            SELECT 
3✔
233
                d.provenance 
3✔
234
            FROM 
3✔
235
                user_permitted_datasets p JOIN datasets d
3✔
236
                    ON(p.dataset_id = d.id)
3✔
237
            WHERE 
3✔
238
                p.user_id = $1 AND d.id = $2",
3✔
239
            )
3✔
240
            .await?;
3✔
241

242
        let row = conn
3✔
243
            .query_opt(&stmt, &[&self.session.user.id, dataset])
3✔
244
            .await?;
3✔
245

246
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
3✔
247

248
        Ok(ProvenanceOutput {
2✔
249
            data: (*dataset).into(),
2✔
250
            provenance: row.get(0),
2✔
251
        })
2✔
252
    }
9✔
253

254
    async fn load_loading_info(&self, dataset: &DatasetId) -> Result<MetaDataDefinition> {
×
NEW
255
        self.update_dataset_status(dataset).await?;
×
UNCOV
256
        let conn = self.conn_pool.get().await?;
×
257

258
        let stmt = conn
×
259
            .prepare(
×
260
                "
×
261
            SELECT 
×
262
                meta_data 
×
263
            FROM 
×
264
                user_permitted_datasets p JOIN datasets d
×
265
                    ON(p.dataset_id = d.id)
×
266
            WHERE 
×
267
                p.user_id = $1 AND d.id = $2",
×
268
            )
×
269
            .await?;
×
270

271
        let row = conn
×
272
            .query_one(&stmt, &[&self.session.user.id, dataset])
×
273
            .await?;
×
274

275
        Ok(row.get(0))
×
276
    }
×
277

278
    async fn resolve_dataset_name_to_id(
20✔
279
        &self,
20✔
280
        dataset_name: &DatasetName,
20✔
281
    ) -> Result<Option<DatasetId>> {
20✔
282
        self.update_datasets_status().await?;
156✔
283
        let conn = self.conn_pool.get().await?;
20✔
284
        resolve_dataset_name_to_id(&conn, dataset_name).await
36✔
285
    }
60✔
286

287
    async fn dataset_autocomplete_search(
4✔
288
        &self,
4✔
289
        tags: Option<Vec<String>>,
4✔
290
        search_string: String,
4✔
291
        limit: u32,
4✔
292
        offset: u32,
4✔
293
    ) -> Result<Vec<String>> {
4✔
294
        self.update_datasets_status().await?;
27✔
295
        let connection = self.conn_pool.get().await?;
4✔
296

297
        let limit = i64::from(limit);
4✔
298
        let offset = i64::from(offset);
4✔
299
        let search_string = format!(
4✔
300
            "%{}%",
4✔
301
            search_string.replace('%', "\\%").replace('_', "\\_")
4✔
302
        );
4✔
303

4✔
304
        let mut query_params: Vec<&(dyn ToSql + Sync)> =
4✔
305
            vec![&self.session.user.id, &limit, &offset, &search_string];
4✔
306

307
        let tags_clause = if let Some(tags) = &tags {
4✔
308
            query_params.push(tags);
2✔
309
            " AND tags @> $5::text[]".to_string()
2✔
310
        } else {
311
            String::new()
2✔
312
        };
313

314
        let stmt = connection
4✔
315
            .prepare(&format!(
4✔
316
                "
4✔
317
            SELECT 
4✔
318
                display_name
4✔
319
            FROM 
4✔
320
                user_permitted_datasets p JOIN datasets d ON (p.dataset_id = d.id)
4✔
321
            WHERE 
4✔
322
                p.user_id = $1
4✔
323
                AND display_name ILIKE $4 ESCAPE '\\'
4✔
324
                {tags_clause}
4✔
325
            ORDER BY display_name ASC
4✔
326
            LIMIT $2
4✔
327
            OFFSET $3;"
4✔
328
            ))
4✔
329
            .await?;
2✔
330

331
        let rows = connection.query(&stmt, &query_params).await?;
4✔
332

333
        Ok(rows.iter().map(|row| row.get(0)).collect())
4✔
334
    }
12✔
335
}
336

337
#[async_trait]
338
impl<Tls>
339
    MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
340
    for ProPostgresDb<Tls>
341
where
342
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
343
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
344
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
345
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
346
{
347
    async fn meta_data(
×
348
        &self,
×
349
        _id: &DataId,
×
350
    ) -> geoengine_operators::util::Result<
×
351
        Box<
×
352
            dyn MetaData<
×
353
                MockDatasetDataSourceLoadingInfo,
×
354
                VectorResultDescriptor,
×
355
                VectorQueryRectangle,
×
356
            >,
×
357
        >,
×
358
    > {
×
359
        Err(geoengine_operators::error::Error::NotYetImplemented)
×
360
    }
×
361
}
362

363
#[async_trait]
364
impl<Tls> MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
365
    for ProPostgresDb<Tls>
366
where
367
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
368
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
369
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
370
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
371
{
372
    async fn meta_data(
9✔
373
        &self,
9✔
374
        id: &DataId,
9✔
375
    ) -> geoengine_operators::util::Result<
9✔
376
        Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
9✔
377
    > {
9✔
378
        let id = id
9✔
379
            .internal()
9✔
380
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
9✔
381

382
        let mut conn = self.conn_pool.get().await.map_err(|e| {
9✔
383
            geoengine_operators::error::Error::MetaData {
×
384
                source: Box::new(e),
×
385
            }
×
386
        })?;
9✔
387
        let tx = conn.build_transaction().start().await.map_err(|e| {
9✔
388
            geoengine_operators::error::Error::MetaData {
×
389
                source: Box::new(e),
×
390
            }
×
391
        })?;
9✔
392

393
        if !self
9✔
394
            .has_permission_in_tx(id, Permission::Read, &tx)
9✔
395
            .await
22✔
396
            .map_err(|e| geoengine_operators::error::Error::MetaData {
9✔
397
                source: Box::new(e),
×
398
            })?
9✔
399
        {
400
            return Err(geoengine_operators::error::Error::PermissionDenied);
2✔
401
        };
7✔
402

403
        let uploaded_status = self.uploaded_dataset_status_in_tx(&id, &tx).await;
64✔
404
        if let Ok(status) = uploaded_status {
7✔
405
            if matches!(status, UploadedDatasetStatus::Deleted) {
2✔
406
                return Err(geoengine_operators::error::Error::DatasetDeleted {
1✔
407
                    id: id.to_string(),
1✔
408
                });
1✔
409
            }
1✔
410
        }
5✔
411

412
        let stmt = tx
6✔
413
            .prepare(
6✔
414
                "
6✔
415
        SELECT
6✔
416
            d.meta_data
6✔
417
        FROM
6✔
418
            user_permitted_datasets p JOIN datasets d
6✔
419
                ON (p.dataset_id = d.id)
6✔
420
        WHERE
6✔
421
            d.id = $1 AND p.user_id = $2",
6✔
422
            )
6✔
423
            .await
6✔
424
            .map_err(|e| geoengine_operators::error::Error::MetaData {
6✔
425
                source: Box::new(e),
×
426
            })?;
6✔
427

428
        let row = tx
6✔
429
            .query_one(&stmt, &[&id, &self.session.user.id])
6✔
430
            .await
6✔
431
            .map_err(|e| geoengine_operators::error::Error::MetaData {
6✔
432
                source: Box::new(e),
×
433
            })?;
6✔
434

435
        let meta_data: MetaDataDefinition = row.get("meta_data");
6✔
436

437
        let MetaDataDefinition::OgrMetaData(meta_data) = meta_data else {
6✔
438
            return Err(geoengine_operators::error::Error::MetaData {
×
439
                source: Box::new(geoengine_operators::error::Error::InvalidType {
×
440
                    expected: "OgrMetaData".to_string(),
×
441
                    found: meta_data.type_name().to_string(),
×
442
                }),
×
443
            });
×
444
        };
445

446
        tx.commit()
6✔
447
            .await
6✔
448
            .map_err(|e| geoengine_operators::error::Error::MetaData {
6✔
449
                source: Box::new(e),
×
450
            })?;
6✔
451

452
        Ok(Box::new(meta_data))
6✔
453
    }
27✔
454
}
455

456
#[async_trait]
457
impl<Tls> MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
458
    for ProPostgresDb<Tls>
459
where
460
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
461
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
462
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
463
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
464
{
465
    async fn meta_data(
8✔
466
        &self,
8✔
467
        id: &DataId,
8✔
468
    ) -> geoengine_operators::util::Result<
8✔
469
        Box<dyn MetaData<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>>,
8✔
470
    > {
8✔
471
        let id = id
8✔
472
            .internal()
8✔
473
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
8✔
474

475
        let mut conn = self.conn_pool.get().await.map_err(|e| {
8✔
476
            geoengine_operators::error::Error::MetaData {
×
477
                source: Box::new(e),
×
478
            }
×
479
        })?;
8✔
480
        let tx = conn.build_transaction().start().await.map_err(|e| {
8✔
481
            geoengine_operators::error::Error::MetaData {
×
482
                source: Box::new(e),
×
483
            }
×
484
        })?;
8✔
485

486
        if !self
8✔
487
            .has_permission_in_tx(id, Permission::Read, &tx)
8✔
488
            .await
17✔
489
            .map_err(|e| geoengine_operators::error::Error::MetaData {
8✔
490
                source: Box::new(e),
×
491
            })?
8✔
492
        {
493
            return Err(geoengine_operators::error::Error::PermissionDenied);
1✔
494
        };
7✔
495

496
        let uploaded_status = self.uploaded_dataset_status_in_tx(&id, &tx).await;
60✔
497
        if let Ok(status) = uploaded_status {
7✔
NEW
498
            if matches!(status, UploadedDatasetStatus::Deleted) {
×
NEW
499
                return Err(geoengine_operators::error::Error::DatasetDeleted {
×
NEW
500
                    id: id.to_string(),
×
NEW
501
                });
×
NEW
502
            }
×
503
        }
7✔
504

505
        let stmt = tx
7✔
506
            .prepare(
7✔
507
                "
7✔
508
            SELECT
7✔
509
                d.meta_data
7✔
510
            FROM
7✔
511
                user_permitted_datasets p JOIN datasets d
7✔
512
                    ON (p.dataset_id = d.id)
7✔
513
            WHERE
7✔
514
                d.id = $1 AND p.user_id = $2",
7✔
515
            )
7✔
516
            .await
7✔
517
            .map_err(|e| geoengine_operators::error::Error::MetaData {
7✔
518
                source: Box::new(e),
×
519
            })?;
7✔
520

521
        let row = tx
7✔
522
            .query_one(&stmt, &[&id, &self.session.user.id])
7✔
523
            .await
7✔
524
            .map_err(|e| geoengine_operators::error::Error::MetaData {
7✔
525
                source: Box::new(e),
×
526
            })?;
7✔
527

528
        let meta_data: MetaDataDefinition = row.get(0);
7✔
529

7✔
530
        Ok(match meta_data {
7✔
531
            MetaDataDefinition::GdalMetaDataRegular(m) => Box::new(m),
4✔
532
            MetaDataDefinition::GdalStatic(m) => Box::new(m),
1✔
533
            MetaDataDefinition::GdalMetaDataList(m) => Box::new(m),
1✔
534
            MetaDataDefinition::GdalMetadataNetCdfCf(m) => Box::new(m),
1✔
535
            _ => return Err(geoengine_operators::error::Error::DataIdTypeMissMatch),
×
536
        })
537
    }
24✔
538
}
539

540
#[async_trait]
541
pub trait PostgresStorable<Tls>: Send + Sync
542
where
543
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
544
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
545
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
546
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
547
{
548
    fn to_typed_metadata(&self) -> Result<DatasetMetaData>;
549
}
550

551
pub struct DatasetMetaData<'m> {
552
    pub meta_data: &'m MetaDataDefinition,
553
    pub result_descriptor: TypedResultDescriptor,
554
}
555

556
impl<Tls> PostgresStorable<Tls> for MetaDataDefinition
557
where
558
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
559
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
560
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
561
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
562
{
563
    fn to_typed_metadata(&self) -> Result<DatasetMetaData> {
×
564
        match self {
×
565
            MetaDataDefinition::MockMetaData(d) => Ok(DatasetMetaData {
×
566
                meta_data: self,
×
567
                result_descriptor: TypedResultDescriptor::from(d.result_descriptor.clone()),
×
568
            }),
×
569
            MetaDataDefinition::OgrMetaData(d) => Ok(DatasetMetaData {
×
570
                meta_data: self,
×
571
                result_descriptor: TypedResultDescriptor::from(d.result_descriptor.clone()),
×
572
            }),
×
573
            MetaDataDefinition::GdalMetaDataRegular(d) => Ok(DatasetMetaData {
×
574
                meta_data: self,
×
575
                result_descriptor: TypedResultDescriptor::from(d.result_descriptor.clone()),
×
576
            }),
×
577
            MetaDataDefinition::GdalStatic(d) => Ok(DatasetMetaData {
×
578
                meta_data: self,
×
579
                result_descriptor: TypedResultDescriptor::from(d.result_descriptor.clone()),
×
580
            }),
×
581
            MetaDataDefinition::GdalMetadataNetCdfCf(d) => Ok(DatasetMetaData {
×
582
                meta_data: self,
×
583
                result_descriptor: TypedResultDescriptor::from(d.result_descriptor.clone()),
×
584
            }),
×
585
            MetaDataDefinition::GdalMetaDataList(d) => Ok(DatasetMetaData {
×
586
                meta_data: self,
×
587
                result_descriptor: TypedResultDescriptor::from(d.result_descriptor.clone()),
×
588
            }),
×
589
        }
590
    }
×
591
}
592

593
#[async_trait]
594
impl<Tls> DatasetStore for ProPostgresDb<Tls>
595
where
596
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
597
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
598
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
599
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
600
{
601
    async fn add_dataset(
23✔
602
        &self,
23✔
603
        dataset: AddDataset,
23✔
604
        meta_data: MetaDataDefinition,
23✔
605
    ) -> Result<DatasetIdAndName> {
23✔
606
        let id = DatasetId::new();
23✔
607
        let name = dataset.name.unwrap_or_else(|| DatasetName {
23✔
608
            namespace: Some(self.session.user.id.to_string()),
12✔
609
            name: id.to_string(),
12✔
610
        });
23✔
611

23✔
612
        log::info!(
23✔
613
            "Adding dataset with name: {:?}, tags: {:?}",
×
614
            name,
615
            dataset.tags
616
        );
617

618
        self.check_namespace(&name)?;
23✔
619

620
        let typed_meta_data = meta_data.to_typed_metadata();
23✔
621

622
        let mut conn = self.conn_pool.get().await?;
23✔
623

624
        let tx = conn.build_transaction().start().await?;
23✔
625

626
        tx.execute(
23✔
627
            "
23✔
628
                INSERT INTO datasets (
23✔
629
                    id,
23✔
630
                    name,
23✔
631
                    display_name,
23✔
632
                    description,
23✔
633
                    source_operator,
23✔
634
                    result_descriptor,
23✔
635
                    meta_data,
23✔
636
                    symbology,
23✔
637
                    provenance,
23✔
638
                    tags
23✔
639
                )
23✔
640
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
23✔
641
            &[
23✔
642
                &id,
23✔
643
                &name,
23✔
644
                &dataset.display_name,
23✔
645
                &dataset.description,
23✔
646
                &dataset.source_operator,
23✔
647
                &typed_meta_data.result_descriptor,
23✔
648
                typed_meta_data.meta_data,
23✔
649
                &dataset.symbology,
23✔
650
                &dataset.provenance,
23✔
651
                &dataset.tags,
23✔
652
            ],
23✔
653
        )
23✔
654
        .await
2,675✔
655
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
23✔
656

657
        let stmt = tx
23✔
658
            .prepare(
23✔
659
                "
23✔
660
            INSERT INTO permissions (
23✔
661
                role_id,
23✔
662
                dataset_id,
23✔
663
                permission
23✔
664
            )
23✔
665
            VALUES ($1, $2, $3)",
23✔
666
            )
23✔
667
            .await?;
57✔
668

669
        tx.execute(
23✔
670
            &stmt,
23✔
671
            &[&RoleId::from(self.session.user.id), &id, &Permission::Owner],
23✔
672
        )
23✔
673
        .await?;
23✔
674

675
        tx.commit().await?;
23✔
676

677
        Ok(DatasetIdAndName { id, name })
23✔
678
    }
69✔
679

680
    async fn update_dataset(&self, dataset: DatasetId, update: UpdateDataset) -> Result<()> {
×
681
        let mut conn = self.conn_pool.get().await?;
×
682

683
        let tx = conn.build_transaction().start().await?;
×
684

685
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
686
            .await
×
687
            .boxed_context(crate::error::PermissionDb)?;
×
688

689
        tx.execute(
×
690
            "UPDATE datasets SET name = $2, display_name = $3, description = $4, tags = $5 WHERE id = $1;",
×
691
            &[
×
692
                &dataset,
×
693
                &update.name,
×
694
                &update.display_name,
×
695
                &update.description,
×
696
                &update.tags,
×
697
            ],
×
698
        )
×
699
        .await?;
×
700

701
        tx.commit().await?;
×
702

703
        Ok(())
×
704
    }
×
705

706
    async fn update_dataset_symbology(
×
707
        &self,
×
708
        dataset: DatasetId,
×
709
        symbology: &Symbology,
×
710
    ) -> Result<()> {
×
711
        let mut conn = self.conn_pool.get().await?;
×
712

713
        let tx = conn.build_transaction().start().await?;
×
714

715
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
716
            .await
×
717
            .boxed_context(crate::error::PermissionDb)?;
×
718

719
        tx.execute(
×
720
            "UPDATE datasets SET symbology = $2 WHERE id = $1;",
×
721
            &[&dataset, &symbology],
×
722
        )
×
723
        .await?;
×
724

725
        tx.commit().await?;
×
726

727
        Ok(())
×
728
    }
×
729

730
    async fn update_dataset_provenance(
×
731
        &self,
×
732
        dataset: DatasetId,
×
733
        provenance: &[Provenance],
×
734
    ) -> Result<()> {
×
735
        let mut conn = self.conn_pool.get().await?;
×
736

737
        let tx = conn.build_transaction().start().await?;
×
738

739
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
×
740
            .await
×
741
            .boxed_context(crate::error::PermissionDb)?;
×
742

743
        tx.execute(
×
744
            "UPDATE datasets SET provenance = $2 WHERE id = $1;",
×
745
            &[&dataset, &provenance],
×
746
        )
×
747
        .await?;
×
748

749
        tx.commit().await?;
×
750

751
        Ok(())
×
752
    }
×
753

754
    async fn delete_dataset(&self, dataset_id: DatasetId) -> Result<()> {
4✔
755
        let mut conn = self.conn_pool.get().await?;
4✔
756
        let tx = conn.build_transaction().start().await?;
4✔
757

758
        let _uploaded = self.uploaded_dataset_status_in_tx(&dataset_id, &tx).await;
33✔
759
        if let Err(error) = _uploaded {
4✔
760
            if matches!(error, UnknownDatasetId) {
3✔
761
                self.ensure_permission_in_tx(dataset_id.into(), Permission::Owner, &tx)
3✔
762
                    .await
4✔
763
                    .boxed_context(crate::error::PermissionDb)?;
3✔
764

765
                let stmt = tx
3✔
766
                    .prepare(
3✔
767
                        "
3✔
768
                SELECT
3✔
769
                    TRUE
3✔
770
                FROM
3✔
771
                    user_permitted_datasets p JOIN datasets d
3✔
772
                        ON (p.dataset_id = d.id)
3✔
773
                WHERE
3✔
774
                    d.id = $1 AND p.user_id = $2 AND p.permission = 'Owner';",
3✔
775
                    )
3✔
776
                    .await?;
2✔
777

778
                let rows = tx
3✔
779
                    .query(&stmt, &[&dataset_id, &self.session.user.id])
3✔
780
                    .await?;
2✔
781

782
                if rows.is_empty() {
3✔
NEW
783
                    return Err(Error::OperationRequiresOwnerPermission);
×
784
                }
3✔
785

786
                let stmt = tx.prepare("DELETE FROM datasets WHERE id = $1;").await?;
3✔
787

788
                tx.execute(&stmt, &[&dataset_id]).await?;
3✔
789

790
                tx.commit().await?;
3✔
791

792
                return Ok(());
3✔
NEW
793
            }
×
794
        }
1✔
795

796
        let expire_dataset = ChangeDatasetExpiration {
1✔
797
            dataset_id,
1✔
798
            expiration_change: ExpirationChange::SetExpire(Expiration {
1✔
799
                deletion_timestamp: None,
1✔
800
                delete_record: true,
1✔
801
                delete_data: false,
1✔
802
            }),
1✔
803
        };
1✔
804
        self.expire_uploaded_dataset(expire_dataset).await?;
23✔
805

806
        Ok(())
1✔
807
    }
12✔
808
}
809

810
#[async_trait]
811
impl<Tls> UploadDb for ProPostgresDb<Tls>
812
where
813
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
814
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
815
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
816
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
817
{
818
    async fn load_upload(&self, upload: UploadId) -> Result<Upload> {
8✔
819
        let conn = self.conn_pool.get().await?;
8✔
820

821
        let stmt = conn
8✔
822
            .prepare(
8✔
823
                "
8✔
824
            SELECT u.id, u.files 
8✔
825
            FROM uploads u JOIN user_uploads uu ON(u.id = uu.upload_id)
8✔
826
            WHERE u.id = $1 AND uu.user_id = $2",
8✔
827
            )
8✔
828
            .await?;
10✔
829

830
        let row = conn
8✔
831
            .query_one(&stmt, &[&upload, &self.session.user.id])
8✔
832
            .await?;
7✔
833

834
        Ok(Upload {
5✔
835
            id: row.get(0),
5✔
836
            files: row
5✔
837
                .get::<_, Vec<FileUpload>>(1)
5✔
838
                .into_iter()
5✔
839
                .map(Into::into)
5✔
840
                .collect(),
5✔
841
        })
5✔
842
    }
24✔
843

844
    async fn create_upload(&self, upload: Upload) -> Result<()> {
17✔
845
        let mut conn = self.conn_pool.get().await?;
25✔
846
        let tx = conn.build_transaction().start().await?;
17✔
847

848
        let stmt = tx
17✔
849
            .prepare("INSERT INTO uploads (id, files) VALUES ($1, $2)")
17✔
850
            .await?;
48✔
851

852
        tx.execute(
17✔
853
            &stmt,
17✔
854
            &[
17✔
855
                &upload.id,
17✔
856
                &upload
17✔
857
                    .files
17✔
858
                    .iter()
17✔
859
                    .map(FileUpload::from)
17✔
860
                    .collect::<Vec<_>>(),
17✔
861
            ],
17✔
862
        )
17✔
863
        .await?;
15✔
864

865
        let stmt = tx
17✔
866
            .prepare("INSERT INTO user_uploads (user_id, upload_id) VALUES ($1, $2)")
17✔
867
            .await?;
15✔
868

869
        tx.execute(&stmt, &[&self.session.user.id, &upload.id])
17✔
870
            .await?;
15✔
871

872
        tx.commit().await?;
18✔
873

874
        Ok(())
17✔
875
    }
51✔
876
}
877

878
#[derive(Debug, Clone, ToSql, FromSql)]
167✔
879
pub struct FileUpload {
880
    pub id: FileId,
881
    pub name: String,
882
    pub byte_size: i64,
883
}
884

885
impl From<crate::datasets::upload::FileUpload> for FileUpload {
886
    fn from(upload: crate::datasets::upload::FileUpload) -> Self {
×
887
        Self {
×
888
            id: upload.id,
×
889
            name: upload.name,
×
890
            byte_size: upload.byte_size as i64,
×
891
        }
×
892
    }
×
893
}
894

895
impl From<&crate::datasets::upload::FileUpload> for FileUpload {
896
    fn from(upload: &crate::datasets::upload::FileUpload) -> Self {
29✔
897
        Self {
29✔
898
            id: upload.id,
29✔
899
            name: upload.name.clone(),
29✔
900
            byte_size: upload.byte_size as i64,
29✔
901
        }
29✔
902
    }
29✔
903
}
904

905
impl From<FileUpload> for crate::datasets::upload::FileUpload {
906
    fn from(upload: FileUpload) -> Self {
17✔
907
        Self {
17✔
908
            id: upload.id,
17✔
909
            name: upload.name,
17✔
910
            byte_size: upload.byte_size as u64,
17✔
911
        }
17✔
912
    }
17✔
913
}
914

915
#[async_trait]
916
impl<Tls> UploadedUserDatasetStore for ProPostgresDb<Tls>
917
where
918
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
919
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
920
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
921
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
922
{
923
    async fn add_uploaded_dataset(
15✔
924
        &self,
15✔
925
        upload_id: UploadId,
15✔
926
        dataset: AddDataset,
15✔
927
        meta_data: MetaDataDefinition,
15✔
928
    ) -> Result<DatasetIdAndName> {
15✔
929
        let id = DatasetId::new();
15✔
930
        let name = dataset.name.unwrap_or_else(|| DatasetName {
15✔
931
            namespace: Some(self.session.user.id.to_string()),
3✔
932
            name: id.to_string(),
3✔
933
        });
15✔
934

15✔
935
        log::info!(
15✔
NEW
936
            "Adding dataset with name: {:?}, tags: {:?}",
×
937
            name,
938
            dataset.tags
939
        );
940

941
        if let Some(tags) = &dataset.tags {
15✔
942
            if tags.contains(&"deleted".to_string()) {
15✔
NEW
943
                log::warn!("Adding a new dataset with a deleted tag");
×
944
            }
15✔
NEW
945
        }
×
946

947
        self.check_namespace(&name)?;
15✔
948

949
        let typed_meta_data = meta_data.to_typed_metadata();
15✔
950

951
        let mut conn = self.conn_pool.get().await?;
15✔
952

953
        let tx = conn.build_transaction().start().await?;
15✔
954

955
        tx.execute(
15✔
956
            "
15✔
957
                INSERT INTO datasets (
15✔
958
                    id,
15✔
959
                    name,
15✔
960
                    display_name,
15✔
961
                    description,
15✔
962
                    source_operator,
15✔
963
                    result_descriptor,
15✔
964
                    meta_data,
15✔
965
                    symbology,
15✔
966
                    provenance,
15✔
967
                    tags
15✔
968
                )
15✔
969
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
15✔
970
            &[
15✔
971
                &id,
15✔
972
                &name,
15✔
973
                &dataset.display_name,
15✔
974
                &dataset.description,
15✔
975
                &dataset.source_operator,
15✔
976
                &typed_meta_data.result_descriptor,
15✔
977
                typed_meta_data.meta_data,
15✔
978
                &dataset.symbology,
15✔
979
                &dataset.provenance,
15✔
980
                &dataset.tags,
15✔
981
            ],
15✔
982
        )
15✔
983
        .await
1,199✔
984
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
15✔
985

986
        let stmt = tx
15✔
987
            .prepare(
15✔
988
                "
15✔
989
            INSERT INTO permissions (
15✔
990
                role_id,
15✔
991
                dataset_id,
15✔
992
                permission
15✔
993
            )
15✔
994
            VALUES ($1, $2, $3)",
15✔
995
            )
15✔
996
            .await?;
30✔
997

998
        tx.execute(
15✔
999
            &stmt,
15✔
1000
            &[&RoleId::from(self.session.user.id), &id, &Permission::Owner],
15✔
1001
        )
15✔
1002
        .await?;
14✔
1003

1004
        let stmt = tx
15✔
1005
            .prepare(
15✔
1006
                "
15✔
1007
            INSERT INTO uploaded_user_datasets (
15✔
1008
                user_id,
15✔
1009
                upload_id,
15✔
1010
                dataset_id,
15✔
1011
                status,
15✔
1012
                created,
15✔
1013
                expiration,
15✔
1014
                deleted,
15✔
1015
                delete_data,
15✔
1016
                delete_record
15✔
1017
            )
15✔
1018
            VALUES ($1, $2, $3, 'Available', CURRENT_TIMESTAMP, NULL, NULL, false, false)",
15✔
1019
            )
15✔
1020
            .await?;
14✔
1021

1022
        tx.execute(
15✔
1023
            &stmt,
15✔
1024
            &[&RoleId::from(self.session.user.id), &upload_id, &id],
15✔
1025
        )
15✔
1026
        .await?;
14✔
1027

1028
        tx.commit().await?;
15✔
1029

1030
        Ok(DatasetIdAndName { id, name })
15✔
1031
    }
45✔
1032

1033
    async fn expire_uploaded_dataset(&self, expire_dataset: ChangeDatasetExpiration) -> Result<()> {
28✔
1034
        let mut conn = self.conn_pool.get().await?;
28✔
1035
        let tx = conn.build_transaction().start().await?;
28✔
1036

1037
        self.ensure_permission_in_tx(expire_dataset.dataset_id.into(), Permission::Owner, &tx)
28✔
1038
            .await
63✔
1039
            .boxed_context(crate::error::PermissionDb)?;
28✔
1040

1041
        self.update_dataset_status_in_tx(&tx, &expire_dataset.dataset_id)
26✔
1042
            .await?;
101✔
1043

1044
        match expire_dataset.expiration_change {
26✔
1045
            ExpirationChange::SetExpire(expiration) => {
24✔
1046
                let num_changes = if let Some(delete_timestamp) = expiration.deletion_timestamp {
24✔
1047
                    let stmt = tx
12✔
1048
                        .prepare("
12✔
1049
                        UPDATE uploaded_user_datasets
12✔
1050
                        SET status = 'Expires', expiration = $2, delete_data = $3, delete_record = $4
12✔
1051
                        WHERE dataset_id = $1 AND $2 >= CURRENT_TIMESTAMP AND (status = 'Available' OR status = 'Expires');",
12✔
1052
                    ).await?;
12✔
1053
                    tx.execute(
12✔
1054
                        &stmt,
12✔
1055
                        &[
12✔
1056
                            &expire_dataset.dataset_id,
12✔
1057
                            &delete_timestamp,
12✔
1058
                            &expiration.delete_data,
12✔
1059
                            &expiration.delete_record,
12✔
1060
                        ],
12✔
1061
                    )
12✔
1062
                    .await?
11✔
1063
                } else {
1064
                    let stmt = tx.prepare("
12✔
1065
                        UPDATE uploaded_user_datasets
12✔
1066
                        SET status = 'Expires', expiration = CURRENT_TIMESTAMP, delete_data = $2, delete_record = $3
12✔
1067
                        WHERE dataset_id = $1 AND (status = 'Available' OR status = 'Expires');",
12✔
1068
                    ).await?;
12✔
1069
                    let num_expired = tx
12✔
1070
                        .execute(
12✔
1071
                            &stmt,
12✔
1072
                            &[
12✔
1073
                                &expire_dataset.dataset_id,
12✔
1074
                                &expiration.delete_data,
12✔
1075
                                &expiration.delete_record,
12✔
1076
                            ],
12✔
1077
                        )
12✔
1078
                        .await?;
12✔
1079

1080
                    if num_expired == 0 {
12✔
1081
                        let stmt = tx
4✔
1082
                            .prepare(
4✔
1083
                                "
4✔
1084
                            UPDATE uploaded_user_datasets
4✔
1085
                            SET delete_data = (CASE WHEN NOT delete_data THEN $2 ELSE true END),
4✔
1086
                                delete_record = (CASE WHEN NOT delete_record THEN $3 ELSE true END),
4✔
1087
                                status = 'UpdateExpired'
4✔
1088
                            WHERE dataset_id = $1 AND (status = 'Expired' OR status = 'Deleted')
4✔
1089
                                    AND (delete_data = false OR delete_record = false);",
4✔
1090
                            )
4✔
1091
                            .await?;
4✔
1092
                        tx.execute(
4✔
1093
                            &stmt,
4✔
1094
                            &[
4✔
1095
                                &expire_dataset.dataset_id,
4✔
1096
                                &expiration.delete_data,
4✔
1097
                                &expiration.delete_record,
4✔
1098
                            ],
4✔
1099
                        )
4✔
1100
                        .await?
4✔
1101
                    } else {
1102
                        num_expired
8✔
1103
                    }
1104
                };
1105

1106
                if num_changes == 0 {
24✔
1107
                    self.validate_expiration_request_in_tx(
3✔
1108
                        &tx,
3✔
1109
                        &expire_dataset.dataset_id,
3✔
1110
                        &expiration,
3✔
1111
                    )
3✔
1112
                    .await?;
10✔
1113
                };
21✔
1114
            }
1115
            ExpirationChange::UnsetExpire => {
1116
                let stmt = tx.prepare("
2✔
1117
                    UPDATE uploaded_user_datasets
2✔
1118
                    SET status = 'Available', expiration = NULL, delete_data = false, delete_record = false
2✔
1119
                    WHERE dataset_id = $1 AND status = 'Expires';",
2✔
1120
                ).await?;
2✔
1121
                let set_changes = tx.execute(&stmt, &[&expire_dataset.dataset_id]).await?;
2✔
1122
                if set_changes == 0 {
2✔
1123
                    return Err(IllegalDatasetStatus {
1✔
1124
                        status: "Requested dataset does not exist or does not have an expiration"
1✔
1125
                            .to_string(),
1✔
1126
                    });
1✔
1127
                }
1✔
1128
            }
1129
        }
1130
        self.update_dataset_status_in_tx(&tx, &expire_dataset.dataset_id)
22✔
1131
            .await?;
105✔
1132

1133
        tx.commit().await?;
22✔
1134

1135
        Ok(())
22✔
1136
    }
84✔
1137

1138
    async fn validate_expiration_request_in_tx(
3✔
1139
        &self,
3✔
1140
        tx: &Transaction,
3✔
1141
        dataset_id: &DatasetId,
3✔
1142
        expiration: &Expiration,
3✔
1143
    ) -> Result<()> {
3✔
1144
        let (status, delete_data, delete_record, legal_expiration) =
3✔
1145
            if let Some(timestamp) = expiration.deletion_timestamp {
3✔
1146
                let stmt = tx
2✔
1147
                    .prepare(
2✔
1148
                        "
2✔
1149
                    SELECT
2✔
1150
                        status,
2✔
1151
                        delete_data,
2✔
1152
                        delete_record,
2✔
1153
                        $2 >= CURRENT_TIMESTAMP as legal_expiration
2✔
1154
                    FROM
2✔
1155
                        uploaded_user_datasets
2✔
1156
                    WHERE
2✔
1157
                        dataset_id = $1;",
2✔
1158
                    )
2✔
1159
                    .await?;
4✔
1160
                let row = tx
2✔
1161
                    .query_opt(&stmt, &[&dataset_id, &timestamp])
2✔
1162
                    .await?
2✔
1163
                    .ok_or(UnknownDatasetId)?;
2✔
1164
                (
2✔
1165
                    row.get(0),
2✔
1166
                    row.get(1),
2✔
1167
                    row.get(2),
2✔
1168
                    row.get::<usize, bool>(3),
2✔
1169
                )
2✔
1170
            } else {
1171
                let stmt = tx
1✔
1172
                    .prepare(
1✔
1173
                        "
1✔
1174
                    SELECT
1✔
1175
                        status,
1✔
1176
                        delete_data,
1✔
1177
                        delete_record,
1✔
1178
                        TRUE as legal_expiration
1✔
1179
                    FROM
1✔
1180
                        uploaded_user_datasets
1✔
1181
                    WHERE
1✔
1182
                        dataset_id = $1;",
1✔
1183
                    )
1✔
1184
                    .await?;
3✔
1185
                let row = tx
1✔
1186
                    .query_opt(&stmt, &[&dataset_id])
1✔
1187
                    .await?
1✔
1188
                    .ok_or(UnknownDatasetId)?;
1✔
1189
                (
1✔
1190
                    row.get(0),
1✔
1191
                    row.get(1),
1✔
1192
                    row.get(2),
1✔
1193
                    row.get::<usize, bool>(3),
1✔
1194
                )
1✔
1195
            };
1196

1197
        match status {
3✔
1198
            InternalUploadedDatasetStatus::Available | InternalUploadedDatasetStatus::Expires => {
1199
                if !legal_expiration {
1✔
1200
                    return Err(ExpirationTimestampInPast);
1✔
NEW
1201
                }
×
1202
            }
1203
            InternalUploadedDatasetStatus::Expired
1204
            | InternalUploadedDatasetStatus::UpdateExpired
1205
            | InternalUploadedDatasetStatus::Deleted => {
1206
                let data_downgrade = delete_data && !expiration.delete_data;
2✔
1207
                let record_downgrade = delete_record && !expiration.delete_record;
2✔
1208
                if data_downgrade || record_downgrade {
2✔
1209
                    let data = if data_downgrade { " data" } else { "" };
1✔
1210
                    let record = if record_downgrade { " record" } else { "" };
1✔
1211
                    return Err(IllegalExpirationUpdate {
1✔
1212
                        reason: format!("Prior expiration deleted: {data} {record}"),
1✔
1213
                    });
1✔
1214
                }
1✔
1215
                if expiration.deletion_timestamp.is_some() {
1✔
1216
                    return Err(IllegalExpirationUpdate {
1✔
1217
                        reason: "Setting expiration after deletion".to_string(),
1✔
1218
                    });
1✔
NEW
1219
                }
×
1220
            }
1221
            InternalUploadedDatasetStatus::DeletedWithError => {
NEW
1222
                return Err(IllegalDatasetStatus {
×
NEW
1223
                    status: "Dataset was deleted, but an error occurred during deletion"
×
NEW
1224
                        .to_string(),
×
NEW
1225
                });
×
1226
            }
1227
        }
NEW
1228
        Ok(())
×
1229
    }
9✔
1230

1231
    async fn uploaded_dataset_status_in_tx(
18✔
1232
        &self,
18✔
1233
        dataset_id: &DatasetId,
18✔
1234
        tx: &Transaction,
18✔
1235
    ) -> Result<UploadedDatasetStatus> {
18✔
1236
        self.ensure_permission_in_tx((*dataset_id).into(), Permission::Read, tx)
18✔
1237
            .await
37✔
1238
            .boxed_context(crate::error::PermissionDb)?;
18✔
1239

1240
        self.update_dataset_status_in_tx(tx, dataset_id).await?;
68✔
1241

1242
        let stmt = tx
18✔
1243
            .prepare(
18✔
1244
                "
18✔
1245
            SELECT
18✔
1246
                status
18✔
1247
            FROM
18✔
1248
                uploaded_user_datasets
18✔
1249
            WHERE
18✔
1250
                dataset_id = $1;",
18✔
1251
            )
18✔
1252
            .await?;
35✔
1253

1254
        let result = tx
18✔
1255
            .query_opt(&stmt, &[&dataset_id])
18✔
1256
            .await?
17✔
1257
            .ok_or(error::Error::UnknownDatasetId)?;
18✔
1258

1259
        let internal_status: InternalUploadedDatasetStatus = result.get(0);
3✔
1260

3✔
1261
        Ok(internal_status.into())
3✔
1262
    }
54✔
1263

1264
    async fn update_dataset_status(&self, dataset_id: &DatasetId) -> Result<()> {
33✔
1265
        let mut conn = self.conn_pool.get().await?;
33✔
1266
        let tx = conn.build_transaction().start().await?;
33✔
1267
        self.update_dataset_status_in_tx(&tx, dataset_id).await?;
126✔
1268
        tx.commit().await?;
33✔
1269

1270
        Ok(())
33✔
1271
    }
99✔
1272

1273
    async fn update_dataset_status_in_tx(
99✔
1274
        &self,
99✔
1275
        tx: &Transaction,
99✔
1276
        dataset_id: &DatasetId,
99✔
1277
    ) -> Result<()> {
99✔
1278
        let mut newly_expired_datasets = 0;
99✔
1279

1280
        let delete_records = tx
99✔
1281
            .prepare(
99✔
1282
                "
99✔
1283
                DELETE FROM
99✔
1284
                    datasets
99✔
1285
                USING
99✔
1286
                    user_permitted_datasets p, uploaded_user_datasets u
99✔
1287
                WHERE
99✔
1288
                    p.user_id = $1 AND datasets.id = $2
99✔
1289
                        AND p.dataset_id = datasets.id AND u.dataset_id = datasets.id
99✔
1290
                        AND (u.status = 'Expires' OR u.status = 'UpdateExpired') AND u.expiration <= CURRENT_TIMESTAMP
99✔
1291
                        AND u.delete_record;",
99✔
1292
            )
99✔
1293
            .await?;
95✔
1294
        newly_expired_datasets += tx
99✔
1295
            .execute(&delete_records, &[&self.session.user.id, &dataset_id])
99✔
1296
            .await?;
94✔
1297

1298
        let tag_deletion = tx
99✔
1299
            .prepare(
99✔
1300
                "
99✔
1301
            UPDATE
99✔
1302
                datasets
99✔
1303
            SET
99✔
1304
                tags = tags || '{deleted}'
99✔
1305
            FROM
99✔
1306
                user_permitted_datasets p, uploaded_user_datasets u
99✔
1307
            WHERE
99✔
1308
                p.user_id = $1 AND datasets.id = $2
99✔
1309
                    AND p.dataset_id = datasets.id AND u.dataset_id = datasets.id
99✔
1310
                    AND u.status = 'Expires' AND u.expiration <= CURRENT_TIMESTAMP
99✔
1311
                    AND NOT u.delete_record;",
99✔
1312
            )
99✔
1313
            .await?;
94✔
1314
        newly_expired_datasets += tx
99✔
1315
            .execute(&tag_deletion, &[&self.session.user.id, &dataset_id])
99✔
1316
            .await?;
95✔
1317

1318
        if newly_expired_datasets > 0 {
99✔
1319
            let mark_deletion = tx
11✔
1320
                .prepare(
11✔
1321
                    "
11✔
1322
                UPDATE
11✔
1323
                    uploaded_user_datasets
11✔
1324
                SET
11✔
1325
                    status = 'Expired'
11✔
1326
                FROM
11✔
1327
                    user_permitted_datasets p
11✔
1328
                WHERE
11✔
1329
                    p.user_id = $1 AND uploaded_user_datasets.dataset_id = $2
11✔
1330
                        AND uploaded_user_datasets.dataset_id = p.dataset_id
11✔
1331
                        AND (status = 'Expires' OR status = 'UpdateExpired') AND expiration <= CURRENT_TIMESTAMP;",
11✔
1332
                )
11✔
1333
                .await?;
11✔
1334

1335
            tx.execute(&mark_deletion, &[&self.session.user.id, &dataset_id])
11✔
1336
                .await?;
11✔
1337
        }
88✔
1338

1339
        Ok(())
99✔
1340
    }
297✔
1341

1342
    async fn update_datasets_status(&self) -> Result<()> {
35✔
1343
        let mut conn = self.conn_pool.get().await?;
35✔
1344
        let tx = conn.build_transaction().start().await?;
35✔
1345
        self.update_datasets_status_in_tx(&tx).await?;
181✔
1346
        tx.commit().await?;
35✔
1347

1348
        Ok(())
35✔
1349
    }
105✔
1350

1351
    async fn update_datasets_status_in_tx(&self, tx: &Transaction) -> Result<()> {
38✔
1352
        if self.session.is_admin() {
38✔
1353
            let delete_records = tx
7✔
1354
                .prepare(
7✔
1355
                    "
7✔
1356
                DELETE FROM
7✔
1357
                    datasets
7✔
1358
                USING
7✔
1359
                    user_permitted_datasets p, uploaded_user_datasets u
7✔
1360
                WHERE
7✔
1361
                    u.dataset_id = datasets.id
7✔
1362
                        AND u.status = 'Expires' AND u.expiration <= CURRENT_TIMESTAMP
7✔
1363
                        AND u.delete_record;",
7✔
1364
                )
7✔
1365
                .await?;
6✔
1366
            tx.execute(&delete_records, &[]).await?;
7✔
1367

1368
            let tag_deletion = tx
7✔
1369
                .prepare(
7✔
1370
                    "
7✔
1371
                UPDATE
7✔
1372
                    datasets
7✔
1373
                SET
7✔
1374
                    tags = tags || '{deleted}'
7✔
1375
                FROM
7✔
1376
                    uploaded_user_datasets u
7✔
1377
                WHERE
7✔
1378
                    u.dataset_id = datasets.id
7✔
1379
                        AND u.status = 'Expires' AND u.expiration <= CURRENT_TIMESTAMP
7✔
1380
                        AND NOT u.delete_record;",
7✔
1381
                )
7✔
1382
                .await?;
6✔
1383
            tx.execute(&tag_deletion, &[]).await?;
7✔
1384

1385
            let mark_deletion = tx
7✔
1386
                .prepare(
7✔
1387
                    "
7✔
1388
                UPDATE
7✔
1389
                    uploaded_user_datasets
7✔
1390
                SET
7✔
1391
                    status = 'Expired'
7✔
1392
                WHERE
7✔
1393
                    (status = 'Expires' or status = 'UpdateExpired') AND expiration <= CURRENT_TIMESTAMP;",
7✔
1394
                )
7✔
1395
                .await?;
6✔
1396

1397
            tx.execute(&mark_deletion, &[]).await?;
7✔
1398
        } else {
1399
            let delete_records = tx
31✔
1400
                .prepare(
31✔
1401
                    "
31✔
1402
                DELETE FROM
31✔
1403
                    datasets
31✔
1404
                USING
31✔
1405
                    user_permitted_datasets p, uploaded_user_datasets u
31✔
1406
                WHERE
31✔
1407
                    p.user_id = $1 AND p.dataset_id = datasets.id AND u.dataset_id = datasets.id
31✔
1408
                        AND u.status = 'Expires' AND u.expiration <= CURRENT_TIMESTAMP
31✔
1409
                        AND u.delete_record;",
31✔
1410
                )
31✔
1411
                .await?;
26✔
1412
            tx.execute(&delete_records, &[&self.session.user.id])
31✔
1413
                .await?;
27✔
1414

1415
            let tag_deletion = tx
31✔
1416
                .prepare(
31✔
1417
                    "
31✔
1418
                UPDATE
31✔
1419
                    datasets
31✔
1420
                SET
31✔
1421
                    tags = tags || '{deleted}'
31✔
1422
                FROM
31✔
1423
                    user_permitted_datasets p, uploaded_user_datasets u
31✔
1424
                WHERE
31✔
1425
                    p.user_id = $1 AND p.dataset_id = datasets.id AND u.dataset_id = datasets.id
31✔
1426
                        AND u.status = 'Expires' AND u.expiration <= CURRENT_TIMESTAMP
31✔
1427
                        AND NOT u.delete_record;",
31✔
1428
                )
31✔
1429
                .await?;
27✔
1430
            tx.execute(&tag_deletion, &[&self.session.user.id]).await?;
31✔
1431

1432
            let mark_deletion = tx
31✔
1433
                .prepare(
31✔
1434
                    "
31✔
1435
                UPDATE
31✔
1436
                    uploaded_user_datasets
31✔
1437
                SET
31✔
1438
                    status = 'Expired'
31✔
1439
                FROM
31✔
1440
                    user_permitted_datasets p
31✔
1441
                WHERE
31✔
1442
                    p.user_id = $1 AND uploaded_user_datasets.dataset_id = p.dataset_id
31✔
1443
                        AND (status = 'Expires' OR status = 'UpdateExpired') AND expiration <= CURRENT_TIMESTAMP;",
31✔
1444
                )
31✔
1445
                .await?;
28✔
1446

1447
            tx.execute(&mark_deletion, &[&self.session.user.id]).await?;
31✔
1448
        }
1449
        Ok(())
38✔
1450
    }
114✔
1451

1452
    async fn clear_expired_datasets(&self) -> Result<usize> {
4✔
1453
        ensure!(self.session.is_admin(), error::PermissionDenied);
4✔
1454

1455
        let mut conn = self.conn_pool.get().await?;
3✔
1456
        let tx = conn.build_transaction().start().await?;
3✔
1457

1458
        self.update_datasets_status_in_tx(&tx).await?;
18✔
1459

1460
        let marked_datasets = tx
3✔
1461
            .prepare(
3✔
1462
                "
3✔
1463
                SELECT
3✔
1464
                    dataset_id, upload_id
3✔
1465
                FROM
3✔
1466
                    uploaded_user_datasets
3✔
1467
                WHERE
3✔
1468
                    status = 'Expired' AND delete_data;",
3✔
1469
            )
3✔
1470
            .await?;
3✔
1471

1472
        let rows = tx.query(&marked_datasets, &[]).await?;
3✔
1473

1474
        let mut deleted = vec![];
3✔
1475
        let mut deleted_with_error = vec![];
3✔
1476

1477
        for row in rows {
7✔
1478
            let dataset_id: DatasetId = row.get(0);
4✔
1479
            let upload_id = row.get(1);
4✔
1480
            let res = delete_upload(upload_id).await;
4✔
1481
            if let Err(error) = res {
4✔
NEW
1482
                log::error!("Error during deletion of upload {upload_id} from dataset {dataset_id}: {error}, marking as DeletedWithError");
×
NEW
1483
                deleted_with_error.push(upload_id);
×
1484
            } else {
4✔
1485
                deleted.push(upload_id);
4✔
1486
            }
4✔
1487
        }
1488

1489
        if !deleted.is_empty() {
3✔
1490
            let mark_deletion = tx
2✔
1491
                .prepare(
2✔
1492
                    "
2✔
1493
                UPDATE
2✔
1494
                    uploaded_user_datasets
2✔
1495
                SET
2✔
1496
                    status = 'Deleted'
2✔
1497
                WHERE
2✔
1498
                    status = 'Expired' AND delete_data AND upload_id = ANY($1);",
2✔
1499
                )
2✔
1500
                .await?;
2✔
1501
            tx.execute(&mark_deletion, &[&deleted]).await?;
2✔
1502
        }
1✔
1503

1504
        if !deleted_with_error.is_empty() {
3✔
NEW
1505
            let mark_error = tx
×
NEW
1506
                .prepare(
×
NEW
1507
                    "
×
NEW
1508
                UPDATE
×
NEW
1509
                    uploaded_user_datasets
×
NEW
1510
                SET
×
NEW
1511
                    status = 'DeletedWithError'
×
NEW
1512
                WHERE
×
NEW
1513
                    status = 'Expired' AND delete_data AND upload_id = ANY($1);",
×
NEW
1514
                )
×
NEW
1515
                .await?;
×
NEW
1516
            tx.execute(&mark_error, &[&deleted_with_error]).await?;
×
1517
        }
3✔
1518

1519
        tx.commit().await?;
3✔
1520

1521
        Ok(deleted.len())
3✔
1522
    }
12✔
1523
}
1524

1525
#[cfg(test)]
1526
mod tests {
1527
    use std::fs;
1528
    use std::ops::{Add, Sub};
1529
    use std::path::PathBuf;
1530

1531
    use super::*;
1532
    use crate::api::model::responses::IdResponse;
1533
    use crate::contexts::SessionId;
1534
    use crate::datasets::upload::UploadRootPath;
1535
    use crate::error::Error::PermissionDenied;
1536
    use crate::pro::users::{UserCredentials, UserRegistration};
1537
    use crate::pro::util::tests::get_db_timestamp;
1538
    use crate::pro::util::tests::{admin_login, send_pro_test_request};
1539
    use crate::util::tests::{SetMultipartBody, TestDataUploads};
1540
    use crate::{
1541
        contexts::{ApplicationContext, SessionContext},
1542
        pro::{
1543
            contexts::ProPostgresContext,
1544
            ge_context,
1545
            users::{UserAuth, UserSession},
1546
        },
1547
    };
1548
    use actix_web::http::header;
1549
    use actix_web::test;
1550
    use actix_web_httpauth::headers::authorization::Bearer;
1551
    use geoengine_datatypes::primitives::Duration;
1552
    use geoengine_datatypes::{
1553
        collections::VectorDataType,
1554
        primitives::{CacheTtlSeconds, FeatureDataType, Measurement},
1555
        spatial_reference::SpatialReference,
1556
    };
1557
    use geoengine_operators::error::Error::DatasetDeleted;
1558
    use geoengine_operators::{
1559
        engine::{StaticMetaData, VectorColumnInfo},
1560
        source::{
1561
            CsvHeader, FormatSpecifics, OgrSourceColumnSpec, OgrSourceDatasetTimeType,
1562
            OgrSourceDurationSpec, OgrSourceErrorSpec, OgrSourceTimeFormat,
1563
        },
1564
    };
1565
    use tokio_postgres::NoTls;
1566

1567
    #[ge_context::test]
3✔
1568
    async fn it_autocompletes_datasets(app_ctx: ProPostgresContext<NoTls>) {
1✔
1569
        let session_a = app_ctx.create_anonymous_session().await.unwrap();
1✔
1570
        let session_b = app_ctx.create_anonymous_session().await.unwrap();
3✔
1571

1✔
1572
        let db_a = app_ctx.session_context(session_a.clone()).db();
1✔
1573
        let db_b = app_ctx.session_context(session_b.clone()).db();
1✔
1574

1✔
1575
        add_single_dataset(&db_a, &session_a).await;
168✔
1576

1577
        assert_eq!(
1578
            db_a.dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0)
1✔
1579
                .await
12✔
1580
                .unwrap(),
1✔
1581
            vec!["Ogr Test"]
1✔
1582
        );
1583
        assert_eq!(
1584
            db_a.dataset_autocomplete_search(
1✔
1585
                Some(vec!["upload".to_string()]),
1✔
1586
                "Ogr".to_owned(),
1✔
1587
                10,
1✔
1588
                0
1✔
1589
            )
1✔
1590
            .await
12✔
1591
            .unwrap(),
1✔
1592
            vec!["Ogr Test"]
1✔
1593
        );
1594

1595
        // check that other user B cannot access datasets of user A
1596

1597
        assert!(db_b
1✔
1598
            .dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0)
1✔
1599
            .await
10✔
1600
            .unwrap()
1✔
1601
            .is_empty());
1✔
1602
        assert!(db_b
1✔
1603
            .dataset_autocomplete_search(Some(vec!["upload".to_string()]), "Ogr".to_owned(), 10, 0)
1✔
UNCOV
1604
            .await
×
1605
            .unwrap()
1✔
1606
            .is_empty());
1✔
1607
    }
1✔
1608

1609
    async fn add_single_dataset(db: &ProPostgresDb<NoTls>, session: &UserSession) {
1✔
1610
        let loading_info = OgrSourceDataset {
1✔
1611
            file_name: PathBuf::from("test.csv"),
1✔
1612
            layer_name: "test.csv".to_owned(),
1✔
1613
            data_type: Some(VectorDataType::MultiPoint),
1✔
1614
            time: OgrSourceDatasetTimeType::Start {
1✔
1615
                start_field: "start".to_owned(),
1✔
1616
                start_format: OgrSourceTimeFormat::Auto,
1✔
1617
                duration: OgrSourceDurationSpec::Zero,
1✔
1618
            },
1✔
1619
            default_geometry: None,
1✔
1620
            columns: Some(OgrSourceColumnSpec {
1✔
1621
                format_specifics: Some(FormatSpecifics::Csv {
1✔
1622
                    header: CsvHeader::Auto,
1✔
1623
                }),
1✔
1624
                x: "x".to_owned(),
1✔
1625
                y: None,
1✔
1626
                int: vec![],
1✔
1627
                float: vec![],
1✔
1628
                text: vec![],
1✔
1629
                bool: vec![],
1✔
1630
                datetime: vec![],
1✔
1631
                rename: None,
1✔
1632
            }),
1✔
1633
            force_ogr_time_filter: false,
1✔
1634
            force_ogr_spatial_filter: false,
1✔
1635
            on_error: OgrSourceErrorSpec::Ignore,
1✔
1636
            sql_query: None,
1✔
1637
            attribute_query: None,
1✔
1638
            cache_ttl: CacheTtlSeconds::default(),
1✔
1639
        };
1✔
1640

1✔
1641
        let meta_data = MetaDataDefinition::OgrMetaData(StaticMetaData::<
1✔
1642
            OgrSourceDataset,
1✔
1643
            VectorResultDescriptor,
1✔
1644
            VectorQueryRectangle,
1✔
1645
        > {
1✔
1646
            loading_info: loading_info.clone(),
1✔
1647
            result_descriptor: VectorResultDescriptor {
1✔
1648
                data_type: VectorDataType::MultiPoint,
1✔
1649
                spatial_reference: SpatialReference::epsg_4326().into(),
1✔
1650
                columns: [(
1✔
1651
                    "foo".to_owned(),
1✔
1652
                    VectorColumnInfo {
1✔
1653
                        data_type: FeatureDataType::Float,
1✔
1654
                        measurement: Measurement::Unitless,
1✔
1655
                    },
1✔
1656
                )]
1✔
1657
                .into_iter()
1✔
1658
                .collect(),
1✔
1659
                time: None,
1✔
1660
                bbox: None,
1✔
1661
            },
1✔
1662
            phantom: Default::default(),
1✔
1663
        });
1✔
1664

1✔
1665
        let dataset_name = DatasetName::new(Some(session.user.id.to_string()), "my_dataset");
1✔
1666

1✔
1667
        db.add_dataset(
1✔
1668
            AddDataset {
1✔
1669
                name: Some(dataset_name.clone()),
1✔
1670
                display_name: "Ogr Test".to_owned(),
1✔
1671
                description: "desc".to_owned(),
1✔
1672
                source_operator: "OgrSource".to_owned(),
1✔
1673
                symbology: None,
1✔
1674
                provenance: None,
1✔
1675
                tags: Some(vec!["upload".to_owned(), "test".to_owned()]),
1✔
1676
            },
1✔
1677
            meta_data,
1✔
1678
        )
1✔
1679
        .await
168✔
1680
        .unwrap();
1✔
1681
    }
1✔
1682

1683
    const TEST_POINT_DATASET_SOURCE_PATH: &str = "vector/data/points.fgb";
1684

1685
    struct TestDatasetDefinition {
1686
        meta_data: MetaDataDefinition,
1687
        dataset_name: DatasetName,
1688
    }
1689

1690
    struct UploadedTestDataset {
1691
        dataset_name: DatasetName,
1692
        dataset_id: DatasetId,
1693
        upload_id: UploadId,
1694
    }
1695

1696
    fn test_point_dataset(name_space: String, name: &str) -> TestDatasetDefinition {
12✔
1697
        let local_path = PathBuf::from(TEST_POINT_DATASET_SOURCE_PATH);
12✔
1698
        let file_name = local_path.file_name().unwrap().to_str().unwrap();
12✔
1699
        let loading_info = OgrSourceDataset {
12✔
1700
            file_name: PathBuf::from(file_name),
12✔
1701
            layer_name: file_name.to_owned(),
12✔
1702
            data_type: Some(VectorDataType::MultiPoint),
12✔
1703
            time: OgrSourceDatasetTimeType::None,
12✔
1704
            default_geometry: None,
12✔
1705
            columns: Some(OgrSourceColumnSpec {
12✔
1706
                format_specifics: None,
12✔
1707
                x: "x".to_owned(),
12✔
1708
                y: Some("y".to_owned()),
12✔
1709
                int: vec!["num".to_owned()],
12✔
1710
                float: vec![],
12✔
1711
                text: vec!["txt".to_owned()],
12✔
1712
                bool: vec![],
12✔
1713
                datetime: vec![],
12✔
1714
                rename: None,
12✔
1715
            }),
12✔
1716
            force_ogr_time_filter: false,
12✔
1717
            force_ogr_spatial_filter: false,
12✔
1718
            on_error: OgrSourceErrorSpec::Ignore,
12✔
1719
            sql_query: None,
12✔
1720
            attribute_query: None,
12✔
1721
            cache_ttl: CacheTtlSeconds::default(),
12✔
1722
        };
12✔
1723

12✔
1724
        let meta_data = MetaDataDefinition::OgrMetaData(StaticMetaData::<
12✔
1725
            OgrSourceDataset,
12✔
1726
            VectorResultDescriptor,
12✔
1727
            VectorQueryRectangle,
12✔
1728
        > {
12✔
1729
            loading_info: loading_info.clone(),
12✔
1730
            result_descriptor: VectorResultDescriptor {
12✔
1731
                data_type: VectorDataType::MultiPoint,
12✔
1732
                spatial_reference: SpatialReference::epsg_4326().into(),
12✔
1733
                columns: [
12✔
1734
                    (
12✔
1735
                        "num".to_owned(),
12✔
1736
                        VectorColumnInfo {
12✔
1737
                            data_type: FeatureDataType::Int,
12✔
1738
                            measurement: Measurement::Unitless,
12✔
1739
                        },
12✔
1740
                    ),
12✔
1741
                    (
12✔
1742
                        "txt".to_owned(),
12✔
1743
                        VectorColumnInfo {
12✔
1744
                            data_type: FeatureDataType::Text,
12✔
1745
                            measurement: Measurement::Unitless,
12✔
1746
                        },
12✔
1747
                    ),
12✔
1748
                ]
12✔
1749
                .into_iter()
12✔
1750
                .collect(),
12✔
1751
                time: None,
12✔
1752
                bbox: None,
12✔
1753
            },
12✔
1754
            phantom: Default::default(),
12✔
1755
        });
12✔
1756

12✔
1757
        let dataset_name = DatasetName::new(Some(name_space), name);
12✔
1758

12✔
1759
        TestDatasetDefinition {
12✔
1760
            meta_data,
12✔
1761
            dataset_name,
12✔
1762
        }
12✔
1763
    }
12✔
1764

1765
    async fn upload_point_dataset(
12✔
1766
        app_ctx: &ProPostgresContext<NoTls>,
12✔
1767
        session_id: SessionId,
12✔
1768
    ) -> UploadId {
12✔
1769
        let files =
12✔
1770
            vec![geoengine_datatypes::test_data!(TEST_POINT_DATASET_SOURCE_PATH).to_path_buf()];
12✔
1771

12✔
1772
        let req = actix_web::test::TestRequest::post()
12✔
1773
            .uri("/upload")
12✔
1774
            .append_header((header::AUTHORIZATION, Bearer::new(session_id.to_string())))
12✔
1775
            .set_multipart_files(&files);
12✔
1776

1777
        let res = send_pro_test_request(req, app_ctx.clone()).await;
286✔
1778
        assert_eq!(res.status(), 200);
12✔
1779
        let upload: IdResponse<UploadId> = test::read_body_json(res).await;
12✔
1780

1781
        upload.id
12✔
1782
    }
12✔
1783

1784
    async fn upload_and_add_point_dataset(
12✔
1785
        app_ctx: &ProPostgresContext<NoTls>,
12✔
1786
        user_session: &UserSession,
12✔
1787
        name: &str,
12✔
1788
        upload_dir: &mut TestDataUploads,
12✔
1789
    ) -> UploadedTestDataset {
12✔
1790
        let test_dataset = test_point_dataset(user_session.user.id.to_string(), name);
12✔
1791
        let upload_id = upload_point_dataset(app_ctx, user_session.id).await;
286✔
1792

1793
        let res = app_ctx
12✔
1794
            .session_context(user_session.clone())
12✔
1795
            .db()
12✔
1796
            .add_uploaded_dataset(
12✔
1797
                upload_id,
12✔
1798
                AddDataset {
12✔
1799
                    name: Some(test_dataset.dataset_name.clone()),
12✔
1800
                    display_name: "Ogr Test".to_owned(),
12✔
1801
                    description: "desc".to_owned(),
12✔
1802
                    source_operator: "OgrSource".to_owned(),
12✔
1803
                    symbology: None,
12✔
1804
                    provenance: None,
12✔
1805
                    tags: Some(vec!["upload".to_owned(), "test".to_owned()]),
12✔
1806
                },
12✔
1807
                test_dataset.meta_data.clone(),
12✔
1808
            )
12✔
1809
            .await
886✔
1810
            .unwrap();
12✔
1811

12✔
1812
        upload_dir.uploads.push(upload_id);
12✔
1813

12✔
1814
        UploadedTestDataset {
12✔
1815
            dataset_name: test_dataset.dataset_name,
12✔
1816
            dataset_id: res.id,
12✔
1817
            upload_id,
12✔
1818
        }
12✔
1819
    }
12✔
1820

1821
    fn listing_not_deleted(dataset: &DatasetListing, origin: &UploadedTestDataset) -> bool {
10✔
1822
        dataset.name == origin.dataset_name && !dataset.tags.contains(&"deleted".to_owned())
10✔
1823
    }
10✔
1824

1825
    fn dataset_deleted(dataset: &Dataset, origin: &UploadedTestDataset) -> bool {
8✔
1826
        let tags = dataset.tags.clone().unwrap();
8✔
1827
        let mut num_deleted = 0;
8✔
1828
        for tag in tags {
32✔
1829
            if tag == *"deleted" {
24✔
1830
                num_deleted += 1;
8✔
1831
            }
16✔
1832
        }
1833
        dataset.name == origin.dataset_name && num_deleted == 1
8✔
1834
    }
8✔
1835

1836
    fn dir_exists(origin: &UploadedTestDataset) -> bool {
12✔
1837
        let path = origin.upload_id.root_path().unwrap();
12✔
1838
        fs::read_dir(path).is_ok()
12✔
1839
    }
12✔
1840

1841
    async fn register_test_user(app_ctx: &ProPostgresContext<NoTls>) -> UserSession {
5✔
1842
        let _user_id = app_ctx
5✔
1843
            .register_user(UserRegistration {
5✔
1844
                email: "test@localhost".to_string(),
5✔
1845
                real_name: "Foo Bar".to_string(),
5✔
1846
                password: "test".to_string(),
5✔
1847
            })
5✔
1848
            .await
51✔
1849
            .unwrap();
5✔
1850

5✔
1851
        app_ctx
5✔
1852
            .login(UserCredentials {
5✔
1853
                email: "test@localhost".to_string(),
5✔
1854
                password: "test".to_string(),
5✔
1855
            })
5✔
1856
            .await
55✔
1857
            .unwrap()
5✔
1858
    }
5✔
1859

1860
    #[ge_context::test]
3✔
1861
    async fn it_deletes_datasets(app_ctx: ProPostgresContext<NoTls>) {
1✔
1862
        let mut test_data = TestDataUploads::default();
1✔
1863
        let user_session = register_test_user(&app_ctx).await;
22✔
1864

1865
        let available =
1✔
1866
            upload_and_add_point_dataset(&app_ctx, &user_session, "available", &mut test_data)
1✔
1867
                .await;
199✔
1868
        let fair =
1✔
1869
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair", &mut test_data).await;
27✔
1870
        let full =
1✔
1871
            upload_and_add_point_dataset(&app_ctx, &user_session, "full", &mut test_data).await;
24✔
1872
        let none =
1✔
1873
            upload_and_add_point_dataset(&app_ctx, &user_session, "none", &mut test_data).await;
6✔
1874

1875
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
1876

1✔
1877
        let default_list_options = DatasetListOptions {
1✔
1878
            filter: None,
1✔
1879
            order: OrderBy::NameAsc,
1✔
1880
            offset: 0,
1✔
1881
            limit: 10,
1✔
1882
            tags: None,
1✔
1883
        };
1✔
1884

1885
        let listing = db
1✔
1886
            .list_datasets(default_list_options.clone())
1✔
NEW
1887
            .await
×
1888
            .unwrap();
1✔
1889

1✔
1890
        assert_eq!(listing.len(), 4);
1✔
1891
        assert!(listing_not_deleted(listing.first().unwrap(), &available));
1✔
1892
        assert!(listing_not_deleted(listing.get(1).unwrap(), &fair));
1✔
1893
        assert!(listing_not_deleted(listing.get(2).unwrap(), &full));
1✔
1894
        assert!(listing_not_deleted(listing.get(3).unwrap(), &none));
1✔
1895

1896
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(fair.dataset_id))
1✔
1897
            .await
16✔
1898
            .unwrap();
1✔
1899
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_none(none.dataset_id))
1✔
1900
            .await
17✔
1901
            .unwrap();
1✔
1902
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(full.dataset_id))
1✔
1903
            .await
17✔
1904
            .unwrap();
1✔
1905

1906
        let listing = db
1✔
1907
            .list_datasets(default_list_options.clone())
1✔
1908
            .await
12✔
1909
            .unwrap();
1✔
1910

1✔
1911
        assert_eq!(listing.len(), 1);
1✔
1912
        assert!(listing_not_deleted(listing.first().unwrap(), &available));
1✔
1913
        assert!(dataset_deleted(
1914
            &db.load_dataset(&fair.dataset_id).await.unwrap(),
10✔
1915
            &fair
1✔
1916
        ));
1917
        assert!(matches!(
1918
            db.load_dataset(&full.dataset_id).await.unwrap_err(),
10✔
1919
            UnknownDatasetId
1920
        ));
1921
        assert!(dataset_deleted(
1922
            &db.load_dataset(&none.dataset_id).await.unwrap(),
10✔
1923
            &none
1✔
1924
        ));
1925

1926
        assert!(dir_exists(&available));
1✔
1927
        assert!(dir_exists(&fair));
1✔
1928
        assert!(dir_exists(&full));
1✔
1929
        assert!(dir_exists(&none));
1✔
1930

1931
        let admin_session = admin_login(&app_ctx).await;
11✔
1932
        let admin_ctx = app_ctx.session_context(admin_session.clone());
1✔
1933
        let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap();
15✔
1934

1✔
1935
        assert_eq!(deleted, 2);
1✔
1936
        assert!(dir_exists(&available));
1✔
1937
        assert!(!dir_exists(&fair));
1✔
1938
        assert!(!dir_exists(&full));
1✔
1939
        assert!(dir_exists(&none));
1✔
1940

1941
        let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap();
11✔
1942
        assert_eq!(deleted, 0);
1✔
1943
    }
1✔
1944

1945
    #[ge_context::test]
3✔
1946
    async fn it_expires_dataset(app_ctx: ProPostgresContext<NoTls>) {
1✔
1947
        let mut test_data = TestDataUploads::default();
1✔
1948
        let user_session = register_test_user(&app_ctx).await;
20✔
1949

1950
        let current_time = get_db_timestamp(&app_ctx).await;
3✔
1951
        let future_time = current_time.add(Duration::seconds(3));
1✔
1952

1953
        let fair =
1✔
1954
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair", &mut test_data).await;
203✔
1955

1956
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
1957

1✔
1958
        let default_list_options = DatasetListOptions {
1✔
1959
            filter: None,
1✔
1960
            order: OrderBy::NameAsc,
1✔
1961
            offset: 0,
1✔
1962
            limit: 10,
1✔
1963
            tags: None,
1✔
1964
        };
1✔
1965

1✔
1966
        db.expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
1967
            fair.dataset_id,
1✔
1968
            future_time,
1✔
1969
        ))
1✔
1970
        .await
16✔
1971
        .unwrap();
1✔
1972

1973
        let listing = db
1✔
1974
            .list_datasets(default_list_options.clone())
1✔
1975
            .await
12✔
1976
            .unwrap();
1✔
1977

1✔
1978
        assert_eq!(listing.len(), 1);
1✔
1979
        assert!(listing_not_deleted(listing.first().unwrap(), &fair));
1✔
1980

1981
        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1✔
1982

1983
        let listing = db
1✔
1984
            .list_datasets(default_list_options.clone())
1✔
1985
            .await
12✔
1986
            .unwrap();
1✔
1987

1✔
1988
        assert_eq!(listing.len(), 0);
1✔
1989
        assert!(dataset_deleted(
1990
            &db.load_dataset(&fair.dataset_id).await.unwrap(),
10✔
1991
            &fair
1✔
1992
        ));
1993
    }
1✔
1994

1995
    #[ge_context::test]
3✔
1996
    async fn it_updates_expiring_dataset(app_ctx: ProPostgresContext<NoTls>) {
1✔
1997
        let mut test_data = TestDataUploads::default();
1✔
1998
        let user_session = register_test_user(&app_ctx).await;
22✔
1999

2000
        let current_time = get_db_timestamp(&app_ctx).await;
3✔
2001
        let future_time_1 = current_time.add(Duration::seconds(2));
1✔
2002
        let future_time_2 = current_time.add(Duration::seconds(5));
1✔
2003

2004
        let fair =
1✔
2005
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair", &mut test_data).await;
202✔
2006
        let fair2full =
1✔
2007
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair2full", &mut test_data)
1✔
2008
                .await;
27✔
2009

2010
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
2011

1✔
2012
        let default_list_options = DatasetListOptions {
1✔
2013
            filter: None,
1✔
2014
            order: OrderBy::NameAsc,
1✔
2015
            offset: 0,
1✔
2016
            limit: 10,
1✔
2017
            tags: None,
1✔
2018
        };
1✔
2019

1✔
2020
        db.expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
2021
            fair.dataset_id,
1✔
2022
            future_time_1,
1✔
2023
        ))
1✔
2024
        .await
16✔
2025
        .unwrap();
1✔
2026
        db.expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
2027
            fair.dataset_id,
1✔
2028
            future_time_2,
1✔
2029
        ))
1✔
2030
        .await
15✔
2031
        .unwrap();
1✔
2032
        db.expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
2033
            fair2full.dataset_id,
1✔
2034
            future_time_1,
1✔
2035
        ))
1✔
2036
        .await
15✔
2037
        .unwrap();
1✔
2038
        db.expire_uploaded_dataset(ChangeDatasetExpiration::expire_none(
1✔
2039
            fair2full.dataset_id,
1✔
2040
            future_time_1,
1✔
2041
        ))
1✔
2042
        .await
15✔
2043
        .unwrap();
1✔
2044
        db.expire_uploaded_dataset(ChangeDatasetExpiration::expire_meta(
1✔
2045
            fair2full.dataset_id,
1✔
2046
            future_time_1,
1✔
2047
        ))
1✔
2048
        .await
15✔
2049
        .unwrap();
1✔
2050
        db.expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
2051
            fair2full.dataset_id,
1✔
2052
            future_time_1,
1✔
2053
        ))
1✔
2054
        .await
15✔
2055
        .unwrap();
1✔
2056
        db.expire_uploaded_dataset(ChangeDatasetExpiration::expire_full(
1✔
2057
            fair2full.dataset_id,
1✔
2058
            future_time_1,
1✔
2059
        ))
1✔
2060
        .await
15✔
2061
        .unwrap();
1✔
2062

2063
        let listing = db
1✔
2064
            .list_datasets(default_list_options.clone())
1✔
2065
            .await
12✔
2066
            .unwrap();
1✔
2067

1✔
2068
        assert_eq!(listing.len(), 2);
1✔
2069
        assert!(listing_not_deleted(listing.first().unwrap(), &fair));
1✔
2070
        assert!(listing_not_deleted(listing.get(1).unwrap(), &fair2full));
1✔
2071

2072
        tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1✔
2073

2074
        let listing = db
1✔
2075
            .list_datasets(default_list_options.clone())
1✔
2076
            .await
12✔
2077
            .unwrap();
1✔
2078

1✔
2079
        assert_eq!(listing.len(), 1);
1✔
2080
        assert!(listing_not_deleted(listing.first().unwrap(), &fair));
1✔
2081
        assert!(matches!(
2082
            db.load_dataset(&fair2full.dataset_id).await.unwrap_err(),
10✔
2083
            UnknownDatasetId
2084
        ));
2085

2086
        tokio::time::sleep(std::time::Duration::from_secs(3)).await;
1✔
2087

2088
        let listing = db
1✔
2089
            .list_datasets(default_list_options.clone())
1✔
2090
            .await
12✔
2091
            .unwrap();
1✔
2092
        assert_eq!(listing.len(), 0);
1✔
2093
        assert!(dataset_deleted(
2094
            &db.load_dataset(&fair.dataset_id).await.unwrap(),
10✔
2095
            &fair
1✔
2096
        ));
2097
    }
1✔
2098

2099
    #[allow(clippy::too_many_lines)]
2100
    #[ge_context::test]
3✔
2101
    async fn it_updates_expired_dataset(app_ctx: ProPostgresContext<NoTls>) {
1✔
2102
        let mut test_data = TestDataUploads::default();
1✔
2103
        let user_session = register_test_user(&app_ctx).await;
20✔
2104

2105
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
2106
        let default_list_options = DatasetListOptions {
1✔
2107
            filter: None,
1✔
2108
            order: OrderBy::NameAsc,
1✔
2109
            offset: 0,
1✔
2110
            limit: 10,
1✔
2111
            tags: None,
1✔
2112
        };
1✔
2113

2114
        let fair2full =
1✔
2115
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair2full", &mut test_data)
1✔
2116
                .await;
202✔
2117
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(fair2full.dataset_id))
1✔
2118
            .await
18✔
2119
            .unwrap();
1✔
2120
        assert!(dataset_deleted(
1✔
2121
            &db.load_dataset(&fair2full.dataset_id).await.unwrap(),
10✔
2122
            &fair2full
1✔
2123
        ));
2124
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_full(fair2full.dataset_id))
1✔
2125
            .await
19✔
2126
            .unwrap();
1✔
2127
        assert!(matches!(
1✔
2128
            db.load_dataset(&fair2full.dataset_id).await.unwrap_err(),
10✔
2129
            UnknownDatasetId
2130
        ));
2131

2132
        let none2fair =
1✔
2133
            upload_and_add_point_dataset(&app_ctx, &user_session, "none2fair", &mut test_data)
1✔
2134
                .await;
27✔
2135
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_none(none2fair.dataset_id))
1✔
2136
            .await
17✔
2137
            .unwrap();
1✔
2138
        assert!(dataset_deleted(
1✔
2139
            &db.load_dataset(&none2fair.dataset_id).await.unwrap(),
10✔
2140
            &none2fair
1✔
2141
        ));
2142
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(none2fair.dataset_id))
1✔
2143
            .await
18✔
2144
            .unwrap();
1✔
2145
        assert!(dataset_deleted(
1✔
2146
            &db.load_dataset(&none2fair.dataset_id).await.unwrap(),
10✔
2147
            &none2fair
1✔
2148
        ));
2149

2150
        let none2meta =
1✔
2151
            upload_and_add_point_dataset(&app_ctx, &user_session, "none2meta", &mut test_data)
1✔
2152
                .await;
27✔
2153
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_none(none2meta.dataset_id))
1✔
2154
            .await
17✔
2155
            .unwrap();
1✔
2156
        assert!(dataset_deleted(
1✔
2157
            &db.load_dataset(&none2meta.dataset_id).await.unwrap(),
10✔
2158
            &none2meta
1✔
2159
        ));
2160
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_meta(none2meta.dataset_id))
1✔
2161
            .await
19✔
2162
            .unwrap();
1✔
2163
        assert!(matches!(
1✔
2164
            db.load_dataset(&none2meta.dataset_id).await.unwrap_err(),
10✔
2165
            UnknownDatasetId
2166
        ));
2167

2168
        assert!(db
1✔
2169
            .expire_uploaded_dataset(ChangeDatasetExpiration::delete_none(none2fair.dataset_id))
1✔
2170
            .await
16✔
2171
            .is_err());
1✔
2172
        assert!(db
1✔
2173
            .expire_uploaded_dataset(ChangeDatasetExpiration::delete_none(fair2full.dataset_id))
1✔
2174
            .await
4✔
2175
            .is_err());
1✔
2176
        assert!(db
1✔
2177
            .expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(fair2full.dataset_id))
1✔
2178
            .await
4✔
2179
            .is_err());
1✔
2180

2181
        let current_time = get_db_timestamp(&app_ctx).await;
3✔
2182
        let future_time = current_time.add(Duration::seconds(2));
1✔
2183
        let fair2available =
1✔
2184
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair2available", &mut test_data)
1✔
2185
                .await;
26✔
2186
        db.expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
2187
            fair2available.dataset_id,
1✔
2188
            future_time,
1✔
2189
        ))
1✔
2190
        .await
15✔
2191
        .unwrap();
1✔
2192
        db.expire_uploaded_dataset(ChangeDatasetExpiration::unset_expire(
1✔
2193
            fair2available.dataset_id,
1✔
2194
        ))
1✔
2195
        .await
15✔
2196
        .unwrap();
1✔
2197

1✔
2198
        tokio::time::sleep(std::time::Duration::from_secs(5)).await;
1✔
2199

2200
        let admin_session = admin_login(&app_ctx).await;
11✔
2201
        let admin_ctx = app_ctx.session_context(admin_session.clone());
1✔
2202
        let deleted = admin_ctx.db().clear_expired_datasets().await.unwrap();
15✔
2203
        assert_eq!(deleted, 2);
1✔
2204

2205
        let listing = db
1✔
2206
            .list_datasets(default_list_options.clone())
1✔
2207
            .await
12✔
2208
            .unwrap();
1✔
2209
        assert_eq!(listing.len(), 1);
1✔
2210
        assert!(listing_not_deleted(
1✔
2211
            listing.first().unwrap(),
1✔
2212
            &fair2available
1✔
2213
        ));
1✔
2214

2215
        assert!(dir_exists(&fair2available));
1✔
2216
        assert!(!dir_exists(&fair2full));
1✔
2217
        assert!(!dir_exists(&none2fair));
1✔
2218
        assert!(dir_exists(&none2meta));
1✔
2219
    }
1✔
2220

2221
    #[ge_context::test]
3✔
2222
    async fn it_handles_expiration_errors(app_ctx: ProPostgresContext<NoTls>) {
1✔
2223
        let mut test_data = TestDataUploads::default();
1✔
2224
        let user_session = register_test_user(&app_ctx).await;
22✔
2225

2226
        let current_time = get_db_timestamp(&app_ctx).await;
3✔
2227
        let future_time = current_time.add(Duration::hours(1));
1✔
2228
        let past_time = current_time.sub(Duration::hours(1));
1✔
2229

1✔
2230
        let db = app_ctx.session_context(user_session.clone()).db();
1✔
2231

2232
        //Expire before current time
2233
        let test_dataset =
1✔
2234
            upload_and_add_point_dataset(&app_ctx, &user_session, "fair2full", &mut test_data)
1✔
2235
                .await;
202✔
2236
        let err = db
1✔
2237
            .expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
2238
                test_dataset.dataset_id,
1✔
2239
                past_time,
1✔
2240
            ))
1✔
2241
            .await;
15✔
2242
        assert!(err.is_err());
1✔
2243
        assert!(matches!(err.unwrap_err(), ExpirationTimestampInPast));
1✔
2244

2245
        //Unset expire for not-expiring dataset
2246
        let err = db
1✔
2247
            .expire_uploaded_dataset(ChangeDatasetExpiration::unset_expire(
1✔
2248
                test_dataset.dataset_id,
1✔
2249
            ))
1✔
2250
            .await;
10✔
2251
        assert!(err.is_err());
1✔
2252
        assert!(matches!(err.unwrap_err(), IllegalDatasetStatus { .. }));
1✔
2253

2254
        //Expire already deleted
2255
        db.expire_uploaded_dataset(ChangeDatasetExpiration::delete_fair(
1✔
2256
            test_dataset.dataset_id,
1✔
2257
        ))
1✔
2258
        .await
17✔
2259
        .unwrap();
1✔
2260
        let err = db
1✔
2261
            .expire_uploaded_dataset(ChangeDatasetExpiration::expire_fair(
1✔
2262
                test_dataset.dataset_id,
1✔
2263
                future_time,
1✔
2264
            ))
1✔
2265
            .await;
12✔
2266
        assert!(err.is_err());
1✔
2267
        assert!(matches!(err.unwrap_err(), IllegalExpirationUpdate { .. }));
1✔
2268

2269
        // Call meta data for deleted
2270
        let err: std::result::Result<
1✔
2271
            Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
1✔
2272
            geoengine_operators::error::Error,
1✔
2273
        > = db
1✔
2274
            .meta_data(&DataId::Internal {
1✔
2275
                dataset_id: test_dataset.dataset_id,
1✔
2276
            })
1✔
2277
            .await;
12✔
2278
        assert!(err.is_err());
1✔
2279
        assert!(matches!(err.unwrap_err(), DatasetDeleted { .. }));
1✔
2280

2281
        //Clear without admin permission
2282
        let err = db.clear_expired_datasets().await;
1✔
2283
        assert!(err.is_err());
1✔
2284
        assert!(matches!(err.unwrap_err(), PermissionDenied));
1✔
2285
    }
1✔
2286
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc