• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12466060820

23 Dec 2024 11:26AM UTC coverage: 90.353% (-0.2%) from 90.512%
12466060820

Pull #998

github

web-flow
Merge 66ab0655c into 34e12969f
Pull Request #998: Quota and Data usage Logging

834 of 1211 new or added lines in 66 files covered. (68.87%)

222 existing lines in 18 files now uncovered.

133834 of 148123 relevant lines covered (90.35%)

54353.18 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.11
/services/src/layers/postgres_layer_db.rs
1
use super::external::TypedDataProviderDefinition;
2
use super::layer::{UpdateLayer, UpdateLayerCollection};
3
use super::listing::{ProviderCapabilities, SearchType};
4
use crate::contexts::PostgresDb;
5
use crate::layers::layer::Property;
6
use crate::layers::listing::{SearchCapabilities, SearchParameters, SearchTypes};
7
use crate::workflows::registry::TxWorkflowRegistry;
8
use crate::{
9
    error::Result,
10
    layers::{
11
        external::{DataProvider, DataProviderDefinition},
12
        layer::{
13
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
14
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
15
            ProviderLayerCollectionId, ProviderLayerId,
16
        },
17
        listing::{LayerCollectionId, LayerCollectionProvider},
18
        storage::{
19
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
20
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
21
        },
22
        LayerDbError,
23
    },
24
};
25
use async_trait::async_trait;
26
use bb8_postgres::bb8::PooledConnection;
27
use bb8_postgres::tokio_postgres::{
28
    tls::{MakeTlsConnect, TlsConnect},
29
    Socket,
30
};
31
use bb8_postgres::PostgresConnectionManager;
32
use geoengine_datatypes::dataset::{DataProviderId, LayerId};
33
use geoengine_datatypes::util::HashMapTextTextDbType;
34
use snafu::ResultExt;
35
use std::str::FromStr;
36
use tokio_postgres::Transaction;
37
use uuid::Uuid;
38

39
/// delete all collections without parent collection
40
async fn _remove_collections_without_parent_collection(
8✔
41
    transaction: &tokio_postgres::Transaction<'_>,
8✔
42
) -> Result<()> {
8✔
43
    // HINT: a recursive delete statement seems reasonable, but hard to implement in postgres
44
    //       because you have a graph with potential loops
45

46
    let remove_layer_collections_without_parents_stmt = transaction
8✔
47
        .prepare(
8✔
48
            "DELETE FROM layer_collections
8✔
49
                 WHERE  id <> $1 -- do not delete root collection
8✔
50
                 AND    id NOT IN (
8✔
51
                    SELECT child FROM collection_children
8✔
52
                 );",
8✔
53
        )
8✔
54
        .await?;
8✔
55
    while 0 < transaction
13✔
56
        .execute(
13✔
57
            &remove_layer_collections_without_parents_stmt,
13✔
58
            &[&INTERNAL_LAYER_DB_ROOT_COLLECTION_ID],
13✔
59
        )
13✔
60
        .await?
13✔
61
    {
5✔
62
        // whenever one collection is deleted, we have to check again if there are more
5✔
63
        // collections without parents
5✔
64
    }
5✔
65

66
    Ok(())
8✔
67
}
8✔
68

69
/// delete all layers without parent collection
70
async fn _remove_layers_without_parent_collection(
13✔
71
    transaction: &tokio_postgres::Transaction<'_>,
13✔
72
) -> Result<()> {
13✔
73
    let remove_layers_without_parents_stmt = transaction
13✔
74
        .prepare(
13✔
75
            "DELETE FROM layers
13✔
76
                 WHERE id NOT IN (
13✔
77
                    SELECT layer FROM collection_layers
13✔
78
                 );",
13✔
79
        )
13✔
80
        .await?;
12✔
81
    transaction
13✔
82
        .execute(&remove_layers_without_parents_stmt, &[])
13✔
83
        .await?;
13✔
84

85
    Ok(())
13✔
86
}
13✔
87

88
pub async fn insert_layer<W: TxWorkflowRegistry>(
37✔
89
    workflow_registry: &W,
37✔
90
    trans: &Transaction<'_>,
37✔
91
    id: &LayerId,
37✔
92
    layer: AddLayer,
37✔
93
    collection: &LayerCollectionId,
37✔
94
) -> Result<Uuid> {
37✔
95
    let layer_id = Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
37✔
96
        found: collection.0.clone(),
×
97
    })?;
37✔
98

99
    let collection_id =
37✔
100
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
37✔
101
            found: collection.0.clone(),
×
102
        })?;
37✔
103

104
    let workflow_id = workflow_registry
37✔
105
        .register_workflow_in_tx(layer.workflow, trans)
37✔
106
        .await?;
59✔
107

108
    let stmt = trans
37✔
109
        .prepare(
37✔
110
            "
37✔
111
            INSERT INTO layers (id, name, description, workflow_id, symbology, properties, metadata)
37✔
112
            VALUES ($1, $2, $3, $4, $5, $6, $7);",
37✔
113
        )
37✔
114
        .await?;
762✔
115

116
    trans
37✔
117
        .execute(
37✔
118
            &stmt,
37✔
119
            &[
37✔
120
                &layer_id,
37✔
121
                &layer.name,
37✔
122
                &layer.description,
37✔
123
                &workflow_id,
37✔
124
                &layer.symbology,
37✔
125
                &layer.properties,
37✔
126
                &HashMapTextTextDbType::from(&layer.metadata),
37✔
127
            ],
37✔
128
        )
37✔
129
        .await?;
31✔
130

131
    let stmt = trans
37✔
132
        .prepare(
37✔
133
            "
37✔
134
            INSERT INTO collection_layers (collection, layer)
37✔
135
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
37✔
136
        )
37✔
137
        .await?;
30✔
138

139
    trans.execute(&stmt, &[&collection_id, &layer_id]).await?;
37✔
140

141
    Ok(layer_id)
37✔
142
}
37✔
143

144
pub async fn insert_layer_collection_with_id(
44✔
145
    trans: &Transaction<'_>,
44✔
146
    id: &LayerCollectionId,
44✔
147
    collection: AddLayerCollection,
44✔
148
    parent: &LayerCollectionId,
44✔
149
) -> Result<Uuid> {
44✔
150
    let collection_id =
44✔
151
        Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
44✔
UNCOV
152
            found: id.0.clone(),
×
153
        })?;
44✔
154

155
    let parent =
44✔
156
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
44✔
UNCOV
157
            found: parent.0.clone(),
×
158
        })?;
44✔
159

160
    let stmt = trans
44✔
161
        .prepare(
44✔
162
            "
44✔
163
        INSERT INTO layer_collections (id, name, description, properties)
44✔
164
        VALUES ($1, $2, $3, $4);",
44✔
165
        )
44✔
166
        .await?;
100✔
167

168
    trans
44✔
169
        .execute(
44✔
170
            &stmt,
44✔
171
            &[
44✔
172
                &collection_id,
44✔
173
                &collection.name,
44✔
174
                &collection.description,
44✔
175
                &collection.properties,
44✔
176
            ],
44✔
177
        )
44✔
178
        .await?;
39✔
179

180
    let stmt = trans
44✔
181
        .prepare(
44✔
182
            "
44✔
183
        INSERT INTO collection_children (parent, child)
44✔
184
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
44✔
185
        )
44✔
186
        .await?;
40✔
187

188
    trans.execute(&stmt, &[&parent, &collection_id]).await?;
44✔
189

190
    Ok(collection_id)
44✔
191
}
44✔
192

193
pub async fn insert_collection_parent<Tls>(
3✔
194
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
3✔
195
    collection: &LayerCollectionId,
3✔
196
    parent: &LayerCollectionId,
3✔
197
) -> Result<()>
3✔
198
where
3✔
199
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
3✔
200
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
3✔
201
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
3✔
202
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
3✔
203
{
3✔
204
    let collection =
3✔
205
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
UNCOV
206
            found: collection.0.clone(),
×
207
        })?;
3✔
208

209
    let parent =
3✔
210
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
UNCOV
211
            found: parent.0.clone(),
×
212
        })?;
3✔
213

214
    let stmt = conn
3✔
215
        .prepare(
3✔
216
            "
3✔
217
        INSERT INTO collection_children (parent, child)
3✔
218
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
3✔
219
        )
3✔
220
        .await?;
3✔
221

222
    conn.execute(&stmt, &[&parent, &collection]).await?;
3✔
223

224
    Ok(())
3✔
225
}
3✔
226

227
pub async fn delete_layer_collection(
8✔
228
    transaction: &Transaction<'_>,
8✔
229
    collection: &LayerCollectionId,
8✔
230
) -> Result<()> {
8✔
231
    let collection =
8✔
232
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
8✔
UNCOV
233
            found: collection.0.clone(),
×
234
        })?;
8✔
235

236
    if collection == INTERNAL_LAYER_DB_ROOT_COLLECTION_ID {
8✔
237
        return Err(LayerDbError::CannotRemoveRootCollection.into());
3✔
238
    }
5✔
239

240
    // delete the collection!
241
    // on delete cascade removes all entries from `collection_children` and `collection_layers`
242

243
    let remove_layer_collection_stmt = transaction
5✔
244
        .prepare(
5✔
245
            "DELETE FROM layer_collections
5✔
246
             WHERE id = $1;",
5✔
247
        )
5✔
248
        .await?;
4✔
249
    transaction
5✔
250
        .execute(&remove_layer_collection_stmt, &[&collection])
5✔
251
        .await?;
4✔
252

253
    _remove_collections_without_parent_collection(transaction).await?;
10✔
254

255
    _remove_layers_without_parent_collection(transaction).await?;
9✔
256

257
    Ok(())
5✔
258
}
8✔
259

260
pub async fn delete_layer_from_collection(
5✔
261
    transaction: &Transaction<'_>,
5✔
262
    layer: &LayerId,
5✔
263
    collection: &LayerCollectionId,
5✔
264
) -> Result<()> {
5✔
265
    let collection_uuid =
5✔
266
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
UNCOV
267
            found: collection.0.clone(),
×
268
        })?;
5✔
269

270
    let layer_uuid =
5✔
271
        Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
UNCOV
272
            found: layer.0.clone(),
×
273
        })?;
5✔
274

275
    let remove_layer_collection_stmt = transaction
5✔
276
        .prepare(
5✔
277
            "DELETE FROM collection_layers
5✔
278
             WHERE collection = $1
5✔
279
             AND layer = $2;",
5✔
280
        )
5✔
281
        .await?;
4✔
282
    let num_results = transaction
5✔
283
        .execute(
5✔
284
            &remove_layer_collection_stmt,
5✔
285
            &[&collection_uuid, &layer_uuid],
5✔
286
        )
5✔
287
        .await?;
5✔
288

289
    if num_results == 0 {
5✔
UNCOV
290
        return Err(LayerDbError::NoLayerForGivenIdInCollection {
×
UNCOV
291
            layer: layer.clone(),
×
UNCOV
292
            collection: collection.clone(),
×
UNCOV
293
        }
×
UNCOV
294
        .into());
×
295
    }
5✔
296

5✔
297
    _remove_layers_without_parent_collection(transaction).await?;
10✔
298

299
    Ok(())
5✔
300
}
5✔
301

302
pub async fn delete_layer_collection_from_parent(
3✔
303
    transaction: &Transaction<'_>,
3✔
304
    collection: &LayerCollectionId,
3✔
305
    parent: &LayerCollectionId,
3✔
306
) -> Result<()> {
3✔
307
    let collection_uuid =
3✔
308
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
309
            found: collection.0.clone(),
×
310
        })?;
3✔
311

312
    let parent_collection_uuid =
3✔
313
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
UNCOV
314
            found: parent.0.clone(),
×
315
        })?;
3✔
316

317
    let remove_layer_collection_stmt = transaction
3✔
318
        .prepare(
3✔
319
            "DELETE FROM collection_children
3✔
320
             WHERE child = $1
3✔
321
             AND parent = $2;",
3✔
322
        )
3✔
323
        .await?;
3✔
324
    let num_results = transaction
3✔
325
        .execute(
3✔
326
            &remove_layer_collection_stmt,
3✔
327
            &[&collection_uuid, &parent_collection_uuid],
3✔
328
        )
3✔
329
        .await?;
3✔
330

331
    if num_results == 0 {
3✔
UNCOV
332
        return Err(LayerDbError::NoCollectionForGivenIdInCollection {
×
UNCOV
333
            collection: collection.clone(),
×
UNCOV
334
            parent: parent.clone(),
×
UNCOV
335
        }
×
UNCOV
336
        .into());
×
337
    }
3✔
338

3✔
339
    _remove_collections_without_parent_collection(transaction).await?;
11✔
340

341
    _remove_layers_without_parent_collection(transaction).await?;
6✔
342

343
    Ok(())
3✔
344
}
3✔
345

346
fn create_search_query(full_info: bool) -> String {
18✔
347
    format!("
18✔
348
        WITH RECURSIVE parents AS (
18✔
349
            SELECT $1::uuid as id
18✔
350
            UNION ALL SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)
18✔
351
        )
18✔
352
        SELECT DISTINCT *
18✔
353
        FROM (
18✔
354
            SELECT 
18✔
355
                {}
18✔
356
            FROM layer_collections
18✔
357
                JOIN (SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)) cc ON (id = cc.child)
18✔
358
            WHERE name ILIKE $4
18✔
359
        ) u UNION (
18✔
360
            SELECT 
18✔
361
                {}
18✔
362
            FROM layers uc
18✔
363
                JOIN (SELECT DISTINCT layer FROM collection_layers JOIN parents ON (collection = id)) cl ON (id = cl.layer)
18✔
364
            WHERE name ILIKE $4
18✔
365
        )
18✔
366
        ORDER BY {}name ASC
18✔
367
        LIMIT $2 
18✔
368
        OFFSET $3;",
18✔
369
        if full_info {
18✔
370
            "concat(id, '') AS id,
9✔
371
            name,
9✔
372
            description,
9✔
373
            properties,
9✔
374
            FALSE AS is_layer"
9✔
375
        } else { "name" },
9✔
376
        if full_info {
18✔
377
            "concat(id, '') AS id,
9✔
378
            name,
9✔
379
            description,
9✔
380
            properties,
9✔
381
            TRUE AS is_layer"
9✔
382
        } else { "name" },
9✔
383
        if full_info { "is_layer ASC," } else { "" })
18✔
384
}
18✔
385

386
#[async_trait]
387
impl<Tls> LayerDb for PostgresDb<Tls>
388
where
389
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
390
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
391
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
392
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
393
{
394
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
21✔
395
        let layer_id = Uuid::new_v4();
21✔
396
        let layer_id = LayerId(layer_id.to_string());
21✔
397

21✔
398
        self.add_layer_with_id(&layer_id, layer, collection).await?;
752✔
399

400
        Ok(layer_id)
21✔
401
    }
42✔
402

403
    async fn update_layer(&self, id: &LayerId, layer: UpdateLayer) -> Result<()> {
1✔
404
        let layer_id =
1✔
405
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
UNCOV
406
                found: id.0.clone(),
×
407
            })?;
1✔
408

409
        let mut conn = self.conn_pool.get().await?;
1✔
410

411
        let tx = conn.build_transaction().start().await?;
1✔
412

413
        let workflow_id = self.register_workflow_in_tx(layer.workflow, &tx).await?;
1✔
414

415
        tx.execute(
1✔
416
            "
1✔
417
            UPDATE layers
1✔
418
            SET name = $1, description = $2, symbology = $3, properties = $4, metadata = $5, workflow_id = $6
1✔
419
            WHERE id = $7;",
1✔
420
            &[
1✔
421
                &layer.name,
1✔
422
                &layer.description,
1✔
423
                &layer.symbology,
1✔
424
                &layer.properties,
1✔
425
                &HashMapTextTextDbType::from(&layer.metadata),
1✔
426
                &workflow_id,
1✔
427
                &layer_id,
1✔
428
            ],
1✔
429
        )
1✔
UNCOV
430
        .await?;
×
431

432
        tx.commit().await?;
1✔
433

434
        Ok(())
1✔
435
    }
2✔
436

437
    async fn remove_layer(&self, id: &LayerId) -> Result<()> {
1✔
438
        let layer_id =
1✔
439
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
UNCOV
440
                found: id.0.clone(),
×
441
            })?;
1✔
442

443
        let conn = self.conn_pool.get().await?;
1✔
444

445
        conn.execute(
1✔
446
            "
1✔
447
            DELETE FROM layers
1✔
448
            WHERE id = $1;",
1✔
449
            &[&layer_id],
1✔
450
        )
1✔
451
        .await?;
2✔
452

453
        Ok(())
1✔
454
    }
2✔
455

456
    async fn add_layer_with_id(
457
        &self,
458
        id: &LayerId,
459
        layer: AddLayer,
460
        collection: &LayerCollectionId,
461
    ) -> Result<()> {
21✔
462
        let mut conn = self.conn_pool.get().await?;
21✔
463

464
        let trans = conn.build_transaction().start().await?;
21✔
465

466
        insert_layer(self, &trans, id, layer, collection).await?;
691✔
467

468
        trans.commit().await?;
21✔
469

470
        Ok(())
21✔
471
    }
42✔
472

473
    async fn add_layer_to_collection(
474
        &self,
475
        layer: &LayerId,
476
        collection: &LayerCollectionId,
477
    ) -> Result<()> {
3✔
478
        let layer_id =
2✔
479
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
480
                found: layer.0.clone(),
1✔
481
            })?;
3✔
482

483
        let collection_id =
2✔
484
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
UNCOV
485
                found: collection.0.clone(),
×
486
            })?;
2✔
487

488
        let conn = self.conn_pool.get().await?;
2✔
489

490
        let stmt = conn
2✔
491
            .prepare(
2✔
492
                "
2✔
493
            INSERT INTO collection_layers (collection, layer)
2✔
494
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
2✔
495
            )
2✔
496
            .await?;
2✔
497

498
        conn.execute(&stmt, &[&collection_id, &layer_id]).await?;
2✔
499

500
        Ok(())
2✔
501
    }
6✔
502

503
    async fn add_layer_collection(
504
        &self,
505
        collection: AddLayerCollection,
506
        parent: &LayerCollectionId,
507
    ) -> Result<LayerCollectionId> {
25✔
508
        let collection_id = Uuid::new_v4();
25✔
509
        let collection_id = LayerCollectionId(collection_id.to_string());
25✔
510

25✔
511
        self.add_layer_collection_with_id(&collection_id, collection, parent)
25✔
512
            .await?;
219✔
513

514
        Ok(collection_id)
25✔
515
    }
50✔
516

517
    async fn add_layer_collection_with_id(
518
        &self,
519
        id: &LayerCollectionId,
520
        collection: AddLayerCollection,
521
        parent: &LayerCollectionId,
522
    ) -> Result<()> {
25✔
523
        let mut conn = self.conn_pool.get().await?;
25✔
524

525
        let trans = conn.build_transaction().start().await?;
25✔
526

527
        insert_layer_collection_with_id(&trans, id, collection, parent).await?;
144✔
528

529
        trans.commit().await?;
25✔
530

531
        Ok(())
25✔
532
    }
50✔
533

534
    async fn add_collection_to_parent(
535
        &self,
536
        collection: &LayerCollectionId,
537
        parent: &LayerCollectionId,
538
    ) -> Result<()> {
2✔
539
        let conn = self.conn_pool.get().await?;
2✔
540
        insert_collection_parent(&conn, collection, parent).await
4✔
541
    }
4✔
542

543
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
5✔
544
        let mut conn = self.conn_pool.get().await?;
5✔
545
        let transaction = conn.transaction().await?;
5✔
546

547
        delete_layer_collection(&transaction, collection).await?;
15✔
548

549
        transaction.commit().await.map_err(Into::into)
3✔
550
    }
10✔
551

552
    async fn remove_layer_from_collection(
553
        &self,
554
        layer: &LayerId,
555
        collection: &LayerCollectionId,
556
    ) -> Result<()> {
3✔
557
        let mut conn = self.conn_pool.get().await?;
3✔
558
        let transaction = conn.transaction().await?;
3✔
559

560
        delete_layer_from_collection(&transaction, layer, collection).await?;
11✔
561

562
        transaction.commit().await.map_err(Into::into)
3✔
563
    }
6✔
564

565
    async fn remove_layer_collection_from_parent(
566
        &self,
567
        collection: &LayerCollectionId,
568
        parent: &LayerCollectionId,
569
    ) -> Result<()> {
2✔
570
        let mut conn = self.conn_pool.get().await?;
2✔
571
        let transaction = conn.transaction().await?;
2✔
572

573
        delete_layer_collection_from_parent(&transaction, collection, parent).await?;
15✔
574

575
        transaction.commit().await.map_err(Into::into)
2✔
576
    }
4✔
577

578
    async fn update_layer_collection(
579
        &self,
580
        collection: &LayerCollectionId,
581
        update: UpdateLayerCollection,
582
    ) -> Result<()> {
1✔
583
        let collection_id =
1✔
584
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
UNCOV
585
                found: collection.0.clone(),
×
586
            })?;
1✔
587

588
        let conn = self.conn_pool.get().await?;
1✔
589

590
        conn.execute(
1✔
591
            "UPDATE layer_collections 
1✔
592
                SET name = $1, description = $2, properties = $3
1✔
593
                WHERE id = $4;",
1✔
594
            &[
1✔
595
                &update.name,
1✔
596
                &update.description,
1✔
597
                &update.properties,
1✔
598
                &collection_id,
1✔
599
            ],
1✔
600
        )
1✔
601
        .await?;
1✔
602

603
        Ok(())
1✔
604
    }
2✔
605
}
606

607
#[async_trait]
608
impl<Tls> LayerCollectionProvider for PostgresDb<Tls>
609
where
610
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
611
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
612
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
613
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
614
{
615
    fn capabilities(&self) -> ProviderCapabilities {
2✔
616
        ProviderCapabilities {
2✔
617
            listing: true,
2✔
618
            search: SearchCapabilities {
2✔
619
                search_types: SearchTypes {
2✔
620
                    fulltext: true,
2✔
621
                    prefix: true,
2✔
622
                },
2✔
623
                autocomplete: true,
2✔
624
                filters: None,
2✔
625
            },
2✔
626
        }
2✔
627
    }
2✔
628

UNCOV
629
    fn name(&self) -> &str {
×
UNCOV
630
        "Postgres Layer Database"
×
UNCOV
631
    }
×
632

UNCOV
633
    fn description(&self) -> &str {
×
UNCOV
634
        "A layer database using Postgres"
×
UNCOV
635
    }
×
636

637
    #[allow(clippy::too_many_lines)]
638
    async fn load_layer_collection(
639
        &self,
640
        collection_id: &LayerCollectionId,
641
        options: LayerCollectionListOptions,
642
    ) -> Result<LayerCollection> {
19✔
643
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
19✔
UNCOV
644
            crate::error::Error::IdStringMustBeUuid {
×
UNCOV
645
                found: collection_id.0.clone(),
×
UNCOV
646
            }
×
647
        })?;
19✔
648

649
        let conn = self.conn_pool.get().await?;
19✔
650

651
        let stmt = conn
19✔
652
            .prepare(
19✔
653
                "
19✔
654
        SELECT name, description, properties
19✔
655
        FROM layer_collections
19✔
656
        WHERE id = $1;",
19✔
657
            )
19✔
658
            .await?;
18✔
659

660
        let row = conn.query_one(&stmt, &[&collection]).await?;
19✔
661

662
        let name: String = row.get(0);
15✔
663
        let description: String = row.get(1);
15✔
664
        let properties: Vec<Property> = row.get(2);
15✔
665

666
        let stmt = conn
15✔
667
            .prepare(
15✔
668
                "
15✔
669
        SELECT DISTINCT id, name, description, properties, is_layer
15✔
670
        FROM (
15✔
671
            SELECT 
15✔
672
                concat(id, '') AS id, 
15✔
673
                name, 
15✔
674
                description, 
15✔
675
                properties, 
15✔
676
                FALSE AS is_layer
15✔
677
            FROM layer_collections
15✔
678
                JOIN collection_children cc ON (id = cc.child)
15✔
679
            WHERE cc.parent = $1
15✔
680
        ) u UNION (
15✔
681
            SELECT 
15✔
682
                concat(id, '') AS id, 
15✔
683
                name, 
15✔
684
                description, 
15✔
685
                properties, 
15✔
686
                TRUE AS is_layer
15✔
687
            FROM layers uc
15✔
688
                JOIN collection_layers cl ON (id = cl.layer)
15✔
689
            WHERE cl.collection = $1
15✔
690
        )
15✔
691
        ORDER BY is_layer ASC, name ASC
15✔
692
        LIMIT $2 
15✔
693
        OFFSET $3;            
15✔
694
        ",
15✔
695
            )
15✔
696
            .await?;
14✔
697

698
        let rows = conn
15✔
699
            .query(
15✔
700
                &stmt,
15✔
701
                &[
15✔
702
                    &collection,
15✔
703
                    &i64::from(options.limit),
15✔
704
                    &i64::from(options.offset),
15✔
705
                ],
15✔
706
            )
15✔
707
            .await?;
14✔
708

709
        let items = rows
15✔
710
            .into_iter()
15✔
711
            .map(|row| {
16✔
712
                let is_layer: bool = row.get(4);
16✔
713

16✔
714
                if is_layer {
16✔
715
                    Ok(CollectionItem::Layer(LayerListing {
7✔
716
                        id: ProviderLayerId {
7✔
717
                            provider_id: INTERNAL_PROVIDER_ID,
7✔
718
                            layer_id: LayerId(row.get(0)),
7✔
719
                        },
7✔
720
                        name: row.get(1),
7✔
721
                        description: row.get(2),
7✔
722
                        properties: row.get(3),
7✔
723
                    }))
7✔
724
                } else {
725
                    Ok(CollectionItem::Collection(LayerCollectionListing {
9✔
726
                        id: ProviderLayerCollectionId {
9✔
727
                            provider_id: INTERNAL_PROVIDER_ID,
9✔
728
                            collection_id: LayerCollectionId(row.get(0)),
9✔
729
                        },
9✔
730
                        name: row.get(1),
9✔
731
                        description: row.get(2),
9✔
732
                        properties: row.get(3),
9✔
733
                    }))
9✔
734
                }
735
            })
16✔
736
            .collect::<Result<Vec<CollectionItem>>>()?;
15✔
737

738
        Ok(LayerCollection {
15✔
739
            id: ProviderLayerCollectionId {
15✔
740
                provider_id: INTERNAL_PROVIDER_ID,
15✔
741
                collection_id: collection_id.clone(),
15✔
742
            },
15✔
743
            name,
15✔
744
            description,
15✔
745
            items,
15✔
746
            entry_label: None,
15✔
747
            properties,
15✔
748
        })
15✔
749
    }
38✔
750

751
    #[allow(clippy::too_many_lines)]
752
    async fn search(
753
        &self,
754
        collection_id: &LayerCollectionId,
755
        search: SearchParameters,
756
    ) -> Result<LayerCollection> {
9✔
757
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
9✔
UNCOV
758
            crate::error::Error::IdStringMustBeUuid {
×
UNCOV
759
                found: collection_id.0.clone(),
×
UNCOV
760
            }
×
761
        })?;
9✔
762

763
        let conn = self.conn_pool.get().await?;
9✔
764

765
        let stmt = conn
9✔
766
            .prepare(
9✔
767
                "
9✔
768
        SELECT name, description, properties
9✔
769
        FROM layer_collections
9✔
770
        WHERE id = $1;",
9✔
771
            )
9✔
772
            .await?;
15✔
773

774
        let row = conn.query_one(&stmt, &[&collection]).await?;
9✔
775

776
        let name: String = row.get(0);
9✔
777
        let description: String = row.get(1);
9✔
778
        let properties: Vec<Property> = row.get(2);
9✔
779

780
        let pattern = match search.search_type {
9✔
781
            SearchType::Fulltext => {
782
                format!("%{}%", search.search_string)
6✔
783
            }
784
            SearchType::Prefix => {
785
                format!("{}%", search.search_string)
3✔
786
            }
787
        };
788

789
        let stmt = conn.prepare(&create_search_query(true)).await?;
9✔
790

791
        let rows = conn
9✔
792
            .query(
9✔
793
                &stmt,
9✔
794
                &[
9✔
795
                    &collection,
9✔
796
                    &i64::from(search.limit),
9✔
797
                    &i64::from(search.offset),
9✔
798
                    &pattern,
9✔
799
                ],
9✔
800
            )
9✔
801
            .await?;
9✔
802

803
        let items = rows
9✔
804
            .into_iter()
9✔
805
            .map(|row| {
13✔
806
                let is_layer: bool = row.get(4);
13✔
807

13✔
808
                if is_layer {
13✔
809
                    Ok(CollectionItem::Layer(LayerListing {
5✔
810
                        id: ProviderLayerId {
5✔
811
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
812
                            layer_id: LayerId(row.get(0)),
5✔
813
                        },
5✔
814
                        name: row.get(1),
5✔
815
                        description: row.get(2),
5✔
816
                        properties: row.get(3),
5✔
817
                    }))
5✔
818
                } else {
819
                    Ok(CollectionItem::Collection(LayerCollectionListing {
8✔
820
                        id: ProviderLayerCollectionId {
8✔
821
                            provider_id: INTERNAL_PROVIDER_ID,
8✔
822
                            collection_id: LayerCollectionId(row.get(0)),
8✔
823
                        },
8✔
824
                        name: row.get(1),
8✔
825
                        description: row.get(2),
8✔
826
                        properties: row.get(3),
8✔
827
                    }))
8✔
828
                }
829
            })
13✔
830
            .collect::<Result<Vec<CollectionItem>>>()?;
9✔
831

832
        Ok(LayerCollection {
9✔
833
            id: ProviderLayerCollectionId {
9✔
834
                provider_id: INTERNAL_PROVIDER_ID,
9✔
835
                collection_id: collection_id.clone(),
9✔
836
            },
9✔
837
            name,
9✔
838
            description,
9✔
839
            items,
9✔
840
            entry_label: None,
9✔
841
            properties,
9✔
842
        })
9✔
843
    }
18✔
844

845
    #[allow(clippy::too_many_lines)]
846
    async fn autocomplete_search(
847
        &self,
848
        collection_id: &LayerCollectionId,
849
        search: SearchParameters,
850
    ) -> Result<Vec<String>> {
9✔
851
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
9✔
UNCOV
852
            crate::error::Error::IdStringMustBeUuid {
×
UNCOV
853
                found: collection_id.0.clone(),
×
UNCOV
854
            }
×
855
        })?;
9✔
856

857
        let conn = self.conn_pool.get().await?;
9✔
858

859
        let pattern = match search.search_type {
9✔
860
            SearchType::Fulltext => {
861
                format!("%{}%", search.search_string)
6✔
862
            }
863
            SearchType::Prefix => {
864
                format!("{}%", search.search_string)
3✔
865
            }
866
        };
867

868
        let stmt = conn.prepare(&create_search_query(false)).await?;
9✔
869

870
        let rows = conn
9✔
871
            .query(
9✔
872
                &stmt,
9✔
873
                &[
9✔
874
                    &collection,
9✔
875
                    &i64::from(search.limit),
9✔
876
                    &i64::from(search.offset),
9✔
877
                    &pattern,
9✔
878
                ],
9✔
879
            )
9✔
880
            .await?;
8✔
881

882
        let items = rows
9✔
883
            .into_iter()
9✔
884
            .map(|row| Ok(row.get::<usize, &str>(0).to_string()))
13✔
885
            .collect::<Result<Vec<String>>>()?;
9✔
886

887
        Ok(items)
9✔
888
    }
18✔
889

890
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
27✔
891
        Ok(LayerCollectionId(
27✔
892
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
27✔
893
        ))
27✔
894
    }
54✔
895

896
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
19✔
897
        let layer_id =
19✔
898
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
19✔
UNCOV
899
                found: id.0.clone(),
×
900
            })?;
19✔
901

902
        let conn = self.conn_pool.get().await?;
19✔
903

904
        let stmt = conn
19✔
905
            .prepare(
19✔
906
                "
19✔
907
            SELECT 
19✔
908
                l.name,
19✔
909
                l.description,
19✔
910
                w.workflow,
19✔
911
                l.symbology,
19✔
912
                l.properties,
19✔
913
                l.metadata
19✔
914
            FROM 
19✔
915
                layers l JOIN workflows w ON (l.workflow_id = w.id)
19✔
916
            WHERE 
19✔
917
                l.id = $1;",
19✔
918
            )
19✔
919
            .await?;
18✔
920

921
        let row = conn
19✔
922
            .query_one(&stmt, &[&layer_id])
19✔
923
            .await
18✔
924
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
19✔
925

926
        Ok(Layer {
927
            id: ProviderLayerId {
14✔
928
                provider_id: INTERNAL_PROVIDER_ID,
14✔
929
                layer_id: id.clone(),
14✔
930
            },
14✔
931
            name: row.get(0),
14✔
932
            description: row.get(1),
14✔
933
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
14✔
934
            symbology: row.get(3),
14✔
935
            properties: row.get(4),
14✔
936
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
14✔
937
        })
938
    }
38✔
939
}
940

941
#[async_trait]
942
impl<Tls> LayerProviderDb for PostgresDb<Tls>
943
where
944
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
945
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
946
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
947
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
948
{
949
    async fn add_layer_provider(
950
        &self,
951
        provider: TypedDataProviderDefinition,
952
    ) -> Result<DataProviderId> {
8✔
953
        let conn = self.conn_pool.get().await?;
8✔
954

955
        let stmt = conn
8✔
956
            .prepare(
8✔
957
                "
8✔
958
              INSERT INTO layer_providers (
8✔
959
                  id, 
8✔
960
                  type_name, 
8✔
961
                  name,
8✔
962
                  definition,
8✔
963
                  priority
8✔
964
              )
8✔
965
              VALUES ($1, $2, $3, $4, $5)",
8✔
966
            )
8✔
967
            .await?;
248✔
968

969
        // clamp the priority to a reasonable range
970
        let prio = DataProviderDefinition::<Self>::priority(&provider);
8✔
971
        let clamp_prio = prio.clamp(-1000, 1000);
8✔
972

8✔
973
        if prio != clamp_prio {
8✔
UNCOV
974
            log::warn!(
×
UNCOV
975
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
UNCOV
976
                DataProviderDefinition::<Self>::name(&provider),
×
977
                prio,
978
                clamp_prio
979
            );
980
        }
8✔
981

982
        let id = DataProviderDefinition::<Self>::id(&provider);
8✔
983
        conn.execute(
8✔
984
            &stmt,
8✔
985
            &[
8✔
986
                &id,
8✔
987
                &DataProviderDefinition::<Self>::type_name(&provider),
8✔
988
                &DataProviderDefinition::<Self>::name(&provider),
8✔
989
                &provider,
8✔
990
                &clamp_prio,
8✔
991
            ],
8✔
992
        )
8✔
993
        .await?;
8✔
994
        Ok(id)
8✔
995
    }
16✔
996

997
    async fn list_layer_providers(
998
        &self,
999
        options: LayerProviderListingOptions,
1000
    ) -> Result<Vec<LayerProviderListing>> {
1✔
1001
        // TODO: permission
1002
        let conn = self.conn_pool.get().await?;
1✔
1003

1004
        let stmt = conn
1✔
1005
            .prepare(
1✔
1006
                "
1✔
1007
            SELECT 
1✔
1008
                id, 
1✔
1009
                name,
1✔
1010
                priority
1✔
1011
            FROM 
1✔
1012
                layer_providers
1✔
1013
            WHERE
1✔
1014
                priority > -1000
1✔
1015
            ORDER BY priority DESC, name ASC
1✔
1016
            LIMIT $1 
1✔
1017
            OFFSET $2;",
1✔
1018
            )
1✔
1019
            .await?;
1✔
1020

1021
        let rows = conn
1✔
1022
            .query(
1✔
1023
                &stmt,
1✔
1024
                &[&i64::from(options.limit), &i64::from(options.offset)],
1✔
1025
            )
1✔
1026
            .await?;
1✔
1027

1028
        Ok(rows
1✔
1029
            .iter()
1✔
1030
            .map(|row| LayerProviderListing {
1✔
1031
                id: row.get(0),
1✔
1032
                name: row.get(1),
1✔
1033
                priority: row.get(2),
1✔
1034
            })
1✔
1035
            .collect())
1✔
1036
    }
2✔
1037

1038
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
18✔
1039
        let conn = self.conn_pool.get().await?;
18✔
1040

1041
        let stmt = conn
18✔
1042
            .prepare(
18✔
1043
                "
18✔
1044
               SELECT 
18✔
1045
                   definition
18✔
1046
               FROM 
18✔
1047
                   layer_providers
18✔
1048
               WHERE
18✔
1049
                   id = $1",
18✔
1050
            )
18✔
1051
            .await?;
18✔
1052

1053
        let row = conn.query_one(&stmt, &[&id]).await?;
18✔
1054

1055
        let definition: TypedDataProviderDefinition = row.get(0);
11✔
1056

11✔
1057
        Box::new(definition)
11✔
1058
            .initialize(PostgresDb {
11✔
1059
                conn_pool: self.conn_pool.clone(),
11✔
1060
            })
11✔
UNCOV
1061
            .await
×
1062
    }
36✔
1063
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc