• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5006008836

pending completion
5006008836

push

github

GitHub
Merge #785 #787

936 of 936 new or added lines in 50 files covered. (100.0%)

96010 of 107707 relevant lines covered (89.14%)

72676.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.15
/operators/src/pro/cache/cache_operator.rs
1
use std::pin::Pin;
2
use std::sync::Arc;
3
use std::task::{Context, Poll};
4

5
use crate::engine::{
6
    CanonicOperatorName, InitializedRasterOperator, QueryContext, QueryProcessor,
7
    RasterResultDescriptor, TypedRasterQueryProcessor,
8
};
9
use crate::util::Result;
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::{ready, Stream};
13
use geoengine_datatypes::primitives::{QueryRectangle, SpatialPartition2D};
14
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
15
use pin_project::{pin_project, pinned_drop};
16
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
17

18
use super::tile_cache::{Cachable, TileCache};
19

20
/// A cache operator that caches the results of its source operator
21
pub struct InitializedCacheOperator<S> {
22
    source: S,
23
}
24

25
impl<S> InitializedCacheOperator<S> {
26
    pub fn new(source: S) -> Self {
1✔
27
        Self { source }
1✔
28
    }
1✔
29
}
30

31
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
32
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
33
        self.source.result_descriptor()
×
34
    }
×
35

36
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
37
        let processor_result = self.source.query_processor();
1✔
38
        match processor_result {
1✔
39
            Ok(p) => {
1✔
40
                let res_processor = match p {
1✔
41
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
1✔
42
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
1✔
43
                    )),
1✔
44
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
45
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
46
                    )),
×
47
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
48
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
49
                    )),
×
50
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
51
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
52
                    )),
×
53
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
54
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
55
                    )),
×
56
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
57
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
58
                    )),
×
59
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
60
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
61
                    )),
×
62
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
63
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
64
                    )),
×
65
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
66
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
67
                    )),
×
68
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
69
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
70
                    )),
×
71
                };
72
                tracing::debug!(event = "query processor created");
1✔
73
                Ok(res_processor)
1✔
74
            }
75
            Err(err) => {
×
76
                tracing::debug!(event = "query processor failed");
×
77
                Err(err)
×
78
            }
79
        }
80
    }
1✔
81

82
    fn canonic_name(&self) -> CanonicOperatorName {
×
83
        self.source.canonic_name()
×
84
    }
×
85
}
86

87
/// A cache operator that caches the results of its source operator
88
struct CacheQueryProcessor<Q, T>
89
where
90
    Q: QueryProcessor<Output = RasterTile2D<T>, SpatialBounds = SpatialPartition2D>,
91
    T: Pixel,
92
{
93
    processor: Q,
94
    cache_key: CanonicOperatorName,
95
}
96

97
impl<Q, T> CacheQueryProcessor<Q, T>
98
where
99
    Q: QueryProcessor<Output = RasterTile2D<T>, SpatialBounds = SpatialPartition2D> + Sized,
100
    T: Pixel,
101
{
102
    pub fn new(processor: Q, cache_key: CanonicOperatorName) -> Self {
1✔
103
        CacheQueryProcessor {
1✔
104
            processor,
1✔
105
            cache_key,
1✔
106
        }
1✔
107
    }
1✔
108
}
109

110
#[async_trait]
111
impl<Q, T> QueryProcessor for CacheQueryProcessor<Q, T>
112
where
113
    Q: QueryProcessor<Output = RasterTile2D<T>, SpatialBounds = SpatialPartition2D> + Sized,
114
    T: Pixel + Cachable,
115
{
116
    type Output = RasterTile2D<T>;
117
    type SpatialBounds = SpatialPartition2D;
118

119
    async fn _query<'a>(
2✔
120
        &'a self,
2✔
121
        query: QueryRectangle<Self::SpatialBounds>,
2✔
122
        ctx: &'a dyn QueryContext,
2✔
123
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
2✔
124
        let tile_cache = ctx
2✔
125
            .extensions()
2✔
126
            .get::<Arc<TileCache>>()
2✔
127
            .expect("`TileCache` extension should be set during `ProContext` creation");
2✔
128

129
        let cache_result = tile_cache
2✔
130
            .query_cache::<T>(self.cache_key.clone(), &query)
2✔
131
            .await;
×
132

133
        if let Some(cache_result) = cache_result {
2✔
134
            // cache hit
135
            log::debug!("cache hit for operator {:?}", self.cache_key);
1✔
136
            return Ok(Box::pin(cache_result));
1✔
137
        }
1✔
138

1✔
139
        // cache miss
1✔
140
        log::debug!("cache miss for operator {:?}", self.cache_key);
1✔
141
        let source_stream = self.processor.query(query, ctx).await?;
1✔
142

143
        let query_id = tile_cache
1✔
144
            .insert_query::<T>(self.cache_key.clone(), &query)
1✔
145
            .await;
×
146

147
        // lazily insert tiles into the cache as they are produced
148
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
1✔
149

1✔
150
        let cache_key = self.cache_key.clone();
1✔
151
        let tile_cache = tile_cache.clone();
1✔
152
        crate::util::spawn(async move {
1✔
153
            while let Some(event) = stream_event_receiver.recv().await {
10✔
154
                match event {
9✔
155
                    SourceStreamEvent::Tile(tile) => {
8✔
156
                        let result = tile_cache
8✔
157
                            .insert_tile(cache_key.clone(), query_id, tile)
8✔
158
                            .await;
×
159
                        log::debug!(
8✔
160
                            "inserted tile into cache for cache key {:?} and query id {}. result: {:?}",
×
161
                            cache_key,
162
                            query_id,
163
                            result
164
                        );
165
                    }
166
                    SourceStreamEvent::Abort => {
167
                        tile_cache.abort_query(cache_key.clone(), query_id).await;
×
168
                        log::debug!(
×
169
                            "aborted cache insertion for cache key {:?} and query id {}",
×
170
                            cache_key,
171
                            query_id
172
                        );
173
                    }
174
                    SourceStreamEvent::Finished => {
175
                        let result = tile_cache.finish_query(cache_key.clone(), query_id).await;
1✔
176
                        log::debug!(
1✔
177
                            "finished cache insertion for cache key {:?} and query id {}, result: {:?}",
×
178
                            cache_key,query_id,
179
                            result
180
                        );
181
                    }
182
                }
183
            }
184
        });
1✔
185

1✔
186
        let output_stream = CacheOutputStream {
1✔
187
            source: source_stream,
1✔
188
            stream_event_sender,
1✔
189
            finished: false,
1✔
190
        };
1✔
191

1✔
192
        Ok(Box::pin(output_stream))
1✔
193
    }
4✔
194
}
195

196
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
197
enum SourceStreamEvent<T> {
198
    Tile(RasterTile2D<T>),
199
    Abort,
200
    Finished,
201
}
202

203
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
204
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
9✔
205
struct CacheOutputStream<S, T>
206
where
207
    S: Stream<Item = Result<RasterTile2D<T>>>,
208
{
209
    #[pin]
210
    source: S,
211
    stream_event_sender: UnboundedSender<SourceStreamEvent<T>>,
212
    finished: bool,
213
}
214

215
impl<S, T> Stream for CacheOutputStream<S, T>
216
where
217
    S: Stream<Item = Result<RasterTile2D<T>>>,
218
    T: Pixel,
219
{
220
    type Item = Result<RasterTile2D<T>>;
221

222
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
9✔
223
        let this = self.project();
9✔
224

225
        let next = ready!(this.source.poll_next(cx));
9✔
226

227
        if let Some(tile) = &next {
9✔
228
            if let Ok(tile) = tile {
8✔
229
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
8✔
230
                let _ = this
8✔
231
                    .stream_event_sender
8✔
232
                    .send(SourceStreamEvent::Tile(tile.clone()));
8✔
233
            } else {
8✔
234
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
×
235
                let _ = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
236
            }
×
237
        } else {
1✔
238
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
1✔
239
            let _ = this.stream_event_sender.send(SourceStreamEvent::Finished);
1✔
240
            *this.finished = true;
1✔
241
        }
1✔
242

243
        Poll::Ready(next)
9✔
244
    }
9✔
245
}
246

247
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
248
#[pinned_drop]
×
249
impl<S, T> PinnedDrop for CacheOutputStream<S, T>
250
where
251
    S: Stream<Item = Result<RasterTile2D<T>>>,
1✔
252
{
1✔
253
    fn drop(self: Pin<&mut Self>) {
1✔
254
        if !self.finished {
1✔
255
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
×
256
            let _ = self.stream_event_sender.send(SourceStreamEvent::Abort);
×
257
        }
1✔
258
    }
1✔
259
}
260

261
#[cfg(test)]
262
mod tests {
263
    use futures::StreamExt;
264
    use geoengine_datatypes::{
265
        primitives::{SpatialResolution, TimeInterval},
266
        util::test::TestDefault,
267
    };
268

269
    use crate::{
270
        engine::{
271
            ChunkByteSize, MockExecutionContext, MockQueryContext, QueryContextExtensions,
272
            RasterOperator, WorkflowOperatorPath,
273
        },
274
        source::{GdalSource, GdalSourceParameters},
275
        util::gdal::add_ndvi_dataset,
276
    };
277

278
    use super::*;
279

280
    #[tokio::test]
1✔
281
    async fn it_caches() {
1✔
282
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
283

1✔
284
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
285

286
        let operator = GdalSource {
1✔
287
            params: GdalSourceParameters { data: ndvi_id },
1✔
288
        }
1✔
289
        .boxed()
1✔
290
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
291
        .await
×
292
        .unwrap();
1✔
293

1✔
294
        let cached_op = InitializedCacheOperator::new(operator);
1✔
295

1✔
296
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
297

1✔
298
        let tile_cache = Arc::new(TileCache::default());
1✔
299

1✔
300
        let mut extensions = QueryContextExtensions::default();
1✔
301

1✔
302
        extensions.insert(tile_cache);
1✔
303

1✔
304
        let query_ctx =
1✔
305
            MockQueryContext::new_with_query_extensions(ChunkByteSize::test_default(), extensions);
1✔
306

307
        let stream = processor
1✔
308
            .query(
1✔
309
                QueryRectangle {
1✔
310
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
311
                        [-180., -90.].into(),
1✔
312
                        [180., 90.].into(),
1✔
313
                    ),
1✔
314
                    time_interval: TimeInterval::default(),
1✔
315
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
316
                },
1✔
317
                &query_ctx,
1✔
318
            )
1✔
319
            .await
×
320
            .unwrap();
1✔
321

322
        let tiles = stream.collect::<Vec<_>>().await;
1✔
323
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
324

1✔
325
        // wait for the cache to be filled, which happens asynchronously
1✔
326
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
327

328
        let stream_from_cache = processor
1✔
329
            .query(
1✔
330
                QueryRectangle {
1✔
331
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
332
                        [-180., -90.].into(),
1✔
333
                        [180., 90.].into(),
1✔
334
                    ),
1✔
335
                    time_interval: TimeInterval::default(),
1✔
336
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
337
                },
1✔
338
                &query_ctx,
1✔
339
            )
1✔
340
            .await
×
341
            .unwrap();
1✔
342

343
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
344
        let tiles_from_cache = tiles_from_cache
1✔
345
            .into_iter()
1✔
346
            .collect::<Result<Vec<_>>>()
1✔
347
            .unwrap();
1✔
348

1✔
349
        // TODO: how to ensure the tiles are actually from the cache?
1✔
350

1✔
351
        assert_eq!(tiles, tiles_from_cache);
1✔
352
    }
353
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc