• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12767614094

14 Jan 2025 12:26PM UTC coverage: 90.64% (+0.06%) from 90.576%
12767614094

push

github

web-flow
Merge pull request #1006 from geo-engine/migrate-pro-api

Migrate-pro-api

1106 of 1152 new or added lines in 24 files covered. (96.01%)

248 existing lines in 13 files now uncovered.

133501 of 147287 relevant lines covered (90.64%)

54652.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.85
/services/src/layers/postgres_layer_db.rs
1
use super::external::TypedDataProviderDefinition;
2
use super::layer::{UpdateLayer, UpdateLayerCollection};
3
use super::listing::{ProviderCapabilities, SearchType};
4
use crate::contexts::PostgresDb;
5
use crate::layers::layer::Property;
6
use crate::layers::listing::{SearchCapabilities, SearchParameters, SearchTypes};
7
use crate::workflows::registry::TxWorkflowRegistry;
8
use crate::{
9
    error::Result,
10
    layers::{
11
        external::{DataProvider, DataProviderDefinition},
12
        layer::{
13
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
14
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
15
            ProviderLayerCollectionId, ProviderLayerId,
16
        },
17
        listing::{LayerCollectionId, LayerCollectionProvider},
18
        storage::{
19
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
20
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
21
        },
22
        LayerDbError,
23
    },
24
};
25
use async_trait::async_trait;
26
use bb8_postgres::bb8::PooledConnection;
27
use bb8_postgres::tokio_postgres::{
28
    tls::{MakeTlsConnect, TlsConnect},
29
    Socket,
30
};
31
use bb8_postgres::PostgresConnectionManager;
32
use geoengine_datatypes::dataset::{DataProviderId, LayerId};
33
use geoengine_datatypes::util::HashMapTextTextDbType;
34
use snafu::ResultExt;
35
use std::str::FromStr;
36
use tokio_postgres::Transaction;
37
use uuid::Uuid;
38

39
/// delete all collections without parent collection
40
async fn _remove_collections_without_parent_collection(
8✔
41
    transaction: &tokio_postgres::Transaction<'_>,
8✔
42
) -> Result<()> {
8✔
43
    // HINT: a recursive delete statement seems reasonable, but hard to implement in postgres
44
    //       because you have a graph with potential loops
45

46
    let remove_layer_collections_without_parents_stmt = transaction
8✔
47
        .prepare(
8✔
48
            "DELETE FROM layer_collections
8✔
49
                 WHERE  id <> $1 -- do not delete root collection
8✔
50
                 AND    id NOT IN (
8✔
51
                    SELECT child FROM collection_children
8✔
52
                 );",
8✔
53
        )
8✔
54
        .await?;
8✔
55
    while 0 < transaction
13✔
56
        .execute(
13✔
57
            &remove_layer_collections_without_parents_stmt,
13✔
58
            &[&INTERNAL_LAYER_DB_ROOT_COLLECTION_ID],
13✔
59
        )
13✔
60
        .await?
13✔
61
    {
5✔
62
        // whenever one collection is deleted, we have to check again if there are more
5✔
63
        // collections without parents
5✔
64
    }
5✔
65

66
    Ok(())
8✔
67
}
8✔
68

69
/// delete all layers without parent collection
70
#[allow(clippy::used_underscore_items)] // TODO: maybe rename?
71
async fn _remove_layers_without_parent_collection(
13✔
72
    transaction: &tokio_postgres::Transaction<'_>,
13✔
73
) -> Result<()> {
13✔
74
    let remove_layers_without_parents_stmt = transaction
13✔
75
        .prepare(
13✔
76
            "DELETE FROM layers
13✔
77
                 WHERE id NOT IN (
13✔
78
                    SELECT layer FROM collection_layers
13✔
79
                 );",
13✔
80
        )
13✔
81
        .await?;
13✔
82
    transaction
13✔
83
        .execute(&remove_layers_without_parents_stmt, &[])
13✔
84
        .await?;
13✔
85

86
    Ok(())
13✔
87
}
13✔
88

89
pub async fn insert_layer<W: TxWorkflowRegistry>(
37✔
90
    workflow_registry: &W,
37✔
91
    trans: &Transaction<'_>,
37✔
92
    id: &LayerId,
37✔
93
    layer: AddLayer,
37✔
94
    collection: &LayerCollectionId,
37✔
95
) -> Result<Uuid> {
37✔
96
    let layer_id = Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
37✔
97
        found: collection.0.clone(),
×
98
    })?;
37✔
99

100
    let collection_id =
37✔
101
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
37✔
102
            found: collection.0.clone(),
×
103
        })?;
37✔
104

105
    let workflow_id = workflow_registry
37✔
106
        .register_workflow_in_tx(layer.workflow, trans)
37✔
107
        .await?;
37✔
108

109
    let stmt = trans
37✔
110
        .prepare(
37✔
111
            "
37✔
112
            INSERT INTO layers (id, name, description, workflow_id, symbology, properties, metadata)
37✔
113
            VALUES ($1, $2, $3, $4, $5, $6, $7);",
37✔
114
        )
37✔
115
        .await?;
37✔
116

117
    trans
37✔
118
        .execute(
37✔
119
            &stmt,
37✔
120
            &[
37✔
121
                &layer_id,
37✔
122
                &layer.name,
37✔
123
                &layer.description,
37✔
124
                &workflow_id,
37✔
125
                &layer.symbology,
37✔
126
                &layer.properties,
37✔
127
                &HashMapTextTextDbType::from(&layer.metadata),
37✔
128
            ],
37✔
129
        )
37✔
130
        .await?;
37✔
131

132
    let stmt = trans
37✔
133
        .prepare(
37✔
134
            "
37✔
135
            INSERT INTO collection_layers (collection, layer)
37✔
136
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
37✔
137
        )
37✔
138
        .await?;
37✔
139

140
    trans.execute(&stmt, &[&collection_id, &layer_id]).await?;
37✔
141

142
    Ok(layer_id)
37✔
143
}
37✔
144

145
pub async fn insert_layer_collection_with_id(
44✔
146
    trans: &Transaction<'_>,
44✔
147
    id: &LayerCollectionId,
44✔
148
    collection: AddLayerCollection,
44✔
149
    parent: &LayerCollectionId,
44✔
150
) -> Result<Uuid> {
44✔
151
    let collection_id =
44✔
152
        Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
44✔
153
            found: id.0.clone(),
×
154
        })?;
44✔
155

156
    let parent =
44✔
157
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
44✔
158
            found: parent.0.clone(),
×
159
        })?;
44✔
160

161
    let stmt = trans
44✔
162
        .prepare(
44✔
163
            "
44✔
164
        INSERT INTO layer_collections (id, name, description, properties)
44✔
165
        VALUES ($1, $2, $3, $4);",
44✔
166
        )
44✔
167
        .await?;
44✔
168

169
    trans
44✔
170
        .execute(
44✔
171
            &stmt,
44✔
172
            &[
44✔
173
                &collection_id,
44✔
174
                &collection.name,
44✔
175
                &collection.description,
44✔
176
                &collection.properties,
44✔
177
            ],
44✔
178
        )
44✔
179
        .await?;
44✔
180

181
    let stmt = trans
44✔
182
        .prepare(
44✔
183
            "
44✔
184
        INSERT INTO collection_children (parent, child)
44✔
185
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
44✔
186
        )
44✔
187
        .await?;
44✔
188

189
    trans.execute(&stmt, &[&parent, &collection_id]).await?;
44✔
190

191
    Ok(collection_id)
44✔
192
}
44✔
193

194
pub async fn insert_collection_parent<Tls>(
3✔
195
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
3✔
196
    collection: &LayerCollectionId,
3✔
197
    parent: &LayerCollectionId,
3✔
198
) -> Result<()>
3✔
199
where
3✔
200
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
3✔
201
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
3✔
202
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
3✔
203
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
3✔
204
{
3✔
205
    let collection =
3✔
206
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
207
            found: collection.0.clone(),
×
208
        })?;
3✔
209

210
    let parent =
3✔
211
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
212
            found: parent.0.clone(),
×
213
        })?;
3✔
214

215
    let stmt = conn
3✔
216
        .prepare(
3✔
217
            "
3✔
218
        INSERT INTO collection_children (parent, child)
3✔
219
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
3✔
220
        )
3✔
221
        .await?;
3✔
222

223
    conn.execute(&stmt, &[&parent, &collection]).await?;
3✔
224

225
    Ok(())
3✔
226
}
3✔
227

228
pub async fn delete_layer_collection(
8✔
229
    transaction: &Transaction<'_>,
8✔
230
    collection: &LayerCollectionId,
8✔
231
) -> Result<()> {
8✔
232
    let collection =
8✔
233
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
8✔
234
            found: collection.0.clone(),
×
235
        })?;
8✔
236

237
    if collection == INTERNAL_LAYER_DB_ROOT_COLLECTION_ID {
8✔
238
        return Err(LayerDbError::CannotRemoveRootCollection.into());
3✔
239
    }
5✔
240

241
    // delete the collection!
242
    // on delete cascade removes all entries from `collection_children` and `collection_layers`
243

244
    let remove_layer_collection_stmt = transaction
5✔
245
        .prepare(
5✔
246
            "DELETE FROM layer_collections
5✔
247
             WHERE id = $1;",
5✔
248
        )
5✔
249
        .await?;
5✔
250
    transaction
5✔
251
        .execute(&remove_layer_collection_stmt, &[&collection])
5✔
252
        .await?;
5✔
253

254
    #[allow(clippy::used_underscore_items)] // TODO: maybe rename?
255
    _remove_collections_without_parent_collection(transaction).await?;
5✔
256

257
    #[allow(clippy::used_underscore_items)] // TODO: maybe rename?
258
    _remove_layers_without_parent_collection(transaction).await?;
5✔
259

260
    Ok(())
5✔
261
}
8✔
262

263
pub async fn delete_layer_from_collection(
5✔
264
    transaction: &Transaction<'_>,
5✔
265
    layer: &LayerId,
5✔
266
    collection: &LayerCollectionId,
5✔
267
) -> Result<()> {
5✔
268
    let collection_uuid =
5✔
269
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
270
            found: collection.0.clone(),
×
271
        })?;
5✔
272

273
    let layer_uuid =
5✔
274
        Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
275
            found: layer.0.clone(),
×
276
        })?;
5✔
277

278
    let remove_layer_collection_stmt = transaction
5✔
279
        .prepare(
5✔
280
            "DELETE FROM collection_layers
5✔
281
             WHERE collection = $1
5✔
282
             AND layer = $2;",
5✔
283
        )
5✔
284
        .await?;
5✔
285
    let num_results = transaction
5✔
286
        .execute(
5✔
287
            &remove_layer_collection_stmt,
5✔
288
            &[&collection_uuid, &layer_uuid],
5✔
289
        )
5✔
290
        .await?;
5✔
291

292
    if num_results == 0 {
5✔
293
        return Err(LayerDbError::NoLayerForGivenIdInCollection {
×
294
            layer: layer.clone(),
×
295
            collection: collection.clone(),
×
296
        }
×
297
        .into());
×
298
    }
5✔
299

5✔
300
    #[allow(clippy::used_underscore_items)] // TODO: maybe rename?
5✔
301
    _remove_layers_without_parent_collection(transaction).await?;
5✔
302

303
    Ok(())
5✔
304
}
5✔
305

306
pub async fn delete_layer_collection_from_parent(
3✔
307
    transaction: &Transaction<'_>,
3✔
308
    collection: &LayerCollectionId,
3✔
309
    parent: &LayerCollectionId,
3✔
310
) -> Result<()> {
3✔
311
    let collection_uuid =
3✔
312
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
313
            found: collection.0.clone(),
×
314
        })?;
3✔
315

316
    let parent_collection_uuid =
3✔
317
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
318
            found: parent.0.clone(),
×
319
        })?;
3✔
320

321
    let remove_layer_collection_stmt = transaction
3✔
322
        .prepare(
3✔
323
            "DELETE FROM collection_children
3✔
324
             WHERE child = $1
3✔
325
             AND parent = $2;",
3✔
326
        )
3✔
327
        .await?;
3✔
328
    let num_results = transaction
3✔
329
        .execute(
3✔
330
            &remove_layer_collection_stmt,
3✔
331
            &[&collection_uuid, &parent_collection_uuid],
3✔
332
        )
3✔
333
        .await?;
3✔
334

335
    if num_results == 0 {
3✔
336
        return Err(LayerDbError::NoCollectionForGivenIdInCollection {
×
337
            collection: collection.clone(),
×
338
            parent: parent.clone(),
×
339
        }
×
340
        .into());
×
341
    }
3✔
342

3✔
343
    #[allow(clippy::used_underscore_items)] // TODO: maybe rename?
3✔
344
    _remove_collections_without_parent_collection(transaction).await?;
3✔
345

346
    #[allow(clippy::used_underscore_items)] // TODO: maybe rename?
347
    _remove_layers_without_parent_collection(transaction).await?;
3✔
348

349
    Ok(())
3✔
350
}
3✔
351

352
fn create_search_query(full_info: bool) -> String {
16✔
353
    format!("
16✔
354
        WITH RECURSIVE parents AS (
16✔
355
            SELECT $1::uuid as id
16✔
356
            UNION ALL SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)
16✔
357
        )
16✔
358
        SELECT DISTINCT *
16✔
359
        FROM (
16✔
360
            SELECT 
16✔
361
                {}
16✔
362
            FROM layer_collections
16✔
363
                JOIN (SELECT DISTINCT child FROM collection_children JOIN parents ON (id = parent)) cc ON (id = cc.child)
16✔
364
            WHERE name ILIKE $4
16✔
365
        ) u UNION (
16✔
366
            SELECT 
16✔
367
                {}
16✔
368
            FROM layers uc
16✔
369
                JOIN (SELECT DISTINCT layer FROM collection_layers JOIN parents ON (collection = id)) cl ON (id = cl.layer)
16✔
370
            WHERE name ILIKE $4
16✔
371
        )
16✔
372
        ORDER BY {}name ASC
16✔
373
        LIMIT $2 
16✔
374
        OFFSET $3;",
16✔
375
        if full_info {
16✔
376
            "concat(id, '') AS id,
8✔
377
            name,
8✔
378
            description,
8✔
379
            properties,
8✔
380
            FALSE AS is_layer"
8✔
381
        } else { "name" },
8✔
382
        if full_info {
16✔
383
            "concat(id, '') AS id,
8✔
384
            name,
8✔
385
            description,
8✔
386
            properties,
8✔
387
            TRUE AS is_layer"
8✔
388
        } else { "name" },
8✔
389
        if full_info { "is_layer ASC," } else { "" })
16✔
390
}
16✔
391

392
#[async_trait]
393
impl<Tls> LayerDb for PostgresDb<Tls>
394
where
395
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
396
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
397
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
398
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
399
{
400
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
10✔
401
        let layer_id = Uuid::new_v4();
10✔
402
        let layer_id = LayerId(layer_id.to_string());
10✔
403

10✔
404
        self.add_layer_with_id(&layer_id, layer, collection).await?;
10✔
405

406
        Ok(layer_id)
10✔
407
    }
20✔
408

UNCOV
409
    async fn update_layer(&self, id: &LayerId, layer: UpdateLayer) -> Result<()> {
×
UNCOV
410
        let layer_id =
×
UNCOV
411
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
412
                found: id.0.clone(),
×
UNCOV
413
            })?;
×
414

UNCOV
415
        let mut conn = self.conn_pool.get().await?;
×
416

UNCOV
417
        let tx = conn.build_transaction().start().await?;
×
418

UNCOV
419
        let workflow_id = self.register_workflow_in_tx(layer.workflow, &tx).await?;
×
420

UNCOV
421
        tx.execute(
×
UNCOV
422
            "
×
UNCOV
423
            UPDATE layers
×
UNCOV
424
            SET name = $1, description = $2, symbology = $3, properties = $4, metadata = $5, workflow_id = $6
×
UNCOV
425
            WHERE id = $7;",
×
UNCOV
426
            &[
×
UNCOV
427
                &layer.name,
×
UNCOV
428
                &layer.description,
×
UNCOV
429
                &layer.symbology,
×
UNCOV
430
                &layer.properties,
×
UNCOV
431
                &HashMapTextTextDbType::from(&layer.metadata),
×
UNCOV
432
                &workflow_id,
×
UNCOV
433
                &layer_id,
×
UNCOV
434
            ],
×
UNCOV
435
        )
×
UNCOV
436
        .await?;
×
437

UNCOV
438
        tx.commit().await?;
×
439

UNCOV
440
        Ok(())
×
UNCOV
441
    }
×
442

UNCOV
443
    async fn remove_layer(&self, id: &LayerId) -> Result<()> {
×
UNCOV
444
        let layer_id =
×
UNCOV
445
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
446
                found: id.0.clone(),
×
UNCOV
447
            })?;
×
448

UNCOV
449
        let conn = self.conn_pool.get().await?;
×
450

UNCOV
451
        conn.execute(
×
UNCOV
452
            "
×
UNCOV
453
            DELETE FROM layers
×
UNCOV
454
            WHERE id = $1;",
×
UNCOV
455
            &[&layer_id],
×
UNCOV
456
        )
×
UNCOV
457
        .await?;
×
458

UNCOV
459
        Ok(())
×
UNCOV
460
    }
×
461

462
    async fn add_layer_with_id(
463
        &self,
464
        id: &LayerId,
465
        layer: AddLayer,
466
        collection: &LayerCollectionId,
467
    ) -> Result<()> {
10✔
468
        let mut conn = self.conn_pool.get().await?;
10✔
469

470
        let trans = conn.build_transaction().start().await?;
10✔
471

472
        insert_layer(self, &trans, id, layer, collection).await?;
10✔
473

474
        trans.commit().await?;
10✔
475

476
        Ok(())
10✔
477
    }
20✔
478

479
    async fn add_layer_to_collection(
480
        &self,
481
        layer: &LayerId,
482
        collection: &LayerCollectionId,
483
    ) -> Result<()> {
2✔
484
        let layer_id =
1✔
485
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
486
                found: layer.0.clone(),
1✔
487
            })?;
2✔
488

489
        let collection_id =
1✔
490
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
491
                found: collection.0.clone(),
×
492
            })?;
1✔
493

494
        let conn = self.conn_pool.get().await?;
1✔
495

496
        let stmt = conn
1✔
497
            .prepare(
1✔
498
                "
1✔
499
            INSERT INTO collection_layers (collection, layer)
1✔
500
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
1✔
501
            )
1✔
502
            .await?;
1✔
503

504
        conn.execute(&stmt, &[&collection_id, &layer_id]).await?;
1✔
505

506
        Ok(())
1✔
507
    }
4✔
508

509
    async fn add_layer_collection(
510
        &self,
511
        collection: AddLayerCollection,
512
        parent: &LayerCollectionId,
513
    ) -> Result<LayerCollectionId> {
12✔
514
        let collection_id = Uuid::new_v4();
12✔
515
        let collection_id = LayerCollectionId(collection_id.to_string());
12✔
516

12✔
517
        self.add_layer_collection_with_id(&collection_id, collection, parent)
12✔
518
            .await?;
12✔
519

520
        Ok(collection_id)
12✔
521
    }
24✔
522

523
    async fn add_layer_collection_with_id(
524
        &self,
525
        id: &LayerCollectionId,
526
        collection: AddLayerCollection,
527
        parent: &LayerCollectionId,
528
    ) -> Result<()> {
12✔
529
        let mut conn = self.conn_pool.get().await?;
12✔
530

531
        let trans = conn.build_transaction().start().await?;
12✔
532

533
        insert_layer_collection_with_id(&trans, id, collection, parent).await?;
12✔
534

535
        trans.commit().await?;
12✔
536

537
        Ok(())
12✔
538
    }
24✔
539

540
    async fn add_collection_to_parent(
541
        &self,
542
        collection: &LayerCollectionId,
543
        parent: &LayerCollectionId,
544
    ) -> Result<()> {
1✔
545
        let conn = self.conn_pool.get().await?;
1✔
546
        insert_collection_parent(&conn, collection, parent).await
1✔
547
    }
2✔
548

549
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
3✔
550
        let mut conn = self.conn_pool.get().await?;
3✔
551
        let transaction = conn.transaction().await?;
3✔
552

553
        delete_layer_collection(&transaction, collection).await?;
3✔
554

555
        transaction.commit().await.map_err(Into::into)
2✔
556
    }
6✔
557

558
    async fn remove_layer_from_collection(
559
        &self,
560
        layer: &LayerId,
561
        collection: &LayerCollectionId,
562
    ) -> Result<()> {
2✔
563
        let mut conn = self.conn_pool.get().await?;
2✔
564
        let transaction = conn.transaction().await?;
2✔
565

566
        delete_layer_from_collection(&transaction, layer, collection).await?;
2✔
567

568
        transaction.commit().await.map_err(Into::into)
2✔
569
    }
4✔
570

571
    async fn remove_layer_collection_from_parent(
572
        &self,
573
        collection: &LayerCollectionId,
574
        parent: &LayerCollectionId,
575
    ) -> Result<()> {
1✔
576
        let mut conn = self.conn_pool.get().await?;
1✔
577
        let transaction = conn.transaction().await?;
1✔
578

579
        delete_layer_collection_from_parent(&transaction, collection, parent).await?;
1✔
580

581
        transaction.commit().await.map_err(Into::into)
1✔
582
    }
2✔
583

584
    async fn update_layer_collection(
585
        &self,
586
        collection: &LayerCollectionId,
587
        update: UpdateLayerCollection,
UNCOV
588
    ) -> Result<()> {
×
UNCOV
589
        let collection_id =
×
UNCOV
590
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
591
                found: collection.0.clone(),
×
UNCOV
592
            })?;
×
593

UNCOV
594
        let conn = self.conn_pool.get().await?;
×
595

UNCOV
596
        conn.execute(
×
UNCOV
597
            "UPDATE layer_collections 
×
UNCOV
598
                SET name = $1, description = $2, properties = $3
×
UNCOV
599
                WHERE id = $4;",
×
UNCOV
600
            &[
×
UNCOV
601
                &update.name,
×
UNCOV
602
                &update.description,
×
UNCOV
603
                &update.properties,
×
UNCOV
604
                &collection_id,
×
UNCOV
605
            ],
×
UNCOV
606
        )
×
UNCOV
607
        .await?;
×
608

UNCOV
609
        Ok(())
×
UNCOV
610
    }
×
611
}
612

613
#[async_trait]
614
impl<Tls> LayerCollectionProvider for PostgresDb<Tls>
615
where
616
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
617
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
618
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
619
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
620
{
621
    fn capabilities(&self) -> ProviderCapabilities {
1✔
622
        ProviderCapabilities {
1✔
623
            listing: true,
1✔
624
            search: SearchCapabilities {
1✔
625
                search_types: SearchTypes {
1✔
626
                    fulltext: true,
1✔
627
                    prefix: true,
1✔
628
                },
1✔
629
                autocomplete: true,
1✔
630
                filters: None,
1✔
631
            },
1✔
632
        }
1✔
633
    }
1✔
634

635
    fn name(&self) -> &str {
×
636
        "Postgres Layer Database"
×
637
    }
×
638

639
    fn description(&self) -> &str {
×
640
        "A layer database using Postgres"
×
641
    }
×
642

643
    #[allow(clippy::too_many_lines)]
644
    async fn load_layer_collection(
645
        &self,
646
        collection_id: &LayerCollectionId,
647
        options: LayerCollectionListOptions,
648
    ) -> Result<LayerCollection> {
12✔
649
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
12✔
650
            crate::error::Error::IdStringMustBeUuid {
×
651
                found: collection_id.0.clone(),
×
652
            }
×
653
        })?;
12✔
654

655
        let conn = self.conn_pool.get().await?;
12✔
656

657
        let stmt = conn
12✔
658
            .prepare(
12✔
659
                "
12✔
660
        SELECT name, description, properties
12✔
661
        FROM layer_collections
12✔
662
        WHERE id = $1;",
12✔
663
            )
12✔
664
            .await?;
12✔
665

666
        let row = conn.query_one(&stmt, &[&collection]).await?;
12✔
667

668
        let name: String = row.get(0);
9✔
669
        let description: String = row.get(1);
9✔
670
        let properties: Vec<Property> = row.get(2);
9✔
671

672
        let stmt = conn
9✔
673
            .prepare(
9✔
674
                "
9✔
675
        SELECT DISTINCT id, name, description, properties, is_layer
9✔
676
        FROM (
9✔
677
            SELECT 
9✔
678
                concat(id, '') AS id, 
9✔
679
                name, 
9✔
680
                description, 
9✔
681
                properties, 
9✔
682
                FALSE AS is_layer
9✔
683
            FROM layer_collections
9✔
684
                JOIN collection_children cc ON (id = cc.child)
9✔
685
            WHERE cc.parent = $1
9✔
686
        ) u UNION (
9✔
687
            SELECT 
9✔
688
                concat(id, '') AS id, 
9✔
689
                name, 
9✔
690
                description, 
9✔
691
                properties, 
9✔
692
                TRUE AS is_layer
9✔
693
            FROM layers uc
9✔
694
                JOIN collection_layers cl ON (id = cl.layer)
9✔
695
            WHERE cl.collection = $1
9✔
696
        )
9✔
697
        ORDER BY is_layer ASC, name ASC
9✔
698
        LIMIT $2 
9✔
699
        OFFSET $3;            
9✔
700
        ",
9✔
701
            )
9✔
702
            .await?;
9✔
703

704
        let rows = conn
9✔
705
            .query(
9✔
706
                &stmt,
9✔
707
                &[
9✔
708
                    &collection,
9✔
709
                    &i64::from(options.limit),
9✔
710
                    &i64::from(options.offset),
9✔
711
                ],
9✔
712
            )
9✔
713
            .await?;
9✔
714

715
        let items = rows
9✔
716
            .into_iter()
9✔
717
            .map(|row| {
11✔
718
                let is_layer: bool = row.get(4);
11✔
719

11✔
720
                if is_layer {
11✔
721
                    Ok(CollectionItem::Layer(LayerListing {
5✔
722
                        id: ProviderLayerId {
5✔
723
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
724
                            layer_id: LayerId(row.get(0)),
5✔
725
                        },
5✔
726
                        name: row.get(1),
5✔
727
                        description: row.get(2),
5✔
728
                        properties: row.get(3),
5✔
729
                    }))
5✔
730
                } else {
731
                    Ok(CollectionItem::Collection(LayerCollectionListing {
6✔
732
                        id: ProviderLayerCollectionId {
6✔
733
                            provider_id: INTERNAL_PROVIDER_ID,
6✔
734
                            collection_id: LayerCollectionId(row.get(0)),
6✔
735
                        },
6✔
736
                        name: row.get(1),
6✔
737
                        description: row.get(2),
6✔
738
                        properties: row.get(3),
6✔
739
                    }))
6✔
740
                }
741
            })
11✔
742
            .collect::<Result<Vec<CollectionItem>>>()?;
9✔
743

744
        Ok(LayerCollection {
9✔
745
            id: ProviderLayerCollectionId {
9✔
746
                provider_id: INTERNAL_PROVIDER_ID,
9✔
747
                collection_id: collection_id.clone(),
9✔
748
            },
9✔
749
            name,
9✔
750
            description,
9✔
751
            items,
9✔
752
            entry_label: None,
9✔
753
            properties,
9✔
754
        })
9✔
755
    }
24✔
756

757
    #[allow(clippy::too_many_lines)]
758
    async fn search(
759
        &self,
760
        collection_id: &LayerCollectionId,
761
        search: SearchParameters,
762
    ) -> Result<LayerCollection> {
8✔
763
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
8✔
764
            crate::error::Error::IdStringMustBeUuid {
×
765
                found: collection_id.0.clone(),
×
766
            }
×
767
        })?;
8✔
768

769
        let conn = self.conn_pool.get().await?;
8✔
770

771
        let stmt = conn
8✔
772
            .prepare(
8✔
773
                "
8✔
774
        SELECT name, description, properties
8✔
775
        FROM layer_collections
8✔
776
        WHERE id = $1;",
8✔
777
            )
8✔
778
            .await?;
8✔
779

780
        let row = conn.query_one(&stmt, &[&collection]).await?;
8✔
781

782
        let name: String = row.get(0);
8✔
783
        let description: String = row.get(1);
8✔
784
        let properties: Vec<Property> = row.get(2);
8✔
785

786
        let pattern = match search.search_type {
8✔
787
            SearchType::Fulltext => {
788
                format!("%{}%", search.search_string)
5✔
789
            }
790
            SearchType::Prefix => {
791
                format!("{}%", search.search_string)
3✔
792
            }
793
        };
794

795
        let stmt = conn.prepare(&create_search_query(true)).await?;
8✔
796

797
        let rows = conn
8✔
798
            .query(
8✔
799
                &stmt,
8✔
800
                &[
8✔
801
                    &collection,
8✔
802
                    &i64::from(search.limit),
8✔
803
                    &i64::from(search.offset),
8✔
804
                    &pattern,
8✔
805
                ],
8✔
806
            )
8✔
807
            .await?;
8✔
808

809
        let items = rows
8✔
810
            .into_iter()
8✔
811
            .map(|row| {
13✔
812
                let is_layer: bool = row.get(4);
13✔
813

13✔
814
                if is_layer {
13✔
815
                    Ok(CollectionItem::Layer(LayerListing {
5✔
816
                        id: ProviderLayerId {
5✔
817
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
818
                            layer_id: LayerId(row.get(0)),
5✔
819
                        },
5✔
820
                        name: row.get(1),
5✔
821
                        description: row.get(2),
5✔
822
                        properties: row.get(3),
5✔
823
                    }))
5✔
824
                } else {
825
                    Ok(CollectionItem::Collection(LayerCollectionListing {
8✔
826
                        id: ProviderLayerCollectionId {
8✔
827
                            provider_id: INTERNAL_PROVIDER_ID,
8✔
828
                            collection_id: LayerCollectionId(row.get(0)),
8✔
829
                        },
8✔
830
                        name: row.get(1),
8✔
831
                        description: row.get(2),
8✔
832
                        properties: row.get(3),
8✔
833
                    }))
8✔
834
                }
835
            })
13✔
836
            .collect::<Result<Vec<CollectionItem>>>()?;
8✔
837

838
        Ok(LayerCollection {
8✔
839
            id: ProviderLayerCollectionId {
8✔
840
                provider_id: INTERNAL_PROVIDER_ID,
8✔
841
                collection_id: collection_id.clone(),
8✔
842
            },
8✔
843
            name,
8✔
844
            description,
8✔
845
            items,
8✔
846
            entry_label: None,
8✔
847
            properties,
8✔
848
        })
8✔
849
    }
16✔
850

851
    #[allow(clippy::too_many_lines)]
852
    async fn autocomplete_search(
853
        &self,
854
        collection_id: &LayerCollectionId,
855
        search: SearchParameters,
856
    ) -> Result<Vec<String>> {
8✔
857
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
8✔
858
            crate::error::Error::IdStringMustBeUuid {
×
859
                found: collection_id.0.clone(),
×
860
            }
×
861
        })?;
8✔
862

863
        let conn = self.conn_pool.get().await?;
8✔
864

865
        let pattern = match search.search_type {
8✔
866
            SearchType::Fulltext => {
867
                format!("%{}%", search.search_string)
5✔
868
            }
869
            SearchType::Prefix => {
870
                format!("{}%", search.search_string)
3✔
871
            }
872
        };
873

874
        let stmt = conn.prepare(&create_search_query(false)).await?;
8✔
875

876
        let rows = conn
8✔
877
            .query(
8✔
878
                &stmt,
8✔
879
                &[
8✔
880
                    &collection,
8✔
881
                    &i64::from(search.limit),
8✔
882
                    &i64::from(search.offset),
8✔
883
                    &pattern,
8✔
884
                ],
8✔
885
            )
8✔
886
            .await?;
8✔
887

888
        let items = rows
8✔
889
            .into_iter()
8✔
890
            .map(|row| Ok(row.get::<usize, &str>(0).to_string()))
13✔
891
            .collect::<Result<Vec<String>>>()?;
8✔
892

893
        Ok(items)
8✔
894
    }
16✔
895

896
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
8✔
897
        Ok(LayerCollectionId(
8✔
898
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
8✔
899
        ))
8✔
900
    }
16✔
901

902
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
5✔
903
        let layer_id =
5✔
904
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
905
                found: id.0.clone(),
×
906
            })?;
5✔
907

908
        let conn = self.conn_pool.get().await?;
5✔
909

910
        let stmt = conn
5✔
911
            .prepare(
5✔
912
                "
5✔
913
            SELECT 
5✔
914
                l.name,
5✔
915
                l.description,
5✔
916
                w.workflow,
5✔
917
                l.symbology,
5✔
918
                l.properties,
5✔
919
                l.metadata
5✔
920
            FROM 
5✔
921
                layers l JOIN workflows w ON (l.workflow_id = w.id)
5✔
922
            WHERE 
5✔
923
                l.id = $1;",
5✔
924
            )
5✔
925
            .await?;
5✔
926

927
        let row = conn
5✔
928
            .query_one(&stmt, &[&layer_id])
5✔
929
            .await
5✔
930
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
5✔
931

932
        Ok(Layer {
933
            id: ProviderLayerId {
2✔
934
                provider_id: INTERNAL_PROVIDER_ID,
2✔
935
                layer_id: id.clone(),
2✔
936
            },
2✔
937
            name: row.get(0),
2✔
938
            description: row.get(1),
2✔
939
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
2✔
940
            symbology: row.get(3),
2✔
941
            properties: row.get(4),
2✔
942
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
2✔
943
        })
944
    }
10✔
945
}
946

947
#[async_trait]
948
impl<Tls> LayerProviderDb for PostgresDb<Tls>
949
where
950
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static + std::fmt::Debug,
951
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
952
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
953
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
954
{
955
    async fn add_layer_provider(
956
        &self,
957
        provider: TypedDataProviderDefinition,
958
    ) -> Result<DataProviderId> {
4✔
959
        let conn = self.conn_pool.get().await?;
4✔
960

961
        let stmt = conn
4✔
962
            .prepare(
4✔
963
                "
4✔
964
              INSERT INTO layer_providers (
4✔
965
                  id, 
4✔
966
                  type_name, 
4✔
967
                  name,
4✔
968
                  definition,
4✔
969
                  priority
4✔
970
              )
4✔
971
              VALUES ($1, $2, $3, $4, $5)",
4✔
972
            )
4✔
973
            .await?;
4✔
974

975
        // clamp the priority to a reasonable range
976
        let prio = DataProviderDefinition::<Self>::priority(&provider);
4✔
977
        let clamp_prio = prio.clamp(-1000, 1000);
4✔
978

4✔
979
        if prio != clamp_prio {
4✔
980
            log::warn!(
×
981
                "The priority of the provider {} is out of range! --> clamped {} to {}",
×
982
                DataProviderDefinition::<Self>::name(&provider),
×
983
                prio,
984
                clamp_prio
985
            );
986
        }
4✔
987

988
        let id = DataProviderDefinition::<Self>::id(&provider);
4✔
989
        conn.execute(
4✔
990
            &stmt,
4✔
991
            &[
4✔
992
                &id,
4✔
993
                &DataProviderDefinition::<Self>::type_name(&provider),
4✔
994
                &DataProviderDefinition::<Self>::name(&provider),
4✔
995
                &provider,
4✔
996
                &clamp_prio,
4✔
997
            ],
4✔
998
        )
4✔
999
        .await?;
4✔
1000
        Ok(id)
4✔
1001
    }
8✔
1002

1003
    async fn list_layer_providers(
1004
        &self,
1005
        options: LayerProviderListingOptions,
1006
    ) -> Result<Vec<LayerProviderListing>> {
1✔
1007
        // TODO: permission
1008
        let conn = self.conn_pool.get().await?;
1✔
1009

1010
        let stmt = conn
1✔
1011
            .prepare(
1✔
1012
                "
1✔
1013
            SELECT 
1✔
1014
                id, 
1✔
1015
                name,
1✔
1016
                priority
1✔
1017
            FROM 
1✔
1018
                layer_providers
1✔
1019
            WHERE
1✔
1020
                priority > -1000
1✔
1021
            ORDER BY priority DESC, name ASC
1✔
1022
            LIMIT $1 
1✔
1023
            OFFSET $2;",
1✔
1024
            )
1✔
1025
            .await?;
1✔
1026

1027
        let rows = conn
1✔
1028
            .query(
1✔
1029
                &stmt,
1✔
1030
                &[&i64::from(options.limit), &i64::from(options.offset)],
1✔
1031
            )
1✔
1032
            .await?;
1✔
1033

1034
        Ok(rows
1✔
1035
            .iter()
1✔
1036
            .map(|row| LayerProviderListing {
1✔
1037
                id: row.get(0),
1✔
1038
                name: row.get(1),
1✔
1039
                priority: row.get(2),
1✔
1040
            })
1✔
1041
            .collect())
1✔
1042
    }
2✔
1043

1044
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
4✔
1045
        let conn = self.conn_pool.get().await?;
4✔
1046

1047
        let stmt = conn
4✔
1048
            .prepare(
4✔
1049
                "
4✔
1050
               SELECT 
4✔
1051
                   definition
4✔
1052
               FROM 
4✔
1053
                   layer_providers
4✔
1054
               WHERE
4✔
1055
                   id = $1",
4✔
1056
            )
4✔
1057
            .await?;
4✔
1058

1059
        let row = conn.query_one(&stmt, &[&id]).await?;
4✔
1060

1061
        let definition: TypedDataProviderDefinition = row.get(0);
4✔
1062

4✔
1063
        Box::new(definition)
4✔
1064
            .initialize(PostgresDb {
4✔
1065
                conn_pool: self.conn_pool.clone(),
4✔
1066
            })
4✔
1067
            .await
4✔
1068
    }
8✔
1069
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc