• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 6537173113

16 Oct 2023 04:41PM UTC coverage: 89.489% (-0.02%) from 89.512%
6537173113

push

github

web-flow
Merge pull request #886 from geo-engine/reuse_queries

reuse queries in pro project db

468 of 468 new or added lines in 6 files covered. (100.0%)

109180 of 122004 relevant lines covered (89.49%)

59524.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.58
/services/src/layers/postgres_layer_db.rs
1
use super::external::TypedDataProviderDefinition;
2
use crate::contexts::PostgresDb;
3
use crate::error;
4
use crate::layers::layer::Property;
5
use crate::workflows::workflow::WorkflowId;
6
use crate::{
7
    error::Result,
8
    layers::{
9
        external::{DataProvider, DataProviderDefinition},
10
        layer::{
11
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
12
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
13
            ProviderLayerCollectionId, ProviderLayerId,
14
        },
15
        listing::{LayerCollectionId, LayerCollectionProvider},
16
        storage::{
17
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
18
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
19
        },
20
        LayerDbError,
21
    },
22
};
23
use async_trait::async_trait;
24
use bb8_postgres::bb8::PooledConnection;
25
use bb8_postgres::tokio_postgres::{
26
    tls::{MakeTlsConnect, TlsConnect},
27
    Socket,
28
};
29
use bb8_postgres::PostgresConnectionManager;
30
use geoengine_datatypes::dataset::{DataProviderId, LayerId};
31
use geoengine_datatypes::util::HashMapTextTextDbType;
32
use snafu::ResultExt;
33
use std::str::FromStr;
34
use tokio_postgres::Transaction;
35
use uuid::Uuid;
36

37
/// delete all collections without parent collection
38
async fn _remove_collections_without_parent_collection(
8✔
39
    transaction: &tokio_postgres::Transaction<'_>,
8✔
40
) -> Result<()> {
8✔
41
    // HINT: a recursive delete statement seems reasonable, but hard to implement in postgres
42
    //       because you have a graph with potential loops
43

44
    let remove_layer_collections_without_parents_stmt = transaction
8✔
45
        .prepare(
8✔
46
            "DELETE FROM layer_collections
8✔
47
                 WHERE  id <> $1 -- do not delete root collection
8✔
48
                 AND    id NOT IN (
8✔
49
                    SELECT child FROM collection_children
8✔
50
                 );",
8✔
51
        )
8✔
52
        .await?;
7✔
53
    while 0 < transaction
13✔
54
        .execute(
13✔
55
            &remove_layer_collections_without_parents_stmt,
13✔
56
            &[&INTERNAL_LAYER_DB_ROOT_COLLECTION_ID],
13✔
57
        )
13✔
58
        .await?
12✔
59
    {
5✔
60
        // whenever one collection is deleted, we have to check again if there are more
5✔
61
        // collections without parents
5✔
62
    }
5✔
63

64
    Ok(())
8✔
65
}
8✔
66

67
/// delete all layers without parent collection
68
async fn _remove_layers_without_parent_collection(
13✔
69
    transaction: &tokio_postgres::Transaction<'_>,
13✔
70
) -> Result<()> {
13✔
71
    let remove_layers_without_parents_stmt = transaction
13✔
72
        .prepare(
13✔
73
            "DELETE FROM layers
13✔
74
                 WHERE id NOT IN (
13✔
75
                    SELECT layer FROM collection_layers
13✔
76
                 );",
13✔
77
        )
13✔
78
        .await?;
12✔
79
    transaction
13✔
80
        .execute(&remove_layers_without_parents_stmt, &[])
13✔
81
        .await?;
12✔
82

83
    Ok(())
13✔
84
}
13✔
85

86
pub async fn insert_layer(
20✔
87
    trans: &Transaction<'_>,
20✔
88
    id: &LayerId,
20✔
89
    layer: AddLayer,
20✔
90
    collection: &LayerCollectionId,
20✔
91
) -> Result<Uuid> {
20✔
92
    let layer_id = Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
20✔
93
        found: collection.0.clone(),
×
94
    })?;
20✔
95

96
    let collection_id =
20✔
97
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
20✔
98
            found: collection.0.clone(),
×
99
        })?;
20✔
100

101
    let workflow_id = WorkflowId::from_hash(&layer.workflow);
20✔
102

103
    let stmt = trans
20✔
104
        .prepare(
20✔
105
            "INSERT INTO workflows (id, workflow) VALUES ($1, $2) 
20✔
106
            ON CONFLICT DO NOTHING;",
20✔
107
        )
20✔
108
        .await?;
18✔
109

110
    trans
111
        .execute(
112
            &stmt,
20✔
113
            &[
20✔
114
                &workflow_id,
20✔
115
                &serde_json::to_value(&layer.workflow).context(error::SerdeJson)?,
20✔
116
            ],
117
        )
118
        .await?;
18✔
119

120
    let stmt = trans
20✔
121
        .prepare(
20✔
122
            "
20✔
123
            INSERT INTO layers (id, name, description, workflow_id, symbology, properties, metadata)
20✔
124
            VALUES ($1, $2, $3, $4, $5, $6, $7);",
20✔
125
        )
20✔
126
        .await?;
497✔
127

128
    trans
20✔
129
        .execute(
20✔
130
            &stmt,
20✔
131
            &[
20✔
132
                &layer_id,
20✔
133
                &layer.name,
20✔
134
                &layer.description,
20✔
135
                &workflow_id,
20✔
136
                &layer.symbology,
20✔
137
                &layer.properties,
20✔
138
                &HashMapTextTextDbType::from(&layer.metadata),
20✔
139
            ],
20✔
140
        )
20✔
141
        .await?;
18✔
142

143
    let stmt = trans
20✔
144
        .prepare(
20✔
145
            "
20✔
146
            INSERT INTO collection_layers (collection, layer)
20✔
147
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
20✔
148
        )
20✔
149
        .await?;
18✔
150

151
    trans.execute(&stmt, &[&collection_id, &layer_id]).await?;
20✔
152

153
    Ok(layer_id)
20✔
154
}
20✔
155

156
pub async fn insert_layer_collection_with_id(
30✔
157
    trans: &Transaction<'_>,
30✔
158
    id: &LayerCollectionId,
30✔
159
    collection: AddLayerCollection,
30✔
160
    parent: &LayerCollectionId,
30✔
161
) -> Result<Uuid> {
30✔
162
    let collection_id =
30✔
163
        Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
30✔
164
            found: id.0.clone(),
×
165
        })?;
30✔
166

167
    let parent =
30✔
168
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
30✔
169
            found: parent.0.clone(),
×
170
        })?;
30✔
171

172
    let stmt = trans
30✔
173
        .prepare(
30✔
174
            "
30✔
175
        INSERT INTO layer_collections (id, name, description, properties)
30✔
176
        VALUES ($1, $2, $3, $4);",
30✔
177
        )
30✔
178
        .await?;
82✔
179

180
    trans
30✔
181
        .execute(
30✔
182
            &stmt,
30✔
183
            &[
30✔
184
                &collection_id,
30✔
185
                &collection.name,
30✔
186
                &collection.description,
30✔
187
                &collection.properties,
30✔
188
            ],
30✔
189
        )
30✔
190
        .await?;
27✔
191

192
    let stmt = trans
30✔
193
        .prepare(
30✔
194
            "
30✔
195
        INSERT INTO collection_children (parent, child)
30✔
196
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
30✔
197
        )
30✔
198
        .await?;
27✔
199

200
    trans.execute(&stmt, &[&parent, &collection_id]).await?;
30✔
201

202
    Ok(collection_id)
30✔
203
}
30✔
204

205
pub async fn insert_collection_parent<Tls>(
3✔
206
    conn: &PooledConnection<'_, PostgresConnectionManager<Tls>>,
3✔
207
    collection: &LayerCollectionId,
3✔
208
    parent: &LayerCollectionId,
3✔
209
) -> Result<()>
3✔
210
where
3✔
211
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
3✔
212
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
3✔
213
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
3✔
214
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
3✔
215
{
3✔
216
    let collection =
3✔
217
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
218
            found: collection.0.clone(),
×
219
        })?;
3✔
220

221
    let parent =
3✔
222
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
223
            found: parent.0.clone(),
×
224
        })?;
3✔
225

226
    let stmt = conn
3✔
227
        .prepare(
3✔
228
            "
3✔
229
        INSERT INTO collection_children (parent, child)
3✔
230
        VALUES ($1, $2) ON CONFLICT DO NOTHING;",
3✔
231
        )
3✔
232
        .await?;
3✔
233

234
    conn.execute(&stmt, &[&parent, &collection]).await?;
3✔
235

236
    Ok(())
3✔
237
}
3✔
238

239
pub async fn delete_layer_collection(
7✔
240
    transaction: &Transaction<'_>,
7✔
241
    collection: &LayerCollectionId,
7✔
242
) -> Result<()> {
7✔
243
    let collection =
7✔
244
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
7✔
245
            found: collection.0.clone(),
×
246
        })?;
7✔
247

248
    if collection == INTERNAL_LAYER_DB_ROOT_COLLECTION_ID {
7✔
249
        return Err(LayerDbError::CannotRemoveRootCollection.into());
2✔
250
    }
5✔
251

252
    // delete the collection!
253
    // on delete cascade removes all entries from `collection_children` and `collection_layers`
254

255
    let remove_layer_collection_stmt = transaction
5✔
256
        .prepare(
5✔
257
            "DELETE FROM layer_collections
5✔
258
             WHERE id = $1;",
5✔
259
        )
5✔
260
        .await?;
4✔
261
    transaction
5✔
262
        .execute(&remove_layer_collection_stmt, &[&collection])
5✔
263
        .await?;
4✔
264

265
    _remove_collections_without_parent_collection(transaction).await?;
8✔
266

267
    _remove_layers_without_parent_collection(transaction).await?;
8✔
268

269
    Ok(())
5✔
270
}
7✔
271

272
pub async fn delete_layer_from_collection(
5✔
273
    transaction: &Transaction<'_>,
5✔
274
    layer: &LayerId,
5✔
275
    collection: &LayerCollectionId,
5✔
276
) -> Result<()> {
5✔
277
    let collection_uuid =
5✔
278
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
279
            found: collection.0.clone(),
×
280
        })?;
5✔
281

282
    let layer_uuid =
5✔
283
        Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
5✔
284
            found: layer.0.clone(),
×
285
        })?;
5✔
286

287
    let remove_layer_collection_stmt = transaction
5✔
288
        .prepare(
5✔
289
            "DELETE FROM collection_layers
5✔
290
             WHERE collection = $1
5✔
291
             AND layer = $2;",
5✔
292
        )
5✔
293
        .await?;
5✔
294
    let num_results = transaction
5✔
295
        .execute(
5✔
296
            &remove_layer_collection_stmt,
5✔
297
            &[&collection_uuid, &layer_uuid],
5✔
298
        )
5✔
299
        .await?;
5✔
300

301
    if num_results == 0 {
5✔
302
        return Err(LayerDbError::NoLayerForGivenIdInCollection {
×
303
            layer: layer.clone(),
×
304
            collection: collection.clone(),
×
305
        }
×
306
        .into());
×
307
    }
5✔
308

5✔
309
    _remove_layers_without_parent_collection(transaction).await?;
10✔
310

311
    Ok(())
5✔
312
}
5✔
313

314
pub async fn delete_layer_collection_from_parent(
3✔
315
    transaction: &Transaction<'_>,
3✔
316
    collection: &LayerCollectionId,
3✔
317
    parent: &LayerCollectionId,
3✔
318
) -> Result<()> {
3✔
319
    let collection_uuid =
3✔
320
        Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
321
            found: collection.0.clone(),
×
322
        })?;
3✔
323

324
    let parent_collection_uuid =
3✔
325
        Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
326
            found: parent.0.clone(),
×
327
        })?;
3✔
328

329
    let remove_layer_collection_stmt = transaction
3✔
330
        .prepare(
3✔
331
            "DELETE FROM collection_children
3✔
332
             WHERE child = $1
3✔
333
             AND parent = $2;",
3✔
334
        )
3✔
335
        .await?;
3✔
336
    let num_results = transaction
3✔
337
        .execute(
3✔
338
            &remove_layer_collection_stmt,
3✔
339
            &[&collection_uuid, &parent_collection_uuid],
3✔
340
        )
3✔
341
        .await?;
3✔
342

343
    if num_results == 0 {
3✔
344
        return Err(LayerDbError::NoCollectionForGivenIdInCollection {
×
345
            collection: collection.clone(),
×
346
            parent: parent.clone(),
×
347
        }
×
348
        .into());
×
349
    }
3✔
350

3✔
351
    _remove_collections_without_parent_collection(transaction).await?;
11✔
352

353
    _remove_layers_without_parent_collection(transaction).await?;
6✔
354

355
    Ok(())
3✔
356
}
3✔
357

358
#[async_trait]
359
impl<Tls> LayerDb for PostgresDb<Tls>
360
where
361
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
362
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
363
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
364
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
365
{
366
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
14✔
367
        let layer_id = Uuid::new_v4();
14✔
368
        let layer_id = LayerId(layer_id.to_string());
14✔
369

14✔
370
        self.add_layer_with_id(&layer_id, layer, collection).await?;
507✔
371

372
        Ok(layer_id)
14✔
373
    }
28✔
374

375
    async fn add_layer_with_id(
14✔
376
        &self,
14✔
377
        id: &LayerId,
14✔
378
        layer: AddLayer,
14✔
379
        collection: &LayerCollectionId,
14✔
380
    ) -> Result<()> {
14✔
381
        let mut conn = self.conn_pool.get().await?;
14✔
382

383
        let trans = conn.build_transaction().start().await?;
14✔
384

385
        insert_layer(&trans, id, layer, collection).await?;
465✔
386

387
        trans.commit().await?;
14✔
388

389
        Ok(())
14✔
390
    }
28✔
391

392
    async fn add_layer_to_collection(
3✔
393
        &self,
3✔
394
        layer: &LayerId,
3✔
395
        collection: &LayerCollectionId,
3✔
396
    ) -> Result<()> {
3✔
397
        let layer_id =
2✔
398
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
399
                found: layer.0.clone(),
1✔
400
            })?;
3✔
401

402
        let collection_id =
2✔
403
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
404
                found: collection.0.clone(),
×
405
            })?;
2✔
406

407
        let conn = self.conn_pool.get().await?;
2✔
408

409
        let stmt = conn
2✔
410
            .prepare(
2✔
411
                "
2✔
412
            INSERT INTO collection_layers (collection, layer)
2✔
413
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
2✔
414
            )
2✔
415
            .await?;
2✔
416

417
        conn.execute(&stmt, &[&collection_id, &layer_id]).await?;
2✔
418

419
        Ok(())
2✔
420
    }
6✔
421

422
    async fn add_layer_collection(
20✔
423
        &self,
20✔
424
        collection: AddLayerCollection,
20✔
425
        parent: &LayerCollectionId,
20✔
426
    ) -> Result<LayerCollectionId> {
20✔
427
        let collection_id = Uuid::new_v4();
20✔
428
        let collection_id = LayerCollectionId(collection_id.to_string());
20✔
429

20✔
430
        self.add_layer_collection_with_id(&collection_id, collection, parent)
20✔
431
            .await?;
179✔
432

433
        Ok(collection_id)
20✔
434
    }
40✔
435

436
    async fn add_layer_collection_with_id(
20✔
437
        &self,
20✔
438
        id: &LayerCollectionId,
20✔
439
        collection: AddLayerCollection,
20✔
440
        parent: &LayerCollectionId,
20✔
441
    ) -> Result<()> {
20✔
442
        let mut conn = self.conn_pool.get().await?;
20✔
443

444
        let trans = conn.build_transaction().start().await?;
20✔
445

446
        insert_layer_collection_with_id(&trans, id, collection, parent).await?;
119✔
447

448
        trans.commit().await?;
20✔
449

450
        Ok(())
20✔
451
    }
40✔
452

453
    async fn add_collection_to_parent(
2✔
454
        &self,
2✔
455
        collection: &LayerCollectionId,
2✔
456
        parent: &LayerCollectionId,
2✔
457
    ) -> Result<()> {
2✔
458
        let conn = self.conn_pool.get().await?;
2✔
459
        insert_collection_parent(&conn, collection, parent).await
4✔
460
    }
4✔
461

462
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
4✔
463
        let mut conn = self.conn_pool.get().await?;
4✔
464
        let transaction = conn.transaction().await?;
4✔
465

466
        delete_layer_collection(&transaction, collection).await?;
12✔
467

468
        transaction.commit().await.map_err(Into::into)
3✔
469
    }
8✔
470

471
    async fn remove_layer_from_collection(
3✔
472
        &self,
3✔
473
        layer: &LayerId,
3✔
474
        collection: &LayerCollectionId,
3✔
475
    ) -> Result<()> {
3✔
476
        let mut conn = self.conn_pool.get().await?;
3✔
477
        let transaction = conn.transaction().await?;
3✔
478

479
        delete_layer_from_collection(&transaction, layer, collection).await?;
12✔
480

481
        transaction.commit().await.map_err(Into::into)
3✔
482
    }
6✔
483

484
    async fn remove_layer_collection_from_parent(
2✔
485
        &self,
2✔
486
        collection: &LayerCollectionId,
2✔
487
        parent: &LayerCollectionId,
2✔
488
    ) -> Result<()> {
2✔
489
        let mut conn = self.conn_pool.get().await?;
2✔
490
        let transaction = conn.transaction().await?;
2✔
491

492
        delete_layer_collection_from_parent(&transaction, collection, parent).await?;
15✔
493

494
        transaction.commit().await.map_err(Into::into)
2✔
495
    }
4✔
496
}
497

498
#[async_trait]
499
impl<Tls> LayerCollectionProvider for PostgresDb<Tls>
500
where
501
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
502
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
503
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
504
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
505
{
506
    #[allow(clippy::too_many_lines)]
507
    async fn load_layer_collection(
18✔
508
        &self,
18✔
509
        collection_id: &LayerCollectionId,
18✔
510
        options: LayerCollectionListOptions,
18✔
511
    ) -> Result<LayerCollection> {
18✔
512
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
18✔
513
            crate::error::Error::IdStringMustBeUuid {
×
514
                found: collection_id.0.clone(),
×
515
            }
×
516
        })?;
18✔
517

518
        let conn = self.conn_pool.get().await?;
18✔
519

520
        let stmt = conn
18✔
521
            .prepare(
18✔
522
                "
18✔
523
        SELECT DISTINCT name, description, properties
18✔
524
        FROM layer_collections
18✔
525
        WHERE id = $1;",
18✔
526
            )
18✔
527
            .await?;
18✔
528

529
        let row = conn.query_one(&stmt, &[&collection]).await?;
18✔
530

531
        let name: String = row.get(0);
14✔
532
        let description: String = row.get(1);
14✔
533
        let properties: Vec<Property> = row.get(2);
14✔
534

535
        let stmt = conn
14✔
536
            .prepare(
14✔
537
                "
14✔
538
        SELECT DISTINCT id, name, description, properties, is_layer
14✔
539
        FROM (
14✔
540
            SELECT 
14✔
541
                concat(id, '') AS id, 
14✔
542
                name, 
14✔
543
                description, 
14✔
544
                properties, 
14✔
545
                FALSE AS is_layer
14✔
546
            FROM layer_collections
14✔
547
                JOIN collection_children cc ON (id = cc.child)
14✔
548
            WHERE cc.parent = $1
14✔
549
        ) u UNION (
14✔
550
            SELECT 
14✔
551
                concat(id, '') AS id, 
14✔
552
                name, 
14✔
553
                description, 
14✔
554
                properties, 
14✔
555
                TRUE AS is_layer
14✔
556
            FROM layers uc
14✔
557
                JOIN collection_layers cl ON (id = cl.layer)
14✔
558
            WHERE cl.collection = $1
14✔
559
        )
14✔
560
        ORDER BY is_layer ASC, name ASC
14✔
561
        LIMIT $2 
14✔
562
        OFFSET $3;            
14✔
563
        ",
14✔
564
            )
14✔
565
            .await?;
14✔
566

567
        let rows = conn
14✔
568
            .query(
14✔
569
                &stmt,
14✔
570
                &[
14✔
571
                    &collection,
14✔
572
                    &i64::from(options.limit),
14✔
573
                    &i64::from(options.offset),
14✔
574
                ],
14✔
575
            )
14✔
576
            .await?;
14✔
577

578
        let items = rows
14✔
579
            .into_iter()
14✔
580
            .map(|row| {
16✔
581
                let is_layer: bool = row.get(4);
16✔
582

16✔
583
                if is_layer {
16✔
584
                    Ok(CollectionItem::Layer(LayerListing {
7✔
585
                        id: ProviderLayerId {
7✔
586
                            provider_id: INTERNAL_PROVIDER_ID,
7✔
587
                            layer_id: LayerId(row.get(0)),
7✔
588
                        },
7✔
589
                        name: row.get(1),
7✔
590
                        description: row.get(2),
7✔
591
                        properties: row.get(3),
7✔
592
                    }))
7✔
593
                } else {
594
                    Ok(CollectionItem::Collection(LayerCollectionListing {
9✔
595
                        id: ProviderLayerCollectionId {
9✔
596
                            provider_id: INTERNAL_PROVIDER_ID,
9✔
597
                            collection_id: LayerCollectionId(row.get(0)),
9✔
598
                        },
9✔
599
                        name: row.get(1),
9✔
600
                        description: row.get(2),
9✔
601
                        properties: row.get(3),
9✔
602
                    }))
9✔
603
                }
604
            })
16✔
605
            .collect::<Result<Vec<CollectionItem>>>()?;
14✔
606

607
        Ok(LayerCollection {
14✔
608
            id: ProviderLayerCollectionId {
14✔
609
                provider_id: INTERNAL_PROVIDER_ID,
14✔
610
                collection_id: collection_id.clone(),
14✔
611
            },
14✔
612
            name,
14✔
613
            description,
14✔
614
            items,
14✔
615
            entry_label: None,
14✔
616
            properties,
14✔
617
        })
14✔
618
    }
36✔
619

620
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
17✔
621
        Ok(LayerCollectionId(
17✔
622
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
17✔
623
        ))
17✔
624
    }
17✔
625

626
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
17✔
627
        let layer_id =
17✔
628
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
17✔
629
                found: id.0.clone(),
×
630
            })?;
17✔
631

632
        let conn = self.conn_pool.get().await?;
17✔
633

634
        let stmt = conn
17✔
635
            .prepare(
17✔
636
                "
17✔
637
            SELECT 
17✔
638
                l.name,
17✔
639
                l.description,
17✔
640
                w.workflow,
17✔
641
                l.symbology,
17✔
642
                l.properties,
17✔
643
                l.metadata
17✔
644
            FROM 
17✔
645
                layers l JOIN workflows w ON (l.workflow_id = w.id)
17✔
646
            WHERE 
17✔
647
                l.id = $1;",
17✔
648
            )
17✔
649
            .await?;
16✔
650

651
        let row = conn
17✔
652
            .query_one(&stmt, &[&layer_id])
17✔
653
            .await
16✔
654
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
17✔
655

656
        Ok(Layer {
657
            id: ProviderLayerId {
13✔
658
                provider_id: INTERNAL_PROVIDER_ID,
13✔
659
                layer_id: id.clone(),
13✔
660
            },
13✔
661
            name: row.get(0),
13✔
662
            description: row.get(1),
13✔
663
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
13✔
664
            symbology: row.get(3),
13✔
665
            properties: row.get(4),
13✔
666
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
13✔
667
        })
668
    }
34✔
669
}
670

671
#[async_trait]
672
impl<Tls> LayerProviderDb for PostgresDb<Tls>
673
where
674
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
675
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
676
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
677
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
678
{
679
    async fn add_layer_provider(
5✔
680
        &self,
5✔
681
        provider: TypedDataProviderDefinition,
5✔
682
    ) -> Result<DataProviderId> {
5✔
683
        let conn = self.conn_pool.get().await?;
5✔
684

685
        let stmt = conn
5✔
686
            .prepare(
5✔
687
                "
5✔
688
              INSERT INTO layer_providers (
5✔
689
                  id, 
5✔
690
                  type_name, 
5✔
691
                  name,
5✔
692
                  definition
5✔
693
              )
5✔
694
              VALUES ($1, $2, $3, $4)",
5✔
695
            )
5✔
696
            .await?;
129✔
697

698
        let id = provider.id();
5✔
699
        conn.execute(
5✔
700
            &stmt,
5✔
701
            &[&id, &provider.type_name(), &provider.name(), &provider],
5✔
702
        )
5✔
703
        .await?;
5✔
704
        Ok(id)
5✔
705
    }
10✔
706

707
    async fn list_layer_providers(
1✔
708
        &self,
1✔
709
        options: LayerProviderListingOptions,
1✔
710
    ) -> Result<Vec<LayerProviderListing>> {
1✔
711
        // TODO: permission
712
        let conn = self.conn_pool.get().await?;
1✔
713

714
        let stmt = conn
1✔
715
            .prepare(
1✔
716
                "
1✔
717
            SELECT 
1✔
718
                id, 
1✔
719
                name,
1✔
720
                type_name
1✔
721
            FROM 
1✔
722
                layer_providers
1✔
723
            ORDER BY name ASC
1✔
724
            LIMIT $1 
1✔
725
            OFFSET $2;",
1✔
726
            )
1✔
727
            .await?;
1✔
728

729
        let rows = conn
1✔
730
            .query(
1✔
731
                &stmt,
1✔
732
                &[&i64::from(options.limit), &i64::from(options.offset)],
1✔
733
            )
1✔
734
            .await?;
1✔
735

736
        Ok(rows
1✔
737
            .iter()
1✔
738
            .map(|row| LayerProviderListing {
1✔
739
                id: row.get(0),
1✔
740
                name: row.get(1),
1✔
741
                description: row.get(2),
1✔
742
            })
1✔
743
            .collect())
1✔
744
    }
2✔
745

746
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
12✔
747
        // TODO: permissions
748
        let conn = self.conn_pool.get().await?;
12✔
749

750
        let stmt = conn
12✔
751
            .prepare(
12✔
752
                "
12✔
753
               SELECT 
12✔
754
                   definition
12✔
755
               FROM 
12✔
756
                   layer_providers
12✔
757
               WHERE
12✔
758
                   id = $1",
12✔
759
            )
12✔
760
            .await?;
12✔
761

762
        let row = conn.query_one(&stmt, &[&id]).await?;
12✔
763

764
        let definition: TypedDataProviderDefinition = row.get(0);
7✔
765

7✔
766
        Box::new(definition).initialize().await
7✔
767
    }
24✔
768
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc