• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10178074589

31 Jul 2024 09:34AM UTC coverage: 91.068% (+0.4%) from 90.682%
10178074589

push

github

web-flow
Merge pull request #973 from geo-engine/remove-XGB-update-toolchain

Remove-XGB-update-toolchain

81 of 88 new or added lines in 29 files covered. (92.05%)

456 existing lines in 119 files now uncovered.

131088 of 143945 relevant lines covered (91.07%)

53581.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.42
/operators/src/engine/execution_context.rs
1
use super::query::QueryAbortRegistration;
2
use super::{
3
    CreateSpan, InitializedPlotOperator, InitializedRasterOperator, InitializedVectorOperator,
4
    MockQueryContext, WorkflowOperatorPath,
5
};
6
use crate::engine::{
7
    ChunkByteSize, RasterResultDescriptor, ResultDescriptor, VectorResultDescriptor,
8
};
9
use crate::error::Error;
10
use crate::mock::MockDatasetDataSourceLoadingInfo;
11
use crate::source::{GdalLoadingInfo, OgrSourceDataset};
12
use crate::util::{create_rayon_thread_pool, Result};
13
use async_trait::async_trait;
14
use core::any::TypeId;
15
use geoengine_datatypes::dataset::{DataId, NamedData};
16
use geoengine_datatypes::primitives::{RasterQueryRectangle, VectorQueryRectangle};
17
use geoengine_datatypes::raster::TilingSpecification;
18
use geoengine_datatypes::util::test::TestDefault;
19
use rayon::ThreadPool;
20
use serde::{Deserialize, Serialize};
21
use std::any::Any;
22
use std::collections::HashMap;
23
use std::fmt::Debug;
24
use std::marker::PhantomData;
25
use std::sync::Arc;
26

27
/// A context that provides certain utility access during operator initialization
28
#[async_trait::async_trait]
29
pub trait ExecutionContext: Send
30
    + Sync
31
    + MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
32
    + MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
33
    + MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
34
{
35
    fn thread_pool(&self) -> &Arc<ThreadPool>;
36
    fn tiling_specification(&self) -> TilingSpecification;
37

38
    fn wrap_initialized_raster_operator(
39
        &self,
40
        op: Box<dyn InitializedRasterOperator>,
41
        span: CreateSpan,
42
        path: WorkflowOperatorPath, // TODO: remove and allow operators to tell its path
43
    ) -> Box<dyn InitializedRasterOperator>;
44

45
    fn wrap_initialized_vector_operator(
46
        &self,
47
        op: Box<dyn InitializedVectorOperator>,
48
        span: CreateSpan,
49
        path: WorkflowOperatorPath,
50
    ) -> Box<dyn InitializedVectorOperator>;
51

52
    fn wrap_initialized_plot_operator(
53
        &self,
54
        op: Box<dyn InitializedPlotOperator>,
55
        span: CreateSpan,
56
        path: WorkflowOperatorPath,
57
    ) -> Box<dyn InitializedPlotOperator>;
58

59
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId>;
60

61
    /// get the `ExecutionContextExtensions` that contain additional information
62
    fn extensions(&self) -> &ExecutionContextExtensions;
63
}
64

65
/// This type allows adding additional information to the `ExecutionContext`.
66
/// It acts like a type map, allowing one to store one value per type.
67
#[derive(Default)]
68
pub struct ExecutionContextExtensions {
69
    map: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
70
}
71

72
impl ExecutionContextExtensions {
UNCOV
73
    pub fn insert<T: 'static + Send + Sync>(&mut self, val: T) -> Option<T> {
×
UNCOV
74
        self.map
×
UNCOV
75
            .insert(TypeId::of::<T>(), Box::new(val))
×
UNCOV
76
            .and_then(downcast_owned)
×
UNCOV
77
    }
×
78

UNCOV
79
    pub fn get<T: 'static + Send + Sync>(&self) -> Option<&T> {
×
UNCOV
80
        self.map
×
UNCOV
81
            .get(&TypeId::of::<T>())
×
UNCOV
82
            .and_then(|boxed: &Box<dyn Any + Send + Sync>| boxed.downcast_ref())
×
UNCOV
83
    }
×
84
}
85

86
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> {
×
87
    boxed.downcast().ok().map(|boxed| *boxed)
×
88
}
×
89

90
#[async_trait]
91
pub trait MetaDataProvider<L, R, Q>
92
where
93
    R: ResultDescriptor,
94
{
95
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>>;
96
}
97

98
#[async_trait]
99
pub trait MetaData<L, R, Q>: Debug + Send + Sync
100
where
101
    R: ResultDescriptor,
102
{
103
    async fn loading_info(&self, query: Q) -> Result<L>;
104
    async fn result_descriptor(&self) -> Result<R>;
105

106
    fn box_clone(&self) -> Box<dyn MetaData<L, R, Q>>;
107
}
108

109
impl<L, R, Q> Clone for Box<dyn MetaData<L, R, Q>>
110
where
111
    R: ResultDescriptor,
112
{
113
    fn clone(&self) -> Box<dyn MetaData<L, R, Q>> {
125✔
114
        self.box_clone()
125✔
115
    }
125✔
116
}
117

118
pub struct MockExecutionContext {
119
    pub thread_pool: Arc<ThreadPool>,
120
    pub meta_data: HashMap<DataId, Box<dyn Any + Send + Sync>>,
121
    pub named_data: HashMap<NamedData, DataId>,
122
    pub tiling_specification: TilingSpecification,
123
    pub extensions: ExecutionContextExtensions,
124
}
125

126
impl TestDefault for MockExecutionContext {
127
    fn test_default() -> Self {
140✔
128
        Self {
140✔
129
            thread_pool: create_rayon_thread_pool(0),
140✔
130
            meta_data: HashMap::default(),
140✔
131
            named_data: HashMap::default(),
140✔
132
            tiling_specification: TilingSpecification::test_default(),
140✔
133
            extensions: Default::default(),
140✔
134
        }
140✔
135
    }
140✔
136
}
137

138
impl MockExecutionContext {
139
    pub fn new_with_tiling_spec(tiling_specification: TilingSpecification) -> Self {
130✔
140
        Self {
130✔
141
            thread_pool: create_rayon_thread_pool(0),
130✔
142
            meta_data: HashMap::default(),
130✔
143
            named_data: HashMap::default(),
130✔
144
            tiling_specification,
130✔
145
            extensions: Default::default(),
130✔
146
        }
130✔
147
    }
130✔
148

149
    pub fn new_with_tiling_spec_and_thread_count(
×
150
        tiling_specification: TilingSpecification,
×
151
        num_threads: usize,
×
152
    ) -> Self {
×
153
        Self {
×
154
            thread_pool: create_rayon_thread_pool(num_threads),
×
155
            meta_data: HashMap::default(),
×
156
            named_data: HashMap::default(),
×
157
            tiling_specification,
×
158
            extensions: Default::default(),
×
159
        }
×
160
    }
×
161

162
    pub fn add_meta_data<L, R, Q>(
49✔
163
        &mut self,
49✔
164
        data: DataId,
49✔
165
        named_data: NamedData,
49✔
166
        meta_data: Box<dyn MetaData<L, R, Q>>,
49✔
167
    ) where
49✔
168
        L: Send + Sync + 'static,
49✔
169
        R: Send + Sync + 'static + ResultDescriptor,
49✔
170
        Q: Send + Sync + 'static,
49✔
171
    {
49✔
172
        self.meta_data.insert(
49✔
173
            data.clone(),
49✔
174
            Box::new(meta_data) as Box<dyn Any + Send + Sync>,
49✔
175
        );
49✔
176

49✔
177
        self.named_data.insert(named_data, data);
49✔
178
    }
49✔
179

180
    pub fn delete_meta_data(&mut self, named_data: &NamedData) {
2✔
181
        let data = self.named_data.remove(named_data);
2✔
182
        if let Some(data) = data {
2✔
183
            self.meta_data.remove(&data);
2✔
184
        }
2✔
185
    }
2✔
186

187
    pub fn mock_query_context(&self, chunk_byte_size: ChunkByteSize) -> MockQueryContext {
3✔
188
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
3✔
189
        MockQueryContext {
3✔
190
            chunk_byte_size,
3✔
191
            thread_pool: self.thread_pool.clone(),
3✔
192
            extensions: Default::default(),
3✔
193
            abort_registration,
3✔
194
            abort_trigger: Some(abort_trigger),
3✔
195
        }
3✔
196
    }
3✔
197
}
198

199
#[async_trait::async_trait]
200
impl ExecutionContext for MockExecutionContext {
201
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
202
        &self.thread_pool
×
203
    }
×
204

205
    fn tiling_specification(&self) -> TilingSpecification {
239✔
206
        self.tiling_specification
239✔
207
    }
239✔
208

209
    fn wrap_initialized_raster_operator(
304✔
210
        &self,
304✔
211
        op: Box<dyn InitializedRasterOperator>,
304✔
212
        _span: CreateSpan,
304✔
213
        _path: WorkflowOperatorPath,
304✔
214
    ) -> Box<dyn InitializedRasterOperator> {
304✔
215
        op
304✔
216
    }
304✔
217

218
    fn wrap_initialized_vector_operator(
168✔
219
        &self,
168✔
220
        op: Box<dyn InitializedVectorOperator>,
168✔
221
        _span: CreateSpan,
168✔
222
        _path: WorkflowOperatorPath,
168✔
223
    ) -> Box<dyn InitializedVectorOperator> {
168✔
224
        op
168✔
225
    }
168✔
226

227
    fn wrap_initialized_plot_operator(
53✔
228
        &self,
53✔
229
        op: Box<dyn InitializedPlotOperator>,
53✔
230
        _span: CreateSpan,
53✔
231
        _path: WorkflowOperatorPath,
53✔
232
    ) -> Box<dyn InitializedPlotOperator> {
53✔
233
        op
53✔
234
    }
53✔
235

236
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId> {
53✔
237
        self.named_data
53✔
238
            .get(data)
53✔
239
            .cloned()
53✔
240
            .ok_or_else(|| Error::UnknownDatasetName { name: data.clone() })
53✔
241
    }
53✔
242

UNCOV
243
    fn extensions(&self) -> &ExecutionContextExtensions {
×
UNCOV
244
        &self.extensions
×
UNCOV
245
    }
×
246
}
247

248
#[async_trait]
249
impl<L, R, Q> MetaDataProvider<L, R, Q> for MockExecutionContext
250
where
251
    L: 'static,
252
    R: 'static + ResultDescriptor,
253
    Q: 'static,
254
{
255
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>> {
53✔
256
        let meta_data = self
53✔
257
            .meta_data
53✔
258
            .get(id)
53✔
259
            .ok_or(Error::UnknownDataId)?
53✔
260
            .downcast_ref::<Box<dyn MetaData<L, R, Q>>>()
53✔
261
            .ok_or(Error::InvalidMetaDataType)?;
53✔
262

53✔
263
        Ok(meta_data.clone())
53✔
264
    }
53✔
265
}
266

267
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
3✔
268
#[serde(rename_all = "camelCase")]
269
pub struct StaticMetaData<L, R, Q>
270
where
271
    L: Debug + Clone + Send + Sync + 'static,
272
    R: Debug + Send + Sync + 'static + ResultDescriptor,
273
    Q: Debug + Clone + Send + Sync + 'static,
274
{
275
    pub loading_info: L,
276
    pub result_descriptor: R,
277
    #[serde(skip)]
278
    pub phantom: PhantomData<Q>,
279
}
280

281
#[async_trait]
282
impl<L, R, Q> MetaData<L, R, Q> for StaticMetaData<L, R, Q>
283
where
284
    L: Debug + Clone + Send + Sync + 'static,
285
    R: Debug + Send + Sync + 'static + ResultDescriptor,
286
    Q: Debug + Clone + Send + Sync + 'static,
287
{
288
    async fn loading_info(&self, _query: Q) -> Result<L> {
61✔
289
        Ok(self.loading_info.clone())
61✔
290
    }
61✔
291

292
    async fn result_descriptor(&self) -> Result<R> {
36✔
293
        Ok(self.result_descriptor.clone())
36✔
294
    }
36✔
295

296
    fn box_clone(&self) -> Box<dyn MetaData<L, R, Q>> {
49✔
297
        Box::new(self.clone())
49✔
298
    }
49✔
299
}
300

301
mod db_types {
302
    use geoengine_datatypes::delegate_from_to_sql;
303
    use postgres_types::{FromSql, ToSql};
304

305
    use super::*;
306

307
    pub type MockMetaData = StaticMetaData<
308
        MockDatasetDataSourceLoadingInfo,
309
        VectorResultDescriptor,
310
        VectorQueryRectangle,
311
    >;
312

313
    #[derive(Debug, ToSql, FromSql)]
485✔
314
    #[postgres(name = "MockMetaData")]
315
    pub struct MockMetaDataDbType {
316
        pub loading_info: MockDatasetDataSourceLoadingInfo,
317
        pub result_descriptor: VectorResultDescriptor,
318
    }
319

320
    impl From<&MockMetaData> for MockMetaDataDbType {
321
        fn from(other: &MockMetaData) -> Self {
2✔
322
            Self {
2✔
323
                loading_info: other.loading_info.clone(),
2✔
324
                result_descriptor: other.result_descriptor.clone(),
2✔
325
            }
2✔
326
        }
2✔
327
    }
328

329
    impl TryFrom<MockMetaDataDbType> for MockMetaData {
330
        type Error = Error;
331

332
        fn try_from(other: MockMetaDataDbType) -> Result<Self, Self::Error> {
2✔
333
            Ok(Self {
2✔
334
                loading_info: other.loading_info,
2✔
335
                result_descriptor: other.result_descriptor,
2✔
336
                phantom: PhantomData,
2✔
337
            })
2✔
338
        }
2✔
339
    }
340

341
    pub type OgrMetaData =
342
        StaticMetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>;
343

344
    #[derive(Debug, ToSql, FromSql)]
497✔
345
    #[postgres(name = "OgrMetaData")]
346
    pub struct OgrMetaDataDbType {
347
        pub loading_info: OgrSourceDataset,
348
        pub result_descriptor: VectorResultDescriptor,
349
    }
350

351
    impl From<&StaticMetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>
352
        for OgrMetaDataDbType
353
    {
354
        fn from(other: &OgrMetaData) -> Self {
33✔
355
            Self {
33✔
356
                loading_info: other.loading_info.clone(),
33✔
357
                result_descriptor: other.result_descriptor.clone(),
33✔
358
            }
33✔
359
        }
33✔
360
    }
361

362
    impl TryFrom<OgrMetaDataDbType> for OgrMetaData {
363
        type Error = Error;
364

365
        fn try_from(other: OgrMetaDataDbType) -> Result<Self, Self::Error> {
14✔
366
            Ok(Self {
14✔
367
                loading_info: other.loading_info,
14✔
368
                result_descriptor: other.result_descriptor,
14✔
369
                phantom: PhantomData,
14✔
370
            })
14✔
371
        }
14✔
372
    }
373

374
    delegate_from_to_sql!(MockMetaData, MockMetaDataDbType);
375
    delegate_from_to_sql!(OgrMetaData, OgrMetaDataDbType);
376
}
377

378
#[cfg(test)]
379
mod tests {
380
    use super::*;
381
    use geoengine_datatypes::collections::VectorDataType;
382
    use geoengine_datatypes::spatial_reference::SpatialReferenceOption;
383

384
    #[tokio::test]
385
    async fn test() {
1✔
386
        let info = StaticMetaData {
1✔
387
            loading_info: 1_i32,
1✔
388
            result_descriptor: VectorResultDescriptor {
1✔
389
                data_type: VectorDataType::Data,
1✔
390
                spatial_reference: SpatialReferenceOption::Unreferenced,
1✔
391
                columns: Default::default(),
1✔
392
                time: None,
1✔
393
                bbox: None,
1✔
394
            },
1✔
395
            phantom: Default::default(),
1✔
396
        };
1✔
397

1✔
398
        let info: Box<dyn MetaData<i32, VectorResultDescriptor, VectorQueryRectangle>> =
1✔
399
            Box::new(info);
1✔
400

1✔
401
        let info2: Box<dyn Any + Send + Sync> = Box::new(info);
1✔
402

1✔
403
        let info3 = info2
1✔
404
            .downcast_ref::<Box<dyn MetaData<i32, VectorResultDescriptor, VectorQueryRectangle>>>()
1✔
405
            .unwrap();
1✔
406

1✔
407
        assert_eq!(
1✔
408
            info3.result_descriptor().await.unwrap(),
1✔
409
            VectorResultDescriptor {
1✔
410
                data_type: VectorDataType::Data,
1✔
411
                spatial_reference: SpatialReferenceOption::Unreferenced,
1✔
412
                columns: Default::default(),
1✔
413
                time: None,
1✔
414
                bbox: None,
1✔
415
            }
1✔
416
        );
1✔
417
    }
1✔
418
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc