• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12767614094

14 Jan 2025 12:26PM UTC coverage: 90.64% (+0.06%) from 90.576%
12767614094

push

github

web-flow
Merge pull request #1006 from geo-engine/migrate-pro-api

Migrate-pro-api

1106 of 1152 new or added lines in 24 files covered. (96.01%)

248 existing lines in 13 files now uncovered.

133501 of 147287 relevant lines covered (90.64%)

54652.85 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.57
/services/src/contexts/mod.rs
1
use crate::api::model::services::Volume;
2
use crate::datasets::external::netcdfcf::NetCdfCfProviderDb;
3
use crate::datasets::storage::DatasetDb;
4
use crate::error::Result;
5
use crate::layers::listing::LayerCollectionProvider;
6
use crate::layers::storage::{LayerDb, LayerProviderDb};
7
use crate::machine_learning::MlModelDb;
8
use crate::tasks::{TaskContext, TaskManager};
9
use crate::{projects::ProjectDb, workflows::registry::WorkflowRegistry};
10
use async_trait::async_trait;
11
use geoengine_datatypes::dataset::{DataId, DataProviderId, ExternalDataId, LayerId, NamedData};
12
use geoengine_datatypes::machine_learning::{MlModelMetadata, MlModelName};
13
use geoengine_datatypes::primitives::{RasterQueryRectangle, VectorQueryRectangle};
14
use geoengine_datatypes::raster::TilingSpecification;
15
use geoengine_operators::cache::shared_cache::SharedCache;
16
use geoengine_operators::engine::{
17
    ChunkByteSize, CreateSpan, ExecutionContext, InitializedPlotOperator,
18
    InitializedVectorOperator, MetaData, MetaDataProvider, QueryAbortRegistration,
19
    QueryAbortTrigger, QueryContext, RasterResultDescriptor, VectorResultDescriptor,
20
};
21
use geoengine_operators::meta::quota::{QuotaChecker, QuotaTracking};
22
use geoengine_operators::mock::MockDatasetDataSourceLoadingInfo;
23
use geoengine_operators::source::{GdalLoadingInfo, OgrSourceDataset};
24
use rayon::ThreadPool;
25
use std::str::FromStr;
26
use std::sync::Arc;
27
use tokio::sync::RwLock;
28
use uuid::Uuid;
29

30
pub use migrations::{
31
    initialize_database, migrate_database, migration_0000_initial::Migration0000Initial,
32
    CurrentSchemaMigration, DatabaseVersion, Migration, Migration0001RasterStacks,
33
    Migration0002DatasetListingProvider, Migration0003GbifConfig,
34
    Migration0004DatasetListingProviderPrio, Migration0005GbifColumnSelection,
35
    Migration0006EbvProvider, Migration0007OwnerRole, Migration0008BandNames,
36
    Migration0009OidcTokens, Migration0010S2StacTimeBuffers, Migration0011RemoveXgb,
37
    Migration0012MlModelDb, Migration0013CopernicusProvider, Migration0014MultibandColorizer,
38
    Migration0015LogQuota, MigrationResult,
39
};
40
pub use postgres::{PostgresContext, PostgresDb, PostgresSessionContext};
41
pub use session::{MockableSession, Session, SessionId, SimpleSession};
42
pub use simple_context::SimpleApplicationContext;
43

44
mod db_types;
45
pub(crate) mod migrations;
46
mod postgres;
47
mod session;
48
mod simple_context;
49

50
pub type Db<T> = Arc<RwLock<T>>;
51

52
/// The application context bundles shared resources.
53
/// It is passed to API handlers and allows creating a session context that provides access to resources.
54
#[async_trait]
55
pub trait ApplicationContext: 'static + Send + Sync + Clone {
56
    type SessionContext: SessionContext;
57
    type Session: Session + Clone;
58

59
    /// Create a new session context for the given session.
60
    fn session_context(&self, session: Self::Session) -> Self::SessionContext;
61

62
    /// Load a session by its id
63
    async fn session_by_id(&self, session_id: SessionId) -> Result<Self::Session>;
64
}
65

66
/// The session context bundles resources that are specific to a session.
67
#[async_trait]
68
pub trait SessionContext: 'static + Send + Sync + Clone {
69
    type Session: Session + Clone;
70
    type GeoEngineDB: GeoEngineDb;
71
    type QueryContext: QueryContext;
72
    type ExecutionContext: ExecutionContext;
73
    type TaskContext: TaskContext;
74
    type TaskManager: TaskManager<Self::TaskContext>;
75

76
    /// Get the db for accessing resources
77
    fn db(&self) -> Self::GeoEngineDB;
78

79
    /// Get the task manager for accessing tasks
80
    fn tasks(&self) -> Self::TaskManager;
81

82
    /// Create a new query context for executing queries on processors
83
    // TODO: assign computation id inside SessionContext or let it be provided from outside?
84
    fn query_context(&self, workflow: Uuid, computation: Uuid) -> Result<Self::QueryContext>;
85

86
    /// Create a new execution context initializing operators
87
    fn execution_context(&self) -> Result<Self::ExecutionContext>;
88

89
    /// Get the list of available data volumes
90
    fn volumes(&self) -> Result<Vec<Volume>>;
91

92
    /// Get the current session
93
    fn session(&self) -> &Self::Session;
94
}
95

96
/// The trait for accessing all resources
97
pub trait GeoEngineDb:
98
    DatasetDb
99
    + LayerDb
100
    + LayerProviderDb
101
    + LayerCollectionProvider
102
    + ProjectDb
103
    + WorkflowRegistry
104
    + NetCdfCfProviderDb
105
    + MlModelDb
106
    + std::fmt::Debug
107
{
108
}
109

110
pub struct QueryContextImpl {
111
    chunk_byte_size: ChunkByteSize,
112
    thread_pool: Arc<ThreadPool>,
113
    cache: Option<Arc<SharedCache>>,
114
    quota_tracking: Option<QuotaTracking>,
115
    quota_checker: Option<QuotaChecker>,
116
    abort_registration: QueryAbortRegistration,
117
    abort_trigger: Option<QueryAbortTrigger>,
118
}
119

120
impl QueryContextImpl {
121
    pub fn new(chunk_byte_size: ChunkByteSize, thread_pool: Arc<ThreadPool>) -> Self {
1✔
122
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
1✔
123
        QueryContextImpl {
1✔
124
            chunk_byte_size,
1✔
125
            thread_pool,
1✔
126
            cache: None,
1✔
127
            quota_tracking: None,
1✔
128
            quota_checker: None,
1✔
129
            abort_registration,
1✔
130
            abort_trigger: Some(abort_trigger),
1✔
131
        }
1✔
132
    }
1✔
133

134
    pub fn new_with_extensions(
30✔
135
        chunk_byte_size: ChunkByteSize,
30✔
136
        thread_pool: Arc<ThreadPool>,
30✔
137
        cache: Option<Arc<SharedCache>>,
30✔
138
        quota_tracking: Option<QuotaTracking>,
30✔
139
        quota_checker: Option<QuotaChecker>,
30✔
140
    ) -> Self {
30✔
141
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
30✔
142
        QueryContextImpl {
30✔
143
            chunk_byte_size,
30✔
144
            thread_pool,
30✔
145
            cache,
30✔
146
            quota_checker,
30✔
147
            quota_tracking,
30✔
148
            abort_registration,
30✔
149
            abort_trigger: Some(abort_trigger),
30✔
150
        }
30✔
151
    }
30✔
152
}
153

154
impl QueryContext for QueryContextImpl {
155
    fn chunk_byte_size(&self) -> ChunkByteSize {
4✔
156
        self.chunk_byte_size
4✔
157
    }
4✔
158

159
    fn thread_pool(&self) -> &Arc<ThreadPool> {
4✔
160
        &self.thread_pool
4✔
161
    }
4✔
162

163
    fn abort_registration(&self) -> &QueryAbortRegistration {
139✔
164
        &self.abort_registration
139✔
165
    }
139✔
166

167
    fn abort_trigger(&mut self) -> geoengine_operators::util::Result<QueryAbortTrigger> {
30✔
168
        self.abort_trigger
30✔
169
            .take()
30✔
170
            .ok_or(geoengine_operators::error::Error::AbortTriggerAlreadyUsed)
30✔
171
    }
30✔
172

173
    fn quota_tracking(&self) -> Option<&geoengine_operators::meta::quota::QuotaTracking> {
36✔
174
        self.quota_tracking.as_ref()
36✔
175
    }
36✔
176

177
    fn quota_checker(&self) -> Option<&geoengine_operators::meta::quota::QuotaChecker> {
37✔
178
        self.quota_checker.as_ref()
37✔
179
    }
37✔
180

181
    fn cache(&self) -> Option<Arc<geoengine_operators::cache::shared_cache::SharedCache>> {
×
182
        self.cache.clone()
×
183
    }
×
184
}
185

186
pub struct ExecutionContextImpl<D>
187
where
188
    D: DatasetDb + LayerProviderDb + MlModelDb,
189
{
190
    db: D,
191
    thread_pool: Arc<ThreadPool>,
192
    tiling_specification: TilingSpecification,
193
}
194

195
impl<D> ExecutionContextImpl<D>
196
where
197
    D: DatasetDb + LayerProviderDb + MlModelDb,
198
{
199
    pub fn new(
2✔
200
        db: D,
2✔
201
        thread_pool: Arc<ThreadPool>,
2✔
202
        tiling_specification: TilingSpecification,
2✔
203
    ) -> Self {
2✔
204
        Self {
2✔
205
            db,
2✔
206
            thread_pool,
2✔
207
            tiling_specification,
2✔
208
        }
2✔
209
    }
2✔
210
}
211

212
#[async_trait::async_trait]
213
impl<D> ExecutionContext for ExecutionContextImpl<D>
214
where
215
    D: DatasetDb
216
        + MetaDataProvider<
217
            MockDatasetDataSourceLoadingInfo,
218
            VectorResultDescriptor,
219
            VectorQueryRectangle,
220
        > + MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
221
        + MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
222
        + LayerProviderDb
223
        + MlModelDb,
224
{
225
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
226
        &self.thread_pool
×
227
    }
×
228

229
    fn tiling_specification(&self) -> TilingSpecification {
2✔
230
        self.tiling_specification
2✔
231
    }
2✔
232

233
    fn wrap_initialized_raster_operator(
5✔
234
        &self,
5✔
235
        op: Box<dyn geoengine_operators::engine::InitializedRasterOperator>,
5✔
236
        _span: CreateSpan,
5✔
237
    ) -> Box<dyn geoengine_operators::engine::InitializedRasterOperator> {
5✔
238
        op
5✔
239
    }
5✔
240

241
    fn wrap_initialized_vector_operator(
1✔
242
        &self,
1✔
243
        op: Box<dyn InitializedVectorOperator>,
1✔
244
        _span: CreateSpan,
1✔
245
    ) -> Box<dyn InitializedVectorOperator> {
1✔
246
        op
1✔
247
    }
1✔
248

249
    fn wrap_initialized_plot_operator(
1✔
250
        &self,
1✔
251
        op: Box<dyn InitializedPlotOperator>,
1✔
252
        _span: CreateSpan,
1✔
253
    ) -> Box<dyn InitializedPlotOperator> {
1✔
254
        op
1✔
255
    }
1✔
256

257
    async fn resolve_named_data(
258
        &self,
259
        data: &NamedData,
260
    ) -> Result<DataId, geoengine_operators::error::Error> {
2✔
261
        if let Some(provider) = &data.provider {
2✔
262
            // TODO: resolve provider name to provider id
263
            let provider_id = DataProviderId::from_str(provider)?;
1✔
264

265
            let data_id = ExternalDataId {
1✔
266
                provider_id,
1✔
267
                layer_id: LayerId(data.name.clone()),
1✔
268
            };
1✔
269

1✔
270
            return Ok(data_id.into());
1✔
271
        }
1✔
272

273
        let dataset_id = self
1✔
274
            .db
1✔
275
            .resolve_dataset_name_to_id(&data.into())
1✔
276
            .await
1✔
277
            .map_err(
1✔
278
                |source| geoengine_operators::error::Error::CannotResolveDatasetName {
1✔
279
                    name: data.clone(),
×
280
                    source: Box::new(source),
×
281
                },
1✔
282
            )?;
1✔
283

284
        // handle the case where the dataset name is not known
285
        let dataset_id = dataset_id
1✔
286
            .ok_or(geoengine_operators::error::Error::UnknownDatasetName { name: data.clone() })?;
1✔
287

288
        Ok(dataset_id.into())
1✔
289
    }
4✔
290

291
    async fn ml_model_metadata(
292
        &self,
293
        name: &MlModelName,
294
    ) -> Result<MlModelMetadata, geoengine_operators::error::Error> {
×
295
        self.db
×
296
            .load_model(&(name.clone().into()))
×
297
            .await
×
298
            .map_err(
×
299
                |source| geoengine_operators::error::Error::CannotResolveMlModelName {
×
300
                    name: name.clone(),
×
301
                    source: Box::new(source),
×
302
                },
×
303
            )?
×
304
            .metadata_for_operator()
×
305
            .map_err(
×
306
                |source| geoengine_operators::error::Error::LoadingMlMetadataFailed {
×
307
                    source: Box::new(source),
×
308
                },
×
309
            )
×
310
    }
×
311
}
312

313
// TODO: use macro(?) for delegating meta_data function to DatasetDB to avoid redundant code
314
#[async_trait]
315
impl<D>
316
    MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
317
    for ExecutionContextImpl<D>
318
where
319
    D: DatasetDb
320
        + MetaDataProvider<
321
            MockDatasetDataSourceLoadingInfo,
322
            VectorResultDescriptor,
323
            VectorQueryRectangle,
324
        > + LayerProviderDb
325
        + MlModelDb,
326
{
327
    async fn meta_data(
328
        &self,
329
        data_id: &DataId,
330
    ) -> Result<
331
        Box<
332
            dyn MetaData<
333
                MockDatasetDataSourceLoadingInfo,
334
                VectorResultDescriptor,
335
                VectorQueryRectangle,
336
            >,
337
        >,
338
        geoengine_operators::error::Error,
339
    > {
×
340
        match data_id {
×
341
            DataId::Internal { dataset_id: _ } => {
342
                self.db.meta_data(&data_id.clone()).await.map_err(|e| {
×
343
                    geoengine_operators::error::Error::LoadingInfo {
×
344
                        source: Box::new(e),
×
345
                    }
×
346
                })
×
347
            }
348
            DataId::External(external) => {
×
349
                self.db
×
350
                    .load_layer_provider(external.provider_id)
×
351
                    .await
×
352
                    .map_err(|e| geoengine_operators::error::Error::DatasetMetaData {
×
353
                        source: Box::new(e),
×
354
                    })?
×
355
                    .meta_data(data_id)
×
356
                    .await
×
357
            }
358
        }
359
    }
×
360
}
361

362
// TODO: use macro(?) for delegating meta_data function to DatasetDB to avoid redundant code
363
#[async_trait]
364
impl<D> MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
365
    for ExecutionContextImpl<D>
366
where
367
    D: DatasetDb
368
        + MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
369
        + LayerProviderDb
370
        + MlModelDb,
371
{
372
    async fn meta_data(
373
        &self,
374
        data_id: &DataId,
375
    ) -> Result<
376
        Box<dyn MetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>,
377
        geoengine_operators::error::Error,
UNCOV
378
    > {
×
UNCOV
379
        match data_id {
×
380
            DataId::Internal { dataset_id: _ } => {
UNCOV
381
                self.db.meta_data(&data_id.clone()).await.map_err(|e| {
×
382
                    geoengine_operators::error::Error::LoadingInfo {
×
383
                        source: Box::new(e),
×
384
                    }
×
UNCOV
385
                })
×
386
            }
387
            DataId::External(external) => {
×
388
                self.db
×
389
                    .load_layer_provider(external.provider_id)
×
390
                    .await
×
391
                    .map_err(|e| geoengine_operators::error::Error::DatasetMetaData {
×
392
                        source: Box::new(e),
×
393
                    })?
×
394
                    .meta_data(data_id)
×
395
                    .await
×
396
            }
397
        }
UNCOV
398
    }
×
399
}
400

401
// TODO: use macro(?) for delegating meta_data function to DatasetDB to avoid redundant code
402
#[async_trait]
403
impl<D> MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
404
    for ExecutionContextImpl<D>
405
where
406
    D: DatasetDb
407
        + MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
408
        + LayerProviderDb
409
        + MlModelDb,
410
{
411
    async fn meta_data(
412
        &self,
413
        data_id: &DataId,
414
    ) -> Result<
415
        Box<dyn MetaData<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>>,
416
        geoengine_operators::error::Error,
417
    > {
2✔
418
        match data_id {
2✔
419
            DataId::Internal { dataset_id: _ } => {
420
                self.db.meta_data(&data_id.clone()).await.map_err(|e| {
1✔
421
                    geoengine_operators::error::Error::LoadingInfo {
×
422
                        source: Box::new(e),
×
423
                    }
×
424
                })
1✔
425
            }
426
            DataId::External(external) => {
1✔
427
                self.db
1✔
428
                    .load_layer_provider(external.provider_id)
1✔
429
                    .await
1✔
430
                    .map_err(|e| geoengine_operators::error::Error::DatasetMetaData {
1✔
431
                        source: Box::new(e),
×
432
                    })?
1✔
433
                    .meta_data(data_id)
1✔
434
                    .await
1✔
435
            }
436
        }
437
    }
4✔
438
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc