• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 8158689789

05 Mar 2024 03:19PM UTC coverage: 90.58% (+0.2%) from 90.424%
8158689789

push

github

web-flow
Merge pull request #931 from geo-engine/netcdf-overviews

NetCDF Overview Metadata in Database

2393 of 2553 new or added lines in 21 files covered. (93.73%)

37 existing lines in 12 files now uncovered.

128580 of 141952 relevant lines covered (90.58%)

54451.48 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.59
/services/src/layers/postgres_layer_db.rs
1
use super::external::TypedDataProviderDefinition;
2
use super::listing::{ProviderCapabilities, SearchType};
3
use crate::contexts::PostgresDb;
4
use crate::error;
5
use crate::layers::layer::Property;
6
use crate::layers::listing::{SearchCapabilities, SearchParameters, SearchTypes};
7
use crate::workflows::workflow::WorkflowId;
8
use crate::{
9
    error::Result,
10
    layers::{
11
        external::{DataProvider, DataProviderDefinition},
12
        layer::{
13
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
14
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
15
            ProviderLayerCollectionId, ProviderLayerId,
16
        },
17
        listing::{LayerCollectionId, LayerCollectionProvider},
18
        storage::{
19
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
20
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
21
        },
22
        LayerDbError,
23
    },
24
};
25
use async_trait::async_trait;
26
use bb8_postgres::bb8::PooledConnection;
27
use bb8_postgres::tokio_postgres::{
28
    tls::{MakeTlsConnect, TlsConnect},
29
    Socket,
30
};
31
use bb8_postgres::PostgresConnectionManager;
32
use geoengine_datatypes::dataset::{DataProviderId, LayerId};
33
use geoengine_datatypes::util::HashMapTextTextDbType;
34
use snafu::ResultExt;
35
use std::str::FromStr;
36
use tokio_postgres::Transaction;
37
use uuid::Uuid;
38

39
/// delete all collections without parent collection
40
async fn _remove_collections_without_parent_collection(
8✔
41
    transaction: &tokio_postgres::Transaction<'_>,
8✔
42
) -> Result<()> {
8✔
43
    // HINT: a recursive delete statement seems reasonable, but hard to implement in postgres
44
    //       because you have a graph with potential loops
45

46
    let remove_layer_collections_without_parents_stmt = transaction
8✔
47
        .prepare(
8✔
48
            "DELETE FROM layer_collections
8✔
49
                 WHERE  id <> $1 -- do not delete root collection
8✔
50
                 AND    id NOT IN (
8✔
51
                    SELECT child FROM collection_children
8✔
52
                 );",
8✔
53
        )
8✔
54
        .await?;
6✔
55
    while 0 < transaction
13✔
56
        .execute(
13✔
57
            &remove_layer_collections_without_parents_stmt,
13✔
58
            &[&INTERNAL_LAYER_DB_ROOT_COLLECTION_ID],
13✔
59
        )
13✔
60
        .await?
9✔
61
    {
5✔
62
        // whenever one collection is deleted, we have to check again if there are more
5✔
63
        // collections without parents
5✔
64
    }
5✔
65

66
    Ok(())
8✔
67
}
8✔
68

69
/// delete all layers without parent collection
70
async fn _remove_layers_without_parent_collection(
13✔
71
    transaction: &tokio_postgres::Transaction<'_>,
13✔
72
) -> Result<()> {
13✔
73
    let remove_layers_without_parents_stmt = transaction
13✔
74
        .prepare(
13✔
75
            "DELETE FROM layers
13✔
76
                 WHERE id NOT IN (
13✔
77
                    SELECT layer FROM collection_layers
13✔
78
                 );",
13✔
79
        )
13✔
80
        .await?;
11✔
81
    transaction
13✔
82
        .execute(&remove_layers_without_parents_stmt, &[])
13✔
83
        .await?;
11✔
84

85
    Ok(())
13✔
86
}
13✔
87

88
pub async fn insert_layer(
34✔
89
    trans: &Transaction<'_>,
34✔
90
    id: &LayerId,
34✔
91
    layer: AddLayer,
34✔
92
    collection: &LayerCollectionId,
34✔
93
) -> Result<Uuid> {
34✔
94
    let layer_id = Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
34✔
95
        found: collection.0.clone(),
×
96
    })?;
34✔
97

98
    let collection_id =
34✔
99
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
34✔
100
            found: collection.0.clone(),
×
101
        })?;
34✔
102

103
    let workflow_id = WorkflowId::from_hash(&layer.workflow);
34✔
104

105
    let stmt = trans
34✔
106
        .prepare(
34✔
107
            "INSERT INTO workflows (id, workflow) VALUES ($1, $2) 
34✔
108
            ON CONFLICT DO NOTHING;",
34✔
109
        )
34✔
110
        .await?;
30✔
111

112
    trans
34✔
113
        .execute(
34✔
114
            &stmt,
34✔
115
            &[
34✔
116
                &workflow_id,
34✔
117
                &serde_json::to_value(&layer.workflow).context(error::SerdeJson)?,
34✔
118
            ],
119
        )
120
        .await?;
30✔
121

122
    let stmt = trans
34✔
123
        .prepare(
34✔
124
            "
34✔
125
            INSERT INTO layers (id, name, description, workflow_id, symbology, properties, metadata)
34✔
126
            VALUES ($1, $2, $3, $4, $5, $6, $7);",
34✔
127
        )
34✔
128
        .await?;
702✔
129

130
    trans
34✔
131
        .execute(
34✔
132
            &stmt,
34✔
133
            &[
34✔
134
                &layer_id,
34✔
135
                &layer.name,
34✔
136
                &layer.description,
34✔
137
                &workflow_id,
34✔
138
                &layer.symbology,
34✔
139
                &layer.properties,
34✔
140
                &HashMapTextTextDbType::from(&layer.metadata),
34✔
141
            ],
34✔
142
        )
34✔
143
        .await?;
30✔
144

145
    let stmt = trans
34✔
146
        .prepare(
34✔
147
            "
34✔
148
            INSERT INTO collection_layers (collection, layer)
34✔
149
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
34✔
150
        )
34✔
151
        .await?;
30✔
152

153
    trans.execute(&stmt, &[&collection_id, &layer_id]).await?;
34✔
154

155
    Ok(layer_id)
34✔
156
}
34✔
157

158
pub async fn insert_layer_collection_with_id(
44✔
159
    trans: &Transaction<'_>,
44✔
160
    id: &LayerCollectionId,
44✔
161
    collection: AddLayerCollection,
44✔
162
    parent: &LayerCollectionId,
44✔
163
) -> Result<Uuid> {
44✔
164
    let collection_id =
44✔
165
        Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
44✔
166
            found: id.0.clone(),
×
167
        })?;
44✔
168

169
    let parent =
44✔
170
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
44✔
171
            found: parent.0.clone(),
×
172
        })?;
44✔
173

174
    let stmt = trans
44✔
175
        .prepare(
44✔
176
            "
44✔
177
        INSERT INTO layer_collections (id, name, description, properties)
44✔
178
        VALUES ($1, $2, $3, $4);",
44✔
179
        )
44✔
180
        .await?;
99✔
181

182
    trans
44✔
183
        .execute(
44✔
184
            &stmt,
44✔
185
            &[
44✔
186
                &collection_id,
44✔
187
                &collection.name,
44✔
188
                &collection.description,
44✔
189
                &collection.properties,
44✔
190
            ],
44✔
191
        )
44✔
192
        .await?;
41✔
193

194
    let stmt = trans
44✔
195
        .prepare(
44✔
196
            "
44✔
197
        INSERT INTO collection_children (parent, child)
44✔
198
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
44✔
199
        )
44✔
200
        .await?;
41✔
201

202
    trans.execute(&stmt, &[&parent, &collection_id]).await?;
44✔
203

204
    Ok(collection_id)
44✔
205
}
44✔
206

207
pub async fn insert_collection_parent<Tls>(
3✔
208
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
3✔
209
    collection: &LayerCollectionId,
3✔
210
    parent: &LayerCollectionId,
3✔
211
) -> Result<()>
3✔
212
where
3✔
213
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
3✔
214
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
3✔
215
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
3✔
216
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
3✔
217
{
3✔
218
    let collection =
3✔
219
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
220
            found: collection.0.clone(),
×
221
        })?;
3✔
222

223
    let parent =
3✔
224
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
225
            found: parent.0.clone(),
×
226
        })?;
3✔
227

228
    let stmt = conn
3✔
229
        .prepare(
3✔
230
            "
3✔
231
        INSERT INTO collection_children (parent, child)
3✔
232
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
3✔
233
        )
3✔
234
        .await?;
3✔
235

236
    conn.execute(&stmt, &[&parent, &collection]).await?;
3✔
237

238
    Ok(())
3✔
239
}
3✔
240

241
pub async fn delete_layer_collection(
8✔
242
    transaction: &Transaction<'_>,
8✔
243
    collection: &LayerCollectionId,
8✔
244
) -> Result<()> {
8✔
245
    let collection =
8✔
246
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
8✔
247
            found: collection.0.clone(),
×
248
        })?;
8✔
249

250
    if collection == INTERNAL_LAYER_DB_ROOT_COLLECTION_ID {
8✔
251
        return Err(LayerDbError::CannotRemoveRootCollection.into());
3✔
252
    }
5✔
253

254
    // delete the collection!
255
    // on delete cascade removes all entries from `collection_children` and `collection_layers`
256

257
    let remove_layer_collection_stmt = transaction
5✔
258
        .prepare(
5✔
259
            "DELETE FROM layer_collections
5✔
260
             WHERE id = $1;",
5✔
261
        )
5✔
262
        .await?;
4✔
263
    transaction
5✔
264
        .execute(&remove_layer_collection_stmt, &[&collection])
5✔
265
        .await?;
4✔
266

267
    _remove_collections_without_parent_collection(transaction).await?;
8✔
268

269
    _remove_layers_without_parent_collection(transaction).await?;
8✔
270

271
    Ok(())
5✔
272
}
8✔
273

274
pub async fn delete_layer_from_collection(
5✔
275
    transaction: &Transaction<'_>,
5✔
276
    layer: &LayerId,
5✔
277
    collection: &LayerCollectionId,
5✔
278
) -> Result<()> {
5✔
279
    let collection_uuid =
5✔
280
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
281
            found: collection.0.clone(),
×
282
        })?;
5✔
283

284
    let layer_uuid =
5✔
285
        Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
286
            found: layer.0.clone(),
×
287
        })?;
5✔
288

289
    let remove_layer_collection_stmt = transaction
5✔
290
        .prepare(
5✔
291
            "DELETE FROM collection_layers
5✔
292
             WHERE collection = $1
5✔
293
             AND layer = $2;",
5✔
294
        )
5✔
295
        .await?;
5✔
296
    let num_results = transaction
5✔
297
        .execute(
5✔
298
            &remove_layer_collection_stmt,
5✔
299
            &[&collection_uuid, &layer_uuid],
5✔
300
        )
5✔
301
        .await?;
5✔
302

303
    if num_results == 0 {
5✔
304
        return Err(LayerDbError::NoLayerForGivenIdInCollection {
×
305
            layer: layer.clone(),
×
306
            collection: collection.clone(),
×
307
        }
×
308
        .into());
×
309
    }
5✔
310

5✔
311
    _remove_layers_without_parent_collection(transaction).await?;
10✔
312

313
    Ok(())
5✔
314
}
5✔
315

316
pub async fn delete_layer_collection_from_parent(
3✔
317
    transaction: &Transaction<'_>,
3✔
318
    collection: &LayerCollectionId,
3✔
319
    parent: &LayerCollectionId,
3✔
320
) -> Result<()> {
3✔
321
    let collection_uuid =
3✔
322
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
323
            found: collection.0.clone(),
×
324
        })?;
3✔
325

326
    let parent_collection_uuid =
3✔
327
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
328
            found: parent.0.clone(),
×
329
        })?;
3✔
330

331
    let remove_layer_collection_stmt = transaction
3✔
332
        .prepare(
3✔
333
            "DELETE FROM collection_children
3✔
334
             WHERE child = $1
3✔
335
             AND parent = $2;",
3✔
336
        )
3✔
337
        .await?;
3✔
338
    let num_results = transaction
3✔
339
        .execute(
3✔
340
            &remove_layer_collection_stmt,
3✔
341
            &[&collection_uuid, &parent_collection_uuid],
3✔
342
        )
3✔
343
        .await?;
3✔
344

345
    if num_results == 0 {
3✔
346
        return Err(LayerDbError::NoCollectionForGivenIdInCollection {
×
347
            collection: collection.clone(),
×
348
            parent: parent.clone(),
×
349
        }
×
350
        .into());
×
351
    }
3✔
352

3✔
353
    _remove_collections_without_parent_collection(transaction).await?;
7✔
354

355
    _remove_layers_without_parent_collection(transaction).await?;
4✔
356

357
    Ok(())
3✔
358
}
3✔
359

360
fn create_search_query(full_info: bool) -> String {
18✔
361
    format!("
18✔
362
        WITH RECURSIVE parents AS (
18✔
363
            SELECT $1::uuid as id
18✔
364
            UNION ALL SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)
18✔
365
        )
18✔
366
        SELECT DISTINCT *
18✔
367
        FROM (
18✔
368
            SELECT 
18✔
369
                {}
18✔
370
            FROM layer_collections
18✔
371
                JOIN (SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)) cc ON (id = cc.child)
18✔
372
            WHERE name ILIKE $4
18✔
373
        ) u UNION (
18✔
374
            SELECT 
18✔
375
                {}
18✔
376
            FROM layers uc
18✔
377
                JOIN (SELECT DISTINCT layer FROM collection_layers JOIN parents ON (collection = id)) cl ON (id = cl.layer)
18✔
378
            WHERE name ILIKE $4
18✔
379
        )
18✔
380
        ORDER BY {}name ASC
18✔
381
        LIMIT $2 
18✔
382
        OFFSET $3;",
18✔
383
        if full_info {
18✔
384
            "concat(id, '') AS id,
9✔
385
            name,
9✔
386
            description,
9✔
387
            properties,
9✔
388
            FALSE AS is_layer"
9✔
389
        } else { "name" },
9✔
390
        if full_info {
18✔
391
            "concat(id, '') AS id,
9✔
392
            name,
9✔
393
            description,
9✔
394
            properties,
9✔
395
            TRUE AS is_layer"
9✔
396
        } else { "name" },
9✔
397
        if full_info { "is_layer ASC," } else { "" })
18✔
398
}
18✔
399

400
#[async_trait]
401
impl<Tls> LayerDb for PostgresDb<Tls>
402
where
403
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
404
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
405
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
406
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
407
{
408
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
18✔
409
        let layer_id = Uuid::new_v4();
18✔
410
        let layer_id = LayerId(layer_id.to_string());
18✔
411

18✔
412
        self.add_layer_with_id(&layer_id, layer, collection).await?;
643✔
413

414
        Ok(layer_id)
18✔
415
    }
54✔
416

417
    async fn add_layer_with_id(
18✔
418
        &self,
18✔
419
        id: &LayerId,
18✔
420
        layer: AddLayer,
18✔
421
        collection: &LayerCollectionId,
18✔
422
    ) -> Result<()> {
18✔
423
        let mut conn = self.conn_pool.get().await?;
18✔
424

425
        let trans = conn.build_transaction().start().await?;
18✔
426

427
        insert_layer(&trans, id, layer, collection).await?;
589✔
428

429
        trans.commit().await?;
18✔
430

431
        Ok(())
18✔
432
    }
54✔
433

434
    async fn add_layer_to_collection(
3✔
435
        &self,
3✔
436
        layer: &LayerId,
3✔
437
        collection: &LayerCollectionId,
3✔
438
    ) -> Result<()> {
3✔
439
        let layer_id =
2✔
440
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
441
                found: layer.0.clone(),
1✔
442
            })?;
3✔
443

444
        let collection_id =
2✔
445
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
446
                found: collection.0.clone(),
×
447
            })?;
2✔
448

449
        let conn = self.conn_pool.get().await?;
2✔
450

451
        let stmt = conn
2✔
452
            .prepare(
2✔
453
                "
2✔
454
            INSERT INTO collection_layers (collection, layer)
2✔
455
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
2✔
456
            )
2✔
457
            .await?;
1✔
458

459
        conn.execute(&stmt, &[&collection_id, &layer_id]).await?;
2✔
460

461
        Ok(())
2✔
462
    }
9✔
463

464
    async fn add_layer_collection(
24✔
465
        &self,
24✔
466
        collection: AddLayerCollection,
24✔
467
        parent: &LayerCollectionId,
24✔
468
    ) -> Result<LayerCollectionId> {
24✔
469
        let collection_id = Uuid::new_v4();
24✔
470
        let collection_id = LayerCollectionId(collection_id.to_string());
24✔
471

24✔
472
        self.add_layer_collection_with_id(&collection_id, collection, parent)
24✔
473
            .await?;
210✔
474

475
        Ok(collection_id)
24✔
476
    }
72✔
477

478
    async fn add_layer_collection_with_id(
24✔
479
        &self,
24✔
480
        id: &LayerCollectionId,
24✔
481
        collection: AddLayerCollection,
24✔
482
        parent: &LayerCollectionId,
24✔
483
    ) -> Result<()> {
24✔
484
        let mut conn = self.conn_pool.get().await?;
24✔
485

486
        let trans = conn.build_transaction().start().await?;
24✔
487

488
        insert_layer_collection_with_id(&trans, id, collection, parent).await?;
138✔
489

490
        trans.commit().await?;
24✔
491

492
        Ok(())
24✔
493
    }
72✔
494

495
    async fn add_collection_to_parent(
2✔
496
        &self,
2✔
497
        collection: &LayerCollectionId,
2✔
498
        parent: &LayerCollectionId,
2✔
499
    ) -> Result<()> {
2✔
500
        let conn = self.conn_pool.get().await?;
2✔
501
        insert_collection_parent(&conn, collection, parent).await
4✔
502
    }
6✔
503

504
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
5✔
505
        let mut conn = self.conn_pool.get().await?;
5✔
506
        let transaction = conn.transaction().await?;
5✔
507

508
        delete_layer_collection(&transaction, collection).await?;
12✔
509

510
        transaction.commit().await.map_err(Into::into)
3✔
511
    }
15✔
512

513
    async fn remove_layer_from_collection(
3✔
514
        &self,
3✔
515
        layer: &LayerId,
3✔
516
        collection: &LayerCollectionId,
3✔
517
    ) -> Result<()> {
3✔
518
        let mut conn = self.conn_pool.get().await?;
3✔
519
        let transaction = conn.transaction().await?;
3✔
520

521
        delete_layer_from_collection(&transaction, layer, collection).await?;
12✔
522

523
        transaction.commit().await.map_err(Into::into)
3✔
524
    }
9✔
525

526
    async fn remove_layer_collection_from_parent(
2✔
527
        &self,
2✔
528
        collection: &LayerCollectionId,
2✔
529
        parent: &LayerCollectionId,
2✔
530
    ) -> Result<()> {
2✔
531
        let mut conn = self.conn_pool.get().await?;
2✔
532
        let transaction = conn.transaction().await?;
2✔
533

534
        delete_layer_collection_from_parent(&transaction, collection, parent).await?;
15✔
535

536
        transaction.commit().await.map_err(Into::into)
2✔
537
    }
6✔
538
}
539

540
#[async_trait]
541
impl<Tls> LayerCollectionProvider for PostgresDb<Tls>
542
where
543
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
544
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
545
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
546
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
547
{
548
    fn capabilities(&self) -> ProviderCapabilities {
2✔
549
        ProviderCapabilities {
2✔
550
            listing: true,
2✔
551
            search: SearchCapabilities {
2✔
552
                search_types: SearchTypes {
2✔
553
                    fulltext: true,
2✔
554
                    prefix: true,
2✔
555
                },
2✔
556
                autocomplete: true,
2✔
557
                filters: None,
2✔
558
            },
2✔
559
        }
2✔
560
    }
2✔
561

562
    fn name(&self) -> &str {
×
563
        "Postgres Layer Database"
×
564
    }
×
565

566
    fn description(&self) -> &str {
×
567
        "A layer database using Postgres"
×
568
    }
×
569

570
    #[allow(clippy::too_many_lines)]
571
    async fn load_layer_collection(
18✔
572
        &self,
18✔
573
        collection_id: &LayerCollectionId,
18✔
574
        options: LayerCollectionListOptions,
18✔
575
    ) -> Result<LayerCollection> {
18✔
576
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
18✔
577
            crate::error::Error::IdStringMustBeUuid {
×
578
                found: collection_id.0.clone(),
×
579
            }
×
580
        })?;
18✔
581

582
        let conn = self.conn_pool.get().await?;
18✔
583

584
        let stmt = conn
18✔
585
            .prepare(
18✔
586
                "
18✔
587
        SELECT name, description, properties
18✔
588
        FROM layer_collections
18✔
589
        WHERE id = $1;",
18✔
590
            )
18✔
591
            .await?;
17✔
592

593
        let row = conn.query_one(&stmt, &[&collection]).await?;
18✔
594

595
        let name: String = row.get(0);
14✔
596
        let description: String = row.get(1);
14✔
597
        let properties: Vec<Property> = row.get(2);
14✔
598

599
        let stmt = conn
14✔
600
            .prepare(
14✔
601
                "
14✔
602
        SELECT DISTINCT id, name, description, properties, is_layer
14✔
603
        FROM (
14✔
604
            SELECT 
14✔
605
                concat(id, '') AS id, 
14✔
606
                name, 
14✔
607
                description, 
14✔
608
                properties, 
14✔
609
                FALSE AS is_layer
14✔
610
            FROM layer_collections
14✔
611
                JOIN collection_children cc ON (id = cc.child)
14✔
612
            WHERE cc.parent = $1
14✔
613
        ) u UNION (
14✔
614
            SELECT 
14✔
615
                concat(id, '') AS id, 
14✔
616
                name, 
14✔
617
                description, 
14✔
618
                properties, 
14✔
619
                TRUE AS is_layer
14✔
620
            FROM layers uc
14✔
621
                JOIN collection_layers cl ON (id = cl.layer)
14✔
622
            WHERE cl.collection = $1
14✔
623
        )
14✔
624
        ORDER BY is_layer ASC, name ASC
14✔
625
        LIMIT $2 
14✔
626
        OFFSET $3;            
14✔
627
        ",
14✔
628
            )
14✔
629
            .await?;
14✔
630

631
        let rows = conn
14✔
632
            .query(
14✔
633
                &stmt,
14✔
634
                &[
14✔
635
                    &collection,
14✔
636
                    &i64::from(options.limit),
14✔
637
                    &i64::from(options.offset),
14✔
638
                ],
14✔
639
            )
14✔
640
            .await?;
14✔
641

642
        let items = rows
14✔
643
            .into_iter()
14✔
644
            .map(|row| {
16✔
645
                let is_layer: bool = row.get(4);
16✔
646

16✔
647
                if is_layer {
16✔
648
                    Ok(CollectionItem::Layer(LayerListing {
7✔
649
                        id: ProviderLayerId {
7✔
650
                            provider_id: INTERNAL_PROVIDER_ID,
7✔
651
                            layer_id: LayerId(row.get(0)),
7✔
652
                        },
7✔
653
                        name: row.get(1),
7✔
654
                        description: row.get(2),
7✔
655
                        properties: row.get(3),
7✔
656
                    }))
7✔
657
                } else {
658
                    Ok(CollectionItem::Collection(LayerCollectionListing {
9✔
659
                        id: ProviderLayerCollectionId {
9✔
660
                            provider_id: INTERNAL_PROVIDER_ID,
9✔
661
                            collection_id: LayerCollectionId(row.get(0)),
9✔
662
                        },
9✔
663
                        name: row.get(1),
9✔
664
                        description: row.get(2),
9✔
665
                        properties: row.get(3),
9✔
666
                    }))
9✔
667
                }
668
            })
16✔
669
            .collect::<Result<Vec<CollectionItem>>>()?;
14✔
670

671
        Ok(LayerCollection {
14✔
672
            id: ProviderLayerCollectionId {
14✔
673
                provider_id: INTERNAL_PROVIDER_ID,
14✔
674
                collection_id: collection_id.clone(),
14✔
675
            },
14✔
676
            name,
14✔
677
            description,
14✔
678
            items,
14✔
679
            entry_label: None,
14✔
680
            properties,
14✔
681
        })
14✔
682
    }
54✔
683

684
    #[allow(clippy::too_many_lines)]
685
    async fn search(
9✔
686
        &self,
9✔
687
        collection_id: &LayerCollectionId,
9✔
688
        search: SearchParameters,
9✔
689
    ) -> Result<LayerCollection> {
9✔
690
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
9✔
691
            crate::error::Error::IdStringMustBeUuid {
×
692
                found: collection_id.0.clone(),
×
693
            }
×
694
        })?;
9✔
695

696
        let conn = self.conn_pool.get().await?;
9✔
697

698
        let stmt = conn
9✔
699
            .prepare(
9✔
700
                "
9✔
701
        SELECT name, description, properties
9✔
702
        FROM layer_collections
9✔
703
        WHERE id = $1;",
9✔
704
            )
9✔
705
            .await?;
15✔
706

707
        let row = conn.query_one(&stmt, &[&collection]).await?;
9✔
708

709
        let name: String = row.get(0);
9✔
710
        let description: String = row.get(1);
9✔
711
        let properties: Vec<Property> = row.get(2);
9✔
712

713
        let pattern = match search.search_type {
9✔
714
            SearchType::Fulltext => {
715
                format!("%{}%", search.search_string)
6✔
716
            }
717
            SearchType::Prefix => {
718
                format!("{}%", search.search_string)
3✔
719
            }
720
        };
721

722
        let stmt = conn.prepare(&create_search_query(true)).await?;
9✔
723

724
        let rows = conn
9✔
725
            .query(
9✔
726
                &stmt,
9✔
727
                &[
9✔
728
                    &collection,
9✔
729
                    &i64::from(search.limit),
9✔
730
                    &i64::from(search.offset),
9✔
731
                    &pattern,
9✔
732
                ],
9✔
733
            )
9✔
734
            .await?;
9✔
735

736
        let items = rows
9✔
737
            .into_iter()
9✔
738
            .map(|row| {
13✔
739
                let is_layer: bool = row.get(4);
13✔
740

13✔
741
                if is_layer {
13✔
742
                    Ok(CollectionItem::Layer(LayerListing {
5✔
743
                        id: ProviderLayerId {
5✔
744
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
745
                            layer_id: LayerId(row.get(0)),
5✔
746
                        },
5✔
747
                        name: row.get(1),
5✔
748
                        description: row.get(2),
5✔
749
                        properties: row.get(3),
5✔
750
                    }))
5✔
751
                } else {
752
                    Ok(CollectionItem::Collection(LayerCollectionListing {
8✔
753
                        id: ProviderLayerCollectionId {
8✔
754
                            provider_id: INTERNAL_PROVIDER_ID,
8✔
755
                            collection_id: LayerCollectionId(row.get(0)),
8✔
756
                        },
8✔
757
                        name: row.get(1),
8✔
758
                        description: row.get(2),
8✔
759
                        properties: row.get(3),
8✔
760
                    }))
8✔
761
                }
762
            })
13✔
763
            .collect::<Result<Vec<CollectionItem>>>()?;
9✔
764

765
        Ok(LayerCollection {
9✔
766
            id: ProviderLayerCollectionId {
9✔
767
                provider_id: INTERNAL_PROVIDER_ID,
9✔
768
                collection_id: collection_id.clone(),
9✔
769
            },
9✔
770
            name,
9✔
771
            description,
9✔
772
            items,
9✔
773
            entry_label: None,
9✔
774
            properties,
9✔
775
        })
9✔
776
    }
27✔
777

778
    #[allow(clippy::too_many_lines)]
779
    async fn autocomplete_search(
9✔
780
        &self,
9✔
781
        collection_id: &LayerCollectionId,
9✔
782
        search: SearchParameters,
9✔
783
    ) -> Result<Vec<String>> {
9✔
784
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
9✔
785
            crate::error::Error::IdStringMustBeUuid {
×
786
                found: collection_id.0.clone(),
×
787
            }
×
788
        })?;
9✔
789

790
        let conn = self.conn_pool.get().await?;
9✔
791

792
        let pattern = match search.search_type {
9✔
793
            SearchType::Fulltext => {
794
                format!("%{}%", search.search_string)
6✔
795
            }
796
            SearchType::Prefix => {
797
                format!("{}%", search.search_string)
3✔
798
            }
799
        };
800

801
        let stmt = conn.prepare(&create_search_query(false)).await?;
9✔
802

803
        let rows = conn
9✔
804
            .query(
9✔
805
                &stmt,
9✔
806
                &[
9✔
807
                    &collection,
9✔
808
                    &i64::from(search.limit),
9✔
809
                    &i64::from(search.offset),
9✔
810
                    &pattern,
9✔
811
                ],
9✔
812
            )
9✔
813
            .await?;
9✔
814

815
        let items = rows
9✔
816
            .into_iter()
9✔
817
            .map(|row| Ok(row.get::<usize, &str>(0).to_string()))
13✔
818
            .collect::<Result<Vec<String>>>()?;
9✔
819

820
        Ok(items)
9✔
821
    }
27✔
822

823
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
22✔
824
        Ok(LayerCollectionId(
22✔
825
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
22✔
826
        ))
22✔
827
    }
44✔
828

829
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
17✔
830
        let layer_id =
17✔
831
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
17✔
832
                found: id.0.clone(),
×
833
            })?;
17✔
834

835
        let conn = self.conn_pool.get().await?;
17✔
836

837
        let stmt = conn
17✔
838
            .prepare(
17✔
839
                "
17✔
840
            SELECT 
17✔
841
                l.name,
17✔
842
                l.description,
17✔
843
                w.workflow,
17✔
844
                l.symbology,
17✔
845
                l.properties,
17✔
846
                l.metadata
17✔
847
            FROM 
17✔
848
                layers l JOIN workflows w ON (l.workflow_id = w.id)
17✔
849
            WHERE 
17✔
850
                l.id = $1;",
17✔
851
            )
17✔
852
            .await?;
16✔
853

854
        let row = conn
17✔
855
            .query_one(&stmt, &[&layer_id])
17✔
856
            .await
16✔
857
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
17✔
858

859
        Ok(Layer {
860
            id: ProviderLayerId {
13✔
861
                provider_id: INTERNAL_PROVIDER_ID,
13✔
862
                layer_id: id.clone(),
13✔
863
            },
13✔
864
            name: row.get(0),
13✔
865
            description: row.get(1),
13✔
866
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
13✔
867
            symbology: row.get(3),
13✔
868
            properties: row.get(4),
13✔
869
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
13✔
870
        })
871
    }
51✔
872
}
873

874
#[async_trait]
875
impl<Tls> LayerProviderDb for PostgresDb<Tls>
876
where
877
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
878
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
879
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
880
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
881
{
882
    async fn add_layer_provider(
8✔
883
        &self,
8✔
884
        provider: TypedDataProviderDefinition,
8✔
885
    ) -> Result<DataProviderId> {
8✔
886
        let conn = self.conn_pool.get().await?;
8✔
887

888
        let stmt = conn
8✔
889
            .prepare(
8✔
890
                "
8✔
891
              INSERT INTO layer_providers (
8✔
892
                  id, 
8✔
893
                  type_name, 
8✔
894
                  name,
8✔
895
                  definition,
8✔
896
                  priority
8✔
897
              )
8✔
898
              VALUES ($1, $2, $3, $4, $5)",
8✔
899
            )
8✔
900
            .await?;
246✔
901

902
        // clamp the priority to a reasonable range
903
        let prio = DataProviderDefinition::<Self>::priority(&provider);
8✔
904
        let clamp_prio = prio.clamp(-1000, 1000);
8✔
905

8✔
906
        if prio != clamp_prio {
8✔
907
            log::warn!(
×
908
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
909
                DataProviderDefinition::<Self>::name(&provider),
×
910
                prio,
911
                clamp_prio
912
            );
913
        }
8✔
914

915
        let id = DataProviderDefinition::<Self>::id(&provider);
8✔
916
        conn.execute(
8✔
917
            &stmt,
8✔
918
            &[
8✔
919
                &id,
8✔
920
                &DataProviderDefinition::<Self>::type_name(&provider),
8✔
921
                &DataProviderDefinition::<Self>::name(&provider),
8✔
922
                &provider,
8✔
923
                &clamp_prio,
8✔
924
            ],
8✔
925
        )
8✔
926
        .await?;
8✔
927
        Ok(id)
8✔
928
    }
24✔
929

930
    async fn list_layer_providers(
1✔
931
        &self,
1✔
932
        options: LayerProviderListingOptions,
1✔
933
    ) -> Result<Vec<LayerProviderListing>> {
1✔
934
        // TODO: permission
935
        let conn = self.conn_pool.get().await?;
1✔
936

937
        let stmt = conn
1✔
938
            .prepare(
1✔
939
                "
1✔
940
            SELECT 
1✔
941
                id, 
1✔
942
                name,
1✔
943
                priority
1✔
944
            FROM 
1✔
945
                layer_providers
1✔
946
            WHERE
1✔
947
                priority > -1000
1✔
948
            ORDER BY priority DESC, name ASC
1✔
949
            LIMIT $1 
1✔
950
            OFFSET $2;",
1✔
951
            )
1✔
952
            .await?;
1✔
953

954
        let rows = conn
1✔
955
            .query(
1✔
956
                &stmt,
1✔
957
                &[&i64::from(options.limit), &i64::from(options.offset)],
1✔
958
            )
1✔
UNCOV
959
            .await?;
×
960

961
        Ok(rows
1✔
962
            .iter()
1✔
963
            .map(|row| LayerProviderListing {
1✔
964
                id: row.get(0),
1✔
965
                name: row.get(1),
1✔
966
                priority: row.get(2),
1✔
967
            })
1✔
968
            .collect())
1✔
969
    }
3✔
970

971
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
18✔
972
        let conn = self.conn_pool.get().await?;
18✔
973

974
        let stmt = conn
18✔
975
            .prepare(
18✔
976
                "
18✔
977
               SELECT 
18✔
978
                   definition
18✔
979
               FROM 
18✔
980
                   layer_providers
18✔
981
               WHERE
18✔
982
                   id = $1",
18✔
983
            )
18✔
984
            .await?;
18✔
985

986
        let row = conn.query_one(&stmt, &[&id]).await?;
18✔
987

988
        let definition: TypedDataProviderDefinition = row.get(0);
11✔
989

11✔
990
        Box::new(definition)
11✔
991
            .initialize(PostgresDb {
11✔
992
                conn_pool: self.conn_pool.clone(),
11✔
993
            })
11✔
994
            .await
174✔
995
    }
54✔
996
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc