• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 6038596548

31 Aug 2023 02:19PM UTC coverage: 90.097% (+0.06%) from 90.041%
6038596548

push

github

web-flow
Merge pull request #866 from geo-engine/provider-def-mapping

refactor provider defs to pro/non-pro

991 of 991 new or added lines in 21 files covered. (100.0%)

106888 of 118637 relevant lines covered (90.1%)

61031.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.97
/services/src/pro/layers/postgres_layer_db.rs
1
use crate::api::model::datatypes::{DataProviderId, LayerId};
2
use crate::api::model::HashMapTextTextDbType;
3
use crate::error;
4
use crate::layers::external::TypedDataProviderDefinition;
5
use crate::layers::layer::Property;
6
use crate::pro::contexts::ProPostgresDb;
7
use crate::pro::datasets::TypedProDataProviderDefinition;
8
use crate::pro::permissions::{Permission, PermissionDb, RoleId};
9
use crate::workflows::workflow::WorkflowId;
10
use crate::{
11
    error::Result,
12
    layers::{
13
        external::{DataProvider, DataProviderDefinition},
14
        layer::{
15
            AddLayer, AddLayerCollection, CollectionItem, Layer, LayerCollection,
16
            LayerCollectionListOptions, LayerCollectionListing, LayerListing,
17
            ProviderLayerCollectionId, ProviderLayerId,
18
        },
19
        listing::{LayerCollectionId, LayerCollectionProvider},
20
        storage::{
21
            LayerDb, LayerProviderDb, LayerProviderListing, LayerProviderListingOptions,
22
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID, INTERNAL_PROVIDER_ID,
23
        },
24
        LayerDbError,
25
    },
26
};
27
use async_trait::async_trait;
28
use bb8_postgres::tokio_postgres::{
29
    tls::{MakeTlsConnect, TlsConnect},
30
    Socket,
31
};
32
use snafu::{ensure, ResultExt};
33
use std::str::FromStr;
34
use uuid::Uuid;
35

36
/// delete all collections without parent collection
37
async fn _remove_collections_without_parent_collection(
3✔
38
    transaction: &tokio_postgres::Transaction<'_>,
3✔
39
) -> Result<()> {
3✔
40
    // HINT: a recursive delete statement seems reasonable, but hard to implement in postgres
41
    //       because you have a graph with potential loops
42

43
    let remove_layer_collections_without_parents_stmt = transaction
3✔
44
        .prepare(
3✔
45
            "DELETE FROM layer_collections
3✔
46
                 WHERE  id <> $1 -- do not delete root collection
3✔
47
                 AND    id NOT IN (
3✔
48
                    SELECT child FROM collection_children
3✔
49
                 );",
3✔
50
        )
3✔
51
        .await?;
3✔
52
    while 0 < transaction
5✔
53
        .execute(
5✔
54
            &remove_layer_collections_without_parents_stmt,
5✔
55
            &[&INTERNAL_LAYER_DB_ROOT_COLLECTION_ID],
5✔
56
        )
5✔
57
        .await?
5✔
58
    {
2✔
59
        // whenever one collection is deleted, we have to check again if there are more
2✔
60
        // collections without parents
2✔
61
    }
2✔
62

63
    Ok(())
3✔
64
}
3✔
65

66
/// delete all layers without parent collection
67
async fn _remove_layers_without_parent_collection(
5✔
68
    transaction: &tokio_postgres::Transaction<'_>,
5✔
69
) -> Result<()> {
5✔
70
    let remove_layers_without_parents_stmt = transaction
5✔
71
        .prepare(
5✔
72
            "DELETE FROM layers
5✔
73
                 WHERE id NOT IN (
5✔
74
                    SELECT layer FROM collection_layers
5✔
75
                 );",
5✔
76
        )
5✔
77
        .await?;
5✔
78
    transaction
5✔
79
        .execute(&remove_layers_without_parents_stmt, &[])
5✔
80
        .await?;
5✔
81

82
    Ok(())
5✔
83
}
5✔
84

85
#[async_trait]
86
impl<Tls> LayerDb for ProPostgresDb<Tls>
87
where
88
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
89
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
90
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
91
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
92
{
93
    async fn add_layer(&self, layer: AddLayer, collection: &LayerCollectionId) -> Result<LayerId> {
6✔
94
        let layer_id = Uuid::new_v4();
6✔
95
        let layer_id = LayerId(layer_id.to_string());
6✔
96

6✔
97
        self.add_layer_with_id(&layer_id, layer, collection).await?;
122✔
98

99
        Ok(layer_id)
6✔
100
    }
12✔
101

102
    async fn add_layer_with_id(
6✔
103
        &self,
6✔
104
        id: &LayerId,
6✔
105
        layer: AddLayer,
6✔
106
        collection: &LayerCollectionId,
6✔
107
    ) -> Result<()> {
6✔
108
        ensure!(
6✔
109
            self.has_permission(collection.clone(), Permission::Owner)
6✔
110
                .await?,
12✔
111
            error::PermissionDenied
×
112
        );
113

114
        let layer_id =
6✔
115
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
6✔
116
                found: collection.0.clone(),
×
117
            })?;
6✔
118

119
        let collection_id =
6✔
120
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
6✔
121
                found: collection.0.clone(),
×
122
            })?;
6✔
123

124
        let mut conn = self.conn_pool.get().await?;
6✔
125

126
        let layer = layer;
6✔
127

6✔
128
        let workflow_id = WorkflowId::from_hash(&layer.workflow);
6✔
129

130
        let trans = conn.build_transaction().start().await?;
6✔
131

132
        let stmt = trans
6✔
133
            .prepare(
6✔
134
                "INSERT INTO workflows (id, workflow) VALUES ($1, $2) 
6✔
135
            ON CONFLICT DO NOTHING;",
6✔
136
            )
6✔
137
            .await?;
4✔
138

139
        trans
140
            .execute(
141
                &stmt,
6✔
142
                &[
6✔
143
                    &workflow_id,
6✔
144
                    &serde_json::to_value(&layer.workflow).context(error::SerdeJson)?,
6✔
145
                ],
146
            )
147
            .await?;
4✔
148

149
        let stmt = trans
6✔
150
            .prepare(
6✔
151
                "
6✔
152
            INSERT INTO layers (id, name, description, workflow_id, symbology, properties, metadata)
6✔
153
            VALUES ($1, $2, $3, $4, $5, $6, $7);",
6✔
154
            )
6✔
155
            .await?;
66✔
156

157
        trans
6✔
158
            .execute(
6✔
159
                &stmt,
6✔
160
                &[
6✔
161
                    &layer_id,
6✔
162
                    &layer.name,
6✔
163
                    &layer.description,
6✔
164
                    &workflow_id,
6✔
165
                    &layer.symbology,
6✔
166
                    &layer.properties,
6✔
167
                    &HashMapTextTextDbType::from(&layer.metadata),
6✔
168
                ],
6✔
169
            )
6✔
170
            .await?;
4✔
171

172
        let stmt = trans
6✔
173
            .prepare(
6✔
174
                "
6✔
175
            INSERT INTO collection_layers (collection, layer)
6✔
176
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
6✔
177
            )
6✔
178
            .await?;
4✔
179

180
        trans.execute(&stmt, &[&collection_id, &layer_id]).await?;
6✔
181

182
        // TODO: `ON CONFLICT DO NOTHING` means, we do not get an error if the permission already exists.
183
        //       Do we want that, or should we report an error and let the caller decide whether to ignore it?
184
        //       We should decide that and adjust all places where `ON CONFILCT DO NOTHING` is used.
185
        let stmt = trans
6✔
186
            .prepare(
6✔
187
                "
6✔
188
            INSERT INTO permissions (role_id, permission, layer_id)
6✔
189
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
6✔
190
            )
6✔
191
            .await?;
5✔
192

193
        trans
6✔
194
            .execute(
6✔
195
                &stmt,
6✔
196
                &[
6✔
197
                    &RoleId::from(self.session.user.id),
6✔
198
                    &Permission::Owner,
6✔
199
                    &layer_id,
6✔
200
                ],
6✔
201
            )
6✔
202
            .await?;
5✔
203

204
        trans.commit().await?;
6✔
205

206
        Ok(())
6✔
207
    }
12✔
208

209
    async fn add_layer_to_collection(
2✔
210
        &self,
2✔
211
        layer: &LayerId,
2✔
212
        collection: &LayerCollectionId,
2✔
213
    ) -> Result<()> {
2✔
214
        ensure!(
2✔
215
            self.has_permission(collection.clone(), Permission::Owner)
2✔
216
                .await?,
6✔
217
            error::PermissionDenied
×
218
        );
219

220
        let layer_id =
1✔
221
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
222
                found: layer.0.clone(),
1✔
223
            })?;
2✔
224

225
        let collection_id =
1✔
226
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
227
                found: collection.0.clone(),
×
228
            })?;
1✔
229

230
        let conn = self.conn_pool.get().await?;
1✔
231

232
        let stmt = conn
1✔
233
            .prepare(
1✔
234
                "
1✔
235
            INSERT INTO collection_layers (collection, layer)
1✔
236
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
1✔
237
            )
1✔
238
            .await?;
1✔
239

240
        conn.execute(&stmt, &[&collection_id, &layer_id]).await?;
1✔
241

242
        Ok(())
1✔
243
    }
4✔
244

245
    async fn add_layer_collection(
12✔
246
        &self,
12✔
247
        collection: AddLayerCollection,
12✔
248
        parent: &LayerCollectionId,
12✔
249
    ) -> Result<LayerCollectionId> {
12✔
250
        let collection_id = Uuid::new_v4();
12✔
251
        let collection_id = LayerCollectionId(collection_id.to_string());
12✔
252

12✔
253
        self.add_layer_collection_with_id(&collection_id, collection, parent)
12✔
254
            .await?;
149✔
255

256
        Ok(collection_id)
10✔
257
    }
24✔
258

259
    async fn add_layer_collection_with_id(
12✔
260
        &self,
12✔
261
        id: &LayerCollectionId,
12✔
262
        collection: AddLayerCollection,
12✔
263
        parent: &LayerCollectionId,
12✔
264
    ) -> Result<()> {
12✔
265
        ensure!(
12✔
266
            self.has_permission(parent.clone(), Permission::Owner)
12✔
267
                .await?,
52✔
268
            error::PermissionDenied
2✔
269
        );
270

271
        let collection_id =
10✔
272
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
10✔
273
                found: id.0.clone(),
×
274
            })?;
10✔
275

276
        let parent =
10✔
277
            Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
10✔
278
                found: parent.0.clone(),
×
279
            })?;
10✔
280

281
        let mut conn = self.conn_pool.get().await?;
10✔
282

283
        let trans = conn.build_transaction().start().await?;
10✔
284

285
        let stmt = trans
10✔
286
            .prepare(
10✔
287
                "
10✔
288
            INSERT INTO layer_collections (id, name, description, properties)
10✔
289
            VALUES ($1, $2, $3, $4);",
10✔
290
            )
10✔
291
            .await?;
25✔
292

293
        trans
10✔
294
            .execute(
10✔
295
                &stmt,
10✔
296
                &[
10✔
297
                    &collection_id,
10✔
298
                    &collection.name,
10✔
299
                    &collection.description,
10✔
300
                    &collection.properties,
10✔
301
                ],
10✔
302
            )
10✔
303
            .await?;
9✔
304

305
        let stmt = trans
10✔
306
            .prepare(
10✔
307
                "
10✔
308
            INSERT INTO collection_children (parent, child)
10✔
309
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
10✔
310
            )
10✔
311
            .await?;
9✔
312

313
        trans.execute(&stmt, &[&parent, &collection_id]).await?;
10✔
314

315
        let stmt = trans
10✔
316
            .prepare(
10✔
317
                "
10✔
318
            INSERT INTO permissions (role_id, permission, layer_collection_id)
10✔
319
            VALUES ($1, $2, $3) ON CONFLICT DO NOTHING;",
10✔
320
            )
10✔
321
            .await?;
9✔
322

323
        trans
10✔
324
            .execute(
10✔
325
                &stmt,
10✔
326
                &[
10✔
327
                    &RoleId::from(self.session.user.id),
10✔
328
                    &Permission::Owner,
10✔
329
                    &collection_id,
10✔
330
                ],
10✔
331
            )
10✔
332
            .await?;
9✔
333

334
        trans.commit().await?;
10✔
335

336
        Ok(())
10✔
337
    }
24✔
338

339
    async fn add_collection_to_parent(
1✔
340
        &self,
1✔
341
        collection: &LayerCollectionId,
1✔
342
        parent: &LayerCollectionId,
1✔
343
    ) -> Result<()> {
1✔
344
        ensure!(
1✔
345
            self.has_permission(collection.clone(), Permission::Owner)
1✔
346
                .await?,
3✔
347
            error::PermissionDenied
×
348
        );
349

350
        let collection =
1✔
351
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
352
                found: collection.0.clone(),
×
353
            })?;
1✔
354

355
        let parent =
1✔
356
            Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
357
                found: parent.0.clone(),
×
358
            })?;
1✔
359

360
        let conn = self.conn_pool.get().await?;
1✔
361

362
        let stmt = conn
1✔
363
            .prepare(
1✔
364
                "
1✔
365
            INSERT INTO collection_children (parent, child)
1✔
366
            VALUES ($1, $2) ON CONFLICT DO NOTHING;",
1✔
367
            )
1✔
368
            .await?;
1✔
369

370
        conn.execute(&stmt, &[&parent, &collection]).await?;
1✔
371

372
        Ok(())
1✔
373
    }
2✔
374

375
    async fn remove_layer_collection(&self, collection: &LayerCollectionId) -> Result<()> {
3✔
376
        ensure!(
3✔
377
            self.has_permission(collection.clone(), Permission::Owner)
3✔
378
                .await?,
9✔
379
            error::PermissionDenied
×
380
        );
381

382
        let collection =
3✔
383
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
3✔
384
                found: collection.0.clone(),
×
385
            })?;
3✔
386

387
        if collection == INTERNAL_LAYER_DB_ROOT_COLLECTION_ID {
3✔
388
            return Err(LayerDbError::CannotRemoveRootCollection.into());
1✔
389
        }
2✔
390

391
        let mut conn = self.conn_pool.get().await?;
2✔
392
        let transaction = conn.transaction().await?;
2✔
393

394
        // delete the collection!
395
        // on delete cascade removes all entries from `collection_children` and `collection_layers`
396

397
        let remove_layer_collection_stmt = transaction
2✔
398
            .prepare(
2✔
399
                "DELETE FROM layer_collections
2✔
400
                 WHERE id = $1;",
2✔
401
            )
2✔
402
            .await?;
2✔
403
        transaction
2✔
404
            .execute(&remove_layer_collection_stmt, &[&collection])
2✔
405
            .await?;
2✔
406

407
        _remove_collections_without_parent_collection(&transaction).await?;
4✔
408

409
        _remove_layers_without_parent_collection(&transaction).await?;
4✔
410

411
        transaction.commit().await.map_err(Into::into)
2✔
412
    }
6✔
413

414
    async fn remove_layer_from_collection(
2✔
415
        &self,
2✔
416
        layer: &LayerId,
2✔
417
        collection: &LayerCollectionId,
2✔
418
    ) -> Result<()> {
2✔
419
        ensure!(
2✔
420
            self.has_permission(layer.clone(), Permission::Owner)
2✔
421
                .await?,
6✔
422
            error::PermissionDenied
×
423
        );
424

425
        let collection_uuid =
2✔
426
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
427
                found: collection.0.clone(),
×
428
            })?;
2✔
429

430
        let layer_uuid =
2✔
431
            Uuid::from_str(&layer.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
432
                found: layer.0.clone(),
×
433
            })?;
2✔
434

435
        let mut conn = self.conn_pool.get().await?;
2✔
436
        let transaction = conn.transaction().await?;
2✔
437

438
        let remove_layer_collection_stmt = transaction
2✔
439
            .prepare(
2✔
440
                "DELETE FROM collection_layers
2✔
441
                 WHERE collection = $1
2✔
442
                 AND layer = $2;",
2✔
443
            )
2✔
444
            .await?;
2✔
445
        let num_results = transaction
2✔
446
            .execute(
2✔
447
                &remove_layer_collection_stmt,
2✔
448
                &[&collection_uuid, &layer_uuid],
2✔
449
            )
2✔
450
            .await?;
2✔
451

452
        if num_results == 0 {
2✔
453
            return Err(LayerDbError::NoLayerForGivenIdInCollection {
×
454
                layer: layer.clone(),
×
455
                collection: collection.clone(),
×
456
            }
×
457
            .into());
×
458
        }
2✔
459

2✔
460
        _remove_layers_without_parent_collection(&transaction).await?;
4✔
461

462
        transaction.commit().await.map_err(Into::into)
2✔
463
    }
4✔
464

465
    async fn remove_layer_collection_from_parent(
1✔
466
        &self,
1✔
467
        collection: &LayerCollectionId,
1✔
468
        parent: &LayerCollectionId,
1✔
469
    ) -> Result<()> {
1✔
470
        ensure!(
1✔
471
            self.has_permission(collection.clone(), Permission::Owner)
1✔
472
                .await?,
3✔
473
            error::PermissionDenied
×
474
        );
475

476
        let collection_uuid =
1✔
477
            Uuid::from_str(&collection.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
478
                found: collection.0.clone(),
×
479
            })?;
1✔
480

481
        let parent_collection_uuid =
1✔
482
            Uuid::from_str(&parent.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
1✔
483
                found: parent.0.clone(),
×
484
            })?;
1✔
485

486
        let mut conn = self.conn_pool.get().await?;
1✔
487
        let transaction = conn.transaction().await?;
1✔
488

489
        let remove_layer_collection_stmt = transaction
1✔
490
            .prepare(
1✔
491
                "DELETE FROM collection_children
1✔
492
                 WHERE child = $1
1✔
493
                 AND parent = $2;",
1✔
494
            )
1✔
495
            .await?;
1✔
496
        let num_results = transaction
1✔
497
            .execute(
1✔
498
                &remove_layer_collection_stmt,
1✔
499
                &[&collection_uuid, &parent_collection_uuid],
1✔
500
            )
1✔
501
            .await?;
1✔
502

503
        if num_results == 0 {
1✔
504
            return Err(LayerDbError::NoCollectionForGivenIdInCollection {
×
505
                collection: collection.clone(),
×
506
                parent: parent.clone(),
×
507
            }
×
508
            .into());
×
509
        }
1✔
510

1✔
511
        _remove_collections_without_parent_collection(&transaction).await?;
4✔
512

513
        _remove_layers_without_parent_collection(&transaction).await?;
2✔
514

515
        transaction.commit().await.map_err(Into::into)
1✔
516
    }
2✔
517
}
518

519
#[async_trait]
520
impl<Tls> LayerCollectionProvider for ProPostgresDb<Tls>
521
where
522
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
523
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
524
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
525
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
526
{
527
    #[allow(clippy::too_many_lines)]
528
    async fn load_layer_collection(
15✔
529
        &self,
15✔
530
        collection_id: &LayerCollectionId,
15✔
531
        options: LayerCollectionListOptions,
15✔
532
    ) -> Result<LayerCollection> {
15✔
533
        ensure!(
15✔
534
            self.has_permission(collection_id.clone(), Permission::Read)
15✔
535
                .await?,
45✔
536
            error::PermissionDenied
3✔
537
        );
538
        let collection = Uuid::from_str(&collection_id.0).map_err(|_| {
12✔
539
            crate::error::Error::IdStringMustBeUuid {
×
540
                found: collection_id.0.clone(),
×
541
            }
×
542
        })?;
12✔
543

544
        let conn = self.conn_pool.get().await?;
12✔
545

546
        let stmt = conn
12✔
547
            .prepare(
12✔
548
                "
12✔
549
        SELECT DISTINCT name, description, properties
12✔
550
        FROM user_permitted_layer_collections p 
12✔
551
            JOIN layer_collections c ON (p.layer_collection_id = c.id) 
12✔
552
        WHERE p.user_id = $1 AND layer_collection_id = $2;",
12✔
553
            )
12✔
554
            .await?;
12✔
555

556
        let row = conn
12✔
557
            .query_one(&stmt, &[&self.session.user.id, &collection])
12✔
558
            .await?;
12✔
559

560
        let name: String = row.get(0);
12✔
561
        let description: String = row.get(1);
12✔
562
        let properties: Vec<Property> = row.get(2);
12✔
563

564
        let stmt = conn
12✔
565
            .prepare(
12✔
566
                "
12✔
567
        SELECT DISTINCT id, name, description, properties, is_layer
12✔
568
        FROM (
12✔
569
            SELECT 
12✔
570
                concat(id, '') AS id, 
12✔
571
                name, 
12✔
572
                description, 
12✔
573
                properties, 
12✔
574
                FALSE AS is_layer
12✔
575
            FROM user_permitted_layer_collections u 
12✔
576
                JOIN layer_collections lc ON (u.layer_collection_id = lc.id)
12✔
577
                JOIN collection_children cc ON (layer_collection_id = cc.child)
12✔
578
            WHERE u.user_id = $4 AND cc.parent = $1
12✔
579
        ) u UNION (
12✔
580
            SELECT 
12✔
581
                concat(id, '') AS id, 
12✔
582
                name, 
12✔
583
                description, 
12✔
584
                properties, 
12✔
585
                TRUE AS is_layer
12✔
586
            FROM user_permitted_layers ul
12✔
587
                JOIN layers uc ON (ul.layer_id = uc.id) 
12✔
588
                JOIN collection_layers cl ON (layer_id = cl.layer)
12✔
589
            WHERE ul.user_id = $4 AND cl.collection = $1
12✔
590
        )
12✔
591
        ORDER BY is_layer ASC, name ASC
12✔
592
        LIMIT $2 
12✔
593
        OFFSET $3;            
12✔
594
        ",
12✔
595
            )
12✔
596
            .await?;
12✔
597

598
        let rows = conn
12✔
599
            .query(
12✔
600
                &stmt,
12✔
601
                &[
12✔
602
                    &collection,
12✔
603
                    &i64::from(options.limit),
12✔
604
                    &i64::from(options.offset),
12✔
605
                    &self.session.user.id,
12✔
606
                ],
12✔
607
            )
12✔
608
            .await?;
12✔
609

610
        let items = rows
12✔
611
            .into_iter()
12✔
612
            .map(|row| {
15✔
613
                let is_layer: bool = row.get(4);
15✔
614

15✔
615
                if is_layer {
15✔
616
                    Ok(CollectionItem::Layer(LayerListing {
5✔
617
                        id: ProviderLayerId {
5✔
618
                            provider_id: INTERNAL_PROVIDER_ID,
5✔
619
                            layer_id: LayerId(row.get(0)),
5✔
620
                        },
5✔
621
                        name: row.get(1),
5✔
622
                        description: row.get(2),
5✔
623
                        properties: row.get(3),
5✔
624
                    }))
5✔
625
                } else {
626
                    Ok(CollectionItem::Collection(LayerCollectionListing {
10✔
627
                        id: ProviderLayerCollectionId {
10✔
628
                            provider_id: INTERNAL_PROVIDER_ID,
10✔
629
                            collection_id: LayerCollectionId(row.get(0)),
10✔
630
                        },
10✔
631
                        name: row.get(1),
10✔
632
                        description: row.get(2),
10✔
633
                        properties: row.get(3),
10✔
634
                    }))
10✔
635
                }
636
            })
15✔
637
            .collect::<Result<Vec<CollectionItem>>>()?;
12✔
638

639
        Ok(LayerCollection {
12✔
640
            id: ProviderLayerCollectionId {
12✔
641
                provider_id: INTERNAL_PROVIDER_ID,
12✔
642
                collection_id: collection_id.clone(),
12✔
643
            },
12✔
644
            name,
12✔
645
            description,
12✔
646
            items,
12✔
647
            entry_label: None,
12✔
648
            properties,
12✔
649
        })
12✔
650
    }
30✔
651

652
    async fn get_root_layer_collection_id(&self) -> Result<LayerCollectionId> {
6✔
653
        Ok(LayerCollectionId(
6✔
654
            INTERNAL_LAYER_DB_ROOT_COLLECTION_ID.to_string(),
6✔
655
        ))
6✔
656
    }
6✔
657

658
    async fn load_layer(&self, id: &LayerId) -> Result<Layer> {
5✔
659
        ensure!(
5✔
660
            self.has_permission(id.clone(), Permission::Read).await?,
14✔
661
            error::PermissionDenied
3✔
662
        );
663

664
        let layer_id =
2✔
665
            Uuid::from_str(&id.0).map_err(|_| crate::error::Error::IdStringMustBeUuid {
2✔
666
                found: id.0.clone(),
×
667
            })?;
2✔
668

669
        let conn = self.conn_pool.get().await?;
2✔
670

671
        let stmt = conn
2✔
672
            .prepare(
2✔
673
                "
2✔
674
            SELECT 
2✔
675
                l.name,
2✔
676
                l.description,
2✔
677
                w.workflow,
2✔
678
                l.symbology,
2✔
679
                l.properties,
2✔
680
                l.metadata
2✔
681
            FROM 
2✔
682
                layers l JOIN workflows w ON (l.workflow_id = w.id)
2✔
683
            WHERE 
2✔
684
                l.id = $1;",
2✔
685
            )
2✔
686
            .await?;
2✔
687

688
        let row = conn
2✔
689
            .query_one(&stmt, &[&layer_id])
2✔
690
            .await
2✔
691
            .map_err(|_error| LayerDbError::NoLayerForGivenId { id: id.clone() })?;
2✔
692

693
        Ok(Layer {
694
            id: ProviderLayerId {
2✔
695
                provider_id: INTERNAL_PROVIDER_ID,
2✔
696
                layer_id: id.clone(),
2✔
697
            },
2✔
698
            name: row.get(0),
2✔
699
            description: row.get(1),
2✔
700
            workflow: serde_json::from_value(row.get(2)).context(crate::error::SerdeJson)?,
2✔
701
            symbology: row.get(3),
2✔
702
            properties: row.get(4),
2✔
703
            metadata: row.get::<_, HashMapTextTextDbType>(5).into(),
2✔
704
        })
705
    }
10✔
706
}
707

708
#[async_trait]
709
impl<Tls> LayerProviderDb for ProPostgresDb<Tls>
710
where
711
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
712
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
713
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
714
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
715
{
716
    async fn add_layer_provider(
1✔
717
        &self,
1✔
718
        provider: TypedDataProviderDefinition,
1✔
719
    ) -> Result<DataProviderId> {
1✔
720
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
721

722
        let conn = self.conn_pool.get().await?;
1✔
723

724
        let stmt = conn
1✔
725
            .prepare(
1✔
726
                "
1✔
727
              INSERT INTO layer_providers (
1✔
728
                  id, 
1✔
729
                  type_name, 
1✔
730
                  name,
1✔
731
                  definition
1✔
732
              )
1✔
733
              VALUES ($1, $2, $3, $4)",
1✔
734
            )
1✔
735
            .await?;
19✔
736

737
        let id = provider.id();
1✔
738
        conn.execute(
1✔
739
            &stmt,
1✔
740
            &[&id, &provider.type_name(), &provider.name(), &provider],
1✔
741
        )
1✔
742
        .await?;
1✔
743
        Ok(id)
1✔
744
    }
2✔
745

746
    async fn list_layer_providers(
2✔
747
        &self,
2✔
748
        options: LayerProviderListingOptions,
2✔
749
    ) -> Result<Vec<LayerProviderListing>> {
2✔
750
        // TODO: permission
751
        let conn = self.conn_pool.get().await?;
2✔
752

753
        let stmt = conn
2✔
754
            .prepare(
2✔
755
                "(
2✔
756
                    SELECT 
2✔
757
                        id, 
2✔
758
                        name,
2✔
759
                        type_name
2✔
760
                    FROM 
2✔
761
                        layer_providers
2✔
762
                    UNION ALL
2✔
763
                    SELECT 
2✔
764
                        id, 
2✔
765
                        name,
2✔
766
                        type_name
2✔
767
                    FROM 
2✔
768
                        pro_layer_providers
2✔
769
                )
2✔
770
                ORDER BY name ASC
2✔
771
                LIMIT $1 
2✔
772
                OFFSET $2;",
2✔
773
            )
2✔
774
            .await?;
1✔
775

776
        let rows = conn
2✔
777
            .query(
2✔
778
                &stmt,
2✔
779
                &[&i64::from(options.limit), &i64::from(options.offset)],
2✔
780
            )
2✔
781
            .await?;
1✔
782

783
        Ok(rows
2✔
784
            .iter()
2✔
785
            .map(|row| LayerProviderListing {
2✔
786
                id: row.get(0),
2✔
787
                name: row.get(1),
2✔
788
                description: row.get(2),
2✔
789
            })
2✔
790
            .collect())
2✔
791
    }
4✔
792

793
    async fn load_layer_provider(&self, id: DataProviderId) -> Result<Box<dyn DataProvider>> {
2✔
794
        // TODO: permissions
795
        let conn = self.conn_pool.get().await?;
2✔
796

797
        let stmt = conn
2✔
798
            .prepare(
2✔
799
                "SELECT
2✔
800
                    definition, NULL AS pro_definition
2✔
801
                FROM
2✔
802
                    layer_providers
2✔
803
                WHERE
2✔
804
                    id = $1
2✔
805
                UNION ALL
2✔
806
                SELECT
2✔
807
                    NULL AS definition, definition AS pro_definition
2✔
808
                FROM
2✔
809
                    pro_layer_providers
2✔
810
                WHERE
2✔
811
                    id = $1",
2✔
812
            )
2✔
813
            .await?;
18✔
814

815
        let row = conn.query_one(&stmt, &[&id]).await?;
2✔
816

817
        if let Some(definition) = row.get::<_, Option<TypedDataProviderDefinition>>(0) {
2✔
818
            return Box::new(definition).initialize().await;
1✔
819
        }
1✔
820

1✔
821
        let pro_definition: TypedProDataProviderDefinition = row.get(1);
1✔
822
        Box::new(pro_definition).initialize().await
1✔
823
    }
4✔
824
}
825

826
#[async_trait]
827
pub trait ProLayerProviderDb: Send + Sync + 'static {
828
    async fn add_pro_layer_provider(
829
        &self,
830
        provider: TypedProDataProviderDefinition,
831
    ) -> Result<DataProviderId>;
832
}
833

834
#[async_trait]
835
impl<Tls> ProLayerProviderDb for ProPostgresDb<Tls>
836
where
837
    Tls: MakeTlsConnect<Socket> + Clone + Send + Sync + 'static,
838
    <Tls as MakeTlsConnect<Socket>>::Stream: Send + Sync,
839
    <Tls as MakeTlsConnect<Socket>>::TlsConnect: Send,
840
    <<Tls as MakeTlsConnect<Socket>>::TlsConnect as TlsConnect<Socket>>::Future: Send,
841
{
842
    async fn add_pro_layer_provider(
1✔
843
        &self,
1✔
844
        provider: TypedProDataProviderDefinition,
1✔
845
    ) -> Result<DataProviderId> {
1✔
846
        ensure!(self.session.is_admin(), error::PermissionDenied);
1✔
847

848
        let conn = self.conn_pool.get().await?;
1✔
849

850
        let stmt = conn
1✔
851
            .prepare(
1✔
852
                "
1✔
853
              INSERT INTO pro_layer_providers (
1✔
854
                  id, 
1✔
855
                  type_name, 
1✔
856
                  name,
1✔
857
                  definition
1✔
858
              )
1✔
859
              VALUES ($1, $2, $3, $4)",
1✔
860
            )
1✔
861
            .await?;
×
862

863
        let id = provider.id();
1✔
864
        conn.execute(
1✔
865
            &stmt,
1✔
866
            &[&id, &provider.type_name(), &provider.name(), &provider],
1✔
867
        )
1✔
868
        .await?;
1✔
869
        Ok(id)
1✔
870
    }
2✔
871
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc