• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12910820463

22 Jan 2025 02:53PM UTC coverage: 90.101% (+0.04%) from 90.061%
12910820463

Pull #1009

github

web-flow
Merge d52fb841c into df8c694c8
Pull Request #1009: merge data providers and migrations

1303 of 1476 new or added lines in 22 files covered. (88.28%)

132 existing lines in 9 files now uncovered.

125778 of 139596 relevant lines covered (90.1%)

57663.0 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.06
/services/src/datasets/postgres.rs
1
use crate::api::model::services::UpdateDataset;
2
use crate::contexts::PostgresDb;
3
use crate::datasets::listing::Provenance;
4
use crate::datasets::listing::{DatasetListOptions, DatasetListing, DatasetProvider};
5
use crate::datasets::listing::{OrderBy, ProvenanceOutput};
6
use crate::datasets::storage::{Dataset, DatasetDb, DatasetStore, MetaDataDefinition};
7
use crate::datasets::upload::FileId;
8
use crate::datasets::upload::{Upload, UploadDb, UploadId};
9
use crate::datasets::{AddDataset, DatasetIdAndName, DatasetName};
10
use crate::error::{self, Error, Result};
11
use crate::permissions::TxPermissionDb;
12
use crate::permissions::{Permission, RoleId};
13
use crate::projects::Symbology;
14
use crate::util::postgres::PostgresErrorExt;
15
use async_trait::async_trait;
16
use bb8_postgres::bb8::PooledConnection;
17
use bb8_postgres::tokio_postgres::tls::{MakeTlsConnect, TlsConnect};
18
use bb8_postgres::tokio_postgres::Socket;
19
use bb8_postgres::PostgresConnectionManager;
20
use geoengine_datatypes::dataset::{DataId, DatasetId};
21
use geoengine_datatypes::error::BoxedResultExt;
22
use geoengine_datatypes::primitives::RasterQueryRectangle;
23
use geoengine_datatypes::primitives::VectorQueryRectangle;
24
use geoengine_datatypes::util::Identifier;
25
use geoengine_operators::engine::TypedResultDescriptor;
26
use geoengine_operators::engine::{
27
    MetaData, MetaDataProvider, RasterResultDescriptor, VectorResultDescriptor,
28
};
29
use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo;
30
use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset};
31
use postgres_types::{FromSql, ToSql};
32

33
pub async fn resolve_dataset_name_to_id<Tls>(
86✔
34
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
86✔
35
    dataset_name: &DatasetName,
86✔
36
) -> Result<Option<DatasetId>>
86✔
37
where
86✔
38
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
86✔
39
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
86✔
40
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
86✔
41
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
86✔
42
{
86✔
43
    let stmt = conn
86✔
44
        .prepare(
86✔
45
            "SELECT id
86✔
46
        FROM datasets
86✔
47
        WHERE name = $1::\"DatasetName\"",
86✔
48
        )
86✔
49
        .await?;
86✔
50

51
    let row_option = conn.query_opt(&stmt, &[&dataset_name]).await?;
86✔
52

53
    Ok(row_option.map(|row| row.get(0)))
86✔
54
}
86✔
55

56
#[async_trait]
57
pub trait PostgresStorable<Tls>: Send + Sync
58
where
59
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
60
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
61
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
62
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
63
{
64
    fn to_typed_metadata(&self) -> Result<DatasetMetaData>;
65
}
66

67
pub struct DatasetMetaData<'m> {
68
    pub meta_data: &'m MetaDataDefinition,
69
    pub result_descriptor: TypedResultDescriptor,
70
}
71

72
#[derive(Debug, Clone, ToSql, FromSql)]
49✔
73
pub struct FileUpload {
74
    pub id: FileId,
75
    pub name: String,
76
    pub byte_size: i64,
77
}
78

79
impl From<crate::datasets::upload::FileUpload> for FileUpload {
80
    fn from(upload: crate::datasets::upload::FileUpload) -> Self {
×
81
        Self {
×
82
            id: upload.id,
×
83
            name: upload.name,
×
84
            byte_size: upload.byte_size as i64,
×
85
        }
×
86
    }
×
87
}
88

89
impl From<&crate::datasets::upload::FileUpload> for FileUpload {
90
    fn from(upload: &crate::datasets::upload::FileUpload) -> Self {
17✔
91
        Self {
17✔
92
            id: upload.id,
17✔
93
            name: upload.name.clone(),
17✔
94
            byte_size: upload.byte_size as i64,
17✔
95
        }
17✔
96
    }
17✔
97
}
98

99
impl From<FileUpload> for crate::datasets::upload::FileUpload {
100
    fn from(upload: FileUpload) -> Self {
13✔
101
        Self {
13✔
102
            id: upload.id,
13✔
103
            name: upload.name,
13✔
104
            byte_size: upload.byte_size as u64,
13✔
105
        }
13✔
106
    }
13✔
107
}
108

109
impl<Tls> DatasetDb for PostgresDb<Tls>
110
where
111
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
112
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
113
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
114
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
115
{
116
}
117

118
#[allow(clippy::too_many_lines)]
119
#[async_trait]
120
impl<Tls> DatasetProvider for PostgresDb<Tls>
121
where
122
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
123
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
124
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
125
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
126
{
127
    async fn list_datasets(&self, options: DatasetListOptions) -> Result<Vec<DatasetListing>> {
7✔
128
        let conn = self.conn_pool.get().await?;
7✔
129

130
        let mut pos = 3;
7✔
131
        let order_sql = if options.order == OrderBy::NameAsc {
7✔
132
            "display_name ASC"
7✔
133
        } else {
NEW
134
            "display_name DESC"
×
135
        };
136

137
        let filter_sql = if options.filter.is_some() {
7✔
138
            pos += 1;
3✔
139
            format!("AND display_name ILIKE ${pos} ESCAPE '\\'")
3✔
140
        } else {
141
            String::new()
4✔
142
        };
143

144
        let (filter_tags_sql, filter_tags_list) = if let Some(filter_tags) = &options.tags {
7✔
145
            pos += 1;
1✔
146
            (format!("AND d.tags @> ${pos}::text[]"), filter_tags.clone())
1✔
147
        } else {
148
            (String::new(), vec![])
6✔
149
        };
150

151
        let stmt = conn
7✔
152
            .prepare(&format!(
7✔
153
                "
7✔
154
            SELECT 
7✔
155
                d.id,
7✔
156
                d.name,
7✔
157
                d.display_name,
7✔
158
                d.description,
7✔
159
                d.tags,
7✔
160
                d.source_operator,
7✔
161
                d.result_descriptor,
7✔
162
                d.symbology
7✔
163
            FROM 
7✔
164
                user_permitted_datasets p JOIN datasets d 
7✔
165
                    ON (p.dataset_id = d.id)
7✔
166
            WHERE 
7✔
167
                p.user_id = $1
7✔
168
                {filter_sql}
7✔
169
                {filter_tags_sql}
7✔
170
            ORDER BY {order_sql}
7✔
171
            LIMIT $2
7✔
172
            OFFSET $3;  
7✔
173
            ",
7✔
174
            ))
7✔
175
            .await?;
7✔
176

177
        let rows = match (options.filter, options.tags) {
7✔
178
            (Some(filter), Some(_)) => {
1✔
179
                conn.query(
1✔
180
                    &stmt,
1✔
181
                    &[
1✔
182
                        &self.session.user.id,
1✔
183
                        &i64::from(options.limit),
1✔
184
                        &i64::from(options.offset),
1✔
185
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
1✔
186
                        &filter_tags_list,
1✔
187
                    ],
1✔
188
                )
1✔
189
                .await?
1✔
190
            }
191
            (Some(filter), None) => {
2✔
192
                conn.query(
2✔
193
                    &stmt,
2✔
194
                    &[
2✔
195
                        &self.session.user.id,
2✔
196
                        &i64::from(options.limit),
2✔
197
                        &i64::from(options.offset),
2✔
198
                        &format!("%{}%", filter.replace('%', "\\%").replace('_', "\\_")),
2✔
199
                    ],
2✔
200
                )
2✔
201
                .await?
2✔
202
            }
203
            (None, Some(_)) => {
NEW
204
                conn.query(
×
NEW
205
                    &stmt,
×
NEW
206
                    &[
×
NEW
207
                        &self.session.user.id,
×
NEW
208
                        &i64::from(options.limit),
×
NEW
209
                        &i64::from(options.offset),
×
NEW
210
                        &filter_tags_list,
×
NEW
211
                    ],
×
NEW
212
                )
×
NEW
213
                .await?
×
214
            }
215
            (None, None) => {
216
                conn.query(
4✔
217
                    &stmt,
4✔
218
                    &[
4✔
219
                        &self.session.user.id,
4✔
220
                        &i64::from(options.limit),
4✔
221
                        &i64::from(options.offset),
4✔
222
                    ],
4✔
223
                )
4✔
224
                .await?
4✔
225
            }
226
        };
227

228
        Ok(rows
7✔
229
            .iter()
7✔
230
            .map(|row| {
8✔
231
                Result::<DatasetListing>::Ok(DatasetListing {
8✔
232
                    id: row.get(0),
8✔
233
                    name: row.get(1),
8✔
234
                    display_name: row.get(2),
8✔
235
                    description: row.get(3),
8✔
236
                    tags: row.get::<_, Option<Vec<String>>>(4).unwrap_or_default(),
8✔
237
                    source_operator: row.get(5),
8✔
238
                    result_descriptor: row.get(6),
8✔
239
                    symbology: row.get(7),
8✔
240
                })
8✔
241
            })
8✔
242
            .filter_map(Result::ok)
7✔
243
            .collect())
7✔
244
    }
14✔
245

246
    async fn load_dataset(&self, dataset: &DatasetId) -> Result<Dataset> {
22✔
247
        let conn = self.conn_pool.get().await?;
22✔
248
        let stmt = conn
22✔
249
            .prepare(
22✔
250
                "
22✔
251
            SELECT
22✔
252
                d.id,
22✔
253
                d.name,
22✔
254
                d.display_name,
22✔
255
                d.description,
22✔
256
                d.result_descriptor,
22✔
257
                d.source_operator,
22✔
258
                d.symbology,
22✔
259
                d.provenance,
22✔
260
                d.tags
22✔
261
            FROM 
22✔
262
                user_permitted_datasets p JOIN datasets d 
22✔
263
                    ON (p.dataset_id = d.id)
22✔
264
            WHERE 
22✔
265
                p.user_id = $1 AND d.id = $2
22✔
266
            LIMIT 
22✔
267
                1",
22✔
268
            )
22✔
269
            .await?;
22✔
270

271
        let row = conn
22✔
272
            .query_opt(&stmt, &[&self.session.user.id, dataset])
22✔
273
            .await?;
22✔
274

275
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
22✔
276

277
        Ok(Dataset {
15✔
278
            id: row.get(0),
15✔
279
            name: row.get(1),
15✔
280
            display_name: row.get(2),
15✔
281
            description: row.get(3),
15✔
282
            result_descriptor: row.get(4),
15✔
283
            source_operator: row.get(5),
15✔
284
            symbology: row.get(6),
15✔
285
            provenance: row.get(7),
15✔
286
            tags: row.get(8),
15✔
287
        })
15✔
288
    }
44✔
289

290
    async fn load_provenance(&self, dataset: &DatasetId) -> Result<ProvenanceOutput> {
5✔
291
        let conn = self.conn_pool.get().await?;
5✔
292

293
        let stmt = conn
5✔
294
            .prepare(
5✔
295
                "
5✔
296
            SELECT 
5✔
297
                d.provenance 
5✔
298
            FROM 
5✔
299
                user_permitted_datasets p JOIN datasets d
5✔
300
                    ON(p.dataset_id = d.id)
5✔
301
            WHERE 
5✔
302
                p.user_id = $1 AND d.id = $2
5✔
303
            LIMIT 
5✔
304
                1",
5✔
305
            )
5✔
306
            .await?;
5✔
307

308
        let row = conn
5✔
309
            .query_opt(&stmt, &[&self.session.user.id, dataset])
5✔
310
            .await?;
5✔
311

312
        let row = row.ok_or(error::Error::UnknownDatasetId)?;
5✔
313

314
        Ok(ProvenanceOutput {
4✔
315
            data: (*dataset).into(),
4✔
316
            provenance: row.get(0),
4✔
317
        })
4✔
318
    }
10✔
319

320
    async fn load_loading_info(&self, dataset: &DatasetId) -> Result<MetaDataDefinition> {
3✔
321
        let conn = self.conn_pool.get().await?;
3✔
322

323
        let stmt = conn
3✔
324
            .prepare(
3✔
325
                "
3✔
326
            SELECT 
3✔
327
                meta_data 
3✔
328
            FROM 
3✔
329
                user_permitted_datasets p JOIN datasets d
3✔
330
                    ON(p.dataset_id = d.id)
3✔
331
            WHERE 
3✔
332
                p.user_id = $1 AND d.id = $2
3✔
333
            LIMIT 
3✔
334
                1",
3✔
335
            )
3✔
336
            .await?;
3✔
337

338
        let row = conn
3✔
339
            .query_one(&stmt, &[&self.session.user.id, dataset])
3✔
340
            .await?;
3✔
341

342
        Ok(row.get(0))
3✔
343
    }
6✔
344

345
    async fn resolve_dataset_name_to_id(
346
        &self,
347
        dataset_name: &DatasetName,
348
    ) -> Result<Option<DatasetId>> {
86✔
349
        let conn = self.conn_pool.get().await?;
86✔
350
        resolve_dataset_name_to_id(&conn, dataset_name).await
86✔
351
    }
172✔
352

353
    async fn dataset_autocomplete_search(
354
        &self,
355
        tags: Option<Vec<String>>,
356
        search_string: String,
357
        limit: u32,
358
        offset: u32,
359
    ) -> Result<Vec<String>> {
7✔
360
        let connection = self.conn_pool.get().await?;
7✔
361

362
        let limit = i64::from(limit);
7✔
363
        let offset = i64::from(offset);
7✔
364
        let search_string = format!(
7✔
365
            "%{}%",
7✔
366
            search_string.replace('%', "\\%").replace('_', "\\_")
7✔
367
        );
7✔
368

7✔
369
        let mut query_params: Vec<&(dyn ToSql + Sync)> =
7✔
370
            vec![&self.session.user.id, &limit, &offset, &search_string];
7✔
371

372
        let tags_clause = if let Some(tags) = &tags {
7✔
373
            query_params.push(tags);
3✔
374
            " AND tags @> $5::text[]".to_string()
3✔
375
        } else {
376
            String::new()
4✔
377
        };
378

379
        let stmt = connection
7✔
380
            .prepare(&format!(
7✔
381
                "
7✔
382
            SELECT 
7✔
383
                display_name
7✔
384
            FROM 
7✔
385
                user_permitted_datasets p JOIN datasets d ON (p.dataset_id = d.id)
7✔
386
            WHERE 
7✔
387
                p.user_id = $1
7✔
388
                AND display_name ILIKE $4 ESCAPE '\\'
7✔
389
                {tags_clause}
7✔
390
            ORDER BY display_name ASC
7✔
391
            LIMIT $2
7✔
392
            OFFSET $3;"
7✔
393
            ))
7✔
394
            .await?;
7✔
395

396
        let rows = connection.query(&stmt, &query_params).await?;
7✔
397

398
        Ok(rows.iter().map(|row| row.get(0)).collect())
7✔
399
    }
14✔
400
}
401

402
#[async_trait]
403
impl<Tls>
404
    MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
405
    for PostgresDb<Tls>
406
where
407
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
408
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
409
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
410
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
411
{
412
    async fn meta_data(
413
        &self,
414
        _id: &DataId,
415
    ) -> geoengine_operators::util::Result<
416
        Box<
417
            dyn MetaData<
418
                MockDatasetDataSourceLoadingInfo,
419
                VectorResultDescriptor,
420
                VectorQueryRectangle,
421
            >,
422
        >,
NEW
423
    > {
×
NEW
424
        Err(geoengine_operators::error::Error::NotYetImplemented)
×
NEW
425
    }
×
426
}
427

428
#[async_trait]
429
impl<Tls> MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
430
    for PostgresDb<Tls>
431
where
432
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
433
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
434
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
435
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
436
{
437
    async fn meta_data(
438
        &self,
439
        id: &DataId,
440
    ) -> geoengine_operators::util::Result<
441
        Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
442
    > {
10✔
443
        let id = id
10✔
444
            .internal()
10✔
445
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
10✔
446

447
        let mut conn = self.conn_pool.get().await.map_err(|e| {
10✔
NEW
448
            geoengine_operators::error::Error::MetaData {
×
NEW
449
                source: Box::new(e),
×
NEW
450
            }
×
451
        })?;
10✔
452
        let tx = conn.build_transaction().start().await.map_err(|e| {
10✔
NEW
453
            geoengine_operators::error::Error::MetaData {
×
NEW
454
                source: Box::new(e),
×
NEW
455
            }
×
456
        })?;
10✔
457

458
        if !self
10✔
459
            .has_permission_in_tx(id, Permission::Read, &tx)
10✔
460
            .await
10✔
461
            .map_err(|e| geoengine_operators::error::Error::MetaData {
10✔
NEW
462
                source: Box::new(e),
×
463
            })?
10✔
464
        {
465
            return Err(geoengine_operators::error::Error::PermissionDenied);
2✔
466
        };
8✔
467

468
        let stmt = tx
8✔
469
            .prepare(
8✔
470
                "
8✔
471
            SELECT
8✔
472
                d.meta_data
8✔
473
            FROM
8✔
474
                user_permitted_datasets p JOIN datasets d
8✔
475
                    ON (p.dataset_id = d.id)
8✔
476
            WHERE
8✔
477
                d.id = $1 AND p.user_id = $2
8✔
478
            LIMIT 
8✔
479
                1",
8✔
480
            )
8✔
481
            .await
8✔
482
            .map_err(|e| geoengine_operators::error::Error::MetaData {
8✔
NEW
483
                source: Box::new(e),
×
484
            })?;
8✔
485

486
        let row = tx
8✔
487
            .query_one(&stmt, &[&id, &self.session.user.id])
8✔
488
            .await
8✔
489
            .map_err(|e| geoengine_operators::error::Error::MetaData {
8✔
NEW
490
                source: Box::new(e),
×
491
            })?;
8✔
492

493
        let meta_data: MetaDataDefinition = row.get("meta_data");
8✔
494

495
        let MetaDataDefinition::OgrMetaData(meta_data) = meta_data else {
8✔
NEW
496
            return Err(geoengine_operators::error::Error::MetaData {
×
NEW
497
                source: Box::new(geoengine_operators::error::Error::InvalidType {
×
NEW
498
                    expected: "OgrMetaData".to_string(),
×
NEW
499
                    found: meta_data.type_name().to_string(),
×
NEW
500
                }),
×
NEW
501
            });
×
502
        };
503

504
        tx.commit()
8✔
505
            .await
8✔
506
            .map_err(|e| geoengine_operators::error::Error::MetaData {
8✔
NEW
507
                source: Box::new(e),
×
508
            })?;
8✔
509

510
        Ok(Box::new(meta_data))
8✔
511
    }
20✔
512
}
513

514
#[async_trait]
515
impl<Tls> MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
516
    for PostgresDb<Tls>
517
where
518
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
519
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
520
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
521
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
522
{
523
    async fn meta_data(
524
        &self,
525
        id: &DataId,
526
    ) -> geoengine_operators::util::Result<
527
        Box<dyn MetaData<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>>,
528
    > {
37✔
529
        let id = id
37✔
530
            .internal()
37✔
531
            .ok_or(geoengine_operators::error::Error::DataIdTypeMissMatch)?;
37✔
532

533
        let mut conn = self.conn_pool.get().await.map_err(|e| {
37✔
NEW
534
            geoengine_operators::error::Error::MetaData {
×
NEW
535
                source: Box::new(e),
×
NEW
536
            }
×
537
        })?;
37✔
538
        let tx = conn.build_transaction().start().await.map_err(|e| {
37✔
NEW
539
            geoengine_operators::error::Error::MetaData {
×
NEW
540
                source: Box::new(e),
×
NEW
541
            }
×
542
        })?;
37✔
543

544
        if !self
37✔
545
            .has_permission_in_tx(id, Permission::Read, &tx)
37✔
546
            .await
37✔
547
            .map_err(|e| geoengine_operators::error::Error::MetaData {
37✔
NEW
548
                source: Box::new(e),
×
549
            })?
37✔
550
        {
551
            return Err(geoengine_operators::error::Error::PermissionDenied);
1✔
552
        };
36✔
553

554
        let stmt = tx
36✔
555
            .prepare(
36✔
556
                "
36✔
557
            SELECT
36✔
558
                d.meta_data
36✔
559
            FROM
36✔
560
                user_permitted_datasets p JOIN datasets d
36✔
561
                    ON (p.dataset_id = d.id)
36✔
562
            WHERE
36✔
563
                d.id = $1 AND p.user_id = $2
36✔
564
            LIMIT 
36✔
565
                1",
36✔
566
            )
36✔
567
            .await
36✔
568
            .map_err(|e| geoengine_operators::error::Error::MetaData {
36✔
NEW
569
                source: Box::new(e),
×
570
            })?;
36✔
571

572
        let row = tx
36✔
573
            .query_one(&stmt, &[&id, &self.session.user.id])
36✔
574
            .await
36✔
575
            .map_err(|e| geoengine_operators::error::Error::MetaData {
36✔
NEW
576
                source: Box::new(e),
×
577
            })?;
36✔
578

579
        let meta_data: MetaDataDefinition = row.get(0);
36✔
580

36✔
581
        Ok(match meta_data {
36✔
582
            MetaDataDefinition::GdalMetaDataRegular(m) => Box::new(m),
23✔
583
            MetaDataDefinition::GdalStatic(m) => Box::new(m),
9✔
584
            MetaDataDefinition::GdalMetaDataList(m) => Box::new(m),
3✔
585
            MetaDataDefinition::GdalMetadataNetCdfCf(m) => Box::new(m),
1✔
NEW
586
            _ => return Err(geoengine_operators::error::Error::DataIdTypeMissMatch),
×
587
        })
588
    }
74✔
589
}
590

591
#[async_trait]
592
impl<Tls> DatasetStore for PostgresDb<Tls>
593
where
594
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
595
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
596
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
597
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
598
{
599
    async fn add_dataset(
600
        &self,
601
        dataset: AddDataset,
602
        meta_data: MetaDataDefinition,
603
    ) -> Result<DatasetIdAndName> {
97✔
604
        let id = DatasetId::new();
97✔
605
        let name = dataset.name.unwrap_or_else(|| DatasetName {
97✔
606
            namespace: Some(self.session.user.id.to_string()),
27✔
607
            name: id.to_string(),
27✔
608
        });
97✔
609

97✔
610
        log::info!(
97✔
NEW
611
            "Adding dataset with name: {:?}, tags: {:?}",
×
612
            name,
613
            dataset.tags
614
        );
615

616
        self.check_dataset_namespace(&name)?;
97✔
617

618
        let typed_meta_data = meta_data.to_typed_metadata();
97✔
619

620
        let mut conn = self.conn_pool.get().await?;
97✔
621

622
        let tx = conn.build_transaction().start().await?;
97✔
623

624
        tx.execute(
97✔
625
            "
97✔
626
                INSERT INTO datasets (
97✔
627
                    id,
97✔
628
                    name,
97✔
629
                    display_name,
97✔
630
                    description,
97✔
631
                    source_operator,
97✔
632
                    result_descriptor,
97✔
633
                    meta_data,
97✔
634
                    symbology,
97✔
635
                    provenance,
97✔
636
                    tags
97✔
637
                )
97✔
638
                VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10::text[])",
97✔
639
            &[
97✔
640
                &id,
97✔
641
                &name,
97✔
642
                &dataset.display_name,
97✔
643
                &dataset.description,
97✔
644
                &dataset.source_operator,
97✔
645
                &typed_meta_data.result_descriptor,
97✔
646
                typed_meta_data.meta_data,
97✔
647
                &dataset.symbology,
97✔
648
                &dataset.provenance,
97✔
649
                &dataset.tags,
97✔
650
            ],
97✔
651
        )
97✔
652
        .await
97✔
653
        .map_unique_violation("datasets", "name", || error::Error::InvalidDatasetName)?;
97✔
654

655
        let stmt = tx
97✔
656
            .prepare(
97✔
657
                "
97✔
658
            INSERT INTO permissions (
97✔
659
                role_id,
97✔
660
                dataset_id,
97✔
661
                permission
97✔
662
            )
97✔
663
            VALUES ($1, $2, $3)",
97✔
664
            )
97✔
665
            .await?;
97✔
666

667
        tx.execute(
97✔
668
            &stmt,
97✔
669
            &[&RoleId::from(self.session.user.id), &id, &Permission::Owner],
97✔
670
        )
97✔
671
        .await?;
97✔
672

673
        tx.commit().await?;
97✔
674

675
        Ok(DatasetIdAndName { id, name })
97✔
676
    }
194✔
677

678
    async fn update_dataset(&self, dataset: DatasetId, update: UpdateDataset) -> Result<()> {
1✔
679
        let mut conn = self.conn_pool.get().await?;
1✔
680

681
        let tx = conn.build_transaction().start().await?;
1✔
682

683
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
1✔
684
            .await
1✔
685
            .boxed_context(crate::error::PermissionDb)?;
1✔
686

687
        tx.execute(
1✔
688
            "UPDATE datasets SET name = $2, display_name = $3, description = $4, tags = $5 WHERE id = $1;",
1✔
689
            &[
1✔
690
                &dataset,
1✔
691
                &update.name,
1✔
692
                &update.display_name,
1✔
693
                &update.description,
1✔
694
                &update.tags,
1✔
695
            ],
1✔
696
        )
1✔
697
        .await?;
1✔
698

699
        tx.commit().await?;
1✔
700

701
        Ok(())
1✔
702
    }
2✔
703

704
    async fn update_dataset_loading_info(
705
        &self,
706
        dataset: DatasetId,
707
        meta_data: &MetaDataDefinition,
708
    ) -> Result<()> {
1✔
709
        let mut conn = self.conn_pool.get().await?;
1✔
710

711
        let tx = conn.build_transaction().start().await?;
1✔
712

713
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
1✔
714
            .await
1✔
715
            .boxed_context(crate::error::PermissionDb)?;
1✔
716

717
        tx.execute(
1✔
718
            "UPDATE datasets SET meta_data = $2 WHERE id = $1;",
1✔
719
            &[&dataset, &meta_data],
1✔
720
        )
1✔
721
        .await?;
1✔
722

723
        tx.commit().await?;
1✔
724

725
        Ok(())
1✔
726
    }
2✔
727

728
    async fn update_dataset_symbology(
729
        &self,
730
        dataset: DatasetId,
731
        symbology: &Symbology,
732
    ) -> Result<()> {
1✔
733
        let mut conn = self.conn_pool.get().await?;
1✔
734

735
        let tx = conn.build_transaction().start().await?;
1✔
736

737
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
1✔
738
            .await
1✔
739
            .boxed_context(crate::error::PermissionDb)?;
1✔
740

741
        tx.execute(
1✔
742
            "UPDATE datasets SET symbology = $2 WHERE id = $1;",
1✔
743
            &[&dataset, &symbology],
1✔
744
        )
1✔
745
        .await?;
1✔
746

747
        tx.commit().await?;
1✔
748

749
        Ok(())
1✔
750
    }
2✔
751

752
    async fn update_dataset_provenance(
753
        &self,
754
        dataset: DatasetId,
755
        provenance: &[Provenance],
756
    ) -> Result<()> {
1✔
757
        let mut conn = self.conn_pool.get().await?;
1✔
758

759
        let tx = conn.build_transaction().start().await?;
1✔
760

761
        self.ensure_permission_in_tx(dataset.into(), Permission::Owner, &tx)
1✔
762
            .await
1✔
763
            .boxed_context(crate::error::PermissionDb)?;
1✔
764

765
        tx.execute(
1✔
766
            "UPDATE datasets SET provenance = $2 WHERE id = $1;",
1✔
767
            &[&dataset, &provenance],
1✔
768
        )
1✔
769
        .await?;
1✔
770

771
        tx.commit().await?;
1✔
772

773
        Ok(())
1✔
774
    }
2✔
775

776
    async fn delete_dataset(&self, dataset_id: DatasetId) -> Result<()> {
33✔
777
        let mut conn = self.conn_pool.get().await?;
33✔
778
        let tx = conn.build_transaction().start().await?;
33✔
779

780
        self.ensure_permission_in_tx(dataset_id.into(), Permission::Owner, &tx)
33✔
781
            .await
33✔
782
            .boxed_context(crate::error::PermissionDb)?;
33✔
783

784
        let stmt = tx
33✔
785
            .prepare(
33✔
786
                "
33✔
787
        SELECT 
33✔
788
            TRUE
33✔
789
        FROM 
33✔
790
            user_permitted_datasets p JOIN datasets d 
33✔
791
                ON (p.dataset_id = d.id)
33✔
792
        WHERE 
33✔
793
            d.id = $1 AND p.user_id = $2 AND p.permission = 'Owner';",
33✔
794
            )
33✔
795
            .await?;
33✔
796

797
        let rows = tx
33✔
798
            .query(&stmt, &[&dataset_id, &self.session.user.id])
33✔
799
            .await?;
33✔
800

801
        if rows.is_empty() {
33✔
NEW
802
            return Err(Error::OperationRequiresOwnerPermission);
×
803
        }
33✔
804

805
        let stmt = tx.prepare("DELETE FROM datasets WHERE id = $1;").await?;
33✔
806

807
        tx.execute(&stmt, &[&dataset_id]).await?;
33✔
808

809
        tx.commit().await?;
33✔
810

811
        Ok(())
33✔
812
    }
66✔
813
}
814

815
#[async_trait]
816
impl<Tls> UploadDb for PostgresDb<Tls>
817
where
818
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
819
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
820
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
821
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
822
{
823
    async fn load_upload(&self, upload: UploadId) -> Result<Upload> {
6✔
824
        let conn = self.conn_pool.get().await?;
6✔
825

826
        let stmt = conn
6✔
827
            .prepare(
6✔
828
                "
6✔
829
            SELECT u.id, u.files 
6✔
830
            FROM uploads u JOIN user_uploads uu ON(u.id = uu.upload_id)
6✔
831
            WHERE u.id = $1 AND uu.user_id = $2",
6✔
832
            )
6✔
833
            .await?;
6✔
834

835
        let row = conn
6✔
836
            .query_one(&stmt, &[&upload, &self.session.user.id])
6✔
837
            .await?;
6✔
838

839
        Ok(Upload {
5✔
840
            id: row.get(0),
5✔
841
            files: row
5✔
842
                .get::<_, Vec<FileUpload>>(1)
5✔
843
                .into_iter()
5✔
844
                .map(Into::into)
5✔
845
                .collect(),
5✔
846
        })
5✔
847
    }
12✔
848

849
    async fn create_upload(&self, upload: Upload) -> Result<()> {
8✔
850
        let mut conn = self.conn_pool.get().await?;
8✔
851
        let tx = conn.build_transaction().start().await?;
8✔
852

853
        let stmt = tx
8✔
854
            .prepare("INSERT INTO uploads (id, files) VALUES ($1, $2)")
8✔
855
            .await?;
8✔
856

857
        tx.execute(
8✔
858
            &stmt,
8✔
859
            &[
8✔
860
                &upload.id,
8✔
861
                &upload
8✔
862
                    .files
8✔
863
                    .iter()
8✔
864
                    .map(FileUpload::from)
8✔
865
                    .collect::<Vec<_>>(),
8✔
866
            ],
8✔
867
        )
8✔
868
        .await?;
8✔
869

870
        let stmt = tx
8✔
871
            .prepare("INSERT INTO user_uploads (user_id, upload_id) VALUES ($1, $2)")
8✔
872
            .await?;
8✔
873

874
        tx.execute(&stmt, &[&self.session.user.id, &upload.id])
8✔
875
            .await?;
8✔
876

877
        tx.commit().await?;
8✔
878

879
        Ok(())
8✔
880
    }
16✔
881
}
882

883
#[cfg(test)]
884
mod tests {
885
    use std::path::PathBuf;
886

887
    use super::*;
888
    use crate::{
889
        contexts::PostgresContext,
890
        contexts::{ApplicationContext, SessionContext},
891
        ge_context,
892
        permissions::PermissionDb,
893
        users::{UserAuth, UserSession},
894
    };
895
    use geoengine_datatypes::{
896
        collections::VectorDataType,
897
        primitives::{CacheTtlSeconds, FeatureDataType, Measurement},
898
        spatial_reference::SpatialReference,
899
    };
900
    use geoengine_operators::{
901
        engine::{StaticMetaData, VectorColumnInfo},
902
        source::{
903
            CsvHeader, FormatSpecifics, OgrSourceColumnSpec, OgrSourceDatasetTimeType,
904
            OgrSourceDurationSpec, OgrSourceErrorSpec, OgrSourceTimeFormat,
905
        },
906
    };
907
    use tokio_postgres::NoTls;
908

909
    #[ge_context::test]
3✔
910
    async fn it_autocompletes_datasets(app_ctx: PostgresContext<NoTls>) {
1✔
911
        let session_a = app_ctx.create_anonymous_session().await.unwrap();
1✔
912
        let session_b = app_ctx.create_anonymous_session().await.unwrap();
1✔
913

1✔
914
        let db_a = app_ctx.session_context(session_a.clone()).db();
1✔
915
        let db_b = app_ctx.session_context(session_b.clone()).db();
1✔
916

1✔
917
        add_single_dataset(&db_a, &session_a).await;
1✔
918

919
        assert_eq!(
1✔
920
            db_a.dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0)
1✔
921
                .await
1✔
922
                .unwrap(),
1✔
923
            vec!["Ogr Test"]
1✔
924
        );
925
        assert_eq!(
1✔
926
            db_a.dataset_autocomplete_search(
1✔
927
                Some(vec!["upload".to_string()]),
1✔
928
                "Ogr".to_owned(),
1✔
929
                10,
1✔
930
                0
1✔
931
            )
1✔
932
            .await
1✔
933
            .unwrap(),
1✔
934
            vec!["Ogr Test"]
1✔
935
        );
936

937
        // check that other user B cannot access datasets of user A
938

939
        assert!(db_b
1✔
940
            .dataset_autocomplete_search(None, "Ogr".to_owned(), 10, 0)
1✔
941
            .await
1✔
942
            .unwrap()
1✔
943
            .is_empty());
1✔
944
        assert!(db_b
1✔
945
            .dataset_autocomplete_search(Some(vec!["upload".to_string()]), "Ogr".to_owned(), 10, 0)
1✔
946
            .await
1✔
947
            .unwrap()
1✔
948
            .is_empty());
1✔
949
    }
1✔
950

951
    #[ge_context::test]
3✔
952
    async fn it_loads_own_datasets(app_ctx: PostgresContext<NoTls>) {
1✔
953
        let session_a = app_ctx.create_anonymous_session().await.unwrap();
1✔
954

1✔
955
        let db_a = app_ctx.session_context(session_a.clone()).db();
1✔
956

957
        let DatasetIdAndName {
958
            id: dataset_id,
1✔
959
            name: _,
960
        } = add_single_dataset(&db_a, &session_a).await;
1✔
961

962
        // we are already owner, but we give the permission again to test the permission check
963
        db_a.add_permission(session_a.user.id.into(), dataset_id, Permission::Read)
1✔
964
            .await
1✔
965
            .unwrap();
1✔
966

1✔
967
        db_a.load_loading_info(&dataset_id).await.unwrap();
1✔
968
        let _: Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>> =
1✔
969
            db_a.meta_data(&DataId::from(dataset_id)).await.unwrap();
1✔
970
    }
1✔
971

972
    async fn add_single_dataset(db: &PostgresDb<NoTls>, session: &UserSession) -> DatasetIdAndName {
2✔
973
        let loading_info = OgrSourceDataset {
2✔
974
            file_name: PathBuf::from("test.csv"),
2✔
975
            layer_name: "test.csv".to_owned(),
2✔
976
            data_type: Some(VectorDataType::MultiPoint),
2✔
977
            time: OgrSourceDatasetTimeType::Start {
2✔
978
                start_field: "start".to_owned(),
2✔
979
                start_format: OgrSourceTimeFormat::Auto,
2✔
980
                duration: OgrSourceDurationSpec::Zero,
2✔
981
            },
2✔
982
            default_geometry: None,
2✔
983
            columns: Some(OgrSourceColumnSpec {
2✔
984
                format_specifics: Some(FormatSpecifics::Csv {
2✔
985
                    header: CsvHeader::Auto,
2✔
986
                }),
2✔
987
                x: "x".to_owned(),
2✔
988
                y: None,
2✔
989
                int: vec![],
2✔
990
                float: vec![],
2✔
991
                text: vec![],
2✔
992
                bool: vec![],
2✔
993
                datetime: vec![],
2✔
994
                rename: None,
2✔
995
            }),
2✔
996
            force_ogr_time_filter: false,
2✔
997
            force_ogr_spatial_filter: false,
2✔
998
            on_error: OgrSourceErrorSpec::Ignore,
2✔
999
            sql_query: None,
2✔
1000
            attribute_query: None,
2✔
1001
            cache_ttl: CacheTtlSeconds::default(),
2✔
1002
        };
2✔
1003

2✔
1004
        let meta_data = MetaDataDefinition::OgrMetaData(StaticMetaData::<
2✔
1005
            OgrSourceDataset,
2✔
1006
            VectorResultDescriptor,
2✔
1007
            VectorQueryRectangle,
2✔
1008
        > {
2✔
1009
            loading_info: loading_info.clone(),
2✔
1010
            result_descriptor: VectorResultDescriptor {
2✔
1011
                data_type: VectorDataType::MultiPoint,
2✔
1012
                spatial_reference: SpatialReference::epsg_4326().into(),
2✔
1013
                columns: [(
2✔
1014
                    "foo".to_owned(),
2✔
1015
                    VectorColumnInfo {
2✔
1016
                        data_type: FeatureDataType::Float,
2✔
1017
                        measurement: Measurement::Unitless,
2✔
1018
                    },
2✔
1019
                )]
2✔
1020
                .into_iter()
2✔
1021
                .collect(),
2✔
1022
                time: None,
2✔
1023
                bbox: None,
2✔
1024
            },
2✔
1025
            phantom: Default::default(),
2✔
1026
        });
2✔
1027

2✔
1028
        let dataset_name = DatasetName::new(Some(session.user.id.to_string()), "my_dataset");
2✔
1029

2✔
1030
        db.add_dataset(
2✔
1031
            AddDataset {
2✔
1032
                name: Some(dataset_name.clone()),
2✔
1033
                display_name: "Ogr Test".to_owned(),
2✔
1034
                description: "desc".to_owned(),
2✔
1035
                source_operator: "OgrSource".to_owned(),
2✔
1036
                symbology: None,
2✔
1037
                provenance: None,
2✔
1038
                tags: Some(vec!["upload".to_owned(), "test".to_owned()]),
2✔
1039
            },
2✔
1040
            meta_data,
2✔
1041
        )
2✔
1042
        .await
2✔
1043
        .unwrap()
2✔
1044
    }
2✔
1045
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc