• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 6224433969

18 Sep 2023 03:07PM UTC coverage: 89.873% (-0.01%) from 89.887%
6224433969

push

github

web-flow
Merge pull request #846 from geo-engine/ml_training

Ml training

2102 of 2102 new or added lines in 22 files covered. (100.0%)

109530 of 121872 relevant lines covered (89.87%)

59591.05 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.96
/operators/src/engine/execution_context.rs
1
use super::query::QueryAbortRegistration;
2
use super::{
3
    CreateSpan, InitializedPlotOperator, InitializedRasterOperator, InitializedVectorOperator,
4
    MockQueryContext, WorkflowOperatorPath,
5
};
6
use crate::engine::{
7
    ChunkByteSize, RasterResultDescriptor, ResultDescriptor, VectorResultDescriptor,
8
};
9
use crate::error::Error;
10
use crate::mock::MockDatasetDataSourceLoadingInfo;
11
use crate::source::{GdalLoadingInfo, OgrSourceDataset};
12
use crate::util::{create_rayon_thread_pool, Result};
13
use async_trait::async_trait;
14
use core::any::TypeId;
15
use geoengine_datatypes::dataset::{DataId, NamedData};
16
use geoengine_datatypes::primitives::{RasterQueryRectangle, VectorQueryRectangle};
17
use geoengine_datatypes::raster::TilingSpecification;
18
use geoengine_datatypes::util::test::TestDefault;
19
use rayon::ThreadPool;
20
use serde::{Deserialize, Serialize};
21
use std::any::Any;
22
use std::collections::HashMap;
23
use std::fmt::Debug;
24
use std::marker::PhantomData;
25
use std::sync::Arc;
26

27
/// A context that provides certain utility access during operator initialization
28
#[async_trait::async_trait]
29
pub trait ExecutionContext: Send
30
    + Sync
31
    + MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
32
    + MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
33
    + MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
34
{
35
    fn thread_pool(&self) -> &Arc<ThreadPool>;
36
    fn tiling_specification(&self) -> TilingSpecification;
37

38
    fn wrap_initialized_raster_operator(
39
        &self,
40
        op: Box<dyn InitializedRasterOperator>,
41
        span: CreateSpan,
42
        path: WorkflowOperatorPath, // TODO: remove and allow operators to tell its path
43
    ) -> Box<dyn InitializedRasterOperator>;
44

45
    fn wrap_initialized_vector_operator(
46
        &self,
47
        op: Box<dyn InitializedVectorOperator>,
48
        span: CreateSpan,
49
        path: WorkflowOperatorPath,
50
    ) -> Box<dyn InitializedVectorOperator>;
51

52
    fn wrap_initialized_plot_operator(
53
        &self,
54
        op: Box<dyn InitializedPlotOperator>,
55
        span: CreateSpan,
56
        path: WorkflowOperatorPath,
57
    ) -> Box<dyn InitializedPlotOperator>;
58

59
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId>;
60

61
    /// get the `ExecutionContextExtensions` that contain additional information
62
    fn extensions(&self) -> &ExecutionContextExtensions;
63
}
64

65
/// This type allows adding additional information to the `ExecutionContext`.
66
/// It acts like a type map, allowing one to store one value per type.
67
#[derive(Default)]
302✔
68
pub struct ExecutionContextExtensions {
69
    map: HashMap<TypeId, Box<dyn Any + Send + Sync>>,
70
}
71

72
impl ExecutionContextExtensions {
73
    pub fn insert<T: 'static + Send + Sync>(&mut self, val: T) -> Option<T> {
12✔
74
        self.map
12✔
75
            .insert(TypeId::of::<T>(), Box::new(val))
12✔
76
            .and_then(downcast_owned)
12✔
77
    }
12✔
78

79
    pub fn get<T: 'static + Send + Sync>(&self) -> Option<&T> {
4✔
80
        self.map
4✔
81
            .get(&TypeId::of::<T>())
4✔
82
            .and_then(|boxed: &Box<dyn Any + Send + Sync>| boxed.downcast_ref())
4✔
83
    }
4✔
84
}
85

86
fn downcast_owned<T: 'static + Send + Sync>(boxed: Box<dyn Any + Send + Sync>) -> Option<T> {
×
87
    boxed.downcast().ok().map(|boxed| *boxed)
×
88
}
×
89

90
#[async_trait]
91
pub trait MetaDataProvider<L, R, Q>
92
where
93
    R: ResultDescriptor,
94
{
95
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>>;
96
}
97

98
#[async_trait]
99
pub trait MetaData<L, R, Q>: Debug + Send + Sync
100
where
101
    R: ResultDescriptor,
102
{
103
    async fn loading_info(&self, query: Q) -> Result<L>;
104
    async fn result_descriptor(&self) -> Result<R>;
105

106
    fn box_clone(&self) -> Box<dyn MetaData<L, R, Q>>;
107
}
108

109
impl<L, R, Q> Clone for Box<dyn MetaData<L, R, Q>>
110
where
111
    R: ResultDescriptor,
112
{
113
    fn clone(&self) -> Box<dyn MetaData<L, R, Q>> {
117✔
114
        self.box_clone()
117✔
115
    }
117✔
116
}
117

118
pub struct MockExecutionContext {
119
    pub thread_pool: Arc<ThreadPool>,
120
    pub meta_data: HashMap<DataId, Box<dyn Any + Send + Sync>>,
121
    pub named_data: HashMap<NamedData, DataId>,
122
    pub tiling_specification: TilingSpecification,
123
    pub extensions: ExecutionContextExtensions,
124
}
125

126
impl TestDefault for MockExecutionContext {
127
    fn test_default() -> Self {
122✔
128
        Self {
122✔
129
            thread_pool: create_rayon_thread_pool(0),
122✔
130
            meta_data: HashMap::default(),
122✔
131
            named_data: HashMap::default(),
122✔
132
            tiling_specification: TilingSpecification::test_default(),
122✔
133
            extensions: Default::default(),
122✔
134
        }
122✔
135
    }
122✔
136
}
137

138
impl MockExecutionContext {
139
    pub fn new_with_tiling_spec(tiling_specification: TilingSpecification) -> Self {
124✔
140
        Self {
124✔
141
            thread_pool: create_rayon_thread_pool(0),
124✔
142
            meta_data: HashMap::default(),
124✔
143
            named_data: HashMap::default(),
124✔
144
            tiling_specification,
124✔
145
            extensions: Default::default(),
124✔
146
        }
124✔
147
    }
124✔
148

149
    pub fn new_with_tiling_spec_and_thread_count(
×
150
        tiling_specification: TilingSpecification,
×
151
        num_threads: usize,
×
152
    ) -> Self {
×
153
        Self {
×
154
            thread_pool: create_rayon_thread_pool(num_threads),
×
155
            meta_data: HashMap::default(),
×
156
            named_data: HashMap::default(),
×
157
            tiling_specification,
×
158
            extensions: Default::default(),
×
159
        }
×
160
    }
×
161

162
    pub fn add_meta_data<L, R, Q>(
47✔
163
        &mut self,
47✔
164
        data: DataId,
47✔
165
        named_data: NamedData,
47✔
166
        meta_data: Box<dyn MetaData<L, R, Q>>,
47✔
167
    ) where
47✔
168
        L: Send + Sync + 'static,
47✔
169
        R: Send + Sync + 'static + ResultDescriptor,
47✔
170
        Q: Send + Sync + 'static,
47✔
171
    {
47✔
172
        self.meta_data.insert(
47✔
173
            data.clone(),
47✔
174
            Box::new(meta_data) as Box<dyn Any + Send + Sync>,
47✔
175
        );
47✔
176

47✔
177
        self.named_data.insert(named_data, data);
47✔
178
    }
47✔
179

180
    pub fn mock_query_context(&self, chunk_byte_size: ChunkByteSize) -> MockQueryContext {
3✔
181
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
3✔
182
        MockQueryContext {
3✔
183
            chunk_byte_size,
3✔
184
            thread_pool: self.thread_pool.clone(),
3✔
185
            extensions: Default::default(),
3✔
186
            abort_registration,
3✔
187
            abort_trigger: Some(abort_trigger),
3✔
188
        }
3✔
189
    }
3✔
190
}
191

192
#[async_trait::async_trait]
193
impl ExecutionContext for MockExecutionContext {
194
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
195
        &self.thread_pool
×
196
    }
×
197

198
    fn tiling_specification(&self) -> TilingSpecification {
207✔
199
        self.tiling_specification
207✔
200
    }
207✔
201

202
    fn wrap_initialized_raster_operator(
254✔
203
        &self,
254✔
204
        op: Box<dyn InitializedRasterOperator>,
254✔
205
        _span: CreateSpan,
254✔
206
        _path: WorkflowOperatorPath,
254✔
207
    ) -> Box<dyn InitializedRasterOperator> {
254✔
208
        op
254✔
209
    }
254✔
210

211
    fn wrap_initialized_vector_operator(
153✔
212
        &self,
153✔
213
        op: Box<dyn InitializedVectorOperator>,
153✔
214
        _span: CreateSpan,
153✔
215
        _path: WorkflowOperatorPath,
153✔
216
    ) -> Box<dyn InitializedVectorOperator> {
153✔
217
        op
153✔
218
    }
153✔
219

220
    fn wrap_initialized_plot_operator(
51✔
221
        &self,
51✔
222
        op: Box<dyn InitializedPlotOperator>,
51✔
223
        _span: CreateSpan,
51✔
224
        _path: WorkflowOperatorPath,
51✔
225
    ) -> Box<dyn InitializedPlotOperator> {
51✔
226
        op
51✔
227
    }
51✔
228

229
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId> {
49✔
230
        self.named_data
49✔
231
            .get(data)
49✔
232
            .cloned()
49✔
233
            .ok_or_else(|| Error::UnknownDatasetName { name: data.clone() })
49✔
234
    }
49✔
235

236
    fn extensions(&self) -> &ExecutionContextExtensions {
2✔
237
        &self.extensions
2✔
238
    }
2✔
239
}
240

241
#[async_trait]
242
impl<L, R, Q> MetaDataProvider<L, R, Q> for MockExecutionContext
243
where
244
    L: 'static,
245
    R: 'static + ResultDescriptor,
246
    Q: 'static,
247
{
248
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>> {
49✔
249
        let meta_data = self
49✔
250
            .meta_data
49✔
251
            .get(id)
49✔
252
            .ok_or(Error::UnknownDataId)?
49✔
253
            .downcast_ref::<Box<dyn MetaData<L, R, Q>>>()
49✔
254
            .ok_or(Error::InvalidMetaDataType)?;
49✔
255

256
        Ok(meta_data.clone())
49✔
257
    }
98✔
258
}
259

260
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
78✔
261
#[serde(rename_all = "camelCase")]
262
pub struct StaticMetaData<L, R, Q>
263
where
264
    L: Debug + Clone + Send + Sync + 'static,
265
    R: Debug + Send + Sync + 'static + ResultDescriptor,
266
    Q: Debug + Clone + Send + Sync + 'static,
267
{
268
    pub loading_info: L,
269
    pub result_descriptor: R,
270
    #[serde(skip)]
271
    pub phantom: PhantomData<Q>,
272
}
273

274
#[async_trait]
275
impl<L, R, Q> MetaData<L, R, Q> for StaticMetaData<L, R, Q>
276
where
277
    L: Debug + Clone + Send + Sync + 'static,
278
    R: Debug + Send + Sync + 'static + ResultDescriptor,
279
    Q: Debug + Clone + Send + Sync + 'static,
280
{
281
    async fn loading_info(&self, _query: Q) -> Result<L> {
53✔
282
        Ok(self.loading_info.clone())
53✔
283
    }
53✔
284

285
    async fn result_descriptor(&self) -> Result<R> {
34✔
286
        Ok(self.result_descriptor.clone())
34✔
287
    }
34✔
288

289
    fn box_clone(&self) -> Box<dyn MetaData<L, R, Q>> {
49✔
290
        Box::new(self.clone())
49✔
291
    }
49✔
292
}
293

294
#[cfg(test)]
295
mod tests {
296
    use super::*;
297
    use geoengine_datatypes::collections::VectorDataType;
298
    use geoengine_datatypes::spatial_reference::SpatialReferenceOption;
299

300
    #[tokio::test]
1✔
301
    async fn test() {
1✔
302
        let info = StaticMetaData {
1✔
303
            loading_info: 1_i32,
1✔
304
            result_descriptor: VectorResultDescriptor {
1✔
305
                data_type: VectorDataType::Data,
1✔
306
                spatial_reference: SpatialReferenceOption::Unreferenced,
1✔
307
                columns: Default::default(),
1✔
308
                time: None,
1✔
309
                bbox: None,
1✔
310
            },
1✔
311
            phantom: Default::default(),
1✔
312
        };
1✔
313

1✔
314
        let info: Box<dyn MetaData<i32, VectorResultDescriptor, VectorQueryRectangle>> =
1✔
315
            Box::new(info);
1✔
316

1✔
317
        let info2: Box<dyn Any + Send + Sync> = Box::new(info);
1✔
318

1✔
319
        let info3 = info2
1✔
320
            .downcast_ref::<Box<dyn MetaData<i32, VectorResultDescriptor, VectorQueryRectangle>>>()
1✔
321
            .unwrap();
1✔
322

323
        assert_eq!(
1✔
324
            info3.result_descriptor().await.unwrap(),
1✔
325
            VectorResultDescriptor {
1✔
326
                data_type: VectorDataType::Data,
1✔
327
                spatial_reference: SpatialReferenceOption::Unreferenced,
1✔
328
                columns: Default::default(),
1✔
329
                time: None,
1✔
330
                bbox: None,
1✔
331
            }
1✔
332
        );
333
    }
334
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc