• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11910714914

19 Nov 2024 10:06AM UTC coverage: 90.445% (-0.2%) from 90.687%
11910714914

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

375 existing lines in 75 files now uncovered.

132867 of 146904 relevant lines covered (90.44%)

54798.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.28
/services/src/layers/postgres_layer_db.rs
1
use super::external::TypedDataProviderDefinition;
2
use super::layer::{UpdateLayer, UpdateLayerCollection};
3
use super::listing::{ProviderCapabilities, SearchType};
4
use crate::contexts::PostgresDb;
5
use crate::error;
6
use crate::layers::layer::Property;
7
use crate::layers::listing::{SearchCapabilities, SearchParameters, SearchTypes};
8
use crate::workflows::workflow::WorkflowId;
9
use crate::{
10
    error::Result,
11
    layers::{
12
        external::{DataProvider, DataProviderDefinition},
13
        layer::{
14
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
15
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
16
            ProviderLayerCollectionId, ProviderLayerId,
17
        },
18
        listing::{LayerCollectionId, LayerCollectionProvider},
19
        storage::{
20
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
21
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
22
        },
23
        LayerDbError,
24
    },
25
};
26
use async_trait::async_trait;
27
use bb8_postgres::bb8::PooledConnection;
28
use bb8_postgres::tokio_postgres::{
29
    tls::{MakeTlsConnect, TlsConnect},
30
    Socket,
31
};
32
use bb8_postgres::PostgresConnectionManager;
33
use geoengine_datatypes::dataset::{DataProviderId, LayerId};
34
use geoengine_datatypes::util::HashMapTextTextDbType;
35
use snafu::ResultExt;
36
use std::str::FromStr;
37
use tokio_postgres::Transaction;
38
use uuid::Uuid;
39

40
/// delete all collections without parent collection
41
async fn _remove_collections_without_parent_collection(
8✔
42
    transaction: &tokio_postgres::Transaction<'_>,
8✔
43
) -> Result<()> {
8✔
44
    // HINT: a recursive delete statement seems reasonable, but hard to implement in postgres
45
    //       because you have a graph with potential loops
46

47
    let remove_layer_collections_without_parents_stmt = transaction
8✔
48
        .prepare(
8✔
49
            "DELETE FROM layer_collections
8✔
50
                 WHERE  id <> $1 -- do not delete root collection
8✔
51
                 AND    id NOT IN (
8✔
52
                    SELECT child FROM collection_children
8✔
53
                 );",
8✔
54
        )
8✔
55
        .await?;
8✔
56
    while 0 < transaction
13✔
57
        .execute(
13✔
58
            &remove_layer_collections_without_parents_stmt,
13✔
59
            &[&INTERNAL_LAYER_DB_ROOT_COLLECTION_ID],
13✔
60
        )
13✔
61
        .await?
13✔
62
    {
5✔
63
        // whenever one collection is deleted, we have to check again if there are more
5✔
64
        // collections without parents
5✔
65
    }
5✔
66

67
    Ok(())
8✔
68
}
8✔
69

70
/// delete all layers without parent collection
71
async fn _remove_layers_without_parent_collection(
13✔
72
    transaction: &tokio_postgres::Transaction<'_>,
13✔
73
) -> Result<()> {
13✔
74
    let remove_layers_without_parents_stmt = transaction
13✔
75
        .prepare(
13✔
76
            "DELETE FROM layers
13✔
77
                 WHERE id NOT IN (
13✔
78
                    SELECT layer FROM collection_layers
13✔
79
                 );",
13✔
80
        )
13✔
81
        .await?;
13✔
82
    transaction
13✔
83
        .execute(&remove_layers_without_parents_stmt, &[])
13✔
84
        .await?;
13✔
85

86
    Ok(())
13✔
87
}
13✔
88

89
pub async fn insert_layer(
36✔
90
    trans: &Transaction<'_>,
36✔
91
    id: &LayerId,
36✔
92
    layer: AddLayer,
36✔
93
    collection: &LayerCollectionId,
36✔
94
) -> Result<Uuid> {
36✔
95
    let layer_id = Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
36✔
96
        found: collection.0.clone(),
×
97
    })?;
36✔
98

99
    let collection_id =
36✔
100
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
36✔
101
            found: collection.0.clone(),
×
102
        })?;
36✔
103

104
    let workflow_id = WorkflowId::from_hash(&layer.workflow);
36✔
105

106
    let stmt = trans
36✔
107
        .prepare(
36✔
108
            "INSERT INTO workflows (id, workflow) VALUES ($1, $2) 
36✔
109
            ON CONFLICT DO NOTHING;",
36✔
110
        )
36✔
111
        .await?;
30✔
112

113
    trans
36✔
114
        .execute(
36✔
115
            &stmt,
36✔
116
            &[
36✔
117
                &workflow_id,
36✔
118
                &serde_json::to_value(&layer.workflow).context(error::SerdeJson)?,
36✔
119
            ],
120
        )
121
        .await?;
30✔
122

123
    let stmt = trans
36✔
124
        .prepare(
36✔
125
            "
36✔
126
            INSERT INTO layers (id, name, description, workflow_id, symbology, properties, metadata)
36✔
127
            VALUES ($1, $2, $3, $4, $5, $6, $7);",
36✔
128
        )
36✔
129
        .await?;
722✔
130

131
    trans
36✔
132
        .execute(
36✔
133
            &stmt,
36✔
134
            &[
36✔
135
                &layer_id,
36✔
136
                &layer.name,
36✔
137
                &layer.description,
36✔
138
                &workflow_id,
36✔
139
                &layer.symbology,
36✔
140
                &layer.properties,
36✔
141
                &HashMapTextTextDbType::from(&layer.metadata),
36✔
142
            ],
36✔
143
        )
36✔
144
        .await?;
28✔
145

146
    let stmt = trans
36✔
147
        .prepare(
36✔
148
            "
36✔
149
            INSERT INTO collection_layers (collection, layer)
36✔
150
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
36✔
151
        )
36✔
152
        .await?;
31✔
153

154
    trans.execute(&stmt, &[&collection_id, &layer_id]).await?;
36✔
155

156
    Ok(layer_id)
36✔
157
}
36✔
158

159
pub async fn insert_layer_collection_with_id(
44✔
160
    trans: &Transaction<'_>,
44✔
161
    id: &LayerCollectionId,
44✔
162
    collection: AddLayerCollection,
44✔
163
    parent: &LayerCollectionId,
44✔
164
) -> Result<Uuid> {
44✔
165
    let collection_id =
44✔
166
        Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
44✔
167
            found: id.0.clone(),
×
168
        })?;
44✔
169

170
    let parent =
44✔
171
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
44✔
172
            found: parent.0.clone(),
×
173
        })?;
44✔
174

175
    let stmt = trans
44✔
176
        .prepare(
44✔
177
            "
44✔
178
        INSERT INTO layer_collections (id, name, description, properties)
44✔
179
        VALUES ($1, $2, $3, $4);",
44✔
180
        )
44✔
181
        .await?;
96✔
182

183
    trans
44✔
184
        .execute(
44✔
185
            &stmt,
44✔
186
            &[
44✔
187
                &collection_id,
44✔
188
                &collection.name,
44✔
189
                &collection.description,
44✔
190
                &collection.properties,
44✔
191
            ],
44✔
192
        )
44✔
193
        .await?;
37✔
194

195
    let stmt = trans
44✔
196
        .prepare(
44✔
197
            "
44✔
198
        INSERT INTO collection_children (parent, child)
44✔
199
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
44✔
200
        )
44✔
201
        .await?;
38✔
202

203
    trans.execute(&stmt, &[&parent, &collection_id]).await?;
44✔
204

205
    Ok(collection_id)
44✔
206
}
44✔
207

208
pub async fn insert_collection_parent<Tls>(
3✔
209
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
3✔
210
    collection: &LayerCollectionId,
3✔
211
    parent: &LayerCollectionId,
3✔
212
) -> Result<()>
3✔
213
where
3✔
214
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
3✔
215
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
3✔
216
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
3✔
217
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
3✔
218
{
3✔
219
    let collection =
3✔
220
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
221
            found: collection.0.clone(),
×
222
        })?;
3✔
223

224
    let parent =
3✔
225
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
226
            found: parent.0.clone(),
×
227
        })?;
3✔
228

229
    let stmt = conn
3✔
230
        .prepare(
3✔
231
            "
3✔
232
        INSERT INTO collection_children (parent, child)
3✔
233
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
3✔
234
        )
3✔
235
        .await?;
3✔
236

237
    conn.execute(&stmt, &[&parent, &collection]).await?;
3✔
238

239
    Ok(())
3✔
240
}
3✔
241

242
pub async fn delete_layer_collection(
8✔
243
    transaction: &Transaction<'_>,
8✔
244
    collection: &LayerCollectionId,
8✔
245
) -> Result<()> {
8✔
246
    let collection =
8✔
247
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
8✔
248
            found: collection.0.clone(),
×
249
        })?;
8✔
250

251
    if collection == INTERNAL_LAYER_DB_ROOT_COLLECTION_ID {
8✔
252
        return Err(LayerDbError::CannotRemoveRootCollection.into());
3✔
253
    }
5✔
254

255
    // delete the collection!
256
    // on delete cascade removes all entries from `collection_children` and `collection_layers`
257

258
    let remove_layer_collection_stmt = transaction
5✔
259
        .prepare(
5✔
260
            "DELETE FROM layer_collections
5✔
261
             WHERE id = $1;",
5✔
262
        )
5✔
263
        .await?;
5✔
264
    transaction
5✔
265
        .execute(&remove_layer_collection_stmt, &[&collection])
5✔
266
        .await?;
5✔
267

268
    _remove_collections_without_parent_collection(transaction).await?;
10✔
269

270
    _remove_layers_without_parent_collection(transaction).await?;
10✔
271

272
    Ok(())
5✔
273
}
8✔
274

275
pub async fn delete_layer_from_collection(
5✔
276
    transaction: &Transaction<'_>,
5✔
277
    layer: &LayerId,
5✔
278
    collection: &LayerCollectionId,
5✔
279
) -> Result<()> {
5✔
280
    let collection_uuid =
5✔
281
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
282
            found: collection.0.clone(),
×
283
        })?;
5✔
284

285
    let layer_uuid =
5✔
286
        Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
287
            found: layer.0.clone(),
×
288
        })?;
5✔
289

290
    let remove_layer_collection_stmt = transaction
5✔
291
        .prepare(
5✔
292
            "DELETE FROM collection_layers
5✔
293
             WHERE collection = $1
5✔
294
             AND layer = $2;",
5✔
295
        )
5✔
296
        .await?;
5✔
297
    let num_results = transaction
5✔
298
        .execute(
5✔
299
            &remove_layer_collection_stmt,
5✔
300
            &[&collection_uuid, &layer_uuid],
5✔
301
        )
5✔
302
        .await?;
5✔
303

304
    if num_results == 0 {
5✔
305
        return Err(LayerDbError::NoLayerForGivenIdInCollection {
×
306
            layer: layer.clone(),
×
307
            collection: collection.clone(),
×
308
        }
×
309
        .into());
×
310
    }
5✔
311

5✔
312
    _remove_layers_without_parent_collection(transaction).await?;
10✔
313

314
    Ok(())
5✔
315
}
5✔
316

317
pub async fn delete_layer_collection_from_parent(
3✔
318
    transaction: &Transaction<'_>,
3✔
319
    collection: &LayerCollectionId,
3✔
320
    parent: &LayerCollectionId,
3✔
321
) -> Result<()> {
3✔
322
    let collection_uuid =
3✔
323
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
324
            found: collection.0.clone(),
×
325
        })?;
3✔
326

327
    let parent_collection_uuid =
3✔
328
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
329
            found: parent.0.clone(),
×
330
        })?;
3✔
331

332
    let remove_layer_collection_stmt = transaction
3✔
333
        .prepare(
3✔
334
            "DELETE FROM collection_children
3✔
335
             WHERE child = $1
3✔
336
             AND parent = $2;",
3✔
337
        )
3✔
338
        .await?;
3✔
339
    let num_results = transaction
3✔
340
        .execute(
3✔
341
            &remove_layer_collection_stmt,
3✔
342
            &[&collection_uuid, &parent_collection_uuid],
3✔
343
        )
3✔
344
        .await?;
3✔
345

346
    if num_results == 0 {
3✔
347
        return Err(LayerDbError::NoCollectionForGivenIdInCollection {
×
348
            collection: collection.clone(),
×
349
            parent: parent.clone(),
×
350
        }
×
351
        .into());
×
352
    }
3✔
353

3✔
354
    _remove_collections_without_parent_collection(transaction).await?;
11✔
355

356
    _remove_layers_without_parent_collection(transaction).await?;
6✔
357

358
    Ok(())
3✔
359
}
3✔
360

361
fn create_search_query(full_info: bool) -> String {
18✔
362
    format!("
18✔
363
        WITH RECURSIVE parents AS (
18✔
364
            SELECT $1::uuid as id
18✔
365
            UNION ALL SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)
18✔
366
        )
18✔
367
        SELECT DISTINCT *
18✔
368
        FROM (
18✔
369
            SELECT 
18✔
370
                {}
18✔
371
            FROM layer_collections
18✔
372
                JOIN (SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)) cc ON (id = cc.child)
18✔
373
            WHERE name ILIKE $4
18✔
374
        ) u UNION (
18✔
375
            SELECT 
18✔
376
                {}
18✔
377
            FROM layers uc
18✔
378
                JOIN (SELECT DISTINCT layer FROM collection_layers JOIN parents ON (collection = id)) cl ON (id = cl.layer)
18✔
379
            WHERE name ILIKE $4
18✔
380
        )
18✔
381
        ORDER BY {}name ASC
18✔
382
        LIMIT $2 
18✔
383
        OFFSET $3;",
18✔
384
        if full_info {
18✔
385
            "concat(id, '') AS id,
9✔
386
            name,
9✔
387
            description,
9✔
388
            properties,
9✔
389
            FALSE AS is_layer"
9✔
390
        } else { "name" },
9✔
391
        if full_info {
18✔
392
            "concat(id, '') AS id,
9✔
393
            name,
9✔
394
            description,
9✔
395
            properties,
9✔
396
            TRUE AS is_layer"
9✔
397
        } else { "name" },
9✔
398
        if full_info { "is_layer ASC," } else { "" })
18✔
399
}
18✔
400

401
#[async_trait]
402
impl<Tls> LayerDb for PostgresDb<Tls>
403
where
404
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
405
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
406
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
407
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
408
{
409
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
20✔
410
        let layer_id = Uuid::new_v4();
20✔
411
        let layer_id = LayerId(layer_id.to_string());
20✔
412

20✔
413
        self.add_layer_with_id(&layer_id, layer, collection).await?;
699✔
414

415
        Ok(layer_id)
20✔
416
    }
40✔
417

418
    async fn update_layer(&self, id: &LayerId, layer: UpdateLayer) -> Result<()> {
1✔
419
        let layer_id =
1✔
420
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
421
                found: id.0.clone(),
×
422
            })?;
1✔
423

424
        let conn = self.conn_pool.get().await?;
1✔
425

426
        conn.execute(
1✔
427
            "
1✔
428
            UPDATE layers
1✔
429
            SET name = $1, description = $2, symbology = $3, properties = $4, metadata = $5
1✔
430
            WHERE id = $6;",
1✔
431
            &[
1✔
432
                &layer.name,
1✔
433
                &layer.description,
1✔
434
                &layer.symbology,
1✔
435
                &layer.properties,
1✔
436
                &HashMapTextTextDbType::from(&layer.metadata),
1✔
437
                &layer_id,
1✔
438
            ],
1✔
439
        )
1✔
440
        .await?;
2✔
441

442
        Ok(())
1✔
443
    }
2✔
444

445
    async fn remove_layer(&self, id: &LayerId) -> Result<()> {
1✔
446
        let layer_id =
1✔
447
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
448
                found: id.0.clone(),
×
449
            })?;
1✔
450

451
        let conn = self.conn_pool.get().await?;
1✔
452

453
        conn.execute(
1✔
454
            "
1✔
455
            DELETE FROM layers
1✔
456
            WHERE id = $1;",
1✔
457
            &[&layer_id],
1✔
458
        )
1✔
459
        .await?;
1✔
460

461
        Ok(())
1✔
462
    }
2✔
463

464
    async fn add_layer_with_id(
465
        &self,
466
        id: &LayerId,
467
        layer: AddLayer,
468
        collection: &LayerCollectionId,
469
    ) -> Result<()> {
20✔
470
        let mut conn = self.conn_pool.get().await?;
20✔
471

472
        let trans = conn.build_transaction().start().await?;
20✔
473

474
        insert_layer(&trans, id, layer, collection).await?;
639✔
475

476
        trans.commit().await?;
20✔
477

478
        Ok(())
20✔
479
    }
40✔
480

481
    async fn add_layer_to_collection(
482
        &self,
483
        layer: &LayerId,
484
        collection: &LayerCollectionId,
485
    ) -> Result<()> {
3✔
486
        let layer_id =
2✔
487
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
488
                found: layer.0.clone(),
1✔
489
            })?;
3✔
490

491
        let collection_id =
2✔
492
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
493
                found: collection.0.clone(),
×
494
            })?;
2✔
495

496
        let conn = self.conn_pool.get().await?;
2✔
497

498
        let stmt = conn
2✔
499
            .prepare(
2✔
500
                "
2✔
501
            INSERT INTO collection_layers (collection, layer)
2✔
502
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
2✔
503
            )
2✔
504
            .await?;
2✔
505

506
        conn.execute(&stmt, &[&collection_id, &layer_id]).await?;
2✔
507

508
        Ok(())
2✔
509
    }
6✔
510

511
    async fn add_layer_collection(
512
        &self,
513
        collection: AddLayerCollection,
514
        parent: &LayerCollectionId,
515
    ) -> Result<LayerCollectionId> {
25✔
516
        let collection_id = Uuid::new_v4();
25✔
517
        let collection_id = LayerCollectionId(collection_id.to_string());
25✔
518

25✔
519
        self.add_layer_collection_with_id(&collection_id, collection, parent)
25✔
520
            .await?;
219✔
521

522
        Ok(collection_id)
25✔
523
    }
50✔
524

525
    async fn add_layer_collection_with_id(
526
        &self,
527
        id: &LayerCollectionId,
528
        collection: AddLayerCollection,
529
        parent: &LayerCollectionId,
530
    ) -> Result<()> {
25✔
531
        let mut conn = self.conn_pool.get().await?;
25✔
532

533
        let trans = conn.build_transaction().start().await?;
25✔
534

535
        insert_layer_collection_with_id(&trans, id, collection, parent).await?;
144✔
536

537
        trans.commit().await?;
25✔
538

539
        Ok(())
25✔
540
    }
50✔
541

542
    async fn add_collection_to_parent(
543
        &self,
544
        collection: &LayerCollectionId,
545
        parent: &LayerCollectionId,
546
    ) -> Result<()> {
2✔
547
        let conn = self.conn_pool.get().await?;
2✔
548
        insert_collection_parent(&conn, collection, parent).await
4✔
549
    }
4✔
550

551
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
5✔
552
        let mut conn = self.conn_pool.get().await?;
5✔
553
        let transaction = conn.transaction().await?;
5✔
554

555
        delete_layer_collection(&transaction, collection).await?;
18✔
556

557
        transaction.commit().await.map_err(Into::into)
3✔
558
    }
10✔
559

560
    async fn remove_layer_from_collection(
561
        &self,
562
        layer: &LayerId,
563
        collection: &LayerCollectionId,
564
    ) -> Result<()> {
3✔
565
        let mut conn = self.conn_pool.get().await?;
3✔
566
        let transaction = conn.transaction().await?;
3✔
567

568
        delete_layer_from_collection(&transaction, layer, collection).await?;
12✔
569

570
        transaction.commit().await.map_err(Into::into)
3✔
571
    }
6✔
572

573
    async fn remove_layer_collection_from_parent(
574
        &self,
575
        collection: &LayerCollectionId,
576
        parent: &LayerCollectionId,
577
    ) -> Result<()> {
2✔
578
        let mut conn = self.conn_pool.get().await?;
2✔
579
        let transaction = conn.transaction().await?;
2✔
580

581
        delete_layer_collection_from_parent(&transaction, collection, parent).await?;
15✔
582

583
        transaction.commit().await.map_err(Into::into)
2✔
584
    }
4✔
585

586
    async fn update_layer_collection(
587
        &self,
588
        collection: &LayerCollectionId,
589
        update: UpdateLayerCollection,
590
    ) -> Result<()> {
1✔
591
        let collection_id =
1✔
592
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
593
                found: collection.0.clone(),
×
594
            })?;
1✔
595

596
        let conn = self.conn_pool.get().await?;
1✔
597

598
        conn.execute(
1✔
599
            "UPDATE layer_collections 
1✔
600
                SET name = $1, description = $2, properties = $3
1✔
601
                WHERE id = $4;",
1✔
602
            &[
1✔
603
                &update.name,
1✔
604
                &update.description,
1✔
605
                &update.properties,
1✔
606
                &collection_id,
1✔
607
            ],
1✔
608
        )
1✔
609
        .await?;
2✔
610

611
        Ok(())
1✔
612
    }
2✔
613
}
614

615
#[async_trait]
616
impl<Tls> LayerCollectionProvider for PostgresDb<Tls>
617
where
618
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
619
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
620
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
621
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
622
{
623
    fn capabilities(&self) -> ProviderCapabilities {
2✔
624
        ProviderCapabilities {
2✔
625
            listing: true,
2✔
626
            search: SearchCapabilities {
2✔
627
                search_types: SearchTypes {
2✔
628
                    fulltext: true,
2✔
629
                    prefix: true,
2✔
630
                },
2✔
631
                autocomplete: true,
2✔
632
                filters: None,
2✔
633
            },
2✔
634
        }
2✔
635
    }
2✔
636

637
    fn name(&self) -> &str {
×
638
        "Postgres Layer Database"
×
639
    }
×
640

641
    fn description(&self) -> &str {
×
642
        "A layer database using Postgres"
×
643
    }
×
644

645
    #[allow(clippy::too_many_lines)]
646
    async fn load_layer_collection(
647
        &self,
648
        collection_id: &LayerCollectionId,
649
        options: LayerCollectionListOptions,
650
    ) -> Result<LayerCollection> {
19✔
651
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
19✔
652
            crate::error::Error::IdStringMustBeUuid {
×
653
                found: collection_id.0.clone(),
×
654
            }
×
655
        })?;
19✔
656

657
        let conn = self.conn_pool.get().await?;
19✔
658

659
        let stmt = conn
19✔
660
            .prepare(
19✔
661
                "
19✔
662
        SELECT name, description, properties
19✔
663
        FROM layer_collections
19✔
664
        WHERE id = $1;",
19✔
665
            )
19✔
666
            .await?;
19✔
667

668
        let row = conn.query_one(&stmt, &[&collection]).await?;
19✔
669

670
        let name: String = row.get(0);
15✔
671
        let description: String = row.get(1);
15✔
672
        let properties: Vec<Property> = row.get(2);
15✔
673

674
        let stmt = conn
15✔
675
            .prepare(
15✔
676
                "
15✔
677
        SELECT DISTINCT id, name, description, properties, is_layer
15✔
678
        FROM (
15✔
679
            SELECT 
15✔
680
                concat(id, '') AS id, 
15✔
681
                name, 
15✔
682
                description, 
15✔
683
                properties, 
15✔
684
                FALSE AS is_layer
15✔
685
            FROM layer_collections
15✔
686
                JOIN collection_children cc ON (id = cc.child)
15✔
687
            WHERE cc.parent = $1
15✔
688
        ) u UNION (
15✔
689
            SELECT 
15✔
690
                concat(id, '') AS id, 
15✔
691
                name, 
15✔
692
                description, 
15✔
693
                properties, 
15✔
694
                TRUE AS is_layer
15✔
695
            FROM layers uc
15✔
696
                JOIN collection_layers cl ON (id = cl.layer)
15✔
697
            WHERE cl.collection = $1
15✔
698
        )
15✔
699
        ORDER BY is_layer ASC, name ASC
15✔
700
        LIMIT $2 
15✔
701
        OFFSET $3;            
15✔
702
        ",
15✔
703
            )
15✔
704
            .await?;
15✔
705

706
        let rows = conn
15✔
707
            .query(
15✔
708
                &stmt,
15✔
709
                &[
15✔
710
                    &collection,
15✔
711
                    &i64::from(options.limit),
15✔
712
                    &i64::from(options.offset),
15✔
713
                ],
15✔
714
            )
15✔
715
            .await?;
15✔
716

717
        let items = rows
15✔
718
            .into_iter()
15✔
719
            .map(|row| {
16✔
720
                let is_layer: bool = row.get(4);
16✔
721

16✔
722
                if is_layer {
16✔
723
                    Ok(CollectionItem::Layer(LayerListing {
7✔
724
                        id: ProviderLayerId {
7✔
725
                            provider_id: INTERNAL_PROVIDER_ID,
7✔
726
                            layer_id: LayerId(row.get(0)),
7✔
727
                        },
7✔
728
                        name: row.get(1),
7✔
729
                        description: row.get(2),
7✔
730
                        properties: row.get(3),
7✔
731
                    }))
7✔
732
                } else {
733
                    Ok(CollectionItem::Collection(LayerCollectionListing {
9✔
734
                        id: ProviderLayerCollectionId {
9✔
735
                            provider_id: INTERNAL_PROVIDER_ID,
9✔
736
                            collection_id: LayerCollectionId(row.get(0)),
9✔
737
                        },
9✔
738
                        name: row.get(1),
9✔
739
                        description: row.get(2),
9✔
740
                        properties: row.get(3),
9✔
741
                    }))
9✔
742
                }
743
            })
16✔
744
            .collect::<Result<Vec<CollectionItem>>>()?;
15✔
745

746
        Ok(LayerCollection {
15✔
747
            id: ProviderLayerCollectionId {
15✔
748
                provider_id: INTERNAL_PROVIDER_ID,
15✔
749
                collection_id: collection_id.clone(),
15✔
750
            },
15✔
751
            name,
15✔
752
            description,
15✔
753
            items,
15✔
754
            entry_label: None,
15✔
755
            properties,
15✔
756
        })
15✔
757
    }
38✔
758

759
    #[allow(clippy::too_many_lines)]
760
    async fn search(
761
        &self,
762
        collection_id: &LayerCollectionId,
763
        search: SearchParameters,
764
    ) -> Result<LayerCollection> {
9✔
765
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
9✔
766
            crate::error::Error::IdStringMustBeUuid {
×
767
                found: collection_id.0.clone(),
×
768
            }
×
769
        })?;
9✔
770

771
        let conn = self.conn_pool.get().await?;
9✔
772

773
        let stmt = conn
9✔
774
            .prepare(
9✔
775
                "
9✔
776
        SELECT name, description, properties
9✔
777
        FROM layer_collections
9✔
778
        WHERE id = $1;",
9✔
779
            )
9✔
780
            .await?;
15✔
781

782
        let row = conn.query_one(&stmt, &[&collection]).await?;
9✔
783

784
        let name: String = row.get(0);
9✔
785
        let description: String = row.get(1);
9✔
786
        let properties: Vec<Property> = row.get(2);
9✔
787

788
        let pattern = match search.search_type {
9✔
789
            SearchType::Fulltext => {
790
                format!("%{}%", search.search_string)
6✔
791
            }
792
            SearchType::Prefix => {
793
                format!("{}%", search.search_string)
3✔
794
            }
795
        };
796

797
        let stmt = conn.prepare(&create_search_query(true)).await?;
9✔
798

799
        let rows = conn
9✔
800
            .query(
9✔
801
                &stmt,
9✔
802
                &[
9✔
803
                    &collection,
9✔
804
                    &i64::from(search.limit),
9✔
805
                    &i64::from(search.offset),
9✔
806
                    &pattern,
9✔
807
                ],
9✔
808
            )
9✔
809
            .await?;
9✔
810

811
        let items = rows
9✔
812
            .into_iter()
9✔
813
            .map(|row| {
13✔
814
                let is_layer: bool = row.get(4);
13✔
815

13✔
816
                if is_layer {
13✔
817
                    Ok(CollectionItem::Layer(LayerListing {
5✔
818
                        id: ProviderLayerId {
5✔
819
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
820
                            layer_id: LayerId(row.get(0)),
5✔
821
                        },
5✔
822
                        name: row.get(1),
5✔
823
                        description: row.get(2),
5✔
824
                        properties: row.get(3),
5✔
825
                    }))
5✔
826
                } else {
827
                    Ok(CollectionItem::Collection(LayerCollectionListing {
8✔
828
                        id: ProviderLayerCollectionId {
8✔
829
                            provider_id: INTERNAL_PROVIDER_ID,
8✔
830
                            collection_id: LayerCollectionId(row.get(0)),
8✔
831
                        },
8✔
832
                        name: row.get(1),
8✔
833
                        description: row.get(2),
8✔
834
                        properties: row.get(3),
8✔
835
                    }))
8✔
836
                }
837
            })
13✔
838
            .collect::<Result<Vec<CollectionItem>>>()?;
9✔
839

840
        Ok(LayerCollection {
9✔
841
            id: ProviderLayerCollectionId {
9✔
842
                provider_id: INTERNAL_PROVIDER_ID,
9✔
843
                collection_id: collection_id.clone(),
9✔
844
            },
9✔
845
            name,
9✔
846
            description,
9✔
847
            items,
9✔
848
            entry_label: None,
9✔
849
            properties,
9✔
850
        })
9✔
851
    }
18✔
852

853
    #[allow(clippy::too_many_lines)]
854
    async fn autocomplete_search(
855
        &self,
856
        collection_id: &LayerCollectionId,
857
        search: SearchParameters,
858
    ) -> Result<Vec<String>> {
9✔
859
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
9✔
860
            crate::error::Error::IdStringMustBeUuid {
×
861
                found: collection_id.0.clone(),
×
862
            }
×
863
        })?;
9✔
864

865
        let conn = self.conn_pool.get().await?;
9✔
866

867
        let pattern = match search.search_type {
9✔
868
            SearchType::Fulltext => {
869
                format!("%{}%", search.search_string)
6✔
870
            }
871
            SearchType::Prefix => {
872
                format!("{}%", search.search_string)
3✔
873
            }
874
        };
875

876
        let stmt = conn.prepare(&create_search_query(false)).await?;
9✔
877

878
        let rows = conn
9✔
879
            .query(
9✔
880
                &stmt,
9✔
881
                &[
9✔
882
                    &collection,
9✔
883
                    &i64::from(search.limit),
9✔
884
                    &i64::from(search.offset),
9✔
885
                    &pattern,
9✔
886
                ],
9✔
887
            )
9✔
888
            .await?;
9✔
889

890
        let items = rows
9✔
891
            .into_iter()
9✔
892
            .map(|row| Ok(row.get::<usize, &str>(0).to_string()))
13✔
893
            .collect::<Result<Vec<String>>>()?;
9✔
894

895
        Ok(items)
9✔
896
    }
18✔
897

898
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
25✔
899
        Ok(LayerCollectionId(
25✔
900
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
25✔
901
        ))
25✔
902
    }
50✔
903

904
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
19✔
905
        let layer_id =
19✔
906
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
19✔
907
                found: id.0.clone(),
×
908
            })?;
19✔
909

910
        let conn = self.conn_pool.get().await?;
19✔
911

912
        let stmt = conn
19✔
913
            .prepare(
19✔
914
                "
19✔
915
            SELECT 
19✔
916
                l.name,
19✔
917
                l.description,
19✔
918
                w.workflow,
19✔
919
                l.symbology,
19✔
920
                l.properties,
19✔
921
                l.metadata
19✔
922
            FROM 
19✔
923
                layers l JOIN workflows w ON (l.workflow_id = w.id)
19✔
924
            WHERE 
19✔
925
                l.id = $1;",
19✔
926
            )
19✔
927
            .await?;
16✔
928

929
        let row = conn
19✔
930
            .query_one(&stmt, &[&layer_id])
19✔
931
            .await
16✔
932
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
19✔
933

934
        Ok(Layer {
935
            id: ProviderLayerId {
14✔
936
                provider_id: INTERNAL_PROVIDER_ID,
14✔
937
                layer_id: id.clone(),
14✔
938
            },
14✔
939
            name: row.get(0),
14✔
940
            description: row.get(1),
14✔
941
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
14✔
942
            symbology: row.get(3),
14✔
943
            properties: row.get(4),
14✔
944
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
14✔
945
        })
946
    }
38✔
947
}
948

949
#[async_trait]
950
impl<Tls> LayerProviderDb for PostgresDb<Tls>
951
where
952
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
953
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
954
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
955
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
956
{
957
    async fn add_layer_provider(
958
        &self,
959
        provider: TypedDataProviderDefinition,
960
    ) -> Result<DataProviderId> {
8✔
961
        let conn = self.conn_pool.get().await?;
8✔
962

963
        let stmt = conn
8✔
964
            .prepare(
8✔
965
                "
8✔
966
              INSERT INTO layer_providers (
8✔
967
                  id, 
8✔
968
                  type_name, 
8✔
969
                  name,
8✔
970
                  definition,
8✔
971
                  priority
8✔
972
              )
8✔
973
              VALUES ($1, $2, $3, $4, $5)",
8✔
974
            )
8✔
975
            .await?;
246✔
976

977
        // clamp the priority to a reasonable range
978
        let prio = DataProviderDefinition::<Self>::priority(&provider);
8✔
979
        let clamp_prio = prio.clamp(-1000, 1000);
8✔
980

8✔
981
        if prio != clamp_prio {
8✔
UNCOV
982
            log::warn!(
×
UNCOV
983
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
984
                DataProviderDefinition::<Self>::name(&provider),
×
985
                prio,
986
                clamp_prio
987
            );
988
        }
8✔
989

990
        let id = DataProviderDefinition::<Self>::id(&provider);
8✔
991
        conn.execute(
8✔
992
            &stmt,
8✔
993
            &[
8✔
994
                &id,
8✔
995
                &DataProviderDefinition::<Self>::type_name(&provider),
8✔
996
                &DataProviderDefinition::<Self>::name(&provider),
8✔
997
                &provider,
8✔
998
                &clamp_prio,
8✔
999
            ],
8✔
1000
        )
8✔
1001
        .await?;
8✔
1002
        Ok(id)
8✔
1003
    }
16✔
1004

1005
    async fn list_layer_providers(
1006
        &self,
1007
        options: LayerProviderListingOptions,
1008
    ) -> Result<Vec<LayerProviderListing>> {
1✔
1009
        // TODO: permission
1010
        let conn = self.conn_pool.get().await?;
1✔
1011

1012
        let stmt = conn
1✔
1013
            .prepare(
1✔
1014
                "
1✔
1015
            SELECT 
1✔
1016
                id, 
1✔
1017
                name,
1✔
1018
                priority
1✔
1019
            FROM 
1✔
1020
                layer_providers
1✔
1021
            WHERE
1✔
1022
                priority > -1000
1✔
1023
            ORDER BY priority DESC, name ASC
1✔
1024
            LIMIT $1 
1✔
1025
            OFFSET $2;",
1✔
1026
            )
1✔
1027
            .await?;
1✔
1028

1029
        let rows = conn
1✔
1030
            .query(
1✔
1031
                &stmt,
1✔
1032
                &[&i64::from(options.limit), &i64::from(options.offset)],
1✔
1033
            )
1✔
1034
            .await?;
1✔
1035

1036
        Ok(rows
1✔
1037
            .iter()
1✔
1038
            .map(|row| LayerProviderListing {
1✔
1039
                id: row.get(0),
1✔
1040
                name: row.get(1),
1✔
1041
                priority: row.get(2),
1✔
1042
            })
1✔
1043
            .collect())
1✔
1044
    }
2✔
1045

1046
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
18✔
1047
        let conn = self.conn_pool.get().await?;
18✔
1048

1049
        let stmt = conn
18✔
1050
            .prepare(
18✔
1051
                "
18✔
1052
               SELECT 
18✔
1053
                   definition
18✔
1054
               FROM 
18✔
1055
                   layer_providers
18✔
1056
               WHERE
18✔
1057
                   id = $1",
18✔
1058
            )
18✔
1059
            .await?;
18✔
1060

1061
        let row = conn.query_one(&stmt, &[&id]).await?;
18✔
1062

1063
        let definition: TypedDataProviderDefinition = row.get(0);
11✔
1064

11✔
1065
        Box::new(definition)
11✔
1066
            .initialize(PostgresDb {
11✔
1067
                conn_pool: self.conn_pool.clone(),
11✔
1068
            })
11✔
UNCOV
1069
            .await
×
1070
    }
36✔
1071
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc