• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11910714914

19 Nov 2024 10:06AM UTC coverage: 90.445% (-0.2%) from 90.687%
11910714914

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

375 existing lines in 75 files now uncovered.

132867 of 146904 relevant lines covered (90.44%)

54798.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.59
/services/src/pro/layers/postgres_layer_db.rs
1
use crate::error;
2
use crate::layers::external::TypedDataProviderDefinition;
3
use crate::layers::layer::{Property, UpdateLayer, UpdateLayerCollection};
4
use crate::layers::listing::{
5
    ProviderCapabilities, SearchCapabilities, SearchParameters, SearchType, SearchTypes,
6
};
7
use crate::layers::postgres_layer_db::{
8
    delete_layer_collection, delete_layer_collection_from_parent, delete_layer_from_collection,
9
    insert_collection_parent, insert_layer, insert_layer_collection_with_id,
10
};
11
use crate::pro::contexts::ProPostgresDb;
12
use crate::pro::datasets::TypedProDataProviderDefinition;
13
use crate::pro::permissions::postgres_permissiondb::TxPermissionDb;
14
use crate::pro::permissions::{Permission, RoleId};
15
use crate::{
16
    error::Result,
17
    layers::{
18
        external::{DataProvider, DataProviderDefinition},
19
        layer::{
20
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
21
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
22
            ProviderLayerCollectionId, ProviderLayerId,
23
        },
24
        listing::{LayerCollectionId, LayerCollectionProvider},
25
        storage::{
26
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
27
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
28
        },
29
        LayerDbError,
30
    },
31
};
32
use async_trait::async_trait;
33
use bb8_postgres::tokio_postgres::{
34
    tls::{MakeTlsConnect, TlsConnect},
35
    Socket,
36
};
37
use geoengine_datatypes::dataset::{DataProviderId, LayerId};
38
use geoengine_datatypes::error::BoxedResultExt;
39
use geoengine_datatypes::util::HashMapTextTextDbType;
40
use snafu::{ensure, ResultExt};
41
use std::str::FromStr;
42
use uuid::Uuid;
43

44
#[async_trait]
45
impl<Tls> LayerDb for ProPostgresDb<Tls>
46
where
47
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
48
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
49
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
50
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
51
{
52
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
16✔
53
        let layer_id = Uuid::new_v4();
16✔
54
        let layer_id = LayerId(layer_id.to_string());
16✔
55

16✔
56
        self.add_layer_with_id(&layer_id, layer, collection).await?;
326✔
57

58
        Ok(layer_id)
16✔
59
    }
32✔
60

61
    async fn update_layer(&self, id: &LayerId, layer: UpdateLayer) -> Result<()> {
×
62
        let layer_id =
×
63
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
64
                found: id.0.clone(),
×
65
            })?;
×
66

67
        let mut conn = self.conn_pool.get().await?;
×
68
        let transaction = conn.build_transaction().start().await?;
×
69

70
        self.ensure_permission_in_tx(id.clone().into(), Permission::Owner, &transaction)
×
71
            .await
×
72
            .boxed_context(crate::error::PermissionDb)?;
×
73

74
        transaction
×
75
            .execute(
×
76
                "
×
77
                UPDATE layers
×
78
                SET name = $1, description = $2, symbology = $3, properties = $4, metadata = $5
×
79
                WHERE id = $6;",
×
80
                &[
×
81
                    &layer.name,
×
82
                    &layer.description,
×
83
                    &layer.symbology,
×
84
                    &layer.properties,
×
85
                    &HashMapTextTextDbType::from(&layer.metadata),
×
86
                    &layer_id,
×
87
                ],
×
88
            )
×
89
            .await?;
×
90

91
        transaction.commit().await.map_err(Into::into)
×
92
    }
×
93

94
    async fn remove_layer(&self, id: &LayerId) -> Result<()> {
×
95
        let layer_id =
×
96
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
97
                found: id.0.clone(),
×
98
            })?;
×
99

100
        let mut conn = self.conn_pool.get().await?;
×
101
        let transaction = conn.build_transaction().start().await?;
×
102

103
        self.ensure_permission_in_tx(id.clone().into(), Permission::Owner, &transaction)
×
104
            .await
×
105
            .boxed_context(crate::error::PermissionDb)?;
×
106

107
        transaction
×
108
            .execute(
×
109
                "
×
110
            DELETE FROM layers
×
111
            WHERE id = $1;",
×
112
                &[&layer_id],
×
113
            )
×
114
            .await?;
×
115

116
        transaction.commit().await.map_err(Into::into)
×
117
    }
×
118

119
    async fn add_layer_with_id(
120
        &self,
121
        id: &LayerId,
122
        layer: AddLayer,
123
        collection: &LayerCollectionId,
124
    ) -> Result<()> {
16✔
125
        let mut conn = self.conn_pool.get().await?;
16✔
126
        let trans = conn.build_transaction().start().await?;
16✔
127

128
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &trans)
16✔
129
            .await
34✔
130
            .boxed_context(crate::error::PermissionDb)?;
16✔
131

132
        let layer_id = insert_layer(&trans, id, layer, collection).await?;
233✔
133

134
        // TODO: `ON CONFLICT DO NOTHING` means, we do not get an error if the permission already exists.
135
        //       Do we want that, or should we report an error and let the caller decide whether to ignore it?
136
        //       We should decide that and adjust all places where `ON CONFLICT DO NOTHING` is used.
137
        let stmt = trans
16✔
138
            .prepare(
16✔
139
                "
16✔
140
            INSERT INTO permissions (role_id, permission, layer_id)
16✔
141
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
16✔
142
            )
16✔
143
            .await?;
11✔
144

145
        trans
16✔
146
            .execute(
16✔
147
                &stmt,
16✔
148
                &[
16✔
149
                    &RoleId::from(self.session.user.id),
16✔
150
                    &Permission::Owner,
16✔
151
                    &layer_id,
16✔
152
                ],
16✔
153
            )
16✔
154
            .await?;
12✔
155

156
        trans.commit().await?;
16✔
157

158
        Ok(())
16✔
159
    }
32✔
160

161
    async fn add_layer_to_collection(
162
        &self,
163
        layer: &LayerId,
164
        collection: &LayerCollectionId,
165
    ) -> Result<()> {
2✔
166
        let mut conn = self.conn_pool.get().await?;
2✔
167
        let tx = conn.build_transaction().start().await?;
2✔
168

169
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &tx)
2✔
170
            .await
4✔
171
            .boxed_context(crate::error::PermissionDb)?;
2✔
172

173
        let layer_id =
1✔
174
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
175
                found: layer.0.clone(),
1✔
176
            })?;
2✔
177

178
        let collection_id =
1✔
179
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
180
                found: collection.0.clone(),
×
181
            })?;
1✔
182

183
        let stmt = tx
1✔
184
            .prepare(
1✔
185
                "
1✔
186
            INSERT INTO collection_layers (collection, layer)
1✔
187
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
1✔
188
            )
1✔
189
            .await?;
1✔
190

191
        tx.execute(&stmt, &[&collection_id, &layer_id]).await?;
1✔
192

193
        tx.commit().await?;
1✔
194

195
        Ok(())
1✔
196
    }
4✔
197

198
    async fn add_layer_collection(
199
        &self,
200
        collection: AddLayerCollection,
201
        parent: &LayerCollectionId,
202
    ) -> Result<LayerCollectionId> {
19✔
203
        let collection_id = Uuid::new_v4();
19✔
204
        let collection_id = LayerCollectionId(collection_id.to_string());
19✔
205

19✔
206
        self.add_layer_collection_with_id(&collection_id, collection, parent)
19✔
207
            .await?;
181✔
208

209
        Ok(collection_id)
19✔
210
    }
38✔
211

212
    async fn add_layer_collection_with_id(
213
        &self,
214
        id: &LayerCollectionId,
215
        collection: AddLayerCollection,
216
        parent: &LayerCollectionId,
217
    ) -> Result<()> {
19✔
218
        let mut conn = self.conn_pool.get().await?;
19✔
219
        let trans = conn.build_transaction().start().await?;
19✔
220

221
        self.ensure_permission_in_tx(parent.clone().into(), Permission::Owner, &trans)
19✔
222
            .await
43✔
223
            .boxed_context(crate::error::PermissionDb)?;
19✔
224

225
        let collection_id = insert_layer_collection_with_id(&trans, id, collection, parent).await?;
65✔
226

227
        let stmt = trans
19✔
228
            .prepare(
19✔
229
                "
19✔
230
            INSERT INTO permissions (role_id, permission, layer_collection_id)
19✔
231
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
19✔
232
            )
19✔
233
            .await?;
16✔
234

235
        trans
19✔
236
            .execute(
19✔
237
                &stmt,
19✔
238
                &[
19✔
239
                    &RoleId::from(self.session.user.id),
19✔
240
                    &Permission::Owner,
19✔
241
                    &collection_id,
19✔
242
                ],
19✔
243
            )
19✔
244
            .await?;
15✔
245

246
        trans.commit().await?;
19✔
247

248
        Ok(())
19✔
249
    }
38✔
250

251
    async fn add_collection_to_parent(
252
        &self,
253
        collection: &LayerCollectionId,
254
        parent: &LayerCollectionId,
255
    ) -> Result<()> {
1✔
256
        let conn = self.conn_pool.get().await?;
1✔
257
        insert_collection_parent(&conn, collection, parent).await
2✔
258
    }
2✔
259

260
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
3✔
261
        let mut conn = self.conn_pool.get().await?;
3✔
262
        let transaction = conn.build_transaction().start().await?;
3✔
263

264
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
3✔
265
            .await
6✔
266
            .boxed_context(crate::error::PermissionDb)?;
3✔
267

268
        delete_layer_collection(&transaction, collection).await?;
12✔
269

270
        transaction.commit().await.map_err(Into::into)
2✔
271
    }
6✔
272

273
    async fn remove_layer_from_collection(
274
        &self,
275
        layer: &LayerId,
276
        collection: &LayerCollectionId,
277
    ) -> Result<()> {
2✔
278
        let mut conn = self.conn_pool.get().await?;
2✔
279
        let transaction = conn.build_transaction().start().await?;
2✔
280

281
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
2✔
282
            .await
4✔
283
            .boxed_context(crate::error::PermissionDb)?;
2✔
284

285
        delete_layer_from_collection(&transaction, layer, collection).await?;
8✔
286

287
        transaction.commit().await.map_err(Into::into)
2✔
288
    }
4✔
289

290
    async fn remove_layer_collection_from_parent(
291
        &self,
292
        collection: &LayerCollectionId,
293
        parent: &LayerCollectionId,
294
    ) -> Result<()> {
1✔
295
        let mut conn = self.conn_pool.get().await?;
1✔
296
        let transaction = conn.build_transaction().start().await?;
1✔
297

298
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
1✔
299
            .await
2✔
300
            .boxed_context(crate::error::PermissionDb)?;
1✔
301

302
        delete_layer_collection_from_parent(&transaction, collection, parent).await?;
8✔
303

304
        transaction.commit().await.map_err(Into::into)
1✔
305
    }
2✔
306

307
    async fn update_layer_collection(
308
        &self,
309
        collection: &LayerCollectionId,
310
        update: UpdateLayerCollection,
311
    ) -> Result<()> {
×
312
        let collection_id =
×
313
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
314
                found: collection.0.clone(),
×
315
            })?;
×
316

317
        let mut conn = self.conn_pool.get().await?;
×
318
        let transaction = conn.build_transaction().start().await?;
×
319

320
        self.ensure_permission_in_tx(collection.clone().into(), Permission::Owner, &transaction)
×
321
            .await
×
322
            .boxed_context(crate::error::PermissionDb)?;
×
323

324
        transaction
×
325
            .execute(
×
326
                "UPDATE layer_collections 
×
327
                SET name = $1, description = $2, properties = $3
×
328
                WHERE id = $4;",
×
329
                &[
×
330
                    &update.name,
×
331
                    &update.description,
×
332
                    &update.properties,
×
333
                    &collection_id,
×
334
                ],
×
335
            )
×
336
            .await?;
×
337

338
        transaction.commit().await.map_err(Into::into)
×
339
    }
×
340
}
341

342
fn create_search_query(full_info: bool) -> String {
30✔
343
    format!("
30✔
344
        WITH RECURSIVE parents AS (
30✔
345
            SELECT $1::uuid as id
30✔
346
            UNION ALL SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)
30✔
347
        )
30✔
348
        SELECT DISTINCT *
30✔
349
        FROM (
30✔
350
            SELECT 
30✔
351
                {}
30✔
352
            FROM user_permitted_layer_collections u
30✔
353
                JOIN layer_collections lc ON (u.layer_collection_id = lc.id)
30✔
354
                JOIN (SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)) cc ON (id = cc.child)
30✔
355
            WHERE u.user_id = $4 AND name ILIKE $5
30✔
356
        ) u UNION (
30✔
357
            SELECT 
30✔
358
                {}
30✔
359
            FROM user_permitted_layers ul
30✔
360
                JOIN layers uc ON (ul.layer_id = uc.id)
30✔
361
                JOIN (SELECT DISTINCT layer FROM collection_layers JOIN parents ON (collection = id)) cl ON (id = cl.layer)
30✔
362
            WHERE ul.user_id = $4 AND name ILIKE $5
30✔
363
        )
30✔
364
        ORDER BY {}name ASC
30✔
365
        LIMIT $2 
30✔
366
        OFFSET $3;",
30✔
367
        if full_info {
30✔
368
            "concat(id, '') AS id,
15✔
369
        name,
15✔
370
        description,
15✔
371
        properties,
15✔
372
        FALSE AS is_layer"
15✔
373
        } else { "name" },
15✔
374
        if full_info {
30✔
375
            "concat(id, '') AS id,
15✔
376
        name,
15✔
377
        description,
15✔
378
        properties,
15✔
379
        TRUE AS is_layer"
15✔
380
        } else { "name" },
15✔
381
        if full_info { "is_layer ASC," } else { "" })
30✔
382
}
30✔
383

384
#[async_trait]
385
impl<Tls> LayerCollectionProvider for ProPostgresDb<Tls>
386
where
387
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
388
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
389
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
390
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
391
{
392
    fn capabilities(&self) -> ProviderCapabilities {
1✔
393
        ProviderCapabilities {
1✔
394
            listing: true,
1✔
395
            search: SearchCapabilities {
1✔
396
                search_types: SearchTypes {
1✔
397
                    fulltext: true,
1✔
398
                    prefix: true,
1✔
399
                },
1✔
400
                autocomplete: true,
1✔
401
                filters: None,
1✔
402
            },
1✔
403
        }
1✔
404
    }
1✔
405

406
    fn name(&self) -> &str {
×
407
        "Postgres Layer Collection Provider (Pro)"
×
408
    }
×
409

410
    fn description(&self) -> &str {
×
411
        "Layer collection provider for Postgres (Pro)"
×
412
    }
×
413

414
    #[allow(clippy::too_many_lines)]
415
    async fn load_layer_collection(
416
        &self,
417
        collection_id: &LayerCollectionId,
418
        options: LayerCollectionListOptions,
419
    ) -> Result<LayerCollection> {
14✔
420
        let mut conn = self.conn_pool.get().await?;
14✔
421
        let tx = conn.build_transaction().start().await?;
14✔
422

423
        self.ensure_permission_in_tx(collection_id.clone().into(), Permission::Read, &tx)
14✔
424
            .await
24✔
425
            .boxed_context(crate::error::PermissionDb)?;
14✔
426

427
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
11✔
428
            crate::error::Error::IdStringMustBeUuid {
×
429
                found: collection_id.0.clone(),
×
430
            }
×
431
        })?;
11✔
432

433
        let stmt = tx
11✔
434
            .prepare(
11✔
435
                "
11✔
436
        SELECT name, description, properties
11✔
437
        FROM user_permitted_layer_collections p 
11✔
438
            JOIN layer_collections c ON (p.layer_collection_id = c.id) 
11✔
439
        WHERE p.user_id = $1 AND layer_collection_id = $2;",
11✔
440
            )
11✔
441
            .await?;
9✔
442

443
        let row = tx
11✔
444
            .query_one(&stmt, &[&self.session.user.id, &collection])
11✔
445
            .await?;
9✔
446

447
        let name: String = row.get(0);
11✔
448
        let description: String = row.get(1);
11✔
449
        let properties: Vec<Property> = row.get(2);
11✔
450

451
        let stmt = tx
11✔
452
            .prepare(
11✔
453
                "
11✔
454
        SELECT DISTINCT id, name, description, properties, is_layer
11✔
455
        FROM (
11✔
456
            SELECT 
11✔
457
                concat(id, '') AS id, 
11✔
458
                name, 
11✔
459
                description, 
11✔
460
                properties, 
11✔
461
                FALSE AS is_layer
11✔
462
            FROM user_permitted_layer_collections u 
11✔
463
                JOIN layer_collections lc ON (u.layer_collection_id = lc.id)
11✔
464
                JOIN collection_children cc ON (layer_collection_id = cc.child)
11✔
465
            WHERE u.user_id = $4 AND cc.parent = $1
11✔
466
        ) u UNION (
11✔
467
            SELECT 
11✔
468
                concat(id, '') AS id, 
11✔
469
                name, 
11✔
470
                description, 
11✔
471
                properties, 
11✔
472
                TRUE AS is_layer
11✔
473
            FROM user_permitted_layers ul
11✔
474
                JOIN layers uc ON (ul.layer_id = uc.id) 
11✔
475
                JOIN collection_layers cl ON (layer_id = cl.layer)
11✔
476
            WHERE ul.user_id = $4 AND cl.collection = $1
11✔
477
        )
11✔
478
        ORDER BY is_layer ASC, name ASC
11✔
479
        LIMIT $2 
11✔
480
        OFFSET $3;            
11✔
481
        ",
11✔
482
            )
11✔
483
            .await?;
9✔
484

485
        let rows = tx
11✔
486
            .query(
11✔
487
                &stmt,
11✔
488
                &[
11✔
489
                    &collection,
11✔
490
                    &i64::from(options.limit),
11✔
491
                    &i64::from(options.offset),
11✔
492
                    &self.session.user.id,
11✔
493
                ],
11✔
494
            )
11✔
495
            .await?;
8✔
496

497
        let items = rows
11✔
498
            .into_iter()
11✔
499
            .map(|row| {
14✔
500
                let is_layer: bool = row.get(4);
14✔
501

14✔
502
                if is_layer {
14✔
503
                    Ok(CollectionItem::Layer(LayerListing {
5✔
504
                        id: ProviderLayerId {
5✔
505
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
506
                            layer_id: LayerId(row.get(0)),
5✔
507
                        },
5✔
508
                        name: row.get(1),
5✔
509
                        description: row.get(2),
5✔
510
                        properties: row.get(3),
5✔
511
                    }))
5✔
512
                } else {
513
                    Ok(CollectionItem::Collection(LayerCollectionListing {
9✔
514
                        id: ProviderLayerCollectionId {
9✔
515
                            provider_id: INTERNAL_PROVIDER_ID,
9✔
516
                            collection_id: LayerCollectionId(row.get(0)),
9✔
517
                        },
9✔
518
                        name: row.get(1),
9✔
519
                        description: row.get(2),
9✔
520
                        properties: row.get(3),
9✔
521
                    }))
9✔
522
                }
523
            })
14✔
524
            .collect::<Result<Vec<CollectionItem>>>()?;
11✔
525

526
        tx.commit().await?;
11✔
527

528
        Ok(LayerCollection {
11✔
529
            id: ProviderLayerCollectionId {
11✔
530
                provider_id: INTERNAL_PROVIDER_ID,
11✔
531
                collection_id: collection_id.clone(),
11✔
532
            },
11✔
533
            name,
11✔
534
            description,
11✔
535
            items,
11✔
536
            entry_label: None,
11✔
537
            properties,
11✔
538
        })
11✔
539
    }
28✔
540

541
    #[allow(clippy::too_many_lines)]
542
    async fn search(
543
        &self,
544
        collection_id: &LayerCollectionId,
545
        search: SearchParameters,
546
    ) -> Result<LayerCollection> {
15✔
547
        let mut conn = self.conn_pool.get().await?;
15✔
548
        let tx = conn.build_transaction().start().await?;
15✔
549

550
        self.ensure_permission_in_tx(collection_id.clone().into(), Permission::Read, &tx)
15✔
551
            .await
21✔
552
            .boxed_context(crate::error::PermissionDb)?;
15✔
553

554
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
15✔
555
            crate::error::Error::IdStringMustBeUuid {
×
556
                found: collection_id.0.clone(),
×
557
            }
×
558
        })?;
15✔
559

560
        let stmt = tx
15✔
561
            .prepare(
15✔
562
                "
15✔
563
        SELECT name, description, properties
15✔
564
        FROM user_permitted_layer_collections p 
15✔
565
            JOIN layer_collections c ON (p.layer_collection_id = c.id) 
15✔
566
        WHERE p.user_id = $1 AND layer_collection_id = $2;",
15✔
567
            )
15✔
568
            .await?;
11✔
569

570
        let row = tx
15✔
571
            .query_one(&stmt, &[&self.session.user.id, &collection])
15✔
572
            .await?;
10✔
573

574
        let name: String = row.get(0);
15✔
575
        let description: String = row.get(1);
15✔
576
        let properties: Vec<Property> = row.get(2);
15✔
577

578
        let pattern = match search.search_type {
15✔
579
            SearchType::Fulltext => {
580
                format!("%{}%", search.search_string)
10✔
581
            }
582
            SearchType::Prefix => {
583
                format!("{}%", search.search_string)
5✔
584
            }
585
        };
586

587
        let stmt = tx.prepare(&create_search_query(true)).await?;
15✔
588

589
        let rows = tx
15✔
590
            .query(
15✔
591
                &stmt,
15✔
592
                &[
15✔
593
                    &collection,
15✔
594
                    &i64::from(search.limit),
15✔
595
                    &i64::from(search.offset),
15✔
596
                    &self.session.user.id,
15✔
597
                    &pattern,
15✔
598
                ],
15✔
599
            )
15✔
600
            .await?;
10✔
601

602
        let items = rows
15✔
603
            .into_iter()
15✔
604
            .map(|row| {
31✔
605
                let is_layer: bool = row.get(4);
31✔
606

31✔
607
                if is_layer {
31✔
608
                    Ok(CollectionItem::Layer(LayerListing {
13✔
609
                        id: ProviderLayerId {
13✔
610
                            provider_id: INTERNAL_PROVIDER_ID,
13✔
611
                            layer_id: LayerId(row.get(0)),
13✔
612
                        },
13✔
613
                        name: row.get(1),
13✔
614
                        description: row.get(2),
13✔
615
                        properties: row.get(3),
13✔
616
                    }))
13✔
617
                } else {
618
                    Ok(CollectionItem::Collection(LayerCollectionListing {
18✔
619
                        id: ProviderLayerCollectionId {
18✔
620
                            provider_id: INTERNAL_PROVIDER_ID,
18✔
621
                            collection_id: LayerCollectionId(row.get(0)),
18✔
622
                        },
18✔
623
                        name: row.get(1),
18✔
624
                        description: row.get(2),
18✔
625
                        properties: row.get(3),
18✔
626
                    }))
18✔
627
                }
628
            })
31✔
629
            .collect::<Result<Vec<CollectionItem>>>()?;
15✔
630

631
        tx.commit().await?;
15✔
632

633
        Ok(LayerCollection {
15✔
634
            id: ProviderLayerCollectionId {
15✔
635
                provider_id: INTERNAL_PROVIDER_ID,
15✔
636
                collection_id: collection_id.clone(),
15✔
637
            },
15✔
638
            name,
15✔
639
            description,
15✔
640
            items,
15✔
641
            entry_label: None,
15✔
642
            properties,
15✔
643
        })
15✔
644
    }
30✔
645

646
    #[allow(clippy::too_many_lines)]
647
    async fn autocomplete_search(
648
        &self,
649
        collection_id: &LayerCollectionId,
650
        search: SearchParameters,
651
    ) -> Result<Vec<String>> {
15✔
652
        let mut conn = self.conn_pool.get().await?;
15✔
653
        let tx = conn.build_transaction().start().await?;
15✔
654

655
        self.ensure_permission_in_tx(collection_id.clone().into(), Permission::Read, &tx)
15✔
656
            .await
29✔
657
            .boxed_context(crate::error::PermissionDb)?;
15✔
658

659
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
15✔
660
            crate::error::Error::IdStringMustBeUuid {
×
661
                found: collection_id.0.clone(),
×
662
            }
×
663
        })?;
15✔
664

665
        let pattern = match search.search_type {
15✔
666
            SearchType::Fulltext => {
667
                format!("%{}%", search.search_string)
10✔
668
            }
669
            SearchType::Prefix => {
670
                format!("{}%", search.search_string)
5✔
671
            }
672
        };
673

674
        let stmt = tx.prepare(&create_search_query(false)).await?;
15✔
675

676
        let rows = tx
15✔
677
            .query(
15✔
678
                &stmt,
15✔
679
                &[
15✔
680
                    &collection,
15✔
681
                    &i64::from(search.limit),
15✔
682
                    &i64::from(search.offset),
15✔
683
                    &self.session.user.id,
15✔
684
                    &pattern,
15✔
685
                ],
15✔
686
            )
15✔
687
            .await?;
15✔
688

689
        let items = rows
15✔
690
            .into_iter()
15✔
691
            .map(|row| Ok(row.get::<usize, &str>(0).to_string()))
31✔
692
            .collect::<Result<Vec<String>>>()?;
15✔
693

694
        tx.commit().await?;
15✔
695

696
        Ok(items)
15✔
697
    }
30✔
698

699
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
11✔
700
        Ok(LayerCollectionId(
11✔
701
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
11✔
702
        ))
11✔
703
    }
22✔
704

705
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
6✔
706
        let mut conn = self.conn_pool.get().await?;
6✔
707
        let tx = conn.build_transaction().start().await?;
6✔
708

709
        self.ensure_permission_in_tx(id.clone().into(), Permission::Read, &tx)
6✔
710
            .await
11✔
711
            .boxed_context(crate::error::PermissionDb)?;
6✔
712

713
        let layer_id =
3✔
714
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
715
                found: id.0.clone(),
×
716
            })?;
3✔
717

718
        let stmt = tx
3✔
719
            .prepare(
3✔
720
                "
3✔
721
            SELECT 
3✔
722
                l.name,
3✔
723
                l.description,
3✔
724
                w.workflow,
3✔
725
                l.symbology,
3✔
726
                l.properties,
3✔
727
                l.metadata
3✔
728
            FROM 
3✔
729
                layers l JOIN workflows w ON (l.workflow_id = w.id)
3✔
730
            WHERE 
3✔
731
                l.id = $1;",
3✔
732
            )
3✔
733
            .await?;
3✔
734

735
        let row = tx
3✔
736
            .query_one(&stmt, &[&layer_id])
3✔
737
            .await
2✔
738
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
3✔
739

740
        tx.commit().await?;
3✔
741

742
        Ok(Layer {
743
            id: ProviderLayerId {
3✔
744
                provider_id: INTERNAL_PROVIDER_ID,
3✔
745
                layer_id: id.clone(),
3✔
746
            },
3✔
747
            name: row.get(0),
3✔
748
            description: row.get(1),
3✔
749
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
3✔
750
            symbology: row.get(3),
3✔
751
            properties: row.get(4),
3✔
752
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
3✔
753
        })
754
    }
12✔
755
}
756

757
#[async_trait]
758
impl<Tls> LayerProviderDb for ProPostgresDb<Tls>
759
where
760
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
761
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
762
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
763
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
764
{
765
    async fn add_layer_provider(
766
        &self,
767
        provider: TypedDataProviderDefinition,
768
    ) -> Result<DataProviderId> {
1✔
769
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
770

771
        let conn = self.conn_pool.get().await?;
1✔
772

773
        let prio = DataProviderDefinition::<Self>::priority(&provider);
1✔
774
        let clamp_prio = prio.clamp(-1000, 1000);
1✔
775

1✔
776
        if prio != clamp_prio {
1✔
UNCOV
777
            log::warn!(
×
UNCOV
778
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
779
                DataProviderDefinition::<Self>::name(&provider),
×
780
                prio,
781
                clamp_prio
782
            );
783
        }
1✔
784

785
        let stmt = conn
1✔
786
            .prepare(
1✔
787
                "
1✔
788
              INSERT INTO layer_providers (
1✔
789
                  id, 
1✔
790
                  type_name, 
1✔
791
                  name,
1✔
792
                  definition,
1✔
793
                  priority
1✔
794
              )
1✔
795
              VALUES ($1, $2, $3, $4, $5)",
1✔
796
            )
1✔
UNCOV
797
            .await?;
×
798

799
        let id = DataProviderDefinition::<Self>::id(&provider);
1✔
800
        conn.execute(
1✔
801
            &stmt,
1✔
802
            &[
1✔
803
                &id,
1✔
804
                &DataProviderDefinition::<Self>::type_name(&provider),
1✔
805
                &DataProviderDefinition::<Self>::name(&provider),
1✔
806
                &provider,
1✔
807
                &clamp_prio,
1✔
808
            ],
1✔
809
        )
1✔
810
        .await?;
1✔
811
        Ok(id)
1✔
812
    }
2✔
813

814
    async fn list_layer_providers(
815
        &self,
816
        options: LayerProviderListingOptions,
817
    ) -> Result<Vec<LayerProviderListing>> {
2✔
818
        // TODO: permission
819
        let conn = self.conn_pool.get().await?;
2✔
820

821
        let stmt = conn
2✔
822
            .prepare(
2✔
823
                "(
2✔
824
                    SELECT 
2✔
825
                        id, 
2✔
826
                        name,
2✔
827
                        type_name,
2✔
828
                        priority
2✔
829
                    FROM 
2✔
830
                        layer_providers
2✔
831
                    WHERE
2✔
832
                        priority > -1000
2✔
833
                    UNION ALL
2✔
834
                    SELECT 
2✔
835
                        id, 
2✔
836
                        name,
2✔
837
                        type_name,
2✔
838
                        priority
2✔
839
                    FROM 
2✔
840
                        pro_layer_providers
2✔
841
                    WHERE
2✔
842
                        priority > -1000
2✔
843
                )
2✔
844
                ORDER BY priority desc, name ASC
2✔
845
                LIMIT $1 
2✔
846
                OFFSET $2;",
2✔
847
            )
2✔
848
            .await?;
2✔
849

850
        let rows = conn
2✔
851
            .query(
2✔
852
                &stmt,
2✔
853
                &[&i64::from(options.limit), &i64::from(options.offset)],
2✔
854
            )
2✔
855
            .await?;
2✔
856

857
        Ok(rows
2✔
858
            .iter()
2✔
859
            .map(|row| LayerProviderListing {
2✔
860
                id: row.get(0),
2✔
861
                name: row.get(1),
2✔
862
                priority: row.get(3),
2✔
863
            })
2✔
864
            .collect())
2✔
865
    }
4✔
866

867
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
2✔
868
        // TODO: permissions
869
        let conn = self.conn_pool.get().await?;
2✔
870

871
        let stmt = conn
2✔
872
            .prepare(
2✔
873
                "SELECT
2✔
874
                    definition, NULL AS pro_definition
2✔
875
                FROM
2✔
876
                    layer_providers
2✔
877
                WHERE
2✔
878
                    id = $1
2✔
879
                UNION ALL
2✔
880
                SELECT
2✔
881
                    NULL AS definition, definition AS pro_definition
2✔
882
                FROM
2✔
883
                    pro_layer_providers
2✔
884
                WHERE
2✔
885
                    id = $1",
2✔
886
            )
2✔
887
            .await?;
56✔
888

889
        let row = conn.query_one(&stmt, &[&id]).await?;
2✔
890

891
        if let Some(definition) = row.get::<_, Option<TypedDataProviderDefinition>>(0) {
2✔
892
            return Box::new(definition)
1✔
893
                .initialize(ProPostgresDb {
1✔
894
                    conn_pool: self.conn_pool.clone(),
1✔
895
                    session: self.session.clone(),
1✔
896
                })
1✔
UNCOV
897
                .await;
×
898
        }
1✔
899

1✔
900
        let pro_definition: TypedProDataProviderDefinition = row.get(1);
1✔
901
        Box::new(pro_definition)
1✔
902
            .initialize(ProPostgresDb {
1✔
903
                conn_pool: self.conn_pool.clone(),
1✔
904
                session: self.session.clone(),
1✔
905
            })
1✔
UNCOV
906
            .await
×
907
    }
4✔
908
}
909

910
#[async_trait]
911
pub trait ProLayerProviderDb: Send + Sync + 'static {
912
    async fn add_pro_layer_provider(
913
        &self,
914
        provider: TypedProDataProviderDefinition,
915
    ) -> Result<DataProviderId>;
916
}
917

918
#[async_trait]
919
impl<Tls> ProLayerProviderDb for ProPostgresDb<Tls>
920
where
921
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
922
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
923
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
924
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
925
{
926
    async fn add_pro_layer_provider(
927
        &self,
928
        provider: TypedProDataProviderDefinition,
929
    ) -> Result<DataProviderId> {
1✔
930
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
931

932
        let conn = self.conn_pool.get().await?;
1✔
933

934
        let prio = DataProviderDefinition::<Self>::priority(&provider);
1✔
935
        let clamp_prio = prio.clamp(-1000, 1000);
1✔
936

1✔
937
        if prio != clamp_prio {
1✔
UNCOV
938
            log::warn!(
×
UNCOV
939
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
940
                DataProviderDefinition::<Self>::name(&provider),
×
941
                prio,
942
                clamp_prio
943
            );
944
        }
1✔
945

946
        let stmt = conn
1✔
947
            .prepare(
1✔
948
                "
1✔
949
              INSERT INTO pro_layer_providers (
1✔
950
                  id, 
1✔
951
                  type_name, 
1✔
952
                  name,
1✔
953
                  definition,
1✔
954
                  priority
1✔
955
              )
1✔
956
              VALUES ($1, $2, $3, $4, $5)",
1✔
957
            )
1✔
958
            .await?;
26✔
959

960
        let id = DataProviderDefinition::<Self>::id(&provider);
1✔
961
        conn.execute(
1✔
962
            &stmt,
1✔
963
            &[
1✔
964
                &id,
1✔
965
                &DataProviderDefinition::<Self>::type_name(&provider),
1✔
966
                &DataProviderDefinition::<Self>::name(&provider),
1✔
967
                &provider,
1✔
968
                &clamp_prio,
1✔
969
            ],
1✔
970
        )
1✔
971
        .await?;
1✔
972
        Ok(id)
1✔
973
    }
2✔
974
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc