• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12417203919

19 Dec 2024 04:55PM UTC coverage: 90.351% (-0.2%) from 90.512%
12417203919

Pull #998

github

web-flow
Merge c7e5c8ae4 into 34e12969f
Pull Request #998: quota logging wip

834 of 1211 new or added lines in 66 files covered. (68.87%)

220 existing lines in 21 files now uncovered.

133830 of 148123 relevant lines covered (90.35%)

54352.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.36
/services/src/pro/layers/postgres_layer_db.rs
1
use crate::error;
2
use crate::layers::external::TypedDataProviderDefinition;
3
use crate::layers::layer::{Property, UpdateLayer, UpdateLayerCollection};
4
use crate::layers::listing::{
5
    ProviderCapabilities, SearchCapabilities, SearchParameters, SearchType, SearchTypes,
6
};
7
use crate::layers::postgres_layer_db::{
8
    delete_layer_collection, delete_layer_collection_from_parent, delete_layer_from_collection,
9
    insert_collection_parent, insert_layer, insert_layer_collection_with_id,
10
};
11
use crate::pro::contexts::ProPostgresDb;
12
use crate::pro::datasets::TypedProDataProviderDefinition;
13
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
14
use crate::pro::permissions::{Permission, RoleId};
15
use crate::workflows::registry::TxWorkflowRegistry;
16
use crate::{
17
    error::Result,
18
    layers::{
19
        external::{DataProvider, DataProviderDefinition},
20
        layer::{
21
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
22
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
23
            ProviderLayerCollectionId, ProviderLayerId,
24
        },
25
        listing::{LayerCollectionId, LayerCollectionProvider},
26
        storage::{
27
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
28
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
29
        },
30
        LayerDbError,
31
    },
32
};
33
use async_trait::async_trait;
34
use bb8_postgres::tokio_postgres::{
35
    tls::{MakeTlsConnect, TlsConnect},
36
    Socket,
37
};
38
use geoengine_datatypes::dataset::{DataProviderId, LayerId};
39
use geoengine_datatypes::error::BoxedResultExt;
40
use geoengine_datatypes::util::HashMapTextTextDbType;
41
use snafu::{ensure, ResultExt};
42
use std::str::FromStr;
43
use uuid::Uuid;
44

45
#[async_trait]
46
impl<Tls> LayerDb for ProPostgresDb<Tls>
47
where
48
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
49
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
50
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
51
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
52
{
53
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
16✔
54
        let layer_id = Uuid::new_v4();
16✔
55
        let layer_id = LayerId(layer_id.to_string());
16✔
56

16✔
57
        self.add_layer_with_id(&layer_id, layer, collection).await?;
478✔
58

59
        Ok(layer_id)
16✔
60
    }
32✔
61

62
    async fn update_layer(&self, id: &LayerId, layer: UpdateLayer) -> Result<()> {
×
63
        let layer_id =
×
64
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
65
                found: id.0.clone(),
×
UNCOV
66
            })?;
×
67

68
        let mut conn = self.conn_pool.get().await?;
×
UNCOV
69
        let transaction = conn.build_transaction().start().await?;
×
70

71
        self.ensure_permission_in_tx(id.clone().into(), Permission::Owner, &transaction)
×
72
            .await
×
UNCOV
73
            .boxed_context(crate::error::PermissionDb)?;
×
74

75
        let workflow_id = self
×
76
            .register_workflow_in_tx(layer.workflow, &transaction)
×
77
            .await?;
×
78

79
        transaction.execute(
×
80
                "
×
81
                UPDATE layers
×
82
                SET name = $1, description = $2, symbology = $3, properties = $4, metadata = $5, workflow_id = $6
×
83
                WHERE id = $7;",
×
84
                &[
×
85
                    &layer.name,
×
86
                    &layer.description,
×
87
                    &layer.symbology,
×
88
                    &layer.properties,
×
89
                    &HashMapTextTextDbType::from(&layer.metadata),
×
UNCOV
90
                    &workflow_id,
×
91
                    &layer_id,
×
92
                ],
×
UNCOV
93
            )
×
94
            .await?;
×
95

96
        transaction.commit().await.map_err(Into::into)
×
97
    }
×
98

UNCOV
99
    async fn remove_layer(&self, id: &LayerId) -> Result<()> {
×
100
        let layer_id =
×
101
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
UNCOV
102
                found: id.0.clone(),
×
103
            })?;
×
104

105
        let mut conn = self.conn_pool.get().await?;
×
UNCOV
106
        let transaction = conn.build_transaction().start().await?;
×
107

108
        self.ensure_permission_in_tx(id.clone().into(), Permission::Owner, &transaction)
×
109
            .await
×
110
            .boxed_context(crate::error::PermissionDb)?;
×
111

112
        transaction
×
113
            .execute(
×
114
                "
×
UNCOV
115
            DELETE FROM layers
×
116
            WHERE id = $1;",
×
117
                &[&layer_id],
×
UNCOV
118
            )
×
UNCOV
119
            .await?;
×
120

UNCOV
121
        transaction.commit().await.map_err(Into::into)
×
UNCOV
122
    }
×
123

124
    async fn add_layer_with_id(
125
        &self,
126
        id: &LayerId,
127
        layer: AddLayer,
128
        collection: &LayerCollectionId,
129
    ) -> Result<()> {
16✔
130
        let mut conn = self.conn_pool.get().await?;
16✔
131
        let trans = conn.build_transaction().start().await?;
16✔
132

133
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &trans)
16✔
134
            .await
55✔
135
            .boxed_context(crate::error::PermissionDb)?;
16✔
136

137
        let layer_id = insert_layer(self, &trans, id, layer, collection).await?;
347✔
138

139
        // TODO: `ON CONFLICT DO NOTHING` means, we do not get an error if the permission already exists.
140
        //       Do we want that, or should we report an error and let the caller decide whether to ignore it?
141
        //       We should decide that and adjust all places where `ON CONFLICT DO NOTHING` is used.
142
        let stmt = trans
16✔
143
            .prepare(
16✔
144
                "
16✔
145
            INSERT INTO permissions (role_id, permission, layer_id)
16✔
146
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
16✔
147
            )
16✔
148
            .await?;
15✔
149

150
        trans
16✔
151
            .execute(
16✔
152
                &stmt,
16✔
153
                &[
16✔
154
                    &RoleId::from(self.session.user.id),
16✔
155
                    &Permission::Owner,
16✔
156
                    &layer_id,
16✔
157
                ],
16✔
158
            )
16✔
159
            .await?;
15✔
160

161
        trans.commit().await?;
16✔
162

163
        Ok(())
16✔
164
    }
32✔
165

166
    async fn add_layer_to_collection(
167
        &self,
168
        layer: &LayerId,
169
        collection: &LayerCollectionId,
170
    ) -> Result<()> {
2✔
171
        let mut conn = self.conn_pool.get().await?;
2✔
172
        let tx = conn.build_transaction().start().await?;
2✔
173

174
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &tx)
2✔
175
            .await
4✔
176
            .boxed_context(crate::error::PermissionDb)?;
2✔
177

178
        let layer_id =
1✔
179
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
180
                found: layer.0.clone(),
1✔
181
            })?;
2✔
182

183
        let collection_id =
1✔
184
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
UNCOV
185
                found: collection.0.clone(),
×
186
            })?;
1✔
187

188
        let stmt = tx
1✔
189
            .prepare(
1✔
190
                "
1✔
191
            INSERT INTO collection_layers (collection, layer)
1✔
192
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
1✔
193
            )
1✔
194
            .await?;
1✔
195

196
        tx.execute(&stmt, &[&collection_id, &layer_id]).await?;
1✔
197

198
        tx.commit().await?;
1✔
199

200
        Ok(())
1✔
201
    }
4✔
202

203
    async fn add_layer_collection(
204
        &self,
205
        collection: AddLayerCollection,
206
        parent: &LayerCollectionId,
207
    ) -> Result<LayerCollectionId> {
19✔
208
        let collection_id = Uuid::new_v4();
19✔
209
        let collection_id = LayerCollectionId(collection_id.to_string());
19✔
210

19✔
211
        self.add_layer_collection_with_id(&collection_id, collection, parent)
19✔
212
            .await?;
228✔
213

214
        Ok(collection_id)
19✔
215
    }
38✔
216

217
    async fn add_layer_collection_with_id(
218
        &self,
219
        id: &LayerCollectionId,
220
        collection: AddLayerCollection,
221
        parent: &LayerCollectionId,
222
    ) -> Result<()> {
19✔
223
        let mut conn = self.conn_pool.get().await?;
19✔
224
        let trans = conn.build_transaction().start().await?;
19✔
225

226
        self.ensure_permission_in_tx(parent.clone().into(), Permission::Owner, &trans)
19✔
227
            .await
54✔
228
            .boxed_context(crate::error::PermissionDb)?;
19✔
229

230
        let collection_id = insert_layer_collection_with_id(&trans, id, collection, parent).await?;
86✔
231

232
        let stmt = trans
19✔
233
            .prepare(
19✔
234
                "
19✔
235
            INSERT INTO permissions (role_id, permission, layer_collection_id)
19✔
236
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
19✔
237
            )
19✔
238
            .await?;
17✔
239

240
        trans
19✔
241
            .execute(
19✔
242
                &stmt,
19✔
243
                &[
19✔
244
                    &RoleId::from(self.session.user.id),
19✔
245
                    &Permission::Owner,
19✔
246
                    &collection_id,
19✔
247
                ],
19✔
248
            )
19✔
249
            .await?;
17✔
250

251
        trans.commit().await?;
19✔
252

253
        Ok(())
19✔
254
    }
38✔
255

256
    async fn add_collection_to_parent(
257
        &self,
258
        collection: &LayerCollectionId,
259
        parent: &LayerCollectionId,
260
    ) -> Result<()> {
1✔
261
        let conn = self.conn_pool.get().await?;
1✔
262
        insert_collection_parent(&conn, collection, parent).await
2✔
263
    }
2✔
264

265
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
3✔
266
        let mut conn = self.conn_pool.get().await?;
3✔
267
        let transaction = conn.build_transaction().start().await?;
3✔
268

269
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
3✔
270
            .await
2✔
271
            .boxed_context(crate::error::PermissionDb)?;
3✔
272

273
        delete_layer_collection(&transaction, collection).await?;
3✔
274

275
        transaction.commit().await.map_err(Into::into)
2✔
276
    }
6✔
277

278
    async fn remove_layer_from_collection(
279
        &self,
280
        layer: &LayerId,
281
        collection: &LayerCollectionId,
282
    ) -> Result<()> {
2✔
283
        let mut conn = self.conn_pool.get().await?;
2✔
284
        let transaction = conn.build_transaction().start().await?;
2✔
285

286
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
2✔
287
            .await
4✔
288
            .boxed_context(crate::error::PermissionDb)?;
2✔
289

290
        delete_layer_from_collection(&transaction, layer, collection).await?;
8✔
291

292
        transaction.commit().await.map_err(Into::into)
2✔
293
    }
4✔
294

295
    async fn remove_layer_collection_from_parent(
296
        &self,
297
        collection: &LayerCollectionId,
298
        parent: &LayerCollectionId,
299
    ) -> Result<()> {
1✔
300
        let mut conn = self.conn_pool.get().await?;
1✔
301
        let transaction = conn.build_transaction().start().await?;
1✔
302

303
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
1✔
304
            .await
2✔
305
            .boxed_context(crate::error::PermissionDb)?;
1✔
306

307
        delete_layer_collection_from_parent(&transaction, collection, parent).await?;
8✔
308

309
        transaction.commit().await.map_err(Into::into)
1✔
310
    }
2✔
311

312
    async fn update_layer_collection(
313
        &self,
314
        collection: &LayerCollectionId,
315
        update: UpdateLayerCollection,
UNCOV
316
    ) -> Result<()> {
×
317
        let collection_id =
×
318
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
UNCOV
319
                found: collection.0.clone(),
×
320
            })?;
×
321

322
        let mut conn = self.conn_pool.get().await?;
×
UNCOV
323
        let transaction = conn.build_transaction().start().await?;
×
324

325
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
×
326
            .await
×
327
            .boxed_context(crate::error::PermissionDb)?;
×
328

329
        transaction
×
330
            .execute(
×
331
                "UPDATE layer_collections 
×
332
                SET name = $1, description = $2, properties = $3
×
333
                WHERE id = $4;",
×
334
                &[
×
335
                    &update.name,
×
336
                    &update.description,
×
UNCOV
337
                    &update.properties,
×
338
                    &collection_id,
×
339
                ],
×
UNCOV
340
            )
×
UNCOV
341
            .await?;
×
342

UNCOV
343
        transaction.commit().await.map_err(Into::into)
×
UNCOV
344
    }
×
345
}
346

347
fn create_search_query(full_info: bool) -> String {
30✔
348
    format!("
30✔
349
        WITH RECURSIVE parents AS (
30✔
350
            SELECT $1::uuid as id
30✔
351
            UNION ALL SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)
30✔
352
        )
30✔
353
        SELECT DISTINCT *
30✔
354
        FROM (
30✔
355
            SELECT 
30✔
356
                {}
30✔
357
            FROM user_permitted_layer_collections u
30✔
358
                JOIN layer_collections lc ON (u.layer_collection_id = lc.id)
30✔
359
                JOIN (SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)) cc ON (id = cc.child)
30✔
360
            WHERE u.user_id = $4 AND name ILIKE $5
30✔
361
        ) u UNION (
30✔
362
            SELECT 
30✔
363
                {}
30✔
364
            FROM user_permitted_layers ul
30✔
365
                JOIN layers uc ON (ul.layer_id = uc.id)
30✔
366
                JOIN (SELECT DISTINCT layer FROM collection_layers JOIN parents ON (collection = id)) cl ON (id = cl.layer)
30✔
367
            WHERE ul.user_id = $4 AND name ILIKE $5
30✔
368
        )
30✔
369
        ORDER BY {}name ASC
30✔
370
        LIMIT $2 
30✔
371
        OFFSET $3;",
30✔
372
        if full_info {
30✔
373
            "concat(id, '') AS id,
15✔
374
        name,
15✔
375
        description,
15✔
376
        properties,
15✔
377
        FALSE AS is_layer"
15✔
378
        } else { "name" },
15✔
379
        if full_info {
30✔
380
            "concat(id, '') AS id,
15✔
381
        name,
15✔
382
        description,
15✔
383
        properties,
15✔
384
        TRUE AS is_layer"
15✔
385
        } else { "name" },
15✔
386
        if full_info { "is_layer ASC," } else { "" })
30✔
387
}
30✔
388

389
#[async_trait]
390
impl<Tls> LayerCollectionProvider for ProPostgresDb<Tls>
391
where
392
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
393
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
394
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
395
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
396
{
397
    fn capabilities(&self) -> ProviderCapabilities {
1✔
398
        ProviderCapabilities {
1✔
399
            listing: true,
1✔
400
            search: SearchCapabilities {
1✔
401
                search_types: SearchTypes {
1✔
402
                    fulltext: true,
1✔
403
                    prefix: true,
1✔
404
                },
1✔
405
                autocomplete: true,
1✔
406
                filters: None,
1✔
407
            },
1✔
408
        }
1✔
409
    }
1✔
410

411
    fn name(&self) -> &str {
×
412
        "Postgres Layer Collection Provider (Pro)"
×
UNCOV
413
    }
×
414

UNCOV
415
    fn description(&self) -> &str {
×
UNCOV
416
        "Layer collection provider for Postgres (Pro)"
×
UNCOV
417
    }
×
418

419
    #[allow(clippy::too_many_lines)]
420
    async fn load_layer_collection(
421
        &self,
422
        collection_id: &LayerCollectionId,
423
        options: LayerCollectionListOptions,
424
    ) -> Result<LayerCollection> {
14✔
425
        let mut conn = self.conn_pool.get().await?;
14✔
426
        let tx = conn.build_transaction().start().await?;
14✔
427

428
        self.ensure_permission_in_tx(collection_id.clone().into(), Permission::Read, &tx)
14✔
429
            .await
23✔
430
            .boxed_context(crate::error::PermissionDb)?;
14✔
431

432
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
11✔
UNCOV
433
            crate::error::Error::IdStringMustBeUuid {
×
UNCOV
434
                found: collection_id.0.clone(),
×
UNCOV
435
            }
×
436
        })?;
11✔
437

438
        let stmt = tx
11✔
439
            .prepare(
11✔
440
                "
11✔
441
        SELECT name, description, properties
11✔
442
        FROM user_permitted_layer_collections p 
11✔
443
            JOIN layer_collections c ON (p.layer_collection_id = c.id) 
11✔
444
        WHERE p.user_id = $1 AND layer_collection_id = $2;",
11✔
445
            )
11✔
446
            .await?;
9✔
447

448
        let row = tx
11✔
449
            .query_one(&stmt, &[&self.session.user.id, &collection])
11✔
450
            .await?;
9✔
451

452
        let name: String = row.get(0);
11✔
453
        let description: String = row.get(1);
11✔
454
        let properties: Vec<Property> = row.get(2);
11✔
455

456
        let stmt = tx
11✔
457
            .prepare(
11✔
458
                "
11✔
459
        SELECT DISTINCT id, name, description, properties, is_layer
11✔
460
        FROM (
11✔
461
            SELECT 
11✔
462
                concat(id, '') AS id, 
11✔
463
                name, 
11✔
464
                description, 
11✔
465
                properties, 
11✔
466
                FALSE AS is_layer
11✔
467
            FROM user_permitted_layer_collections u 
11✔
468
                JOIN layer_collections lc ON (u.layer_collection_id = lc.id)
11✔
469
                JOIN collection_children cc ON (layer_collection_id = cc.child)
11✔
470
            WHERE u.user_id = $4 AND cc.parent = $1
11✔
471
        ) u UNION (
11✔
472
            SELECT 
11✔
473
                concat(id, '') AS id, 
11✔
474
                name, 
11✔
475
                description, 
11✔
476
                properties, 
11✔
477
                TRUE AS is_layer
11✔
478
            FROM user_permitted_layers ul
11✔
479
                JOIN layers uc ON (ul.layer_id = uc.id) 
11✔
480
                JOIN collection_layers cl ON (layer_id = cl.layer)
11✔
481
            WHERE ul.user_id = $4 AND cl.collection = $1
11✔
482
        )
11✔
483
        ORDER BY is_layer ASC, name ASC
11✔
484
        LIMIT $2 
11✔
485
        OFFSET $3;            
11✔
486
        ",
11✔
487
            )
11✔
488
            .await?;
9✔
489

490
        let rows = tx
11✔
491
            .query(
11✔
492
                &stmt,
11✔
493
                &[
11✔
494
                    &collection,
11✔
495
                    &i64::from(options.limit),
11✔
496
                    &i64::from(options.offset),
11✔
497
                    &self.session.user.id,
11✔
498
                ],
11✔
499
            )
11✔
500
            .await?;
9✔
501

502
        let items = rows
11✔
503
            .into_iter()
11✔
504
            .map(|row| {
14✔
505
                let is_layer: bool = row.get(4);
14✔
506

14✔
507
                if is_layer {
14✔
508
                    Ok(CollectionItem::Layer(LayerListing {
5✔
509
                        id: ProviderLayerId {
5✔
510
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
511
                            layer_id: LayerId(row.get(0)),
5✔
512
                        },
5✔
513
                        name: row.get(1),
5✔
514
                        description: row.get(2),
5✔
515
                        properties: row.get(3),
5✔
516
                    }))
5✔
517
                } else {
518
                    Ok(CollectionItem::Collection(LayerCollectionListing {
9✔
519
                        id: ProviderLayerCollectionId {
9✔
520
                            provider_id: INTERNAL_PROVIDER_ID,
9✔
521
                            collection_id: LayerCollectionId(row.get(0)),
9✔
522
                        },
9✔
523
                        name: row.get(1),
9✔
524
                        description: row.get(2),
9✔
525
                        properties: row.get(3),
9✔
526
                    }))
9✔
527
                }
528
            })
14✔
529
            .collect::<Result<Vec<CollectionItem>>>()?;
11✔
530

531
        tx.commit().await?;
11✔
532

533
        Ok(LayerCollection {
11✔
534
            id: ProviderLayerCollectionId {
11✔
535
                provider_id: INTERNAL_PROVIDER_ID,
11✔
536
                collection_id: collection_id.clone(),
11✔
537
            },
11✔
538
            name,
11✔
539
            description,
11✔
540
            items,
11✔
541
            entry_label: None,
11✔
542
            properties,
11✔
543
        })
11✔
544
    }
28✔
545

546
    #[allow(clippy::too_many_lines)]
547
    async fn search(
548
        &self,
549
        collection_id: &LayerCollectionId,
550
        search: SearchParameters,
551
    ) -> Result<LayerCollection> {
15✔
552
        let mut conn = self.conn_pool.get().await?;
15✔
553
        let tx = conn.build_transaction().start().await?;
15✔
554

555
        self.ensure_permission_in_tx(collection_id.clone().into(), Permission::Read, &tx)
15✔
556
            .await
35✔
557
            .boxed_context(crate::error::PermissionDb)?;
15✔
558

559
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
15✔
UNCOV
560
            crate::error::Error::IdStringMustBeUuid {
×
UNCOV
561
                found: collection_id.0.clone(),
×
UNCOV
562
            }
×
563
        })?;
15✔
564

565
        let stmt = tx
15✔
566
            .prepare(
15✔
567
                "
15✔
568
        SELECT name, description, properties
15✔
569
        FROM user_permitted_layer_collections p 
15✔
570
            JOIN layer_collections c ON (p.layer_collection_id = c.id) 
15✔
571
        WHERE p.user_id = $1 AND layer_collection_id = $2;",
15✔
572
            )
15✔
573
            .await?;
19✔
574

575
        let row = tx
15✔
576
            .query_one(&stmt, &[&self.session.user.id, &collection])
15✔
577
            .await?;
15✔
578

579
        let name: String = row.get(0);
15✔
580
        let description: String = row.get(1);
15✔
581
        let properties: Vec<Property> = row.get(2);
15✔
582

583
        let pattern = match search.search_type {
15✔
584
            SearchType::Fulltext => {
585
                format!("%{}%", search.search_string)
10✔
586
            }
587
            SearchType::Prefix => {
588
                format!("{}%", search.search_string)
5✔
589
            }
590
        };
591

592
        let stmt = tx.prepare(&create_search_query(true)).await?;
15✔
593

594
        let rows = tx
15✔
595
            .query(
15✔
596
                &stmt,
15✔
597
                &[
15✔
598
                    &collection,
15✔
599
                    &i64::from(search.limit),
15✔
600
                    &i64::from(search.offset),
15✔
601
                    &self.session.user.id,
15✔
602
                    &pattern,
15✔
603
                ],
15✔
604
            )
15✔
605
            .await?;
16✔
606

607
        let items = rows
15✔
608
            .into_iter()
15✔
609
            .map(|row| {
31✔
610
                let is_layer: bool = row.get(4);
31✔
611

31✔
612
                if is_layer {
31✔
613
                    Ok(CollectionItem::Layer(LayerListing {
13✔
614
                        id: ProviderLayerId {
13✔
615
                            provider_id: INTERNAL_PROVIDER_ID,
13✔
616
                            layer_id: LayerId(row.get(0)),
13✔
617
                        },
13✔
618
                        name: row.get(1),
13✔
619
                        description: row.get(2),
13✔
620
                        properties: row.get(3),
13✔
621
                    }))
13✔
622
                } else {
623
                    Ok(CollectionItem::Collection(LayerCollectionListing {
18✔
624
                        id: ProviderLayerCollectionId {
18✔
625
                            provider_id: INTERNAL_PROVIDER_ID,
18✔
626
                            collection_id: LayerCollectionId(row.get(0)),
18✔
627
                        },
18✔
628
                        name: row.get(1),
18✔
629
                        description: row.get(2),
18✔
630
                        properties: row.get(3),
18✔
631
                    }))
18✔
632
                }
633
            })
31✔
634
            .collect::<Result<Vec<CollectionItem>>>()?;
15✔
635

636
        tx.commit().await?;
15✔
637

638
        Ok(LayerCollection {
15✔
639
            id: ProviderLayerCollectionId {
15✔
640
                provider_id: INTERNAL_PROVIDER_ID,
15✔
641
                collection_id: collection_id.clone(),
15✔
642
            },
15✔
643
            name,
15✔
644
            description,
15✔
645
            items,
15✔
646
            entry_label: None,
15✔
647
            properties,
15✔
648
        })
15✔
649
    }
30✔
650

651
    #[allow(clippy::too_many_lines)]
652
    async fn autocomplete_search(
653
        &self,
654
        collection_id: &LayerCollectionId,
655
        search: SearchParameters,
656
    ) -> Result<Vec<String>> {
15✔
657
        let mut conn = self.conn_pool.get().await?;
15✔
658
        let tx = conn.build_transaction().start().await?;
15✔
659

660
        self.ensure_permission_in_tx(collection_id.clone().into(), Permission::Read, &tx)
15✔
661
            .await
30✔
662
            .boxed_context(crate::error::PermissionDb)?;
15✔
663

664
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
15✔
UNCOV
665
            crate::error::Error::IdStringMustBeUuid {
×
UNCOV
666
                found: collection_id.0.clone(),
×
UNCOV
667
            }
×
668
        })?;
15✔
669

670
        let pattern = match search.search_type {
15✔
671
            SearchType::Fulltext => {
672
                format!("%{}%", search.search_string)
10✔
673
            }
674
            SearchType::Prefix => {
675
                format!("{}%", search.search_string)
5✔
676
            }
677
        };
678

679
        let stmt = tx.prepare(&create_search_query(false)).await?;
15✔
680

681
        let rows = tx
15✔
682
            .query(
15✔
683
                &stmt,
15✔
684
                &[
15✔
685
                    &collection,
15✔
686
                    &i64::from(search.limit),
15✔
687
                    &i64::from(search.offset),
15✔
688
                    &self.session.user.id,
15✔
689
                    &pattern,
15✔
690
                ],
15✔
691
            )
15✔
692
            .await?;
15✔
693

694
        let items = rows
15✔
695
            .into_iter()
15✔
696
            .map(|row| Ok(row.get::<usize, &str>(0).to_string()))
31✔
697
            .collect::<Result<Vec<String>>>()?;
15✔
698

699
        tx.commit().await?;
15✔
700

701
        Ok(items)
15✔
702
    }
30✔
703

704
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
11✔
705
        Ok(LayerCollectionId(
11✔
706
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
11✔
707
        ))
11✔
708
    }
22✔
709

710
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
6✔
711
        let mut conn = self.conn_pool.get().await?;
6✔
712
        let tx = conn.build_transaction().start().await?;
6✔
713

714
        self.ensure_permission_in_tx(id.clone().into(), Permission::Read, &tx)
6✔
715
            .await
12✔
716
            .boxed_context(crate::error::PermissionDb)?;
6✔
717

718
        let layer_id =
3✔
719
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
UNCOV
720
                found: id.0.clone(),
×
721
            })?;
3✔
722

723
        let stmt = tx
3✔
724
            .prepare(
3✔
725
                "
3✔
726
            SELECT 
3✔
727
                l.name,
3✔
728
                l.description,
3✔
729
                w.workflow,
3✔
730
                l.symbology,
3✔
731
                l.properties,
3✔
732
                l.metadata
3✔
733
            FROM 
3✔
734
                layers l JOIN workflows w ON (l.workflow_id = w.id)
3✔
735
            WHERE 
3✔
736
                l.id = $1;",
3✔
737
            )
3✔
738
            .await?;
3✔
739

740
        let row = tx
3✔
741
            .query_one(&stmt, &[&layer_id])
3✔
742
            .await
3✔
743
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
3✔
744

745
        tx.commit().await?;
3✔
746

747
        Ok(Layer {
748
            id: ProviderLayerId {
3✔
749
                provider_id: INTERNAL_PROVIDER_ID,
3✔
750
                layer_id: id.clone(),
3✔
751
            },
3✔
752
            name: row.get(0),
3✔
753
            description: row.get(1),
3✔
754
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
3✔
755
            symbology: row.get(3),
3✔
756
            properties: row.get(4),
3✔
757
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
3✔
758
        })
759
    }
12✔
760
}
761

762
#[async_trait]
763
impl<Tls> LayerProviderDb for ProPostgresDb<Tls>
764
where
765
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
766
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
767
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
768
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
769
{
770
    async fn add_layer_provider(
771
        &self,
772
        provider: TypedDataProviderDefinition,
773
    ) -> Result<DataProviderId> {
1✔
774
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
775

776
        let conn = self.conn_pool.get().await?;
1✔
777

778
        let prio = DataProviderDefinition::<Self>::priority(&provider);
1✔
779
        let clamp_prio = prio.clamp(-1000, 1000);
1✔
780

1✔
781
        if prio != clamp_prio {
1✔
UNCOV
782
            log::warn!(
×
UNCOV
783
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
UNCOV
784
                DataProviderDefinition::<Self>::name(&provider),
×
785
                prio,
786
                clamp_prio
787
            );
788
        }
1✔
789

790
        let stmt = conn
1✔
791
            .prepare(
1✔
792
                "
1✔
793
              INSERT INTO layer_providers (
1✔
794
                  id, 
1✔
795
                  type_name, 
1✔
796
                  name,
1✔
797
                  definition,
1✔
798
                  priority
1✔
799
              )
1✔
800
              VALUES ($1, $2, $3, $4, $5)",
1✔
801
            )
1✔
802
            .await?;
33✔
803

804
        let id = DataProviderDefinition::<Self>::id(&provider);
1✔
805
        conn.execute(
1✔
806
            &stmt,
1✔
807
            &[
1✔
808
                &id,
1✔
809
                &DataProviderDefinition::<Self>::type_name(&provider),
1✔
810
                &DataProviderDefinition::<Self>::name(&provider),
1✔
811
                &provider,
1✔
812
                &clamp_prio,
1✔
813
            ],
1✔
814
        )
1✔
815
        .await?;
1✔
816
        Ok(id)
1✔
817
    }
2✔
818

819
    async fn list_layer_providers(
820
        &self,
821
        options: LayerProviderListingOptions,
822
    ) -> Result<Vec<LayerProviderListing>> {
2✔
823
        // TODO: permission
824
        let conn = self.conn_pool.get().await?;
2✔
825

826
        let stmt = conn
2✔
827
            .prepare(
2✔
828
                "(
2✔
829
                    SELECT 
2✔
830
                        id, 
2✔
831
                        name,
2✔
832
                        type_name,
2✔
833
                        priority
2✔
834
                    FROM 
2✔
835
                        layer_providers
2✔
836
                    WHERE
2✔
837
                        priority > -1000
2✔
838
                    UNION ALL
2✔
839
                    SELECT 
2✔
840
                        id, 
2✔
841
                        name,
2✔
842
                        type_name,
2✔
843
                        priority
2✔
844
                    FROM 
2✔
845
                        pro_layer_providers
2✔
846
                    WHERE
2✔
847
                        priority > -1000
2✔
848
                )
2✔
849
                ORDER BY priority desc, name ASC
2✔
850
                LIMIT $1 
2✔
851
                OFFSET $2;",
2✔
852
            )
2✔
853
            .await?;
2✔
854

855
        let rows = conn
2✔
856
            .query(
2✔
857
                &stmt,
2✔
858
                &[&i64::from(options.limit), &i64::from(options.offset)],
2✔
859
            )
2✔
860
            .await?;
2✔
861

862
        Ok(rows
2✔
863
            .iter()
2✔
864
            .map(|row| LayerProviderListing {
2✔
865
                id: row.get(0),
2✔
866
                name: row.get(1),
2✔
867
                priority: row.get(3),
2✔
868
            })
2✔
869
            .collect())
2✔
870
    }
4✔
871

872
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
2✔
873
        // TODO: permissions
874
        let conn = self.conn_pool.get().await?;
2✔
875

876
        let stmt = conn
2✔
877
            .prepare(
2✔
878
                "SELECT
2✔
879
                    definition, NULL AS pro_definition
2✔
880
                FROM
2✔
881
                    layer_providers
2✔
882
                WHERE
2✔
883
                    id = $1
2✔
884
                UNION ALL
2✔
885
                SELECT
2✔
886
                    NULL AS definition, definition AS pro_definition
2✔
887
                FROM
2✔
888
                    pro_layer_providers
2✔
889
                WHERE
2✔
890
                    id = $1",
2✔
891
            )
2✔
892
            .await?;
55✔
893

894
        let row = conn.query_one(&stmt, &[&id]).await?;
2✔
895

896
        if let Some(definition) = row.get::<_, Option<TypedDataProviderDefinition>>(0) {
2✔
897
            return Box::new(definition)
1✔
898
                .initialize(ProPostgresDb {
1✔
899
                    conn_pool: self.conn_pool.clone(),
1✔
900
                    session: self.session.clone(),
1✔
901
                })
1✔
UNCOV
902
                .await;
×
903
        }
1✔
904

1✔
905
        let pro_definition: TypedProDataProviderDefinition = row.get(1);
1✔
906
        Box::new(pro_definition)
1✔
907
            .initialize(ProPostgresDb {
1✔
908
                conn_pool: self.conn_pool.clone(),
1✔
909
                session: self.session.clone(),
1✔
910
            })
1✔
UNCOV
911
            .await
×
912
    }
4✔
913
}
914

915
#[async_trait]
916
pub trait ProLayerProviderDb: Send + Sync + 'static {
917
    async fn add_pro_layer_provider(
918
        &self,
919
        provider: TypedProDataProviderDefinition,
920
    ) -> Result<DataProviderId>;
921
}
922

923
#[async_trait]
924
impl<Tls> ProLayerProviderDb for ProPostgresDb<Tls>
925
where
926
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
927
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
928
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
929
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
930
{
931
    async fn add_pro_layer_provider(
932
        &self,
933
        provider: TypedProDataProviderDefinition,
934
    ) -> Result<DataProviderId> {
1✔
935
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
936

937
        let conn = self.conn_pool.get().await?;
1✔
938

939
        let prio = DataProviderDefinition::<Self>::priority(&provider);
1✔
940
        let clamp_prio = prio.clamp(-1000, 1000);
1✔
941

1✔
942
        if prio != clamp_prio {
1✔
UNCOV
943
            log::warn!(
×
UNCOV
944
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
UNCOV
945
                DataProviderDefinition::<Self>::name(&provider),
×
946
                prio,
947
                clamp_prio
948
            );
949
        }
1✔
950

951
        let stmt = conn
1✔
952
            .prepare(
1✔
953
                "
1✔
954
              INSERT INTO pro_layer_providers (
1✔
955
                  id, 
1✔
956
                  type_name, 
1✔
957
                  name,
1✔
958
                  definition,
1✔
959
                  priority
1✔
960
              )
1✔
961
              VALUES ($1, $2, $3, $4, $5)",
1✔
962
            )
1✔
963
            .await?;
26✔
964

965
        let id = DataProviderDefinition::<Self>::id(&provider);
1✔
966
        conn.execute(
1✔
967
            &stmt,
1✔
968
            &[
1✔
969
                &id,
1✔
970
                &DataProviderDefinition::<Self>::type_name(&provider),
1✔
971
                &DataProviderDefinition::<Self>::name(&provider),
1✔
972
                &provider,
1✔
973
                &clamp_prio,
1✔
974
            ],
1✔
975
        )
1✔
976
        .await?;
2✔
977
        Ok(id)
1✔
978
    }
2✔
979
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc