• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 26897733292

03 Jun 2026 04:15PM UTC coverage: 87.02% (-0.2%) from 87.218%
26897733292

Pull #1192

github

web-flow
Merge 536ad7e6f into 255ac7144
Pull Request #1192: feat: add Gdal process pool

1479 of 1957 new or added lines in 28 files covered. (75.57%)

18 existing lines in 7 files now uncovered.

117755 of 135320 relevant lines covered (87.02%)

478060.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

60.9
/geoengine/operators/src/engine/execution_context.rs
1
use super::{
2
    CreateSpan, InitializedPlotOperator, InitializedRasterOperator, InitializedVectorOperator,
3
    MockQueryContext,
4
};
5
use crate::cache::shared_cache::SharedCache;
6
use crate::engine::{
7
    ChunkByteSize, RasterResultDescriptor, ResultDescriptor, VectorResultDescriptor,
8
};
9
use crate::error::Error;
10
use crate::machine_learning::MlModelLoadingInfo;
11
use crate::meta::quota::{QuotaChecker, QuotaTracking};
12
use crate::meta::wrapper::InitializedOperatorWrapper;
13
use crate::mock::MockDatasetDataSourceLoadingInfo;
14
use crate::source::gdal_source::{GdalProcessPool, GdalProcessPoolAccess};
15
use crate::source::{
16
    GdalLoadingInfo, MultiBandGdalLoadingInfo, MultiBandGdalLoadingInfoQueryRectangle,
17
    OgrSourceDataset,
18
};
19
use crate::util::{Result, create_rayon_thread_pool};
20
use async_trait::async_trait;
21
use geoengine_datatypes::dataset::{DataId, NamedData};
22
use geoengine_datatypes::machine_learning::MlModelName;
23
use geoengine_datatypes::primitives::{RasterQueryRectangle, VectorQueryRectangle};
24
use geoengine_datatypes::raster::TilingSpecification;
25
use geoengine_datatypes::util::test::TestDefault;
26
use rayon::ThreadPool;
27
use serde::{Deserialize, Serialize};
28
use std::any::Any;
29
use std::collections::HashMap;
30
use std::fmt::Debug;
31
use std::marker::PhantomData;
32
use std::sync::Arc;
33

34
/// A context that provides certain utility access during operator initialization
35
#[async_trait::async_trait]
36
pub trait ExecutionContext: Send
37
    + Sync
38
    + MetaDataProvider<MockDatasetDataSourceLoadingInfo, VectorResultDescriptor, VectorQueryRectangle>
39
    + MetaDataProvider<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>
40
    + MetaDataProvider<GdalLoadingInfo, RasterResultDescriptor, RasterQueryRectangle>
41
    + MetaDataProvider<
42
        MultiBandGdalLoadingInfo,
43
        RasterResultDescriptor,
44
        MultiBandGdalLoadingInfoQueryRectangle,
45
    > + GdalProcessPoolAccess
46
{
47
    fn thread_pool(&self) -> &Arc<ThreadPool>;
48
    fn tiling_specification(&self) -> TilingSpecification;
49

50
    fn wrap_initialized_raster_operator(
51
        &self,
52
        op: Box<dyn InitializedRasterOperator>,
53
        span: CreateSpan,
54
    ) -> Box<dyn InitializedRasterOperator>;
55

56
    fn wrap_initialized_vector_operator(
57
        &self,
58
        op: Box<dyn InitializedVectorOperator>,
59
        span: CreateSpan,
60
    ) -> Box<dyn InitializedVectorOperator>;
61

62
    fn wrap_initialized_plot_operator(
63
        &self,
64
        op: Box<dyn InitializedPlotOperator>,
65
        span: CreateSpan,
66
    ) -> Box<dyn InitializedPlotOperator>;
67

68
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId>;
69

70
    async fn ml_model_loading_info(&self, name: &MlModelName) -> Result<MlModelLoadingInfo>;
71

NEW
72
    fn gdal_process_pool(&self) -> &Arc<GdalProcessPool> {
×
NEW
73
        self.get_gdal_pool()
×
NEW
74
    }
×
75
}
76

77
#[async_trait]
78
pub trait MetaDataProvider<L, R, Q>
79
where
80
    R: ResultDescriptor,
81
{
82
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>>;
83
}
84

85
#[async_trait]
86
pub trait MetaData<L, R, Q>: Debug + Send + Sync
87
where
88
    R: ResultDescriptor,
89
{
90
    async fn loading_info(&self, query: Q) -> Result<L>;
91
    async fn result_descriptor(&self) -> Result<R>;
92

93
    fn box_clone(&self) -> Box<dyn MetaData<L, R, Q>>;
94
}
95

96
impl<L, R, Q> Clone for Box<dyn MetaData<L, R, Q>>
97
where
98
    R: ResultDescriptor,
99
{
100
    fn clone(&self) -> Box<dyn MetaData<L, R, Q>> {
204✔
101
        self.box_clone()
204✔
102
    }
204✔
103
}
104

105
pub struct MockExecutionContext {
106
    pub thread_pool: Arc<ThreadPool>,
107
    pub meta_data: HashMap<DataId, Box<dyn Any + Send + Sync>>,
108
    pub named_data: HashMap<NamedData, DataId>,
109
    pub ml_models: HashMap<MlModelName, MlModelLoadingInfo>,
110
    pub tiling_specification: TilingSpecification,
111
    pub gdal_process_pool: Arc<GdalProcessPool>,
112
}
113

114
impl TestDefault for MockExecutionContext {
115
    fn test_default() -> Self {
191✔
116
        Self {
191✔
117
            thread_pool: create_rayon_thread_pool(0),
191✔
118
            meta_data: HashMap::default(),
191✔
119
            named_data: HashMap::default(),
191✔
120
            ml_models: HashMap::default(),
191✔
121
            tiling_specification: TilingSpecification::test_default(),
191✔
122
            gdal_process_pool: GdalProcessPool::new(8, 4, 2),
191✔
123
        }
191✔
124
    }
191✔
125
}
126

127
impl MockExecutionContext {
NEW
128
    pub fn new_with_tiling_spec_and_tokio_handle(
×
NEW
129
        tiling_specification: TilingSpecification,
×
NEW
130
        handle: &tokio::runtime::Handle,
×
NEW
131
    ) -> Self {
×
NEW
132
        Self {
×
NEW
133
            thread_pool: create_rayon_thread_pool(0),
×
NEW
134
            meta_data: HashMap::default(),
×
NEW
135
            named_data: HashMap::default(),
×
NEW
136
            ml_models: HashMap::default(),
×
NEW
137
            tiling_specification,
×
NEW
138
            gdal_process_pool: GdalProcessPool::new_with_tokio_handle(handle, 8, 4, 2),
×
NEW
139
        }
×
NEW
140
    }
×
141

142
    pub fn new_with_tiling_spec(tiling_specification: TilingSpecification) -> Self {
141✔
143
        Self {
141✔
144
            thread_pool: create_rayon_thread_pool(0),
141✔
145
            meta_data: HashMap::default(),
141✔
146
            named_data: HashMap::default(),
141✔
147
            ml_models: HashMap::default(),
141✔
148
            tiling_specification,
141✔
149
            gdal_process_pool: GdalProcessPool::new(8, 4, 2), // TODO: GdalProcessPool defaults!
141✔
150
        }
141✔
151
    }
141✔
152

153
    pub fn new_with_tiling_spec_and_thread_count(
2✔
154
        tiling_specification: TilingSpecification,
2✔
155
        num_threads: usize,
2✔
156
    ) -> Self {
2✔
157
        Self {
2✔
158
            thread_pool: create_rayon_thread_pool(num_threads),
2✔
159
            meta_data: HashMap::default(),
2✔
160
            named_data: HashMap::default(),
2✔
161
            ml_models: HashMap::default(),
2✔
162
            tiling_specification,
2✔
163
            gdal_process_pool: GdalProcessPool::new(8, 4, 2),
2✔
164
        }
2✔
165
    }
2✔
166

167
    pub fn add_meta_data<L, R, Q>(
78✔
168
        &mut self,
78✔
169
        data: DataId,
78✔
170
        named_data: NamedData,
78✔
171
        meta_data: Box<dyn MetaData<L, R, Q>>,
78✔
172
    ) where
78✔
173
        L: Send + Sync + 'static,
78✔
174
        R: Send + Sync + 'static + ResultDescriptor,
78✔
175
        Q: Send + Sync + 'static,
78✔
176
    {
177
        self.meta_data.insert(
78✔
178
            data.clone(),
78✔
179
            Box::new(meta_data) as Box<dyn Any + Send + Sync>,
78✔
180
        );
181

182
        self.named_data.insert(named_data, data);
78✔
183
    }
78✔
184

185
    pub fn delete_meta_data(&mut self, named_data: &NamedData) {
3✔
186
        let data = self.named_data.remove(named_data);
3✔
187
        if let Some(data) = data {
3✔
188
            self.meta_data.remove(&data);
3✔
189
        }
3✔
190
    }
3✔
191

192
    pub fn mock_query_context_test_default(&self) -> MockQueryContext {
132✔
193
        MockQueryContext::new(
132✔
194
            ChunkByteSize::test_default(),
132✔
195
            self.tiling_specification,
132✔
196
            self.gdal_process_pool.clone(),
132✔
197
        )
198
    }
132✔
199

200
    pub fn mock_query_context(&self, chunk_byte_size: ChunkByteSize) -> MockQueryContext {
152✔
201
        MockQueryContext::new(
152✔
202
            chunk_byte_size,
152✔
203
            self.tiling_specification,
152✔
204
            self.gdal_process_pool.clone(),
152✔
205
        )
206
    }
152✔
207

208
    pub fn mock_query_context_with_query_extensions(
3✔
209
        &self,
3✔
210
        chunk_byte_size: ChunkByteSize,
3✔
211
        cache: Option<Arc<SharedCache>>,
3✔
212
        quota_tracking: Option<QuotaTracking>,
3✔
213
        quota_checker: Option<QuotaChecker>,
3✔
214
    ) -> MockQueryContext {
3✔
215
        MockQueryContext::new_with_query_extensions(
3✔
216
            chunk_byte_size,
3✔
217
            self.tiling_specification,
3✔
218
            self.gdal_process_pool.clone(),
3✔
219
            cache,
3✔
220
            quota_tracking,
3✔
221
            quota_checker,
3✔
222
        )
223
    }
3✔
224

225
    pub fn mock_query_context_with_chunk_size_and_thread_count(
×
226
        &self,
×
227
        chunk_byte_size: ChunkByteSize,
×
228
        num_threads: usize,
×
229
    ) -> MockQueryContext {
×
230
        MockQueryContext::with_chunk_size_and_thread_count(
×
231
            chunk_byte_size,
×
232
            self.tiling_specification,
×
233
            num_threads,
×
NEW
234
            self.gdal_process_pool.clone(),
×
235
        )
236
    }
×
237
}
238

239
impl GdalProcessPoolAccess for MockExecutionContext {
NEW
240
    fn get_gdal_pool(&self) -> &Arc<GdalProcessPool> {
×
NEW
241
        &self.gdal_process_pool
×
NEW
242
    }
×
243
}
244

245
#[async_trait::async_trait]
246
impl ExecutionContext for MockExecutionContext {
247
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
248
        &self.thread_pool
×
249
    }
×
250

251
    fn tiling_specification(&self) -> TilingSpecification {
310✔
252
        self.tiling_specification
310✔
253
    }
310✔
254

255
    fn wrap_initialized_raster_operator(
392✔
256
        &self,
392✔
257
        op: Box<dyn InitializedRasterOperator>,
392✔
258
        _span: CreateSpan,
392✔
259
    ) -> Box<dyn InitializedRasterOperator> {
392✔
260
        op
392✔
261
    }
392✔
262

263
    fn wrap_initialized_vector_operator(
182✔
264
        &self,
182✔
265
        op: Box<dyn InitializedVectorOperator>,
182✔
266
        _span: CreateSpan,
182✔
267
    ) -> Box<dyn InitializedVectorOperator> {
182✔
268
        op
182✔
269
    }
182✔
270

271
    fn wrap_initialized_plot_operator(
55✔
272
        &self,
55✔
273
        op: Box<dyn InitializedPlotOperator>,
55✔
274
        _span: CreateSpan,
55✔
275
    ) -> Box<dyn InitializedPlotOperator> {
55✔
276
        op
55✔
277
    }
55✔
278

279
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId> {
105✔
280
        self.named_data
281
            .get(data)
282
            .cloned()
283
            .ok_or_else(|| Error::UnknownDatasetName { name: data.clone() })
×
284
    }
105✔
285

286
    async fn ml_model_loading_info(&self, name: &MlModelName) -> Result<MlModelLoadingInfo> {
3✔
287
        self.ml_models
288
            .get(name)
289
            .cloned()
290
            .ok_or_else(|| Error::UnknownMlModelName { name: name.clone() })
×
291
    }
3✔
292
}
293

294
#[async_trait]
295
impl<L, R, Q> MetaDataProvider<L, R, Q> for MockExecutionContext
296
where
297
    L: 'static,
298
    R: 'static + ResultDescriptor,
299
    Q: 'static,
300
{
301
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>> {
105✔
302
        let meta_data = self
303
            .meta_data
304
            .get(id)
305
            .ok_or(Error::UnknownDataId)?
306
            .downcast_ref::<Box<dyn MetaData<L, R, Q>>>()
307
            .ok_or(Error::InvalidMetaDataType)?;
308

309
        Ok(meta_data.clone())
310
    }
105✔
311
}
312

313
#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)]
314
#[serde(rename_all = "camelCase")]
315
pub struct StaticMetaData<L, R, Q>
316
where
317
    L: Debug + Clone + Send + Sync + 'static,
318
    R: Debug + Send + Sync + 'static + ResultDescriptor,
319
    Q: Debug + Clone + Send + Sync + 'static,
320
{
321
    pub loading_info: L,
322
    pub result_descriptor: R,
323
    #[serde(skip)]
324
    pub phantom: PhantomData<Q>,
325
}
326

327
#[async_trait]
328
impl<L, R, Q> MetaData<L, R, Q> for StaticMetaData<L, R, Q>
329
where
330
    L: Debug + Clone + Send + Sync + 'static,
331
    R: Debug + Send + Sync + 'static + ResultDescriptor,
332
    Q: Debug + Clone + Send + Sync + 'static,
333
{
334
    async fn loading_info(&self, _query: Q) -> Result<L> {
69✔
335
        Ok(self.loading_info.clone())
336
    }
69✔
337

338
    async fn result_descriptor(&self) -> Result<R> {
53✔
339
        Ok(self.result_descriptor.clone())
340
    }
53✔
341

342
    fn box_clone(&self) -> Box<dyn MetaData<L, R, Q>> {
70✔
343
        Box::new(self.clone())
70✔
344
    }
70✔
345
}
346

347
mod db_types {
348
    use geoengine_datatypes::delegate_from_to_sql;
349
    use postgres_types::{FromSql, ToSql};
350

351
    use super::*;
352

353
    pub type MockMetaData = StaticMetaData<
354
        MockDatasetDataSourceLoadingInfo,
355
        VectorResultDescriptor,
356
        VectorQueryRectangle,
357
    >;
358

359
    #[derive(Debug, ToSql, FromSql)]
×
360
    #[postgres(name = "MockMetaData")]
361
    pub struct MockMetaDataDbType {
362
        pub loading_info: MockDatasetDataSourceLoadingInfo,
363
        pub result_descriptor: VectorResultDescriptor,
364
    }
365

366
    impl From<&MockMetaData> for MockMetaDataDbType {
367
        fn from(other: &MockMetaData) -> Self {
×
368
            Self {
×
369
                loading_info: other.loading_info.clone(),
×
370
                result_descriptor: other.result_descriptor.clone(),
×
371
            }
×
372
        }
×
373
    }
374

375
    impl TryFrom<MockMetaDataDbType> for MockMetaData {
376
        type Error = Error;
377

378
        fn try_from(other: MockMetaDataDbType) -> Result<Self, Self::Error> {
×
379
            Ok(Self {
×
380
                loading_info: other.loading_info,
×
381
                result_descriptor: other.result_descriptor,
×
382
                phantom: PhantomData,
×
383
            })
×
384
        }
×
385
    }
386

387
    pub type OgrMetaData =
388
        StaticMetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>;
389

390
    #[derive(Debug, ToSql, FromSql)]
×
391
    #[postgres(name = "OgrMetaData")]
392
    pub struct OgrMetaDataDbType {
393
        pub loading_info: OgrSourceDataset,
394
        pub result_descriptor: VectorResultDescriptor,
395
    }
396

397
    impl From<&StaticMetaData<OgrSourceDataset, VectorResultDescriptor, VectorQueryRectangle>>
398
        for OgrMetaDataDbType
399
    {
400
        fn from(other: &OgrMetaData) -> Self {
28✔
401
            Self {
28✔
402
                loading_info: other.loading_info.clone(),
28✔
403
                result_descriptor: other.result_descriptor.clone(),
28✔
404
            }
28✔
405
        }
28✔
406
    }
407

408
    impl TryFrom<OgrMetaDataDbType> for OgrMetaData {
409
        type Error = Error;
410

411
        fn try_from(other: OgrMetaDataDbType) -> Result<Self, Self::Error> {
13✔
412
            Ok(Self {
13✔
413
                loading_info: other.loading_info,
13✔
414
                result_descriptor: other.result_descriptor,
13✔
415
                phantom: PhantomData,
13✔
416
            })
13✔
417
        }
13✔
418
    }
419

420
    delegate_from_to_sql!(MockMetaData, MockMetaDataDbType);
421
    delegate_from_to_sql!(OgrMetaData, OgrMetaDataDbType);
422
}
423

424
/// A mock execution context that wraps all operators with a statistics operator.
425
pub struct StatisticsWrappingMockExecutionContext {
426
    pub inner: MockExecutionContext,
427
}
428

429
impl TestDefault for StatisticsWrappingMockExecutionContext {
430
    fn test_default() -> Self {
×
431
        Self {
×
432
            inner: MockExecutionContext::test_default(),
×
433
        }
×
434
    }
×
435
}
436

437
impl StatisticsWrappingMockExecutionContext {
438
    pub fn mock_query_context_with_query_extensions(
×
439
        &self,
×
440
        chunk_byte_size: ChunkByteSize,
×
441
        cache: Option<Arc<SharedCache>>,
×
442
        quota_tracking: Option<QuotaTracking>,
×
443
        quota_checker: Option<QuotaChecker>,
×
444
    ) -> MockQueryContext {
×
445
        self.inner.mock_query_context_with_query_extensions(
×
446
            chunk_byte_size,
×
447
            cache,
×
448
            quota_tracking,
×
449
            quota_checker,
×
450
        )
451
    }
×
452
}
453

454
#[async_trait::async_trait]
455
impl ExecutionContext for StatisticsWrappingMockExecutionContext {
456
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
457
        &self.inner.thread_pool
×
458
    }
×
459

460
    fn tiling_specification(&self) -> TilingSpecification {
×
461
        self.inner.tiling_specification
×
462
    }
×
463

464
    fn wrap_initialized_raster_operator(
×
465
        &self,
×
466
        op: Box<dyn InitializedRasterOperator>,
×
467
        span: CreateSpan,
×
468
    ) -> Box<dyn InitializedRasterOperator> {
×
469
        InitializedOperatorWrapper::new(op, span).boxed()
×
470
    }
×
471

472
    fn wrap_initialized_vector_operator(
×
473
        &self,
×
474
        op: Box<dyn InitializedVectorOperator>,
×
475
        span: CreateSpan,
×
476
    ) -> Box<dyn InitializedVectorOperator> {
×
477
        InitializedOperatorWrapper::new(op, span).boxed()
×
478
    }
×
479

480
    fn wrap_initialized_plot_operator(
×
481
        &self,
×
482
        op: Box<dyn InitializedPlotOperator>,
×
483
        _span: CreateSpan,
×
484
    ) -> Box<dyn InitializedPlotOperator> {
×
485
        op
×
486
    }
×
487

488
    async fn resolve_named_data(&self, data: &NamedData) -> Result<DataId> {
×
489
        self.inner.resolve_named_data(data).await
490
    }
×
491

492
    async fn ml_model_loading_info(&self, name: &MlModelName) -> Result<MlModelLoadingInfo> {
×
493
        self.inner.ml_model_loading_info(name).await
494
    }
×
495
}
496

497
impl GdalProcessPoolAccess for StatisticsWrappingMockExecutionContext {
NEW
498
    fn get_gdal_pool(&self) -> &Arc<GdalProcessPool> {
×
NEW
499
        self.inner.get_gdal_pool()
×
NEW
500
    }
×
501
}
502

503
#[async_trait]
504
impl<L, R, Q> MetaDataProvider<L, R, Q> for StatisticsWrappingMockExecutionContext
505
where
506
    L: 'static,
507
    R: 'static + ResultDescriptor,
508
    Q: 'static,
509
{
510
    async fn meta_data(&self, id: &DataId) -> Result<Box<dyn MetaData<L, R, Q>>> {
×
511
        self.inner.meta_data(id).await
512
    }
×
513
}
514

515
#[cfg(test)]
516
mod tests {
517
    use super::*;
518
    use geoengine_datatypes::collections::VectorDataType;
519
    use geoengine_datatypes::spatial_reference::SpatialReferenceOption;
520

521
    #[tokio::test]
522
    async fn test() {
1✔
523
        let info = StaticMetaData {
1✔
524
            loading_info: 1_i32,
1✔
525
            result_descriptor: VectorResultDescriptor {
1✔
526
                data_type: VectorDataType::Data,
1✔
527
                spatial_reference: SpatialReferenceOption::Unreferenced,
1✔
528
                columns: Default::default(),
1✔
529
                time: None,
1✔
530
                bbox: None,
1✔
531
            },
1✔
532
            phantom: Default::default(),
1✔
533
        };
1✔
534

535
        let info: Box<dyn MetaData<i32, VectorResultDescriptor, VectorQueryRectangle>> =
1✔
536
            Box::new(info);
1✔
537

538
        let info2: Box<dyn Any + Send + Sync> = Box::new(info);
1✔
539

540
        let info3 = info2
1✔
541
            .downcast_ref::<Box<dyn MetaData<i32, VectorResultDescriptor, VectorQueryRectangle>>>()
1✔
542
            .unwrap();
1✔
543

544
        assert_eq!(
1✔
545
            info3.result_descriptor().await.unwrap(),
1✔
546
            VectorResultDescriptor {
1✔
547
                data_type: VectorDataType::Data,
1✔
548
                spatial_reference: SpatialReferenceOption::Unreferenced,
1✔
549
                columns: Default::default(),
1✔
550
                time: None,
1✔
551
                bbox: None,
1✔
552
            }
1✔
553
        );
1✔
554
    }
1✔
555
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc