• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.87
/services/src/pro/layers/postgres_layer_db.rs
1
use crate::api::model::datatypes::{DataProviderId, LayerId};
2
use crate::{
3
    error::Result,
4
    layers::{
5
        external::{DataProvider, DataProviderDefinition},
6
        layer::{
7
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
8
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
9
            ProviderLayerCollectionId, ProviderLayerId,
10
        },
11
        listing::{LayerCollectionId, LayerCollectionProvider},
12
        storage::{
13
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
14
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
15
        },
16
        LayerDbError,
17
    },
18
    util::user_input::Validated,
19
};
20
use async_trait::async_trait;
21
use bb8_postgres::{
22
    bb8::Pool,
23
    tokio_postgres::{
24
        tls::{MakeTlsConnect, TlsConnect},
25
        Socket,
26
    },
27
    PostgresConnectionManager,
28
};
29
use snafu::ResultExt;
30
use std::{collections::HashMap, str::FromStr};
31
use uuid::Uuid;
32

33
pub struct PostgresLayerDb<Tls>
34
where
35
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
36
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
37
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
38
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
39
{
40
    pub(crate) conn_pool: Pool<PostgresConnectionManager<Tls>>,
41
}
42

43
impl<Tls> PostgresLayerDb<Tls>
44
where
45
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
46
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
47
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
48
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
49
{
50
    pub fn new(conn_pool: Pool<PostgresConnectionManager<Tls>>) -> Self {
18✔
51
        Self { conn_pool }
18✔
52
    }
18✔
53

54
    /// delete all collections without parent collection
55
    async fn _remove_collections_without_parent_collection(
3✔
56
        transaction: &tokio_postgres::Transaction<'_>,
3✔
57
    ) -> Result<()> {
3✔
58
        // HINT: a recursive delete statement seems reasonable, but hard to implement in postgres
59
        //       because you have a graph with potential loops
60

61
        let remove_layer_collections_without_parents_stmt = transaction
3✔
62
            .prepare(
3✔
63
                "DELETE FROM layer_collections
3✔
64
                 WHERE  id <> $1 -- do not delete root collection
3✔
65
                 AND    id NOT IN (
3✔
66
                    SELECT child FROM collection_children
3✔
67
                 );",
3✔
68
            )
3✔
69
            .await?;
3✔
70
        while 0 < transaction
5✔
71
            .execute(
5✔
72
                &remove_layer_collections_without_parents_stmt,
5✔
73
                &[&INTERNAL_LAYER_DB_ROOT_COLLECTION_ID],
5✔
74
            )
5✔
75
            .await?
5✔
76
        {
2✔
77
            // whenever one collection is deleted, we have to check again if there are more
2✔
78
            // collections without parents
2✔
79
        }
2✔
80

81
        Ok(())
3✔
82
    }
3✔
83

84
    /// delete all layers without parent collection
85
    async fn _remove_layers_without_parent_collection(
5✔
86
        transaction: &tokio_postgres::Transaction<'_>,
5✔
87
    ) -> Result<()> {
5✔
88
        let remove_layers_without_parents_stmt = transaction
5✔
89
            .prepare(
5✔
90
                "DELETE FROM layers
5✔
91
                 WHERE id NOT IN (
5✔
92
                    SELECT layer FROM collection_layers
5✔
93
                 );",
5✔
94
            )
5✔
95
            .await?;
5✔
96
        transaction
5✔
97
            .execute(&remove_layers_without_parents_stmt, &[])
5✔
98
            .await?;
5✔
99

100
        Ok(())
5✔
101
    }
5✔
102
}
103

104
#[async_trait]
105
impl<Tls> LayerDb for PostgresLayerDb<Tls>
106
where
107
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
108
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
109
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
110
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
111
{
112
    async fn add_layer(
6✔
113
        &self,
6✔
114
        layer: Validated<AddLayer>,
6✔
115
        collection: &LayerCollectionId,
6✔
116
    ) -> Result<LayerId> {
6✔
117
        let collection_id =
6✔
118
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
6✔
119
                found: collection.0.clone(),
×
120
            })?;
6✔
121

122
        let mut conn = self.conn_pool.get().await?;
6✔
123

124
        let layer = layer.user_input;
6✔
125

6✔
126
        let layer_id = Uuid::new_v4();
6✔
127
        let symbology = serde_json::to_value(&layer.symbology).context(crate::error::SerdeJson)?;
6✔
128

129
        let trans = conn.build_transaction().start().await?;
6✔
130

131
        let stmt = trans
6✔
132
            .prepare(
6✔
133
                "
6✔
134
            INSERT INTO layers (id, name, description, workflow, symbology)
6✔
135
            VALUES ($1, $2, $3, $4, $5);",
6✔
136
            )
6✔
137
            .await?;
6✔
138

139
        trans
140
            .execute(
141
                &stmt,
6✔
142
                &[
6✔
143
                    &layer_id,
6✔
144
                    &layer.name,
6✔
145
                    &layer.description,
6✔
146
                    &serde_json::to_value(&layer.workflow).context(crate::error::SerdeJson)?,
6✔
147
                    &symbology,
6✔
148
                ],
149
            )
150
            .await?;
6✔
151

152
        let stmt = trans
6✔
153
            .prepare(
6✔
154
                "
6✔
155
        INSERT INTO collection_layers (collection, layer)
6✔
156
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
6✔
157
            )
6✔
158
            .await?;
6✔
159

160
        trans.execute(&stmt, &[&collection_id, &layer_id]).await?;
6✔
161

162
        trans.commit().await?;
6✔
163

164
        Ok(LayerId(layer_id.to_string()))
6✔
165
    }
12✔
166

167
    async fn add_layer_with_id(
×
168
        &self,
×
169
        id: &LayerId,
×
170
        layer: Validated<AddLayer>,
×
171
        collection: &LayerCollectionId,
×
172
    ) -> Result<()> {
×
173
        let layer_id =
×
174
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
175
                found: collection.0.clone(),
×
176
            })?;
×
177

178
        let collection_id =
×
179
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
180
                found: collection.0.clone(),
×
181
            })?;
×
182

183
        let mut conn = self.conn_pool.get().await?;
×
184

185
        let layer = layer.user_input;
×
186

187
        let symbology = serde_json::to_value(&layer.symbology).context(crate::error::SerdeJson)?;
×
188

189
        let trans = conn.build_transaction().start().await?;
×
190

191
        let stmt = trans
×
192
            .prepare(
×
193
                "
×
194
            INSERT INTO layers (id, name, description, workflow, symbology)
×
195
            VALUES ($1, $2, $3, $4, $5);",
×
196
            )
×
197
            .await?;
×
198

199
        trans
200
            .execute(
201
                &stmt,
×
202
                &[
×
203
                    &layer_id,
×
204
                    &layer.name,
×
205
                    &layer.description,
×
206
                    &serde_json::to_value(&layer.workflow).context(crate::error::SerdeJson)?,
×
207
                    &symbology,
×
208
                ],
209
            )
210
            .await?;
×
211

212
        let stmt = trans
×
213
            .prepare(
×
214
                "
×
215
            INSERT INTO collection_layers (collection, layer)
×
216
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
×
217
            )
×
218
            .await?;
×
219

220
        trans.execute(&stmt, &[&collection_id, &layer_id]).await?;
×
221

222
        trans.commit().await?;
×
223

224
        Ok(())
×
225
    }
×
226

227
    async fn add_layer_to_collection(
1✔
228
        &self,
1✔
229
        layer: &LayerId,
1✔
230
        collection: &LayerCollectionId,
1✔
231
    ) -> Result<()> {
1✔
232
        let layer_id =
1✔
233
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
234
                found: layer.0.clone(),
×
235
            })?;
1✔
236

237
        let collection_id =
1✔
238
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
239
                found: collection.0.clone(),
×
240
            })?;
1✔
241

242
        let conn = self.conn_pool.get().await?;
1✔
243

244
        let stmt = conn
1✔
245
            .prepare(
1✔
246
                "
1✔
247
            INSERT INTO collection_layers (collection, layer)
1✔
248
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
1✔
249
            )
1✔
250
            .await?;
1✔
251

252
        conn.execute(&stmt, &[&collection_id, &layer_id]).await?;
1✔
253

254
        Ok(())
1✔
255
    }
2✔
256

257
    async fn add_collection(
7✔
258
        &self,
7✔
259
        collection: Validated<AddLayerCollection>,
7✔
260
        parent: &LayerCollectionId,
7✔
261
    ) -> Result<LayerCollectionId> {
7✔
262
        let parent =
7✔
263
            Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
7✔
264
                found: parent.0.clone(),
×
265
            })?;
7✔
266

267
        let mut conn = self.conn_pool.get().await?;
7✔
268

269
        let collection = collection.user_input;
7✔
270

7✔
271
        let collection_id = Uuid::new_v4();
7✔
272

273
        let trans = conn.build_transaction().start().await?;
7✔
274

275
        let stmt = trans
7✔
276
            .prepare(
7✔
277
                "
7✔
278
            INSERT INTO layer_collections (id, name, description)
7✔
279
            VALUES ($1, $2, $3);",
7✔
280
            )
7✔
281
            .await?;
7✔
282

283
        trans
7✔
284
            .execute(
7✔
285
                &stmt,
7✔
286
                &[&collection_id, &collection.name, &collection.description],
7✔
287
            )
7✔
288
            .await?;
7✔
289

290
        let stmt = trans
7✔
291
            .prepare(
7✔
292
                "
7✔
293
            INSERT INTO collection_children (parent, child)
7✔
294
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
7✔
295
            )
7✔
296
            .await?;
7✔
297

298
        trans.execute(&stmt, &[&parent, &collection_id]).await?;
7✔
299

300
        trans.commit().await?;
7✔
301

302
        Ok(LayerCollectionId(collection_id.to_string()))
7✔
303
    }
14✔
304

305
    async fn add_collection_with_id(
×
306
        &self,
×
307
        id: &LayerCollectionId,
×
308
        collection: Validated<AddLayerCollection>,
×
309
        parent: &LayerCollectionId,
×
310
    ) -> Result<()> {
×
311
        let collection_id =
×
312
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
313
                found: id.0.clone(),
×
314
            })?;
×
315

316
        let parent =
×
317
            Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
×
318
                found: parent.0.clone(),
×
319
            })?;
×
320

321
        let mut conn = self.conn_pool.get().await?;
×
322

323
        let collection = collection.user_input;
×
324

325
        let trans = conn.build_transaction().start().await?;
×
326

327
        let stmt = trans
×
328
            .prepare(
×
329
                "
×
330
            INSERT INTO layer_collections (id, name, description)
×
331
            VALUES ($1, $2, $3);",
×
332
            )
×
333
            .await?;
×
334

335
        trans
×
336
            .execute(
×
337
                &stmt,
×
338
                &[&collection_id, &collection.name, &collection.description],
×
339
            )
×
340
            .await?;
×
341

342
        let stmt = trans
×
343
            .prepare(
×
344
                "
×
345
            INSERT INTO collection_children (parent, child)
×
346
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
×
347
            )
×
348
            .await?;
×
349

350
        trans.execute(&stmt, &[&parent, &collection_id]).await?;
×
351

352
        trans.commit().await?;
×
353

354
        Ok(())
×
355
    }
×
356

357
    async fn add_collection_to_parent(
1✔
358
        &self,
1✔
359
        collection: &LayerCollectionId,
1✔
360
        parent: &LayerCollectionId,
1✔
361
    ) -> Result<()> {
1✔
362
        let collection =
1✔
363
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
364
                found: collection.0.clone(),
×
365
            })?;
1✔
366

367
        let parent =
1✔
368
            Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
369
                found: parent.0.clone(),
×
370
            })?;
1✔
371

372
        let conn = self.conn_pool.get().await?;
1✔
373

374
        let stmt = conn
1✔
375
            .prepare(
1✔
376
                "
1✔
377
            INSERT INTO collection_children (parent, child)
1✔
378
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
1✔
379
            )
1✔
380
            .await?;
1✔
381

382
        conn.execute(&stmt, &[&parent, &collection]).await?;
1✔
383

384
        Ok(())
1✔
385
    }
2✔
386

387
    async fn remove_collection(&self, collection: &LayerCollectionId) -> Result<()> {
3✔
388
        let collection =
3✔
389
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
390
                found: collection.0.clone(),
×
391
            })?;
3✔
392

393
        if collection == INTERNAL_LAYER_DB_ROOT_COLLECTION_ID {
3✔
394
            return Err(LayerDbError::CannotRemoveRootCollection.into());
1✔
395
        }
2✔
396

397
        let mut conn = self.conn_pool.get().await?;
2✔
398
        let transaction = conn.transaction().await?;
2✔
399

400
        // delete the collection!
401
        // on delete cascade removes all entries from `collection_children` and `collection_layers`
402

403
        let remove_layer_collection_stmt = transaction
2✔
404
            .prepare(
2✔
405
                "DELETE FROM layer_collections
2✔
406
                 WHERE id = $1;",
2✔
407
            )
2✔
408
            .await?;
2✔
409
        transaction
2✔
410
            .execute(&remove_layer_collection_stmt, &[&collection])
2✔
411
            .await?;
2✔
412

413
        Self::_remove_collections_without_parent_collection(&transaction).await?;
4✔
414

415
        Self::_remove_layers_without_parent_collection(&transaction).await?;
4✔
416

417
        transaction.commit().await.map_err(Into::into)
2✔
418
    }
6✔
419

420
    async fn remove_layer_from_collection(
2✔
421
        &self,
2✔
422
        layer: &LayerId,
2✔
423
        collection: &LayerCollectionId,
2✔
424
    ) -> Result<()> {
2✔
425
        let collection_uuid =
2✔
426
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
427
                found: collection.0.clone(),
×
428
            })?;
2✔
429

430
        let layer_uuid =
2✔
431
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
432
                found: layer.0.clone(),
×
433
            })?;
2✔
434

435
        let mut conn = self.conn_pool.get().await?;
2✔
436
        let transaction = conn.transaction().await?;
2✔
437

438
        let remove_layer_collection_stmt = transaction
2✔
439
            .prepare(
2✔
440
                "DELETE FROM collection_layers
2✔
441
                 WHERE collection = $1
2✔
442
                 AND layer = $2;",
2✔
443
            )
2✔
444
            .await?;
2✔
445
        let num_results = transaction
2✔
446
            .execute(
2✔
447
                &remove_layer_collection_stmt,
2✔
448
                &[&collection_uuid, &layer_uuid],
2✔
449
            )
2✔
450
            .await?;
2✔
451

452
        if num_results == 0 {
2✔
453
            return Err(LayerDbError::NoLayerForGivenIdInCollection {
×
454
                layer: layer.clone(),
×
455
                collection: collection.clone(),
×
456
            }
×
457
            .into());
×
458
        }
2✔
459

2✔
460
        Self::_remove_layers_without_parent_collection(&transaction).await?;
4✔
461

462
        transaction.commit().await.map_err(Into::into)
2✔
463
    }
4✔
464

465
    async fn remove_collection_from_parent(
1✔
466
        &self,
1✔
467
        collection: &LayerCollectionId,
1✔
468
        parent: &LayerCollectionId,
1✔
469
    ) -> Result<()> {
1✔
470
        let collection_uuid =
1✔
471
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
472
                found: collection.0.clone(),
×
473
            })?;
1✔
474

475
        let parent_collection_uuid =
1✔
476
            Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
477
                found: parent.0.clone(),
×
478
            })?;
1✔
479

480
        let mut conn = self.conn_pool.get().await?;
1✔
481
        let transaction = conn.transaction().await?;
1✔
482

483
        let remove_layer_collection_stmt = transaction
1✔
484
            .prepare(
1✔
485
                "DELETE FROM collection_children
1✔
486
                 WHERE child = $1
1✔
487
                 AND parent = $2;",
1✔
488
            )
1✔
489
            .await?;
1✔
490
        let num_results = transaction
1✔
491
            .execute(
1✔
492
                &remove_layer_collection_stmt,
1✔
493
                &[&collection_uuid, &parent_collection_uuid],
1✔
494
            )
1✔
495
            .await?;
1✔
496

497
        if num_results == 0 {
1✔
498
            return Err(LayerDbError::NoCollectionForGivenIdInCollection {
×
499
                collection: collection.clone(),
×
500
                parent: parent.clone(),
×
501
            }
×
502
            .into());
×
503
        }
1✔
504

1✔
505
        Self::_remove_collections_without_parent_collection(&transaction).await?;
4✔
506

507
        Self::_remove_layers_without_parent_collection(&transaction).await?;
2✔
508

509
        transaction.commit().await.map_err(Into::into)
1✔
510
    }
2✔
511
}
512

513
#[async_trait]
514
impl<Tls> LayerCollectionProvider for PostgresLayerDb<Tls>
515
where
516
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
517
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
518
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
519
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
520
{
521
    async fn collection(
11✔
522
        &self,
11✔
523
        collection_id: &LayerCollectionId,
11✔
524
        options: Validated<LayerCollectionListOptions>,
11✔
525
    ) -> Result<LayerCollection> {
11✔
526
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
11✔
527
            crate::error::Error::IdStringMustBeUuid {
×
528
                found: collection_id.0.clone(),
×
529
            }
×
530
        })?;
11✔
531

532
        let conn = self.conn_pool.get().await?;
11✔
533

534
        let options = options.user_input;
11✔
535

536
        let stmt = conn
11✔
537
            .prepare(
11✔
538
                "
11✔
539
        SELECT name, description
11✔
540
        FROM layer_collections WHERE id = $1;",
11✔
541
            )
11✔
542
            .await?;
11✔
543

544
        let row = conn.query_one(&stmt, &[&collection]).await?;
11✔
545

546
        let name: String = row.get(0);
8✔
547
        let description: String = row.get(1);
8✔
548

549
        let stmt = conn
8✔
550
            .prepare(
8✔
551
                "
8✔
552
        SELECT id, name, description, is_layer
8✔
553
        FROM (
8✔
554
            SELECT 
8✔
555
                concat(id, '') AS id, 
8✔
556
                name, 
8✔
557
                description, 
8✔
558
                FALSE AS is_layer
8✔
559
            FROM layer_collections c JOIN collection_children cc ON (c.id = cc.child)
8✔
560
            WHERE cc.parent = $1
8✔
561
        ) u UNION (
8✔
562
            SELECT 
8✔
563
                concat(id, '') AS id, 
8✔
564
                name, 
8✔
565
                description, 
8✔
566
                TRUE As is_layer
8✔
567
            FROM layers l JOIN collection_layers cl ON (l.id = cl.layer)
8✔
568
            WHERE cl.collection = $1
8✔
569
        )
8✔
570
        ORDER BY is_layer ASC, name ASC
8✔
571
        LIMIT $2 
8✔
572
        OFFSET $3;            
8✔
573
        ",
8✔
574
            )
8✔
575
            .await?;
8✔
576

577
        let rows = conn
8✔
578
            .query(
8✔
579
                &stmt,
8✔
580
                &[
8✔
581
                    &collection,
8✔
582
                    &i64::from(options.limit),
8✔
583
                    &i64::from(options.offset),
8✔
584
                ],
8✔
585
            )
8✔
586
            .await?;
8✔
587

588
        let items = rows
8✔
589
            .into_iter()
8✔
590
            .map(|row| {
11✔
591
                let is_layer: bool = row.get(3);
11✔
592

11✔
593
                if is_layer {
11✔
594
                    CollectionItem::Layer(LayerListing {
5✔
595
                        id: ProviderLayerId {
5✔
596
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
597
                            layer_id: LayerId(row.get(0)),
5✔
598
                        },
5✔
599
                        name: row.get(1),
5✔
600
                        description: row.get(2),
5✔
601
                        properties: vec![],
5✔
602
                    })
5✔
603
                } else {
604
                    CollectionItem::Collection(LayerCollectionListing {
6✔
605
                        id: ProviderLayerCollectionId {
6✔
606
                            provider_id: INTERNAL_PROVIDER_ID,
6✔
607
                            collection_id: LayerCollectionId(row.get(0)),
6✔
608
                        },
6✔
609
                        name: row.get(1),
6✔
610
                        description: row.get(2),
6✔
611
                    })
6✔
612
                }
613
            })
11✔
614
            .collect();
8✔
615

8✔
616
        Ok(LayerCollection {
8✔
617
            id: ProviderLayerCollectionId {
8✔
618
                provider_id: INTERNAL_PROVIDER_ID,
8✔
619
                collection_id: collection_id.clone(),
8✔
620
            },
8✔
621
            name,
8✔
622
            description,
8✔
623
            items,
8✔
624
            entry_label: None,
8✔
625
            properties: vec![],
8✔
626
        })
8✔
627
    }
22✔
628

629
    async fn root_collection_id(&self) -> Result<LayerCollectionId> {
4✔
630
        Ok(LayerCollectionId(
4✔
631
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
4✔
632
        ))
4✔
633
    }
4✔
634

635
    async fn get_layer(&self, id: &LayerId) -> Result<Layer> {
5✔
636
        let layer_id =
5✔
637
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
638
                found: id.0.clone(),
×
639
            })?;
5✔
640

641
        let conn = self.conn_pool.get().await?;
5✔
642

643
        let stmt = conn
5✔
644
            .prepare(
5✔
645
                "
5✔
646
            SELECT 
5✔
647
                name,
5✔
648
                description,
5✔
649
                workflow,
5✔
650
                symbology         
5✔
651
            FROM layers l
5✔
652
            WHERE l.id = $1;",
5✔
653
            )
5✔
654
            .await?;
5✔
655

656
        let row = conn
5✔
657
            .query_one(&stmt, &[&layer_id])
5✔
658
            .await
5✔
659
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
5✔
660

661
        Ok(Layer {
662
            id: ProviderLayerId {
2✔
663
                provider_id: INTERNAL_PROVIDER_ID,
2✔
664
                layer_id: id.clone(),
2✔
665
            },
2✔
666
            name: row.get(0),
2✔
667
            description: row.get(1),
2✔
668
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
2✔
669
            symbology: serde_json::from_value(row.get(3)).context(crate::error::SerdeJson)?,
2✔
670
            properties: vec![],
2✔
671
            metadata: HashMap::new(),
2✔
672
        })
673
    }
10✔
674
}
675

676
pub struct PostgresLayerProviderDb<Tls>
677
where
678
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
679
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
680
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
681
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
682
{
683
    pub(crate) conn_pool: Pool<PostgresConnectionManager<Tls>>,
684
}
685

686
impl<Tls> PostgresLayerProviderDb<Tls>
687
where
688
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
689
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
690
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
691
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
692
{
693
    pub fn new(conn_pool: Pool<PostgresConnectionManager<Tls>>) -> Self {
18✔
694
        Self { conn_pool }
18✔
695
    }
18✔
696
}
697

698
#[async_trait]
699
impl<Tls> LayerProviderDb for PostgresLayerProviderDb<Tls>
700
where
701
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
702
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
703
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
704
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
705
{
706
    async fn add_layer_provider(
1✔
707
        &self,
1✔
708
        provider: Box<dyn DataProviderDefinition>,
1✔
709
    ) -> Result<DataProviderId> {
1✔
710
        // TODO: permissions
711
        let conn = self.conn_pool.get().await?;
1✔
712

713
        let stmt = conn
1✔
714
            .prepare(
1✔
715
                "
1✔
716
              INSERT INTO layer_providers (
1✔
717
                  id, 
1✔
718
                  type_name, 
1✔
719
                  name,
1✔
720
                  definition
1✔
721
              )
1✔
722
              VALUES ($1, $2, $3, $4)",
1✔
723
            )
1✔
724
            .await?;
1✔
725

726
        let id = provider.id();
1✔
727
        conn.execute(
1✔
728
            &stmt,
1✔
729
            &[
1✔
730
                &id,
1✔
731
                &provider.type_name(),
1✔
732
                &provider.name(),
1✔
733
                &serde_json::to_value(provider)?,
1✔
734
            ],
735
        )
736
        .await?;
1✔
737
        Ok(id)
1✔
738
    }
2✔
739

740
    async fn list_layer_providers(
1✔
741
        &self,
1✔
742
        options: Validated<LayerProviderListingOptions>,
1✔
743
    ) -> Result<Vec<LayerProviderListing>> {
1✔
744
        // TODO: permission
745
        let conn = self.conn_pool.get().await?;
1✔
746

747
        let options = options.user_input;
1✔
748

749
        let stmt = conn
1✔
750
            .prepare(
1✔
751
                "
1✔
752
            SELECT 
1✔
753
                id, 
1✔
754
                name,
1✔
755
                type_name
1✔
756
            FROM 
1✔
757
                layer_providers
1✔
758
            ORDER BY name ASC
1✔
759
            LIMIT $1 
1✔
760
            OFFSET $2;",
1✔
761
            )
1✔
762
            .await?;
1✔
763

764
        let rows = conn
1✔
765
            .query(
1✔
766
                &stmt,
1✔
767
                &[&i64::from(options.limit), &i64::from(options.offset)],
1✔
768
            )
1✔
769
            .await?;
1✔
770

771
        Ok(rows
1✔
772
            .iter()
1✔
773
            .map(|row| LayerProviderListing {
1✔
774
                id: row.get(0),
1✔
775
                name: row.get(1),
1✔
776
                description: row.get(2),
1✔
777
            })
1✔
778
            .collect())
1✔
779
    }
2✔
780

781
    async fn layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
1✔
782
        // TODO: permissions
783
        let conn = self.conn_pool.get().await?;
1✔
784

785
        let stmt = conn
1✔
786
            .prepare(
1✔
787
                "
1✔
788
               SELECT 
1✔
789
                   definition
1✔
790
               FROM 
1✔
791
                   layer_providers
1✔
792
               WHERE
1✔
793
                   id = $1",
1✔
794
            )
1✔
795
            .await?;
1✔
796

797
        let row = conn.query_one(&stmt, &[&id]).await?;
1✔
798

799
        let definition = serde_json::from_value::<Box<dyn DataProviderDefinition>>(row.get(0))?;
1✔
800

801
        definition.initialize().await
1✔
802
    }
2✔
803
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc