• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10796512144

10 Sep 2024 03:50PM UTC coverage: 91.037% (+0.03%) from 91.005%
10796512144

push

github

web-flow
Merge pull request #980 from geo-engine/remove-pro-feature

remove pro from datatypes and operators

60 of 118 new or added lines in 7 files covered. (50.85%)

8 existing lines in 5 files now uncovered.

133668 of 146828 relevant lines covered (91.04%)

52534.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.89
/operators/src/engine/execution_context.rs
1
use super::query::QueryAbortRegistration;
2
use super::{
3
    CreateSpan, InitializedPlotOperator, InitializedRasterOperator, InitializedVectorOperator,
4
    MockQueryContext, WorkflowOperatorPath,
5
};
6
use crate::engine::{
7
    ChunkByteSize, RasterResultDescriptor, ResultDescriptor, VectorResultDescriptor,
8
};
9
use crate::error::Error;
10
use crate::meta::wrapper::InitializedOperatorWrapper;
11
use crate::mock::MockDatasetDataSourceLoadingInfo;
12
use crate::source::{GdalLoadingInfo, OgrSourceDataset};
13
use crate::util::{create_rayon_thread_pool, Result};
14
use async_trait::async_trait;
15
use geoengine_datatypes::dataset::{DataId, NamedData};
16
use geoengine_datatypes::machine_learning::{MlModelMetadata, MlModelName};
17
use geoengine_datatypes::primitives::{RasterQueryRectangle, VectorQueryRectangle};
18
use geoengine_datatypes::raster::TilingSpecification;
19
use geoengine_datatypes::util::test::TestDefault;
20
use rayon::ThreadPool;
21
use serde::{Deserialize, Serialize};
22
use std::any::Any;
23
use std::collections::HashMap;
24
use std::fmt::Debug;
25
use std::marker::PhantomData;
26
use std::sync::Arc;
27

28
/// A context that provides certain utility access during operator initialization
29
#[async_trait::async_trait]
30
pub trait ExecutionContext: Send
31
    + Sync
32
    + MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
33
    + MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
34
    + MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
35
{
36
    fn thread_pool(&self) -> &Arc<ThreadPool>;
37
    fn tiling_specification(&self) -> TilingSpecification;
38

39
    fn wrap_initialized_raster_operator(
40
        &self,
41
        op: Box<dyn InitializedRasterOperator>,
42
        span: CreateSpan,
43
        path: WorkflowOperatorPath, // TODO: remove and allow operators to tell its path
44
    ) -> Box<dyn InitializedRasterOperator>;
45

46
    fn wrap_initialized_vector_operator(
47
        &self,
48
        op: Box<dyn InitializedVectorOperator>,
49
        span: CreateSpan,
50
        path: WorkflowOperatorPath,
51
    ) -> Box<dyn InitializedVectorOperator>;
52

53
    fn wrap_initialized_plot_operator(
54
        &self,
55
        op: Box<dyn InitializedPlotOperator>,
56
        span: CreateSpan,
57
        path: WorkflowOperatorPath,
58
    ) -> Box<dyn InitializedPlotOperator>;
59

60
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId>;
61

62
    async fn ml_model_metadata(&self, name: &MlModelName) -> Result<MlModelMetadata>;
63
}
64

65
#[async_trait]
66
pub trait MetaDataProvider<L, R, Q>
67
where
68
    R: ResultDescriptor,
69
{
70
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>>;
71
}
72

73
#[async_trait]
74
pub trait MetaData<L, R, Q>: Debug + Send + Sync
75
where
76
    R: ResultDescriptor,
77
{
78
    async fn loading_info(&self, query: Q) -> Result<L>;
79
    async fn result_descriptor(&self) -> Result<R>;
80

81
    fn box_clone(&self) -> Box<dyn MetaData<L, R, Q>>;
82
}
83

84
impl<L, R, Q> Clone for Box<dyn MetaData<L, R, Q>>
85
where
86
    R: ResultDescriptor,
87
{
88
    fn clone(&self) -> Box<dyn MetaData<L, R, Q>> {
125✔
89
        self.box_clone()
125✔
90
    }
125✔
91
}
92

93
pub struct MockExecutionContext {
94
    pub thread_pool: Arc<ThreadPool>,
95
    pub meta_data: HashMap<DataId, Box<dyn Any + Send + Sync>>,
96
    pub named_data: HashMap<NamedData, DataId>,
97
    pub ml_models: HashMap<MlModelName, MlModelMetadata>,
98
    pub tiling_specification: TilingSpecification,
99
}
100

101
impl TestDefault for MockExecutionContext {
102
    fn test_default() -> Self {
144✔
103
        Self {
144✔
104
            thread_pool: create_rayon_thread_pool(0),
144✔
105
            meta_data: HashMap::default(),
144✔
106
            named_data: HashMap::default(),
144✔
107
            ml_models: HashMap::default(),
144✔
108
            tiling_specification: TilingSpecification::test_default(),
144✔
109
        }
144✔
110
    }
144✔
111
}
112

113
impl MockExecutionContext {
114
    pub fn new_with_tiling_spec(tiling_specification: TilingSpecification) -> Self {
131✔
115
        Self {
131✔
116
            thread_pool: create_rayon_thread_pool(0),
131✔
117
            meta_data: HashMap::default(),
131✔
118
            named_data: HashMap::default(),
131✔
119
            ml_models: HashMap::default(),
131✔
120
            tiling_specification,
131✔
121
        }
131✔
122
    }
131✔
123

124
    pub fn new_with_tiling_spec_and_thread_count(
×
125
        tiling_specification: TilingSpecification,
×
126
        num_threads: usize,
×
127
    ) -> Self {
×
128
        Self {
×
129
            thread_pool: create_rayon_thread_pool(num_threads),
×
130
            meta_data: HashMap::default(),
×
131
            named_data: HashMap::default(),
×
132
            ml_models: HashMap::default(),
×
133
            tiling_specification,
×
134
        }
×
135
    }
×
136

137
    pub fn add_meta_data<L, R, Q>(
49✔
138
        &mut self,
49✔
139
        data: DataId,
49✔
140
        named_data: NamedData,
49✔
141
        meta_data: Box<dyn MetaData<L, R, Q>>,
49✔
142
    ) where
49✔
143
        L: Send + Sync + 'static,
49✔
144
        R: Send + Sync + 'static + ResultDescriptor,
49✔
145
        Q: Send + Sync + 'static,
49✔
146
    {
49✔
147
        self.meta_data.insert(
49✔
148
            data.clone(),
49✔
149
            Box::new(meta_data) as Box<dyn Any + Send + Sync>,
49✔
150
        );
49✔
151

49✔
152
        self.named_data.insert(named_data, data);
49✔
153
    }
49✔
154

155
    pub fn delete_meta_data(&mut self, named_data: &NamedData) {
2✔
156
        let data = self.named_data.remove(named_data);
2✔
157
        if let Some(data) = data {
2✔
158
            self.meta_data.remove(&data);
2✔
159
        }
2✔
160
    }
2✔
161

162
    pub fn mock_query_context(&self, chunk_byte_size: ChunkByteSize) -> MockQueryContext {
3✔
163
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
3✔
164
        MockQueryContext {
3✔
165
            chunk_byte_size,
3✔
166
            thread_pool: self.thread_pool.clone(),
3✔
167
            cache: None,
3✔
168
            quota_checker: None,
3✔
169
            quota_tracking: None,
3✔
170
            abort_registration,
3✔
171
            abort_trigger: Some(abort_trigger),
3✔
172
        }
3✔
173
    }
3✔
174
}
175

176
#[async_trait::async_trait]
177
impl ExecutionContext for MockExecutionContext {
178
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
179
        &self.thread_pool
×
180
    }
×
181

182
    fn tiling_specification(&self) -> TilingSpecification {
247✔
183
        self.tiling_specification
247✔
184
    }
247✔
185

186
    fn wrap_initialized_raster_operator(
320✔
187
        &self,
320✔
188
        op: Box<dyn InitializedRasterOperator>,
320✔
189
        _span: CreateSpan,
320✔
190
        _path: WorkflowOperatorPath,
320✔
191
    ) -> Box<dyn InitializedRasterOperator> {
320✔
192
        op
320✔
193
    }
320✔
194

195
    fn wrap_initialized_vector_operator(
168✔
196
        &self,
168✔
197
        op: Box<dyn InitializedVectorOperator>,
168✔
198
        _span: CreateSpan,
168✔
199
        _path: WorkflowOperatorPath,
168✔
200
    ) -> Box<dyn InitializedVectorOperator> {
168✔
201
        op
168✔
202
    }
168✔
203

204
    fn wrap_initialized_plot_operator(
53✔
205
        &self,
53✔
206
        op: Box<dyn InitializedPlotOperator>,
53✔
207
        _span: CreateSpan,
53✔
208
        _path: WorkflowOperatorPath,
53✔
209
    ) -> Box<dyn InitializedPlotOperator> {
53✔
210
        op
53✔
211
    }
53✔
212

213
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId> {
53✔
214
        self.named_data
53✔
215
            .get(data)
53✔
216
            .cloned()
53✔
217
            .ok_or_else(|| Error::UnknownDatasetName { name: data.clone() })
53✔
218
    }
53✔
219

220
    async fn ml_model_metadata(&self, name: &MlModelName) -> Result<MlModelMetadata> {
2✔
221
        self.ml_models
2✔
222
            .get(name)
2✔
223
            .cloned()
2✔
224
            .ok_or_else(|| Error::UnknownMlModelName { name: name.clone() })
2✔
225
    }
2✔
226
}
227

228
#[async_trait]
229
impl<L, R, Q> MetaDataProvider<L, R, Q> for MockExecutionContext
230
where
231
    L: 'static,
232
    R: 'static + ResultDescriptor,
233
    Q: 'static,
234
{
235
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>> {
53✔
236
        let meta_data = self
53✔
237
            .meta_data
53✔
238
            .get(id)
53✔
239
            .ok_or(Error::UnknownDataId)?
53✔
240
            .downcast_ref::<Box<dyn MetaData<L, R, Q>>>()
53✔
241
            .ok_or(Error::InvalidMetaDataType)?;
53✔
242

53✔
243
        Ok(meta_data.clone())
53✔
244
    }
53✔
245
}
246

247
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
3✔
248
#[serde(rename_all = "camelCase")]
249
pub struct StaticMetaData<L, R, Q>
250
where
251
    L: Debug + Clone + Send + Sync + 'static,
252
    R: Debug + Send + Sync + 'static + ResultDescriptor,
253
    Q: Debug + Clone + Send + Sync + 'static,
254
{
255
    pub loading_info: L,
256
    pub result_descriptor: R,
257
    #[serde(skip)]
258
    pub phantom: PhantomData<Q>,
259
}
260

261
#[async_trait]
262
impl<L, R, Q> MetaData<L, R, Q> for StaticMetaData<L, R, Q>
263
where
264
    L: Debug + Clone + Send + Sync + 'static,
265
    R: Debug + Send + Sync + 'static + ResultDescriptor,
266
    Q: Debug + Clone + Send + Sync + 'static,
267
{
268
    async fn loading_info(&self, _query: Q) -> Result<L> {
61✔
269
        Ok(self.loading_info.clone())
61✔
270
    }
61✔
271

272
    async fn result_descriptor(&self) -> Result<R> {
36✔
273
        Ok(self.result_descriptor.clone())
36✔
274
    }
36✔
275

276
    fn box_clone(&self) -> Box<dyn MetaData<L, R, Q>> {
49✔
277
        Box::new(self.clone())
49✔
278
    }
49✔
279
}
280

281
mod db_types {
282
    use geoengine_datatypes::delegate_from_to_sql;
283
    use postgres_types::{FromSql, ToSql};
284

285
    use super::*;
286

287
    pub type MockMetaData = StaticMetaData<
288
        MockDatasetDataSourceLoadingInfo,
289
        VectorResultDescriptor,
290
        VectorQueryRectangle,
291
    >;
292

293
    #[derive(Debug, ToSql, FromSql)]
485✔
294
    #[postgres(name = "MockMetaData")]
295
    pub struct MockMetaDataDbType {
296
        pub loading_info: MockDatasetDataSourceLoadingInfo,
297
        pub result_descriptor: VectorResultDescriptor,
298
    }
299

300
    impl From<&MockMetaData> for MockMetaDataDbType {
301
        fn from(other: &MockMetaData) -> Self {
2✔
302
            Self {
2✔
303
                loading_info: other.loading_info.clone(),
2✔
304
                result_descriptor: other.result_descriptor.clone(),
2✔
305
            }
2✔
306
        }
2✔
307
    }
308

309
    impl TryFrom<MockMetaDataDbType> for MockMetaData {
310
        type Error = Error;
311

312
        fn try_from(other: MockMetaDataDbType) -> Result<Self, Self::Error> {
2✔
313
            Ok(Self {
2✔
314
                loading_info: other.loading_info,
2✔
315
                result_descriptor: other.result_descriptor,
2✔
316
                phantom: PhantomData,
2✔
317
            })
2✔
318
        }
2✔
319
    }
320

321
    pub type OgrMetaData =
322
        StaticMetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>;
323

324
    #[derive(Debug, ToSql, FromSql)]
497✔
325
    #[postgres(name = "OgrMetaData")]
326
    pub struct OgrMetaDataDbType {
327
        pub loading_info: OgrSourceDataset,
328
        pub result_descriptor: VectorResultDescriptor,
329
    }
330

331
    impl From<&StaticMetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>
332
        for OgrMetaDataDbType
333
    {
334
        fn from(other: &OgrMetaData) -> Self {
33✔
335
            Self {
33✔
336
                loading_info: other.loading_info.clone(),
33✔
337
                result_descriptor: other.result_descriptor.clone(),
33✔
338
            }
33✔
339
        }
33✔
340
    }
341

342
    impl TryFrom<OgrMetaDataDbType> for OgrMetaData {
343
        type Error = Error;
344

345
        fn try_from(other: OgrMetaDataDbType) -> Result<Self, Self::Error> {
14✔
346
            Ok(Self {
14✔
347
                loading_info: other.loading_info,
14✔
348
                result_descriptor: other.result_descriptor,
14✔
349
                phantom: PhantomData,
14✔
350
            })
14✔
351
        }
14✔
352
    }
353

354
    delegate_from_to_sql!(MockMetaData, MockMetaDataDbType);
355
    delegate_from_to_sql!(OgrMetaData, OgrMetaDataDbType);
356
}
357

358
/// A mock execution context that wraps all operators with a statistics operator.
359
pub struct StatisticsWrappingMockExecutionContext {
360
    pub inner: MockExecutionContext,
361
}
362

363
impl TestDefault for StatisticsWrappingMockExecutionContext {
NEW
364
    fn test_default() -> Self {
×
NEW
365
        Self {
×
NEW
366
            inner: MockExecutionContext::test_default(),
×
NEW
367
        }
×
NEW
368
    }
×
369
}
370

371
#[async_trait::async_trait]
372
impl ExecutionContext for StatisticsWrappingMockExecutionContext {
NEW
373
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
NEW
374
        &self.inner.thread_pool
×
NEW
375
    }
×
376

NEW
377
    fn tiling_specification(&self) -> TilingSpecification {
×
NEW
378
        self.inner.tiling_specification
×
NEW
379
    }
×
380

NEW
381
    fn wrap_initialized_raster_operator(
×
NEW
382
        &self,
×
NEW
383
        op: Box<dyn InitializedRasterOperator>,
×
NEW
384
        span: CreateSpan,
×
NEW
385
        path: WorkflowOperatorPath,
×
NEW
386
    ) -> Box<dyn InitializedRasterOperator> {
×
NEW
387
        InitializedOperatorWrapper::new(op, span, path).boxed()
×
NEW
388
    }
×
389

NEW
390
    fn wrap_initialized_vector_operator(
×
NEW
391
        &self,
×
NEW
392
        op: Box<dyn InitializedVectorOperator>,
×
NEW
393
        span: CreateSpan,
×
NEW
394
        path: WorkflowOperatorPath,
×
NEW
395
    ) -> Box<dyn InitializedVectorOperator> {
×
NEW
396
        InitializedOperatorWrapper::new(op, span, path).boxed()
×
NEW
397
    }
×
398

NEW
399
    fn wrap_initialized_plot_operator(
×
NEW
400
        &self,
×
NEW
401
        op: Box<dyn InitializedPlotOperator>,
×
NEW
402
        _span: CreateSpan,
×
NEW
403
        _path: WorkflowOperatorPath,
×
NEW
404
    ) -> Box<dyn InitializedPlotOperator> {
×
NEW
405
        op
×
NEW
406
    }
×
407

NEW
408
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId> {
×
NEW
409
        self.inner.resolve_named_data(data).await
×
NEW
410
    }
×
411

NEW
412
    async fn ml_model_metadata(&self, name: &MlModelName) -> Result<MlModelMetadata> {
×
NEW
413
        self.inner.ml_model_metadata(name).await
×
NEW
414
    }
×
415
}
416

417
#[async_trait]
418
impl<L, R, Q> MetaDataProvider<L, R, Q> for StatisticsWrappingMockExecutionContext
419
where
420
    L: 'static,
421
    R: 'static + ResultDescriptor,
422
    Q: 'static,
423
{
NEW
424
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>> {
×
NEW
425
        self.inner.meta_data(id).await
×
NEW
426
    }
×
427
}
428

429
#[cfg(test)]
430
mod tests {
431
    use super::*;
432
    use geoengine_datatypes::collections::VectorDataType;
433
    use geoengine_datatypes::spatial_reference::SpatialReferenceOption;
434

435
    #[tokio::test]
436
    async fn test() {
1✔
437
        let info = StaticMetaData {
1✔
438
            loading_info: 1_i32,
1✔
439
            result_descriptor: VectorResultDescriptor {
1✔
440
                data_type: VectorDataType::Data,
1✔
441
                spatial_reference: SpatialReferenceOption::Unreferenced,
1✔
442
                columns: Default::default(),
1✔
443
                time: None,
1✔
444
                bbox: None,
1✔
445
            },
1✔
446
            phantom: Default::default(),
1✔
447
        };
1✔
448

1✔
449
        let info: Box<dyn MetaData<i32, VectorResultDescriptor, VectorQueryRectangle>> =
1✔
450
            Box::new(info);
1✔
451

1✔
452
        let info2: Box<dyn Any + Send + Sync> = Box::new(info);
1✔
453

1✔
454
        let info3 = info2
1✔
455
            .downcast_ref::<Box<dyn MetaData<i32, VectorResultDescriptor, VectorQueryRectangle>>>()
1✔
456
            .unwrap();
1✔
457

1✔
458
        assert_eq!(
1✔
459
            info3.result_descriptor().await.unwrap(),
1✔
460
            VectorResultDescriptor {
1✔
461
                data_type: VectorDataType::Data,
1✔
462
                spatial_reference: SpatialReferenceOption::Unreferenced,
1✔
463
                columns: Default::default(),
1✔
464
                time: None,
1✔
465
                bbox: None,
1✔
466
            }
1✔
467
        );
1✔
468
    }
1✔
469
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc