• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 8159128962

05 Mar 2024 03:19PM UTC coverage: 90.564% (+0.1%) from 90.424%
8159128962

push

github

web-flow
Merge pull request #931 from geo-engine/netcdf-overviews

NetCDF Overview Metadata in Database

2391 of 2553 new or added lines in 21 files covered. (93.65%)

57 existing lines in 15 files now uncovered.

128558 of 141952 relevant lines covered (90.56%)

54455.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.26
/services/src/pro/layers/postgres_layer_db.rs
1
use crate::error;
2
use crate::layers::external::TypedDataProviderDefinition;
3
use crate::layers::layer::Property;
4
use crate::layers::listing::{
5
    ProviderCapabilities, SearchCapabilities, SearchParameters, SearchType, SearchTypes,
6
};
7
use crate::layers::postgres_layer_db::{
8
    delete_layer_collection, delete_layer_collection_from_parent, delete_layer_from_collection,
9
    insert_collection_parent, insert_layer, insert_layer_collection_with_id,
10
};
11
use crate::pro::contexts::ProPostgresDb;
12
use crate::pro::datasets::TypedProDataProviderDefinition;
13
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
14
use crate::pro::permissions::{Permission, RoleId};
15
use crate::{
16
    error::Result,
17
    layers::{
18
        external::{DataProvider, DataProviderDefinition},
19
        layer::{
20
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
21
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
22
            ProviderLayerCollectionId, ProviderLayerId,
23
        },
24
        listing::{LayerCollectionId, LayerCollectionProvider},
25
        storage::{
26
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
27
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
28
        },
29
        LayerDbError,
30
    },
31
};
32
use async_trait::async_trait;
33
use bb8_postgres::tokio_postgres::{
34
    tls::{MakeTlsConnect, TlsConnect},
35
    Socket,
36
};
37
use geoengine_datatypes::dataset::{DataProviderId, LayerId};
38
use geoengine_datatypes::util::HashMapTextTextDbType;
39
use snafu::{ensure, ResultExt};
40
use std::str::FromStr;
41
use uuid::Uuid;
42

43
#[async_trait]
44
impl<Tls> LayerDb for ProPostgresDb<Tls>
45
where
46
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
47
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
48
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
49
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
50
{
51
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
16✔
52
        let layer_id = Uuid::new_v4();
16✔
53
        let layer_id = LayerId(layer_id.to_string());
16✔
54

16✔
55
        self.add_layer_with_id(&layer_id, layer, collection).await?;
313✔
56

57
        Ok(layer_id)
16✔
58
    }
48✔
59

60
    async fn add_layer_with_id(
16✔
61
        &self,
16✔
62
        id: &LayerId,
16✔
63
        layer: AddLayer,
16✔
64
        collection: &LayerCollectionId,
16✔
65
    ) -> Result<()> {
16✔
66
        let mut conn = self.conn_pool.get().await?;
16✔
67
        let trans = conn.build_transaction().start().await?;
16✔
68

69
        ensure!(
16✔
70
            self.has_permission_in_tx(collection.clone(), Permission::Owner, &trans)
16✔
71
                .await?,
38✔
72
            error::PermissionDenied
×
73
        );
74

75
        let layer_id = insert_layer(&trans, id, layer, collection).await?;
221✔
76

77
        // TODO: `ON CONFLICT DO NOTHING` means, we do not get an error if the permission already exists.
78
        //       Do we want that, or should we report an error and let the caller decide whether to ignore it?
79
        //       We should decide that and adjust all places where `ON CONFLICT DO NOTHING` is used.
80
        let stmt = trans
16✔
81
            .prepare(
16✔
82
                "
16✔
83
            INSERT INTO permissions (role_id, permission, layer_id)
16✔
84
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
16✔
85
            )
16✔
86
            .await?;
10✔
87

88
        trans
16✔
89
            .execute(
16✔
90
                &stmt,
16✔
91
                &[
16✔
92
                    &RoleId::from(self.session.user.id),
16✔
93
                    &Permission::Owner,
16✔
94
                    &layer_id,
16✔
95
                ],
16✔
96
            )
16✔
97
            .await?;
10✔
98

99
        trans.commit().await?;
16✔
100

101
        Ok(())
16✔
102
    }
48✔
103

104
    async fn add_layer_to_collection(
2✔
105
        &self,
2✔
106
        layer: &LayerId,
2✔
107
        collection: &LayerCollectionId,
2✔
108
    ) -> Result<()> {
2✔
109
        let mut conn = self.conn_pool.get().await?;
2✔
110
        let tx = conn.build_transaction().start().await?;
2✔
111

112
        ensure!(
2✔
113
            self.has_permission_in_tx(collection.clone(), Permission::Owner, &tx)
2✔
114
                .await?,
2✔
115
            error::PermissionDenied
×
116
        );
117

118
        let layer_id =
1✔
119
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
120
                found: layer.0.clone(),
1✔
121
            })?;
2✔
122

123
        let collection_id =
1✔
124
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
125
                found: collection.0.clone(),
×
126
            })?;
1✔
127

128
        let stmt = tx
1✔
129
            .prepare(
1✔
130
                "
1✔
131
            INSERT INTO collection_layers (collection, layer)
1✔
132
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
1✔
133
            )
1✔
134
            .await?;
1✔
135

136
        tx.execute(&stmt, &[&collection_id, &layer_id]).await?;
1✔
137

138
        tx.commit().await?;
1✔
139

140
        Ok(())
1✔
141
    }
6✔
142

143
    async fn add_layer_collection(
22✔
144
        &self,
22✔
145
        collection: AddLayerCollection,
22✔
146
        parent: &LayerCollectionId,
22✔
147
    ) -> Result<LayerCollectionId> {
22✔
148
        let collection_id = Uuid::new_v4();
22✔
149
        let collection_id = LayerCollectionId(collection_id.to_string());
22✔
150

22✔
151
        self.add_layer_collection_with_id(&collection_id, collection, parent)
22✔
152
            .await?;
181✔
153

154
        Ok(collection_id)
20✔
155
    }
66✔
156

157
    async fn add_layer_collection_with_id(
22✔
158
        &self,
22✔
159
        id: &LayerCollectionId,
22✔
160
        collection: AddLayerCollection,
22✔
161
        parent: &LayerCollectionId,
22✔
162
    ) -> Result<()> {
22✔
163
        let mut conn = self.conn_pool.get().await?;
22✔
164
        let trans = conn.build_transaction().start().await?;
22✔
165

166
        ensure!(
22✔
167
            self.has_permission_in_tx(parent.clone(), Permission::Owner, &trans)
22✔
168
                .await?,
49✔
169
            error::PermissionDenied
2✔
170
        );
171

172
        let collection_id = insert_layer_collection_with_id(&trans, id, collection, parent).await?;
64✔
173

174
        let stmt = trans
20✔
175
            .prepare(
20✔
176
                "
20✔
177
            INSERT INTO permissions (role_id, permission, layer_collection_id)
20✔
178
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
20✔
179
            )
20✔
180
            .await?;
12✔
181

182
        trans
20✔
183
            .execute(
20✔
184
                &stmt,
20✔
185
                &[
20✔
186
                    &RoleId::from(self.session.user.id),
20✔
187
                    &Permission::Owner,
20✔
188
                    &collection_id,
20✔
189
                ],
20✔
190
            )
20✔
191
            .await?;
12✔
192

193
        trans.commit().await?;
20✔
194

195
        Ok(())
20✔
196
    }
66✔
197

198
    async fn add_collection_to_parent(
1✔
199
        &self,
1✔
200
        collection: &LayerCollectionId,
1✔
201
        parent: &LayerCollectionId,
1✔
202
    ) -> Result<()> {
1✔
203
        let conn = self.conn_pool.get().await?;
1✔
204
        insert_collection_parent(&conn, collection, parent).await
2✔
205
    }
3✔
206

207
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
3✔
208
        let mut conn = self.conn_pool.get().await?;
3✔
209
        let transaction = conn.build_transaction().start().await?;
3✔
210

211
        ensure!(
3✔
212
            self.has_permission_in_tx(collection.clone(), Permission::Owner, &transaction)
3✔
UNCOV
213
                .await?,
×
214
            error::PermissionDenied
×
215
        );
216

217
        delete_layer_collection(&transaction, collection).await?;
3✔
218

219
        transaction.commit().await.map_err(Into::into)
2✔
220
    }
9✔
221

222
    async fn remove_layer_from_collection(
2✔
223
        &self,
2✔
224
        layer: &LayerId,
2✔
225
        collection: &LayerCollectionId,
2✔
226
    ) -> Result<()> {
2✔
227
        let mut conn = self.conn_pool.get().await?;
2✔
228
        let transaction = conn.build_transaction().start().await?;
2✔
229

230
        ensure!(
2✔
231
            self.has_permission_in_tx(layer.clone(), Permission::Owner, &transaction)
2✔
232
                .await?,
4✔
233
            error::PermissionDenied
×
234
        );
235

236
        delete_layer_from_collection(&transaction, layer, collection).await?;
7✔
237

238
        transaction.commit().await.map_err(Into::into)
2✔
239
    }
6✔
240

241
    async fn remove_layer_collection_from_parent(
1✔
242
        &self,
1✔
243
        collection: &LayerCollectionId,
1✔
244
        parent: &LayerCollectionId,
1✔
245
    ) -> Result<()> {
1✔
246
        let mut conn = self.conn_pool.get().await?;
1✔
247
        let transaction = conn.build_transaction().start().await?;
1✔
248

249
        ensure!(
1✔
250
            self.has_permission_in_tx(collection.clone(), Permission::Owner, &transaction)
1✔
251
                .await?,
2✔
252
            error::PermissionDenied
×
253
        );
254

255
        delete_layer_collection_from_parent(&transaction, collection, parent).await?;
8✔
256

257
        transaction.commit().await.map_err(Into::into)
1✔
258
    }
3✔
259
}
260

261
fn create_search_query(full_info: bool) -> String {
30✔
262
    format!("
30✔
263
        WITH RECURSIVE parents AS (
30✔
264
            SELECT $1::uuid as id
30✔
265
            UNION ALL SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)
30✔
266
        )
30✔
267
        SELECT DISTINCT *
30✔
268
        FROM (
30✔
269
            SELECT 
30✔
270
                {}
30✔
271
            FROM user_permitted_layer_collections u
30✔
272
                JOIN layer_collections lc ON (u.layer_collection_id = lc.id)
30✔
273
                JOIN (SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)) cc ON (id = cc.child)
30✔
274
            WHERE u.user_id = $4 AND name ILIKE $5
30✔
275
        ) u UNION (
30✔
276
            SELECT 
30✔
277
                {}
30✔
278
            FROM user_permitted_layers ul
30✔
279
                JOIN layers uc ON (ul.layer_id = uc.id)
30✔
280
                JOIN (SELECT DISTINCT layer FROM collection_layers JOIN parents ON (collection = id)) cl ON (id = cl.layer)
30✔
281
            WHERE ul.user_id = $4 AND name ILIKE $5
30✔
282
        )
30✔
283
        ORDER BY {}name ASC
30✔
284
        LIMIT $2 
30✔
285
        OFFSET $3;",
30✔
286
        if full_info {
30✔
287
            "concat(id, '') AS id,
15✔
288
        name,
15✔
289
        description,
15✔
290
        properties,
15✔
291
        FALSE AS is_layer"
15✔
292
        } else { "name" },
15✔
293
        if full_info {
30✔
294
            "concat(id, '') AS id,
15✔
295
        name,
15✔
296
        description,
15✔
297
        properties,
15✔
298
        TRUE AS is_layer"
15✔
299
        } else { "name" },
15✔
300
        if full_info { "is_layer ASC," } else { "" })
30✔
301
}
30✔
302

303
#[async_trait]
304
impl<Tls> LayerCollectionProvider for ProPostgresDb<Tls>
305
where
306
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
307
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
308
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
309
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
310
{
311
    fn capabilities(&self) -> ProviderCapabilities {
1✔
312
        ProviderCapabilities {
1✔
313
            listing: true,
1✔
314
            search: SearchCapabilities {
1✔
315
                search_types: SearchTypes {
1✔
316
                    fulltext: true,
1✔
317
                    prefix: true,
1✔
318
                },
1✔
319
                autocomplete: true,
1✔
320
                filters: None,
1✔
321
            },
1✔
322
        }
1✔
323
    }
1✔
324

325
    fn name(&self) -> &str {
×
326
        "Postgres Layer Collection Provider (Pro)"
×
327
    }
×
328

329
    fn description(&self) -> &str {
×
330
        "Layer collection provider for Postgres (Pro)"
×
331
    }
×
332

333
    #[allow(clippy::too_many_lines)]
334
    async fn load_layer_collection(
15✔
335
        &self,
15✔
336
        collection_id: &LayerCollectionId,
15✔
337
        options: LayerCollectionListOptions,
15✔
338
    ) -> Result<LayerCollection> {
15✔
339
        let mut conn = self.conn_pool.get().await?;
15✔
340
        let tx = conn.build_transaction().start().await?;
15✔
341

342
        ensure!(
15✔
343
            self.has_permission_in_tx(collection_id.clone(), Permission::Read, &tx)
15✔
344
                .await?,
22✔
345
            error::PermissionDenied
3✔
346
        );
347
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
12✔
348
            crate::error::Error::IdStringMustBeUuid {
×
349
                found: collection_id.0.clone(),
×
350
            }
×
351
        })?;
12✔
352

353
        let stmt = tx
12✔
354
            .prepare(
12✔
355
                "
12✔
356
        SELECT name, description, properties
12✔
357
        FROM user_permitted_layer_collections p 
12✔
358
            JOIN layer_collections c ON (p.layer_collection_id = c.id) 
12✔
359
        WHERE p.user_id = $1 AND layer_collection_id = $2;",
12✔
360
            )
12✔
361
            .await?;
9✔
362

363
        let row = tx
12✔
364
            .query_one(&stmt, &[&self.session.user.id, &collection])
12✔
365
            .await?;
9✔
366

367
        let name: String = row.get(0);
12✔
368
        let description: String = row.get(1);
12✔
369
        let properties: Vec<Property> = row.get(2);
12✔
370

371
        let stmt = tx
12✔
372
            .prepare(
12✔
373
                "
12✔
374
        SELECT DISTINCT id, name, description, properties, is_layer
12✔
375
        FROM (
12✔
376
            SELECT 
12✔
377
                concat(id, '') AS id, 
12✔
378
                name, 
12✔
379
                description, 
12✔
380
                properties, 
12✔
381
                FALSE AS is_layer
12✔
382
            FROM user_permitted_layer_collections u 
12✔
383
                JOIN layer_collections lc ON (u.layer_collection_id = lc.id)
12✔
384
                JOIN collection_children cc ON (layer_collection_id = cc.child)
12✔
385
            WHERE u.user_id = $4 AND cc.parent = $1
12✔
386
        ) u UNION (
12✔
387
            SELECT 
12✔
388
                concat(id, '') AS id, 
12✔
389
                name, 
12✔
390
                description, 
12✔
391
                properties, 
12✔
392
                TRUE AS is_layer
12✔
393
            FROM user_permitted_layers ul
12✔
394
                JOIN layers uc ON (ul.layer_id = uc.id) 
12✔
395
                JOIN collection_layers cl ON (layer_id = cl.layer)
12✔
396
            WHERE ul.user_id = $4 AND cl.collection = $1
12✔
397
        )
12✔
398
        ORDER BY is_layer ASC, name ASC
12✔
399
        LIMIT $2 
12✔
400
        OFFSET $3;            
12✔
401
        ",
12✔
402
            )
12✔
403
            .await?;
9✔
404

405
        let rows = tx
12✔
406
            .query(
12✔
407
                &stmt,
12✔
408
                &[
12✔
409
                    &collection,
12✔
410
                    &i64::from(options.limit),
12✔
411
                    &i64::from(options.offset),
12✔
412
                    &self.session.user.id,
12✔
413
                ],
12✔
414
            )
12✔
415
            .await?;
9✔
416

417
        let items = rows
12✔
418
            .into_iter()
12✔
419
            .map(|row| {
15✔
420
                let is_layer: bool = row.get(4);
15✔
421

15✔
422
                if is_layer {
15✔
423
                    Ok(CollectionItem::Layer(LayerListing {
5✔
424
                        id: ProviderLayerId {
5✔
425
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
426
                            layer_id: LayerId(row.get(0)),
5✔
427
                        },
5✔
428
                        name: row.get(1),
5✔
429
                        description: row.get(2),
5✔
430
                        properties: row.get(3),
5✔
431
                    }))
5✔
432
                } else {
433
                    Ok(CollectionItem::Collection(LayerCollectionListing {
10✔
434
                        id: ProviderLayerCollectionId {
10✔
435
                            provider_id: INTERNAL_PROVIDER_ID,
10✔
436
                            collection_id: LayerCollectionId(row.get(0)),
10✔
437
                        },
10✔
438
                        name: row.get(1),
10✔
439
                        description: row.get(2),
10✔
440
                        properties: row.get(3),
10✔
441
                    }))
10✔
442
                }
443
            })
15✔
444
            .collect::<Result<Vec<CollectionItem>>>()?;
12✔
445

446
        tx.commit().await?;
12✔
447

448
        Ok(LayerCollection {
12✔
449
            id: ProviderLayerCollectionId {
12✔
450
                provider_id: INTERNAL_PROVIDER_ID,
12✔
451
                collection_id: collection_id.clone(),
12✔
452
            },
12✔
453
            name,
12✔
454
            description,
12✔
455
            items,
12✔
456
            entry_label: None,
12✔
457
            properties,
12✔
458
        })
12✔
459
    }
45✔
460

461
    #[allow(clippy::too_many_lines)]
462
    async fn search(
15✔
463
        &self,
15✔
464
        collection_id: &LayerCollectionId,
15✔
465
        search: SearchParameters,
15✔
466
    ) -> Result<LayerCollection> {
15✔
467
        let mut conn = self.conn_pool.get().await?;
15✔
468
        let tx = conn.build_transaction().start().await?;
15✔
469

470
        ensure!(
15✔
471
            self.has_permission_in_tx(collection_id.clone(), Permission::Read, &tx)
15✔
472
                .await?,
18✔
473
            error::PermissionDenied
×
474
        );
475
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
15✔
476
            crate::error::Error::IdStringMustBeUuid {
×
477
                found: collection_id.0.clone(),
×
478
            }
×
479
        })?;
15✔
480

481
        let stmt = tx
15✔
482
            .prepare(
15✔
483
                "
15✔
484
        SELECT name, description, properties
15✔
485
        FROM user_permitted_layer_collections p 
15✔
486
            JOIN layer_collections c ON (p.layer_collection_id = c.id) 
15✔
487
        WHERE p.user_id = $1 AND layer_collection_id = $2;",
15✔
488
            )
15✔
489
            .await?;
9✔
490

491
        let row = tx
15✔
492
            .query_one(&stmt, &[&self.session.user.id, &collection])
15✔
493
            .await?;
9✔
494

495
        let name: String = row.get(0);
15✔
496
        let description: String = row.get(1);
15✔
497
        let properties: Vec<Property> = row.get(2);
15✔
498

499
        let pattern = match search.search_type {
15✔
500
            SearchType::Fulltext => {
501
                format!("%{}%", search.search_string)
10✔
502
            }
503
            SearchType::Prefix => {
504
                format!("{}%", search.search_string)
5✔
505
            }
506
        };
507

508
        let stmt = tx.prepare(&create_search_query(true)).await?;
15✔
509

510
        let rows = tx
15✔
511
            .query(
15✔
512
                &stmt,
15✔
513
                &[
15✔
514
                    &collection,
15✔
515
                    &i64::from(search.limit),
15✔
516
                    &i64::from(search.offset),
15✔
517
                    &self.session.user.id,
15✔
518
                    &pattern,
15✔
519
                ],
15✔
520
            )
15✔
521
            .await?;
10✔
522

523
        let items = rows
15✔
524
            .into_iter()
15✔
525
            .map(|row| {
31✔
526
                let is_layer: bool = row.get(4);
31✔
527

31✔
528
                if is_layer {
31✔
529
                    Ok(CollectionItem::Layer(LayerListing {
13✔
530
                        id: ProviderLayerId {
13✔
531
                            provider_id: INTERNAL_PROVIDER_ID,
13✔
532
                            layer_id: LayerId(row.get(0)),
13✔
533
                        },
13✔
534
                        name: row.get(1),
13✔
535
                        description: row.get(2),
13✔
536
                        properties: row.get(3),
13✔
537
                    }))
13✔
538
                } else {
539
                    Ok(CollectionItem::Collection(LayerCollectionListing {
18✔
540
                        id: ProviderLayerCollectionId {
18✔
541
                            provider_id: INTERNAL_PROVIDER_ID,
18✔
542
                            collection_id: LayerCollectionId(row.get(0)),
18✔
543
                        },
18✔
544
                        name: row.get(1),
18✔
545
                        description: row.get(2),
18✔
546
                        properties: row.get(3),
18✔
547
                    }))
18✔
548
                }
549
            })
31✔
550
            .collect::<Result<Vec<CollectionItem>>>()?;
15✔
551

552
        tx.commit().await?;
15✔
553

554
        Ok(LayerCollection {
15✔
555
            id: ProviderLayerCollectionId {
15✔
556
                provider_id: INTERNAL_PROVIDER_ID,
15✔
557
                collection_id: collection_id.clone(),
15✔
558
            },
15✔
559
            name,
15✔
560
            description,
15✔
561
            items,
15✔
562
            entry_label: None,
15✔
563
            properties,
15✔
564
        })
15✔
565
    }
45✔
566

567
    #[allow(clippy::too_many_lines)]
568
    async fn autocomplete_search(
15✔
569
        &self,
15✔
570
        collection_id: &LayerCollectionId,
15✔
571
        search: SearchParameters,
15✔
572
    ) -> Result<Vec<String>> {
15✔
573
        let mut conn = self.conn_pool.get().await?;
15✔
574
        let tx = conn.build_transaction().start().await?;
15✔
575

576
        ensure!(
15✔
577
            self.has_permission_in_tx(collection_id.clone(), Permission::Read, &tx)
15✔
578
                .await?,
23✔
579
            error::PermissionDenied
×
580
        );
581
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
15✔
582
            crate::error::Error::IdStringMustBeUuid {
×
583
                found: collection_id.0.clone(),
×
584
            }
×
585
        })?;
15✔
586

587
        let pattern = match search.search_type {
15✔
588
            SearchType::Fulltext => {
589
                format!("%{}%", search.search_string)
10✔
590
            }
591
            SearchType::Prefix => {
592
                format!("{}%", search.search_string)
5✔
593
            }
594
        };
595

596
        let stmt = tx.prepare(&create_search_query(false)).await?;
15✔
597

598
        let rows = tx
15✔
599
            .query(
15✔
600
                &stmt,
15✔
601
                &[
15✔
602
                    &collection,
15✔
603
                    &i64::from(search.limit),
15✔
604
                    &i64::from(search.offset),
15✔
605
                    &self.session.user.id,
15✔
606
                    &pattern,
15✔
607
                ],
15✔
608
            )
15✔
609
            .await?;
11✔
610

611
        let items = rows
15✔
612
            .into_iter()
15✔
613
            .map(|row| Ok(row.get::<usize, &str>(0).to_string()))
31✔
614
            .collect::<Result<Vec<String>>>()?;
15✔
615

616
        tx.commit().await?;
15✔
617

618
        Ok(items)
15✔
619
    }
45✔
620

621
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
11✔
622
        Ok(LayerCollectionId(
11✔
623
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
11✔
624
        ))
11✔
625
    }
22✔
626

627
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
6✔
628
        let mut conn = self.conn_pool.get().await?;
6✔
629
        let tx = conn.build_transaction().start().await?;
6✔
630

631
        ensure!(
6✔
632
            self.has_permission_in_tx(id.clone(), Permission::Read, &tx)
6✔
633
                .await?,
10✔
634
            error::PermissionDenied
3✔
635
        );
636

637
        let layer_id =
3✔
638
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
639
                found: id.0.clone(),
×
640
            })?;
3✔
641

642
        let stmt = tx
3✔
643
            .prepare(
3✔
644
                "
3✔
645
            SELECT 
3✔
646
                l.name,
3✔
647
                l.description,
3✔
648
                w.workflow,
3✔
649
                l.symbology,
3✔
650
                l.properties,
3✔
651
                l.metadata
3✔
652
            FROM 
3✔
653
                layers l JOIN workflows w ON (l.workflow_id = w.id)
3✔
654
            WHERE 
3✔
655
                l.id = $1;",
3✔
656
            )
3✔
657
            .await?;
2✔
658

659
        let row = tx
3✔
660
            .query_one(&stmt, &[&layer_id])
3✔
661
            .await
2✔
662
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
3✔
663

664
        tx.commit().await?;
3✔
665

666
        Ok(Layer {
667
            id: ProviderLayerId {
3✔
668
                provider_id: INTERNAL_PROVIDER_ID,
3✔
669
                layer_id: id.clone(),
3✔
670
            },
3✔
671
            name: row.get(0),
3✔
672
            description: row.get(1),
3✔
673
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
3✔
674
            symbology: row.get(3),
3✔
675
            properties: row.get(4),
3✔
676
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
3✔
677
        })
678
    }
18✔
679
}
680

681
#[async_trait]
682
impl<Tls> LayerProviderDb for ProPostgresDb<Tls>
683
where
684
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
685
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
686
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
687
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
688
{
689
    async fn add_layer_provider(
1✔
690
        &self,
1✔
691
        provider: TypedDataProviderDefinition,
1✔
692
    ) -> Result<DataProviderId> {
1✔
693
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
694

695
        let conn = self.conn_pool.get().await?;
1✔
696

697
        let prio = DataProviderDefinition::<Self>::priority(&provider);
1✔
698
        let clamp_prio = prio.clamp(-1000, 1000);
1✔
699

1✔
700
        if prio != clamp_prio {
1✔
701
            log::warn!(
×
702
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
703
                DataProviderDefinition::<Self>::name(&provider),
×
704
                prio,
705
                clamp_prio
706
            );
707
        }
1✔
708

709
        let stmt = conn
1✔
710
            .prepare(
1✔
711
                "
1✔
712
              INSERT INTO layer_providers (
1✔
713
                  id, 
1✔
714
                  type_name, 
1✔
715
                  name,
1✔
716
                  definition,
1✔
717
                  priority
1✔
718
              )
1✔
719
              VALUES ($1, $2, $3, $4, $5)",
1✔
720
            )
1✔
721
            .await?;
33✔
722

723
        let id = DataProviderDefinition::<Self>::id(&provider);
1✔
724
        conn.execute(
1✔
725
            &stmt,
1✔
726
            &[
1✔
727
                &id,
1✔
728
                &DataProviderDefinition::<Self>::type_name(&provider),
1✔
729
                &DataProviderDefinition::<Self>::name(&provider),
1✔
730
                &provider,
1✔
731
                &clamp_prio,
1✔
732
            ],
1✔
733
        )
1✔
734
        .await?;
1✔
735
        Ok(id)
1✔
736
    }
3✔
737

738
    async fn list_layer_providers(
2✔
739
        &self,
2✔
740
        options: LayerProviderListingOptions,
2✔
741
    ) -> Result<Vec<LayerProviderListing>> {
2✔
742
        // TODO: permission
743
        let conn = self.conn_pool.get().await?;
2✔
744

745
        let stmt = conn
2✔
746
            .prepare(
2✔
747
                "(
2✔
748
                    SELECT 
2✔
749
                        id, 
2✔
750
                        name,
2✔
751
                        type_name,
2✔
752
                        priority
2✔
753
                    FROM 
2✔
754
                        layer_providers
2✔
755
                    WHERE
2✔
756
                        priority > -1000
2✔
757
                    UNION ALL
2✔
758
                    SELECT 
2✔
759
                        id, 
2✔
760
                        name,
2✔
761
                        type_name,
2✔
762
                        priority
2✔
763
                    FROM 
2✔
764
                        pro_layer_providers
2✔
765
                    WHERE
2✔
766
                        priority > -1000
2✔
767
                )
2✔
768
                ORDER BY priority desc, name ASC
2✔
769
                LIMIT $1 
2✔
770
                OFFSET $2;",
2✔
771
            )
2✔
772
            .await?;
2✔
773

774
        let rows = conn
2✔
775
            .query(
2✔
776
                &stmt,
2✔
777
                &[&i64::from(options.limit), &i64::from(options.offset)],
2✔
778
            )
2✔
779
            .await?;
2✔
780

781
        Ok(rows
2✔
782
            .iter()
2✔
783
            .map(|row| LayerProviderListing {
2✔
784
                id: row.get(0),
2✔
785
                name: row.get(1),
2✔
786
                priority: row.get(3),
2✔
787
            })
2✔
788
            .collect())
2✔
789
    }
6✔
790

791
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
2✔
792
        // TODO: permissions
793
        let conn = self.conn_pool.get().await?;
2✔
794

795
        let stmt = conn
2✔
796
            .prepare(
2✔
797
                "SELECT
2✔
798
                    definition, NULL AS pro_definition
2✔
799
                FROM
2✔
800
                    layer_providers
2✔
801
                WHERE
2✔
802
                    id = $1
2✔
803
                UNION ALL
2✔
804
                SELECT
2✔
805
                    NULL AS definition, definition AS pro_definition
2✔
806
                FROM
2✔
807
                    pro_layer_providers
2✔
808
                WHERE
2✔
809
                    id = $1",
2✔
810
            )
2✔
811
            .await?;
49✔
812

813
        let row = conn.query_one(&stmt, &[&id]).await?;
2✔
814

815
        if let Some(definition) = row.get::<_, Option<TypedDataProviderDefinition>>(0) {
2✔
816
            return Box::new(definition)
1✔
817
                .initialize(ProPostgresDb {
1✔
818
                    conn_pool: self.conn_pool.clone(),
1✔
819
                    session: self.session.clone(),
1✔
820
                })
1✔
821
                .await;
12✔
822
        }
1✔
823

1✔
824
        let pro_definition: TypedProDataProviderDefinition = row.get(1);
1✔
825
        Box::new(pro_definition)
1✔
826
            .initialize(ProPostgresDb {
1✔
827
                conn_pool: self.conn_pool.clone(),
1✔
828
                session: self.session.clone(),
1✔
829
            })
1✔
830
            .await
×
831
    }
6✔
832
}
833

834
#[async_trait]
835
pub trait ProLayerProviderDb: Send + Sync + 'static {
836
    async fn add_pro_layer_provider(
837
        &self,
838
        provider: TypedProDataProviderDefinition,
839
    ) -> Result<DataProviderId>;
840
}
841

842
#[async_trait]
843
impl<Tls> ProLayerProviderDb for ProPostgresDb<Tls>
844
where
845
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
846
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
847
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
848
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
849
{
850
    async fn add_pro_layer_provider(
1✔
851
        &self,
1✔
852
        provider: TypedProDataProviderDefinition,
1✔
853
    ) -> Result<DataProviderId> {
1✔
854
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
855

856
        let conn = self.conn_pool.get().await?;
1✔
857

858
        let prio = DataProviderDefinition::<Self>::priority(&provider);
1✔
859
        let clamp_prio = prio.clamp(-1000, 1000);
1✔
860

1✔
861
        if prio != clamp_prio {
1✔
862
            log::warn!(
×
863
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
864
                DataProviderDefinition::<Self>::name(&provider),
×
865
                prio,
866
                clamp_prio
867
            );
868
        }
1✔
869

870
        let stmt = conn
1✔
871
            .prepare(
1✔
872
                "
1✔
873
              INSERT INTO pro_layer_providers (
1✔
874
                  id, 
1✔
875
                  type_name, 
1✔
876
                  name,
1✔
877
                  definition,
1✔
878
                  priority
1✔
879
              )
1✔
880
              VALUES ($1, $2, $3, $4, $5)",
1✔
881
            )
1✔
882
            .await?;
20✔
883

884
        let id = DataProviderDefinition::<Self>::id(&provider);
1✔
885
        conn.execute(
1✔
886
            &stmt,
1✔
887
            &[
1✔
888
                &id,
1✔
889
                &DataProviderDefinition::<Self>::type_name(&provider),
1✔
890
                &DataProviderDefinition::<Self>::name(&provider),
1✔
891
                &provider,
1✔
892
                &clamp_prio,
1✔
893
            ],
1✔
894
        )
1✔
895
        .await?;
1✔
896
        Ok(id)
1✔
897
    }
3✔
898
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc