• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 8167499361

06 Mar 2024 05:40AM UTC coverage: 90.514% (-0.07%) from 90.58%
8167499361

push

github

web-flow
Merge pull request #936 from geo-engine/manage_permissions

Manage permissions

261 of 404 new or added lines in 16 files covered. (64.6%)

25 existing lines in 8 files now uncovered.

128619 of 142098 relevant lines covered (90.51%)

54399.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.09
/services/src/pro/layers/postgres_layer_db.rs
1
use crate::error;
2
use crate::layers::external::TypedDataProviderDefinition;
3
use crate::layers::layer::Property;
4
use crate::layers::listing::{
5
    ProviderCapabilities, SearchCapabilities, SearchParameters, SearchType, SearchTypes,
6
};
7
use crate::layers::postgres_layer_db::{
8
    delete_layer_collection, delete_layer_collection_from_parent, delete_layer_from_collection,
9
    insert_collection_parent, insert_layer, insert_layer_collection_with_id,
10
};
11
use crate::pro::contexts::ProPostgresDb;
12
use crate::pro::datasets::TypedProDataProviderDefinition;
13
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
14
use crate::pro::permissions::{Permission, RoleId};
15
use crate::{
16
    error::Result,
17
    layers::{
18
        external::{DataProvider, DataProviderDefinition},
19
        layer::{
20
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
21
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
22
            ProviderLayerCollectionId, ProviderLayerId,
23
        },
24
        listing::{LayerCollectionId, LayerCollectionProvider},
25
        storage::{
26
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
27
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
28
        },
29
        LayerDbError,
30
    },
31
};
32
use async_trait::async_trait;
33
use bb8_postgres::tokio_postgres::{
34
    tls::{MakeTlsConnect, TlsConnect},
35
    Socket,
36
};
37
use geoengine_datatypes::dataset::{DataProviderId, LayerId};
38
use geoengine_datatypes::error::BoxedResultExt;
39
use geoengine_datatypes::util::HashMapTextTextDbType;
40
use snafu::{ensure, ResultExt};
41
use std::str::FromStr;
42
use uuid::Uuid;
43

44
#[async_trait]
45
impl<Tls> LayerDb for ProPostgresDb<Tls>
46
where
47
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
48
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
49
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
50
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
51
{
52
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
16✔
53
        let layer_id = Uuid::new_v4();
16✔
54
        let layer_id = LayerId(layer_id.to_string());
16✔
55

16✔
56
        self.add_layer_with_id(&layer_id, layer, collection).await?;
251✔
57

58
        Ok(layer_id)
16✔
59
    }
48✔
60

61
    async fn add_layer_with_id(
16✔
62
        &self,
16✔
63
        id: &LayerId,
16✔
64
        layer: AddLayer,
16✔
65
        collection: &LayerCollectionId,
16✔
66
    ) -> Result<()> {
16✔
67
        let mut conn = self.conn_pool.get().await?;
16✔
68
        let trans = conn.build_transaction().start().await?;
16✔
69

70
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &trans)
16✔
71
            .await
34✔
72
            .boxed_context(crate::error::PermissionDb)?;
16✔
73

74
        let layer_id = insert_layer(&trans, id, layer, collection).await?;
166✔
75

76
        // TODO: `ON CONFLICT DO NOTHING` means, we do not get an error if the permission already exists.
77
        //       Do we want that, or should we report an error and let the caller decide whether to ignore it?
78
        //       We should decide that and adjust all places where `ON CONFLICT DO NOTHING` is used.
79
        let stmt = trans
16✔
80
            .prepare(
16✔
81
                "
16✔
82
            INSERT INTO permissions (role_id, permission, layer_id)
16✔
83
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
16✔
84
            )
16✔
85
            .await?;
9✔
86

87
        trans
16✔
88
            .execute(
16✔
89
                &stmt,
16✔
90
                &[
16✔
91
                    &RoleId::from(self.session.user.id),
16✔
92
                    &Permission::Owner,
16✔
93
                    &layer_id,
16✔
94
                ],
16✔
95
            )
16✔
96
            .await?;
9✔
97

98
        trans.commit().await?;
16✔
99

100
        Ok(())
16✔
101
    }
48✔
102

103
    async fn add_layer_to_collection(
2✔
104
        &self,
2✔
105
        layer: &LayerId,
2✔
106
        collection: &LayerCollectionId,
2✔
107
    ) -> Result<()> {
2✔
108
        let mut conn = self.conn_pool.get().await?;
2✔
109
        let tx = conn.build_transaction().start().await?;
2✔
110

111
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &tx)
2✔
112
            .await
4✔
113
            .boxed_context(crate::error::PermissionDb)?;
2✔
114

115
        let layer_id =
1✔
116
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
117
                found: layer.0.clone(),
1✔
118
            })?;
2✔
119

120
        let collection_id =
1✔
121
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
122
                found: collection.0.clone(),
×
123
            })?;
1✔
124

125
        let stmt = tx
1✔
126
            .prepare(
1✔
127
                "
1✔
128
            INSERT INTO collection_layers (collection, layer)
1✔
129
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
1✔
130
            )
1✔
131
            .await?;
1✔
132

133
        tx.execute(&stmt, &[&collection_id, &layer_id]).await?;
1✔
134

135
        tx.commit().await?;
1✔
136

137
        Ok(())
1✔
138
    }
6✔
139

140
    async fn add_layer_collection(
19✔
141
        &self,
19✔
142
        collection: AddLayerCollection,
19✔
143
        parent: &LayerCollectionId,
19✔
144
    ) -> Result<LayerCollectionId> {
19✔
145
        let collection_id = Uuid::new_v4();
19✔
146
        let collection_id = LayerCollectionId(collection_id.to_string());
19✔
147

19✔
148
        self.add_layer_collection_with_id(&collection_id, collection, parent)
19✔
149
            .await?;
174✔
150

151
        Ok(collection_id)
19✔
152
    }
57✔
153

154
    async fn add_layer_collection_with_id(
19✔
155
        &self,
19✔
156
        id: &LayerCollectionId,
19✔
157
        collection: AddLayerCollection,
19✔
158
        parent: &LayerCollectionId,
19✔
159
    ) -> Result<()> {
19✔
160
        let mut conn = self.conn_pool.get().await?;
19✔
161
        let trans = conn.build_transaction().start().await?;
19✔
162

163
        self.ensure_permission_in_tx(parent.clone().into(), Permission::Owner, &trans)
19✔
164
            .await
37✔
165
            .boxed_context(crate::error::PermissionDb)?;
19✔
166

167
        let collection_id = insert_layer_collection_with_id(&trans, id, collection, parent).await?;
66✔
168

169
        let stmt = trans
19✔
170
            .prepare(
19✔
171
                "
19✔
172
            INSERT INTO permissions (role_id, permission, layer_collection_id)
19✔
173
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
19✔
174
            )
19✔
175
            .await?;
13✔
176

177
        trans
19✔
178
            .execute(
19✔
179
                &stmt,
19✔
180
                &[
19✔
181
                    &RoleId::from(self.session.user.id),
19✔
182
                    &Permission::Owner,
19✔
183
                    &collection_id,
19✔
184
                ],
19✔
185
            )
19✔
186
            .await?;
15✔
187

188
        trans.commit().await?;
19✔
189

190
        Ok(())
19✔
191
    }
57✔
192

193
    async fn add_collection_to_parent(
1✔
194
        &self,
1✔
195
        collection: &LayerCollectionId,
1✔
196
        parent: &LayerCollectionId,
1✔
197
    ) -> Result<()> {
1✔
198
        let conn = self.conn_pool.get().await?;
1✔
199
        insert_collection_parent(&conn, collection, parent).await
2✔
200
    }
3✔
201

202
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
3✔
203
        let mut conn = self.conn_pool.get().await?;
3✔
204
        let transaction = conn.build_transaction().start().await?;
3✔
205

206
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
3✔
207
            .await
6✔
208
            .boxed_context(crate::error::PermissionDb)?;
3✔
209

210
        delete_layer_collection(&transaction, collection).await?;
12✔
211

212
        transaction.commit().await.map_err(Into::into)
2✔
213
    }
9✔
214

215
    async fn remove_layer_from_collection(
2✔
216
        &self,
2✔
217
        layer: &LayerId,
2✔
218
        collection: &LayerCollectionId,
2✔
219
    ) -> Result<()> {
2✔
220
        let mut conn = self.conn_pool.get().await?;
2✔
221
        let transaction = conn.build_transaction().start().await?;
2✔
222

223
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
2✔
224
            .await
4✔
225
            .boxed_context(crate::error::PermissionDb)?;
2✔
226

227
        delete_layer_from_collection(&transaction, layer, collection).await?;
8✔
228

229
        transaction.commit().await.map_err(Into::into)
2✔
230
    }
6✔
231

232
    async fn remove_layer_collection_from_parent(
1✔
233
        &self,
1✔
234
        collection: &LayerCollectionId,
1✔
235
        parent: &LayerCollectionId,
1✔
236
    ) -> Result<()> {
1✔
237
        let mut conn = self.conn_pool.get().await?;
1✔
238
        let transaction = conn.build_transaction().start().await?;
1✔
239

240
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
1✔
241
            .await
2✔
242
            .boxed_context(crate::error::PermissionDb)?;
1✔
243

244
        delete_layer_collection_from_parent(&transaction, collection, parent).await?;
8✔
245

246
        transaction.commit().await.map_err(Into::into)
1✔
247
    }
3✔
248
}
249

250
fn create_search_query(full_info: bool) -> String {
30✔
251
    format!("
30✔
252
        WITH RECURSIVE parents AS (
30✔
253
            SELECT $1::uuid as id
30✔
254
            UNION ALL SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)
30✔
255
        )
30✔
256
        SELECT DISTINCT *
30✔
257
        FROM (
30✔
258
            SELECT 
30✔
259
                {}
30✔
260
            FROM user_permitted_layer_collections u
30✔
261
                JOIN layer_collections lc ON (u.layer_collection_id = lc.id)
30✔
262
                JOIN (SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)) cc ON (id = cc.child)
30✔
263
            WHERE u.user_id = $4 AND name ILIKE $5
30✔
264
        ) u UNION (
30✔
265
            SELECT 
30✔
266
                {}
30✔
267
            FROM user_permitted_layers ul
30✔
268
                JOIN layers uc ON (ul.layer_id = uc.id)
30✔
269
                JOIN (SELECT DISTINCT layer FROM collection_layers JOIN parents ON (collection = id)) cl ON (id = cl.layer)
30✔
270
            WHERE ul.user_id = $4 AND name ILIKE $5
30✔
271
        )
30✔
272
        ORDER BY {}name ASC
30✔
273
        LIMIT $2 
30✔
274
        OFFSET $3;",
30✔
275
        if full_info {
30✔
276
            "concat(id, '') AS id,
15✔
277
        name,
15✔
278
        description,
15✔
279
        properties,
15✔
280
        FALSE AS is_layer"
15✔
281
        } else { "name" },
15✔
282
        if full_info {
30✔
283
            "concat(id, '') AS id,
15✔
284
        name,
15✔
285
        description,
15✔
286
        properties,
15✔
287
        TRUE AS is_layer"
15✔
288
        } else { "name" },
15✔
289
        if full_info { "is_layer ASC," } else { "" })
30✔
290
}
30✔
291

292
#[async_trait]
293
impl<Tls> LayerCollectionProvider for ProPostgresDb<Tls>
294
where
295
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
296
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
297
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
298
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
299
{
300
    fn capabilities(&self) -> ProviderCapabilities {
1✔
301
        ProviderCapabilities {
1✔
302
            listing: true,
1✔
303
            search: SearchCapabilities {
1✔
304
                search_types: SearchTypes {
1✔
305
                    fulltext: true,
1✔
306
                    prefix: true,
1✔
307
                },
1✔
308
                autocomplete: true,
1✔
309
                filters: None,
1✔
310
            },
1✔
311
        }
1✔
312
    }
1✔
313

314
    fn name(&self) -> &str {
×
315
        "Postgres Layer Collection Provider (Pro)"
×
316
    }
×
317

318
    fn description(&self) -> &str {
×
319
        "Layer collection provider for Postgres (Pro)"
×
320
    }
×
321

322
    #[allow(clippy::too_many_lines)]
323
    async fn load_layer_collection(
14✔
324
        &self,
14✔
325
        collection_id: &LayerCollectionId,
14✔
326
        options: LayerCollectionListOptions,
14✔
327
    ) -> Result<LayerCollection> {
14✔
328
        let mut conn = self.conn_pool.get().await?;
14✔
329
        let tx = conn.build_transaction().start().await?;
14✔
330

331
        self.ensure_permission_in_tx(collection_id.clone().into(), Permission::Read, &tx)
14✔
332
            .await
28✔
333
            .boxed_context(crate::error::PermissionDb)?;
14✔
334

335
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
11✔
336
            crate::error::Error::IdStringMustBeUuid {
×
337
                found: collection_id.0.clone(),
×
338
            }
×
339
        })?;
11✔
340

341
        let stmt = tx
11✔
342
            .prepare(
11✔
343
                "
11✔
344
        SELECT name, description, properties
11✔
345
        FROM user_permitted_layer_collections p 
11✔
346
            JOIN layer_collections c ON (p.layer_collection_id = c.id) 
11✔
347
        WHERE p.user_id = $1 AND layer_collection_id = $2;",
11✔
348
            )
11✔
349
            .await?;
11✔
350

351
        let row = tx
11✔
352
            .query_one(&stmt, &[&self.session.user.id, &collection])
11✔
353
            .await?;
11✔
354

355
        let name: String = row.get(0);
11✔
356
        let description: String = row.get(1);
11✔
357
        let properties: Vec<Property> = row.get(2);
11✔
358

359
        let stmt = tx
11✔
360
            .prepare(
11✔
361
                "
11✔
362
        SELECT DISTINCT id, name, description, properties, is_layer
11✔
363
        FROM (
11✔
364
            SELECT 
11✔
365
                concat(id, '') AS id, 
11✔
366
                name, 
11✔
367
                description, 
11✔
368
                properties, 
11✔
369
                FALSE AS is_layer
11✔
370
            FROM user_permitted_layer_collections u 
11✔
371
                JOIN layer_collections lc ON (u.layer_collection_id = lc.id)
11✔
372
                JOIN collection_children cc ON (layer_collection_id = cc.child)
11✔
373
            WHERE u.user_id = $4 AND cc.parent = $1
11✔
374
        ) u UNION (
11✔
375
            SELECT 
11✔
376
                concat(id, '') AS id, 
11✔
377
                name, 
11✔
378
                description, 
11✔
379
                properties, 
11✔
380
                TRUE AS is_layer
11✔
381
            FROM user_permitted_layers ul
11✔
382
                JOIN layers uc ON (ul.layer_id = uc.id) 
11✔
383
                JOIN collection_layers cl ON (layer_id = cl.layer)
11✔
384
            WHERE ul.user_id = $4 AND cl.collection = $1
11✔
385
        )
11✔
386
        ORDER BY is_layer ASC, name ASC
11✔
387
        LIMIT $2 
11✔
388
        OFFSET $3;            
11✔
389
        ",
11✔
390
            )
11✔
391
            .await?;
11✔
392

393
        let rows = tx
11✔
394
            .query(
11✔
395
                &stmt,
11✔
396
                &[
11✔
397
                    &collection,
11✔
398
                    &i64::from(options.limit),
11✔
399
                    &i64::from(options.offset),
11✔
400
                    &self.session.user.id,
11✔
401
                ],
11✔
402
            )
11✔
403
            .await?;
11✔
404

405
        let items = rows
11✔
406
            .into_iter()
11✔
407
            .map(|row| {
14✔
408
                let is_layer: bool = row.get(4);
14✔
409

14✔
410
                if is_layer {
14✔
411
                    Ok(CollectionItem::Layer(LayerListing {
5✔
412
                        id: ProviderLayerId {
5✔
413
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
414
                            layer_id: LayerId(row.get(0)),
5✔
415
                        },
5✔
416
                        name: row.get(1),
5✔
417
                        description: row.get(2),
5✔
418
                        properties: row.get(3),
5✔
419
                    }))
5✔
420
                } else {
421
                    Ok(CollectionItem::Collection(LayerCollectionListing {
9✔
422
                        id: ProviderLayerCollectionId {
9✔
423
                            provider_id: INTERNAL_PROVIDER_ID,
9✔
424
                            collection_id: LayerCollectionId(row.get(0)),
9✔
425
                        },
9✔
426
                        name: row.get(1),
9✔
427
                        description: row.get(2),
9✔
428
                        properties: row.get(3),
9✔
429
                    }))
9✔
430
                }
431
            })
14✔
432
            .collect::<Result<Vec<CollectionItem>>>()?;
11✔
433

434
        tx.commit().await?;
11✔
435

436
        Ok(LayerCollection {
11✔
437
            id: ProviderLayerCollectionId {
11✔
438
                provider_id: INTERNAL_PROVIDER_ID,
11✔
439
                collection_id: collection_id.clone(),
11✔
440
            },
11✔
441
            name,
11✔
442
            description,
11✔
443
            items,
11✔
444
            entry_label: None,
11✔
445
            properties,
11✔
446
        })
11✔
447
    }
42✔
448

449
    #[allow(clippy::too_many_lines)]
450
    async fn search(
15✔
451
        &self,
15✔
452
        collection_id: &LayerCollectionId,
15✔
453
        search: SearchParameters,
15✔
454
    ) -> Result<LayerCollection> {
15✔
455
        let mut conn = self.conn_pool.get().await?;
15✔
456
        let tx = conn.build_transaction().start().await?;
15✔
457

458
        self.ensure_permission_in_tx(collection_id.clone().into(), Permission::Read, &tx)
15✔
459
            .await
33✔
460
            .boxed_context(crate::error::PermissionDb)?;
15✔
461

462
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
15✔
463
            crate::error::Error::IdStringMustBeUuid {
×
464
                found: collection_id.0.clone(),
×
465
            }
×
466
        })?;
15✔
467

468
        let stmt = tx
15✔
469
            .prepare(
15✔
470
                "
15✔
471
        SELECT name, description, properties
15✔
472
        FROM user_permitted_layer_collections p 
15✔
473
            JOIN layer_collections c ON (p.layer_collection_id = c.id) 
15✔
474
        WHERE p.user_id = $1 AND layer_collection_id = $2;",
15✔
475
            )
15✔
476
            .await?;
18✔
477

478
        let row = tx
15✔
479
            .query_one(&stmt, &[&self.session.user.id, &collection])
15✔
480
            .await?;
14✔
481

482
        let name: String = row.get(0);
15✔
483
        let description: String = row.get(1);
15✔
484
        let properties: Vec<Property> = row.get(2);
15✔
485

486
        let pattern = match search.search_type {
15✔
487
            SearchType::Fulltext => {
488
                format!("%{}%", search.search_string)
10✔
489
            }
490
            SearchType::Prefix => {
491
                format!("{}%", search.search_string)
5✔
492
            }
493
        };
494

495
        let stmt = tx.prepare(&create_search_query(true)).await?;
15✔
496

497
        let rows = tx
15✔
498
            .query(
15✔
499
                &stmt,
15✔
500
                &[
15✔
501
                    &collection,
15✔
502
                    &i64::from(search.limit),
15✔
503
                    &i64::from(search.offset),
15✔
504
                    &self.session.user.id,
15✔
505
                    &pattern,
15✔
506
                ],
15✔
507
            )
15✔
508
            .await?;
14✔
509

510
        let items = rows
15✔
511
            .into_iter()
15✔
512
            .map(|row| {
31✔
513
                let is_layer: bool = row.get(4);
31✔
514

31✔
515
                if is_layer {
31✔
516
                    Ok(CollectionItem::Layer(LayerListing {
13✔
517
                        id: ProviderLayerId {
13✔
518
                            provider_id: INTERNAL_PROVIDER_ID,
13✔
519
                            layer_id: LayerId(row.get(0)),
13✔
520
                        },
13✔
521
                        name: row.get(1),
13✔
522
                        description: row.get(2),
13✔
523
                        properties: row.get(3),
13✔
524
                    }))
13✔
525
                } else {
526
                    Ok(CollectionItem::Collection(LayerCollectionListing {
18✔
527
                        id: ProviderLayerCollectionId {
18✔
528
                            provider_id: INTERNAL_PROVIDER_ID,
18✔
529
                            collection_id: LayerCollectionId(row.get(0)),
18✔
530
                        },
18✔
531
                        name: row.get(1),
18✔
532
                        description: row.get(2),
18✔
533
                        properties: row.get(3),
18✔
534
                    }))
18✔
535
                }
536
            })
31✔
537
            .collect::<Result<Vec<CollectionItem>>>()?;
15✔
538

539
        tx.commit().await?;
15✔
540

541
        Ok(LayerCollection {
15✔
542
            id: ProviderLayerCollectionId {
15✔
543
                provider_id: INTERNAL_PROVIDER_ID,
15✔
544
                collection_id: collection_id.clone(),
15✔
545
            },
15✔
546
            name,
15✔
547
            description,
15✔
548
            items,
15✔
549
            entry_label: None,
15✔
550
            properties,
15✔
551
        })
15✔
552
    }
45✔
553

554
    #[allow(clippy::too_many_lines)]
555
    async fn autocomplete_search(
15✔
556
        &self,
15✔
557
        collection_id: &LayerCollectionId,
15✔
558
        search: SearchParameters,
15✔
559
    ) -> Result<Vec<String>> {
15✔
560
        let mut conn = self.conn_pool.get().await?;
15✔
561
        let tx = conn.build_transaction().start().await?;
15✔
562

563
        self.ensure_permission_in_tx(collection_id.clone().into(), Permission::Read, &tx)
15✔
564
            .await
26✔
565
            .boxed_context(crate::error::PermissionDb)?;
15✔
566

567
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
15✔
568
            crate::error::Error::IdStringMustBeUuid {
×
569
                found: collection_id.0.clone(),
×
570
            }
×
571
        })?;
15✔
572

573
        let pattern = match search.search_type {
15✔
574
            SearchType::Fulltext => {
575
                format!("%{}%", search.search_string)
10✔
576
            }
577
            SearchType::Prefix => {
578
                format!("{}%", search.search_string)
5✔
579
            }
580
        };
581

582
        let stmt = tx.prepare(&create_search_query(false)).await?;
15✔
583

584
        let rows = tx
15✔
585
            .query(
15✔
586
                &stmt,
15✔
587
                &[
15✔
588
                    &collection,
15✔
589
                    &i64::from(search.limit),
15✔
590
                    &i64::from(search.offset),
15✔
591
                    &self.session.user.id,
15✔
592
                    &pattern,
15✔
593
                ],
15✔
594
            )
15✔
595
            .await?;
13✔
596

597
        let items = rows
15✔
598
            .into_iter()
15✔
599
            .map(|row| Ok(row.get::<usize, &str>(0).to_string()))
31✔
600
            .collect::<Result<Vec<String>>>()?;
15✔
601

602
        tx.commit().await?;
15✔
603

604
        Ok(items)
15✔
605
    }
45✔
606

607
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
11✔
608
        Ok(LayerCollectionId(
11✔
609
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
11✔
610
        ))
11✔
611
    }
22✔
612

613
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
6✔
614
        let mut conn = self.conn_pool.get().await?;
6✔
615
        let tx = conn.build_transaction().start().await?;
6✔
616

617
        self.ensure_permission_in_tx(id.clone().into(), Permission::Read, &tx)
6✔
618
            .await
10✔
619
            .boxed_context(crate::error::PermissionDb)?;
6✔
620

621
        let layer_id =
3✔
622
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
623
                found: id.0.clone(),
×
624
            })?;
3✔
625

626
        let stmt = tx
3✔
627
            .prepare(
3✔
628
                "
3✔
629
            SELECT 
3✔
630
                l.name,
3✔
631
                l.description,
3✔
632
                w.workflow,
3✔
633
                l.symbology,
3✔
634
                l.properties,
3✔
635
                l.metadata
3✔
636
            FROM 
3✔
637
                layers l JOIN workflows w ON (l.workflow_id = w.id)
3✔
638
            WHERE 
3✔
639
                l.id = $1;",
3✔
640
            )
3✔
641
            .await?;
3✔
642

643
        let row = tx
3✔
644
            .query_one(&stmt, &[&layer_id])
3✔
645
            .await
3✔
646
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
3✔
647

648
        tx.commit().await?;
3✔
649

650
        Ok(Layer {
651
            id: ProviderLayerId {
3✔
652
                provider_id: INTERNAL_PROVIDER_ID,
3✔
653
                layer_id: id.clone(),
3✔
654
            },
3✔
655
            name: row.get(0),
3✔
656
            description: row.get(1),
3✔
657
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
3✔
658
            symbology: row.get(3),
3✔
659
            properties: row.get(4),
3✔
660
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
3✔
661
        })
662
    }
18✔
663
}
664

665
#[async_trait]
666
impl<Tls> LayerProviderDb for ProPostgresDb<Tls>
667
where
668
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
669
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
670
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
671
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
672
{
673
    async fn add_layer_provider(
1✔
674
        &self,
1✔
675
        provider: TypedDataProviderDefinition,
1✔
676
    ) -> Result<DataProviderId> {
1✔
677
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
678

679
        let conn = self.conn_pool.get().await?;
1✔
680

681
        let prio = DataProviderDefinition::<Self>::priority(&provider);
1✔
682
        let clamp_prio = prio.clamp(-1000, 1000);
1✔
683

1✔
684
        if prio != clamp_prio {
1✔
685
            log::warn!(
×
686
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
687
                DataProviderDefinition::<Self>::name(&provider),
×
688
                prio,
689
                clamp_prio
690
            );
691
        }
1✔
692

693
        let stmt = conn
1✔
694
            .prepare(
1✔
695
                "
1✔
696
              INSERT INTO layer_providers (
1✔
697
                  id, 
1✔
698
                  type_name, 
1✔
699
                  name,
1✔
700
                  definition,
1✔
701
                  priority
1✔
702
              )
1✔
703
              VALUES ($1, $2, $3, $4, $5)",
1✔
704
            )
1✔
705
            .await?;
1✔
706

707
        let id = DataProviderDefinition::<Self>::id(&provider);
1✔
708
        conn.execute(
1✔
709
            &stmt,
1✔
710
            &[
1✔
711
                &id,
1✔
712
                &DataProviderDefinition::<Self>::type_name(&provider),
1✔
713
                &DataProviderDefinition::<Self>::name(&provider),
1✔
714
                &provider,
1✔
715
                &clamp_prio,
1✔
716
            ],
1✔
717
        )
1✔
718
        .await?;
1✔
719
        Ok(id)
1✔
720
    }
3✔
721

722
    async fn list_layer_providers(
2✔
723
        &self,
2✔
724
        options: LayerProviderListingOptions,
2✔
725
    ) -> Result<Vec<LayerProviderListing>> {
2✔
726
        // TODO: permission
727
        let conn = self.conn_pool.get().await?;
2✔
728

729
        let stmt = conn
2✔
730
            .prepare(
2✔
731
                "(
2✔
732
                    SELECT 
2✔
733
                        id, 
2✔
734
                        name,
2✔
735
                        type_name,
2✔
736
                        priority
2✔
737
                    FROM 
2✔
738
                        layer_providers
2✔
739
                    WHERE
2✔
740
                        priority > -1000
2✔
741
                    UNION ALL
2✔
742
                    SELECT 
2✔
743
                        id, 
2✔
744
                        name,
2✔
745
                        type_name,
2✔
746
                        priority
2✔
747
                    FROM 
2✔
748
                        pro_layer_providers
2✔
749
                    WHERE
2✔
750
                        priority > -1000
2✔
751
                )
2✔
752
                ORDER BY priority desc, name ASC
2✔
753
                LIMIT $1 
2✔
754
                OFFSET $2;",
2✔
755
            )
2✔
UNCOV
756
            .await?;
×
757

758
        let rows = conn
2✔
759
            .query(
2✔
760
                &stmt,
2✔
761
                &[&i64::from(options.limit), &i64::from(options.offset)],
2✔
762
            )
2✔
UNCOV
763
            .await?;
×
764

765
        Ok(rows
2✔
766
            .iter()
2✔
767
            .map(|row| LayerProviderListing {
2✔
768
                id: row.get(0),
2✔
769
                name: row.get(1),
2✔
770
                priority: row.get(3),
2✔
771
            })
2✔
772
            .collect())
2✔
773
    }
6✔
774

775
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
2✔
776
        // TODO: permissions
777
        let conn = self.conn_pool.get().await?;
2✔
778

779
        let stmt = conn
2✔
780
            .prepare(
2✔
781
                "SELECT
2✔
782
                    definition, NULL AS pro_definition
2✔
783
                FROM
2✔
784
                    layer_providers
2✔
785
                WHERE
2✔
786
                    id = $1
2✔
787
                UNION ALL
2✔
788
                SELECT
2✔
789
                    NULL AS definition, definition AS pro_definition
2✔
790
                FROM
2✔
791
                    pro_layer_providers
2✔
792
                WHERE
2✔
793
                    id = $1",
2✔
794
            )
2✔
795
            .await?;
36✔
796

797
        let row = conn.query_one(&stmt, &[&id]).await?;
2✔
798

799
        if let Some(definition) = row.get::<_, Option<TypedDataProviderDefinition>>(0) {
2✔
800
            return Box::new(definition)
1✔
801
                .initialize(ProPostgresDb {
1✔
802
                    conn_pool: self.conn_pool.clone(),
1✔
803
                    session: self.session.clone(),
1✔
804
                })
1✔
805
                .await;
12✔
806
        }
1✔
807

1✔
808
        let pro_definition: TypedProDataProviderDefinition = row.get(1);
1✔
809
        Box::new(pro_definition)
1✔
810
            .initialize(ProPostgresDb {
1✔
811
                conn_pool: self.conn_pool.clone(),
1✔
812
                session: self.session.clone(),
1✔
813
            })
1✔
814
            .await
×
815
    }
6✔
816
}
817

818
#[async_trait]
819
pub trait ProLayerProviderDb: Send + Sync + 'static {
820
    async fn add_pro_layer_provider(
821
        &self,
822
        provider: TypedProDataProviderDefinition,
823
    ) -> Result<DataProviderId>;
824
}
825

826
#[async_trait]
827
impl<Tls> ProLayerProviderDb for ProPostgresDb<Tls>
828
where
829
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
830
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
831
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
832
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
833
{
834
    async fn add_pro_layer_provider(
1✔
835
        &self,
1✔
836
        provider: TypedProDataProviderDefinition,
1✔
837
    ) -> Result<DataProviderId> {
1✔
838
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
839

840
        let conn = self.conn_pool.get().await?;
1✔
841

842
        let prio = DataProviderDefinition::<Self>::priority(&provider);
1✔
843
        let clamp_prio = prio.clamp(-1000, 1000);
1✔
844

1✔
845
        if prio != clamp_prio {
1✔
846
            log::warn!(
×
847
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
848
                DataProviderDefinition::<Self>::name(&provider),
×
849
                prio,
850
                clamp_prio
851
            );
852
        }
1✔
853

854
        let stmt = conn
1✔
855
            .prepare(
1✔
856
                "
1✔
857
              INSERT INTO pro_layer_providers (
1✔
858
                  id, 
1✔
859
                  type_name, 
1✔
860
                  name,
1✔
861
                  definition,
1✔
862
                  priority
1✔
863
              )
1✔
864
              VALUES ($1, $2, $3, $4, $5)",
1✔
865
            )
1✔
866
            .await?;
20✔
867

868
        let id = DataProviderDefinition::<Self>::id(&provider);
1✔
869
        conn.execute(
1✔
870
            &stmt,
1✔
871
            &[
1✔
872
                &id,
1✔
873
                &DataProviderDefinition::<Self>::type_name(&provider),
1✔
874
                &DataProviderDefinition::<Self>::name(&provider),
1✔
875
                &provider,
1✔
876
                &clamp_prio,
1✔
877
            ],
1✔
878
        )
1✔
879
        .await?;
1✔
880
        Ok(id)
1✔
881
    }
3✔
882
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc