• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 26897733292

03 Jun 2026 04:15PM UTC coverage: 87.02% (-0.2%) from 87.218%
26897733292

Pull #1192

github

web-flow
Merge 536ad7e6f into 255ac7144
Pull Request #1192: feat: add Gdal process pool

1479 of 1957 new or added lines in 28 files covered. (75.57%)

18 existing lines in 7 files now uncovered.

117755 of 135320 relevant lines covered (87.02%)

478060.06 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

71.67
/geoengine/operators/src/engine/query.rs
1
use std::{
2
    pin::Pin,
3
    sync::Arc,
4
    task::{Context, Poll},
5
};
6

7
use crate::{
8
    cache::shared_cache::SharedCache,
9
    error,
10
    meta::quota::QuotaChecker,
11
    source::gdal_source::{GdalProcessPool, GdalProcessPoolAccess},
12
    util::create_rayon_thread_pool,
13
};
14
use crate::{meta::quota::QuotaTracking, util::Result};
15
use futures::Stream;
16
use geoengine_datatypes::{raster::TilingSpecification, util::test::TestDefault};
17
use pin_project::pin_project;
18
use rayon::ThreadPool;
19
use serde::{Deserialize, Serialize};
20
use stream_cancel::{Trigger, Valve, Valved};
21

22
/// Defines the size in bytes of a vector data chunk
23
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug, Serialize, Deserialize)]
24
pub struct ChunkByteSize(usize);
25

26
impl ChunkByteSize {
27
    pub const MIN: ChunkByteSize = ChunkByteSize(usize::MIN);
28
    pub const MAX: ChunkByteSize = ChunkByteSize(usize::MAX);
29

30
    pub fn new(cbs: usize) -> Self {
×
31
        ChunkByteSize(cbs)
×
32
    }
×
33

34
    pub fn bytes(self) -> usize {
×
35
        self.0
×
36
    }
×
37
}
38

39
impl From<usize> for ChunkByteSize {
40
    fn from(size: usize) -> Self {
27✔
41
        ChunkByteSize(size)
27✔
42
    }
27✔
43
}
44

45
impl From<ChunkByteSize> for usize {
46
    fn from(cbs: ChunkByteSize) -> Self {
153✔
47
        cbs.0
153✔
48
    }
153✔
49
}
50

51
impl TestDefault for ChunkByteSize {
52
    fn test_default() -> Self {
482✔
53
        Self(1024 * 1024)
482✔
54
    }
482✔
55
}
56

57
pub trait QueryContext: Send + Sync + GdalProcessPoolAccess {
58
    fn chunk_byte_size(&self) -> ChunkByteSize;
59
    fn tiling_specification(&self) -> TilingSpecification;
60
    fn thread_pool(&self) -> &Arc<ThreadPool>;
61

62
    fn quota_tracking(&self) -> Option<&QuotaTracking>;
63

64
    fn quota_checker(&self) -> Option<&QuotaChecker>;
65

66
    fn cache(&self) -> Option<Arc<SharedCache>>;
67

68
    fn abort_registration(&self) -> &QueryAbortRegistration;
69
    fn abort_trigger(&mut self) -> Result<QueryAbortTrigger>;
70

NEW
71
    fn gdal_process_pool(&self) -> &Arc<GdalProcessPool> {
×
NEW
72
        self.get_gdal_pool()
×
NEW
73
    }
×
74
}
75

76
/// This type allow wrapping multiple streams with `QueryAbortWrapper`s that
77
/// can all be aborted at the same time using the corresponding `QueryAbortTrigger`.
78
pub struct QueryAbortRegistration {
79
    valve: Valve,
80
}
81

82
impl QueryAbortRegistration {
83
    pub fn new() -> (Self, QueryAbortTrigger) {
335✔
84
        let (trigger, valve) = Valve::new();
335✔
85

86
        (Self { valve }, QueryAbortTrigger { trigger })
335✔
87
    }
335✔
88

89
    pub fn wrap<S: Stream>(&self, stream: S) -> QueryAbortWrapper<S> {
2,356✔
90
        QueryAbortWrapper {
2,356✔
91
            valved: self.valve.wrap(stream),
2,356✔
92
        }
2,356✔
93
    }
2,356✔
94
}
95

96
/// This type wraps a stream and allows aborting it using the corresponding `QueryAbortTrigger`
97
/// from its `QueryAbortRegistration`.
98
#[pin_project(project = AbortWrapperProjection)]
99
pub struct QueryAbortWrapper<S> {
100
    #[pin]
101
    valved: Valved<S>,
102
}
103

104
impl<S> Stream for QueryAbortWrapper<S>
105
where
106
    S: Stream,
107
{
108
    type Item = S::Item;
109

110
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
455,468✔
111
        self.project().valved.poll_next(cx)
455,468✔
112
    }
455,468✔
113
}
114

115
/// This type allows aborting all streams that were wrapped using the corresponding
116
/// `QueryAbortRegistration`.
117
pub struct QueryAbortTrigger {
118
    trigger: Trigger,
119
}
120

121
impl QueryAbortTrigger {
122
    pub fn abort(self) {
2✔
123
        self.trigger.cancel();
2✔
124
    }
2✔
125
}
126

127
pub struct MockQueryContext {
128
    pub chunk_byte_size: ChunkByteSize,
129
    pub tiling_specification: TilingSpecification,
130
    pub thread_pool: Arc<ThreadPool>,
131

132
    pub cache: Option<Arc<SharedCache>>,
133
    pub quota_tracking: Option<QuotaTracking>,
134
    pub quota_checker: Option<QuotaChecker>,
135

136
    pub abort_registration: QueryAbortRegistration,
137
    pub abort_trigger: Option<QueryAbortTrigger>,
138

139
    gdal_process_pool: Arc<GdalProcessPool>,
140
}
141

142
impl MockQueryContext {
143
    pub(super) fn new(
284✔
144
        chunk_byte_size: ChunkByteSize,
284✔
145
        tiling_specification: TilingSpecification,
284✔
146
        gdal_process_pool: Arc<GdalProcessPool>,
284✔
147
    ) -> Self {
284✔
148
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
284✔
149
        Self {
284✔
150
            chunk_byte_size,
284✔
151
            tiling_specification,
284✔
152
            thread_pool: create_rayon_thread_pool(0),
284✔
153
            cache: None,
284✔
154
            quota_checker: None,
284✔
155
            quota_tracking: None,
284✔
156
            abort_registration,
284✔
157
            abort_trigger: Some(abort_trigger),
284✔
158
            gdal_process_pool,
284✔
159
        }
284✔
160
    }
284✔
161

162
    pub(super) fn new_with_query_extensions(
3✔
163
        chunk_byte_size: ChunkByteSize,
3✔
164
        tiling_specification: TilingSpecification,
3✔
165
        gdal_process_pool: Arc<GdalProcessPool>,
3✔
166
        cache: Option<Arc<SharedCache>>,
3✔
167
        quota_tracking: Option<QuotaTracking>,
3✔
168
        quota_checker: Option<QuotaChecker>,
3✔
169
    ) -> Self {
3✔
170
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
3✔
171
        Self {
3✔
172
            chunk_byte_size,
3✔
173
            tiling_specification,
3✔
174
            thread_pool: create_rayon_thread_pool(0),
3✔
175
            cache,
3✔
176
            quota_checker,
3✔
177
            quota_tracking,
3✔
178
            abort_registration,
3✔
179
            abort_trigger: Some(abort_trigger),
3✔
180
            gdal_process_pool,
3✔
181
        }
3✔
182
    }
3✔
183

184
    pub(super) fn with_chunk_size_and_thread_count(
×
185
        chunk_byte_size: ChunkByteSize,
×
186
        tiling_specification: TilingSpecification,
×
187
        num_threads: usize,
×
NEW
188
        gdal_process_pool: Arc<GdalProcessPool>,
×
189
    ) -> Self {
×
190
        let (abort_registration, abort_trigger) = QueryAbortRegistration::new();
×
191
        Self {
×
192
            chunk_byte_size,
×
193
            tiling_specification,
×
194
            thread_pool: create_rayon_thread_pool(num_threads),
×
NEW
195
            gdal_process_pool,
×
196
            cache: None,
×
197
            quota_checker: None,
×
198
            quota_tracking: None,
×
199
            abort_registration,
×
200
            abort_trigger: Some(abort_trigger),
×
201
        }
×
202
    }
×
203
}
204

205
impl QueryContext for MockQueryContext {
206
    fn chunk_byte_size(&self) -> ChunkByteSize {
129✔
207
        self.chunk_byte_size
129✔
208
    }
129✔
209

210
    fn thread_pool(&self) -> &Arc<ThreadPool> {
66,788✔
211
        &self.thread_pool
66,788✔
212
    }
66,788✔
213

214
    fn abort_registration(&self) -> &QueryAbortRegistration {
1,751✔
215
        &self.abort_registration
1,751✔
216
    }
1,751✔
217

218
    fn abort_trigger(&mut self) -> Result<QueryAbortTrigger> {
20✔
219
        self.abort_trigger
20✔
220
            .take()
20✔
221
            .ok_or(error::Error::AbortTriggerAlreadyUsed)
20✔
222
    }
20✔
223

224
    fn tiling_specification(&self) -> TilingSpecification {
405✔
225
        self.tiling_specification
405✔
226
    }
405✔
227

228
    fn quota_tracking(&self) -> Option<&QuotaTracking> {
×
229
        self.quota_tracking.as_ref()
×
230
    }
×
231

232
    fn quota_checker(&self) -> Option<&QuotaChecker> {
×
233
        self.quota_checker.as_ref()
×
234
    }
×
235

236
    fn cache(&self) -> Option<Arc<SharedCache>> {
7✔
237
        self.cache.clone()
7✔
238
    }
7✔
239
}
240

241
impl GdalProcessPoolAccess for MockQueryContext {
242
    fn get_gdal_pool(&self) -> &Arc<GdalProcessPool> {
257✔
243
        &self.gdal_process_pool
257✔
244
    }
257✔
245
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc