• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5620643772

21 Jul 2023 09:09AM UTC coverage: 89.043% (-0.2%) from 89.194%
5620643772

Pull #833

github

web-flow
Merge 1ee0a296a into 2852314aa
Pull Request #833: Shared-cache

1128 of 1128 new or added lines in 9 files covered. (100.0%)

102925 of 115590 relevant lines covered (89.04%)

62633.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.97
/operators/src/pro/cache/cache_operator.rs
1
use std::pin::Pin;
2
use std::sync::Arc;
3
use std::task::{Context, Poll};
4

5
use crate::engine::{
6
    CanonicOperatorName, InitializedRasterOperator, QueryContext, QueryProcessor,
7
    RasterResultDescriptor, TypedRasterQueryProcessor,
8
};
9
use crate::pro::cache::shared_cache::{AsyncCache, SharedCache};
10
use crate::util::Result;
11
use async_trait::async_trait;
12
use futures::stream::BoxStream;
13
use futures::{ready, Stream};
14
use geoengine_datatypes::primitives::{QueryRectangle, SpatialPartition2D};
15
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
16
use pin_project::{pin_project, pinned_drop};
17
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
18

19
use super::shared_cache::CachableSubType;
20

21
/// A cache operator that caches the results of its source operator
22
pub struct InitializedCacheOperator<S> {
23
    source: S,
24
}
25

26
impl<S> InitializedCacheOperator<S> {
27
    pub fn new(source: S) -> Self {
1✔
28
        Self { source }
1✔
29
    }
1✔
30
}
31

32
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
33
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
34
        self.source.result_descriptor()
×
35
    }
×
36

37
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
38
        let processor_result = self.source.query_processor();
1✔
39
        match processor_result {
1✔
40
            Ok(p) => {
1✔
41
                let res_processor = match p {
1✔
42
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
1✔
43
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
1✔
44
                    )),
1✔
45
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
46
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
47
                    )),
×
48
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
49
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
50
                    )),
×
51
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
52
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
53
                    )),
×
54
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
55
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
56
                    )),
×
57
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
58
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
59
                    )),
×
60
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
61
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
62
                    )),
×
63
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
64
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
65
                    )),
×
66
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
67
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
68
                    )),
×
69
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
70
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
71
                    )),
×
72
                };
73
                tracing::debug!(event = "query processor created");
1✔
74
                Ok(res_processor)
1✔
75
            }
76
            Err(err) => {
×
77
                tracing::debug!(event = "query processor failed");
×
78
                Err(err)
×
79
            }
80
        }
81
    }
1✔
82

83
    fn canonic_name(&self) -> CanonicOperatorName {
×
84
        self.source.canonic_name()
×
85
    }
×
86
}
87

88
/// A cache operator that caches the results of its source operator
89
struct CacheQueryProcessor<Q, T>
90
where
91
    Q: QueryProcessor<Output = RasterTile2D<T>, SpatialBounds = SpatialPartition2D>,
92
    T: Pixel,
93
{
94
    processor: Q,
95
    cache_key: CanonicOperatorName,
96
}
97

98
impl<Q, T> CacheQueryProcessor<Q, T>
99
where
100
    Q: QueryProcessor<Output = RasterTile2D<T>, SpatialBounds = SpatialPartition2D> + Sized,
101
    T: Pixel,
102
{
103
    pub fn new(processor: Q, cache_key: CanonicOperatorName) -> Self {
1✔
104
        CacheQueryProcessor {
1✔
105
            processor,
1✔
106
            cache_key,
1✔
107
        }
1✔
108
    }
1✔
109
}
110

111
#[async_trait]
112
impl<Q, T> QueryProcessor for CacheQueryProcessor<Q, T>
113
where
114
    Q: QueryProcessor<Output = RasterTile2D<T>, SpatialBounds = SpatialPartition2D> + Sized,
115
    T: Pixel + CachableSubType<CacheElementType = RasterTile2D<T>>,
116
    RasterTile2D<T>: Send,
117
    SpatialPartition2D: Send,
118
{
119
    type Output = RasterTile2D<T>;
120
    type SpatialBounds = SpatialPartition2D;
121

122
    async fn _query<'a>(
2✔
123
        &'a self,
2✔
124
        query: QueryRectangle<Self::SpatialBounds>,
2✔
125
        ctx: &'a dyn QueryContext,
2✔
126
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
2✔
127
        let tile_cache = ctx
2✔
128
            .extensions()
2✔
129
            .get::<Arc<SharedCache>>()
2✔
130
            .expect("`TileCache` extension should be set during `ProContext` creation");
2✔
131

132
        let cache_result = tile_cache.query_cache::<T>(&self.cache_key, &query).await;
2✔
133

134
        if let Ok(Some(cache_result)) = cache_result {
2✔
135
            // cache hit
136
            log::debug!("cache hit for operator {}", self.cache_key);
×
137

138
            return Ok(Box::pin(cache_result));
×
139
        }
2✔
140

2✔
141
        // cache miss
2✔
142
        log::debug!("cache miss for operator {}", self.cache_key);
2✔
143
        let source_stream = self.processor.query(query, ctx).await?;
2✔
144

145
        let query_id = tile_cache.insert_query::<T>(&self.cache_key, &query).await;
2✔
146

147
        if let Err(e) = query_id {
2✔
148
            log::debug!("could not insert query into cache: {}", e);
×
149
            return Ok(source_stream);
×
150
        }
2✔
151

2✔
152
        let query_id = query_id.expect("query_id should be set because of the previous check");
2✔
153

2✔
154
        // lazily insert tiles into the cache as they are produced
2✔
155
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
2✔
156

2✔
157
        let cache_key = self.cache_key.clone();
2✔
158
        let tile_cache = tile_cache.clone();
2✔
159
        crate::util::spawn(async move {
2✔
160
            while let Some(event) = stream_event_receiver.recv().await {
10✔
161
                match event {
9✔
162
                    SourceStreamEvent::Tile(tile) => {
8✔
163
                        let result = tile_cache
8✔
164
                            .insert_query_element::<T>(&cache_key, &query_id, tile)
8✔
165
                            .await;
×
166
                        log::trace!(
8✔
167
                            "inserted tile into cache for cache key {} and query id {}. result: {:?}",
×
168
                            cache_key,
169
                            query_id,
170
                            result
171
                        );
172
                    }
173
                    SourceStreamEvent::Abort => {
174
                        tile_cache.abort_query::<T>(&cache_key, &query_id).await;
×
175
                        log::debug!(
×
176
                            "aborted cache insertion for cache key {} and query id {}",
×
177
                            cache_key,
178
                            query_id
179
                        );
180
                    }
181
                    SourceStreamEvent::Finished => {
182
                        let result = tile_cache.finish_query::<T>(&cache_key, &query_id).await;
1✔
183
                        log::debug!(
1✔
184
                            "finished cache insertion for cache key {} and query id {}, result: {:?}",
×
185
                            cache_key,query_id,
186
                            result
187
                        );
188
                    }
189
                }
190
            }
191
        });
2✔
192

2✔
193
        let output_stream = CacheOutputStream {
2✔
194
            source: source_stream,
2✔
195
            stream_event_sender,
2✔
196
            finished: false,
2✔
197
            pristine: true,
2✔
198
        };
2✔
199

2✔
200
        Ok(Box::pin(output_stream))
2✔
201
    }
4✔
202
}
203

204
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
205
enum SourceStreamEvent<T> {
206
    Tile(RasterTile2D<T>),
207
    Abort,
208
    Finished,
209
}
210

211
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
212
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
18✔
213
struct CacheOutputStream<S, T>
214
where
215
    S: Stream<Item = Result<RasterTile2D<T>>>,
216
{
217
    #[pin]
218
    source: S,
219
    stream_event_sender: UnboundedSender<SourceStreamEvent<T>>,
220
    finished: bool,
221
    pristine: bool,
222
}
223

224
impl<S, T> Stream for CacheOutputStream<S, T>
225
where
226
    S: Stream<Item = Result<RasterTile2D<T>>>,
227
    T: Pixel,
228
{
229
    type Item = Result<RasterTile2D<T>>;
230

231
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
18✔
232
        let this = self.project();
18✔
233

234
        let next = ready!(this.source.poll_next(cx));
18✔
235

236
        if let Some(tile) = &next {
18✔
237
            *this.pristine = false;
16✔
238
            if let Ok(tile) = tile {
16✔
239
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
240
                let r = this
16✔
241
                    .stream_event_sender
16✔
242
                    .send(SourceStreamEvent::Tile(tile.clone()));
16✔
243
                if let Err(e) = r {
16✔
244
                    log::warn!("could not send tile to cache: {}", e.to_string());
×
245
                }
16✔
246
            } else {
247
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
248
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
249
                if let Err(e) = r {
×
250
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
251
                }
×
252
            }
253
        } else {
254
            if *this.pristine {
2✔
255
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
256
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
257
                if let Err(e) = r {
×
258
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
259
                }
×
260
            } else {
261
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
262
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
2✔
263
                if let Err(e) = r {
2✔
264
                    log::warn!("could not send finished to cache: {}", e.to_string());
×
265
                }
2✔
266
                log::debug!("stream finished, mark cache entry as finished.");
2✔
267
            }
268
            *this.finished = true;
2✔
269
        }
270

271
        Poll::Ready(next)
18✔
272
    }
18✔
273
}
274

275
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
276
#[pinned_drop]
×
277
impl<S, T> PinnedDrop for CacheOutputStream<S, T>
278
where
279
    S: Stream<Item = Result<RasterTile2D<T>>>,
2✔
280
{
2✔
281
    fn drop(self: Pin<&mut Self>) {
2✔
282
        if !self.finished {
2✔
283
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
2✔
284
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
2✔
285
            if let Err(e) = r {
2✔
286
                log::debug!("could not send abort to cache: {}", e.to_string());
2✔
287
            }
2✔
288
        }
2✔
289
    }
2✔
290
}
291

292
#[cfg(test)]
293
mod tests {
294
    use futures::StreamExt;
295
    use geoengine_datatypes::{
296
        primitives::{SpatialResolution, TimeInterval},
297
        raster::TilesEqualIgnoringCacheHint,
298
        util::test::TestDefault,
299
    };
300

301
    use crate::{
302
        engine::{
303
            ChunkByteSize, MockExecutionContext, MockQueryContext, QueryContextExtensions,
304
            RasterOperator, WorkflowOperatorPath,
305
        },
306
        source::{GdalSource, GdalSourceParameters},
307
        util::gdal::add_ndvi_dataset,
308
    };
309

310
    use super::*;
311

312
    #[tokio::test]
1✔
313
    async fn it_caches() {
1✔
314
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
315

1✔
316
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
317

318
        let operator = GdalSource {
1✔
319
            params: GdalSourceParameters { data: ndvi_id },
1✔
320
        }
1✔
321
        .boxed()
1✔
322
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
323
        .await
×
324
        .unwrap();
1✔
325

1✔
326
        let cached_op = InitializedCacheOperator::new(operator);
1✔
327

1✔
328
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
329

1✔
330
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
331

1✔
332
        let mut extensions = QueryContextExtensions::default();
1✔
333

1✔
334
        extensions.insert(tile_cache);
1✔
335

1✔
336
        let query_ctx =
1✔
337
            MockQueryContext::new_with_query_extensions(ChunkByteSize::test_default(), extensions);
1✔
338

339
        let stream = processor
1✔
340
            .query(
1✔
341
                QueryRectangle {
1✔
342
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
343
                        [-180., -90.].into(),
1✔
344
                        [180., 90.].into(),
1✔
345
                    ),
1✔
346
                    time_interval: TimeInterval::default(),
1✔
347
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
348
                },
1✔
349
                &query_ctx,
1✔
350
            )
1✔
351
            .await
×
352
            .unwrap();
1✔
353

354
        let tiles = stream.collect::<Vec<_>>().await;
1✔
355
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
356

1✔
357
        // wait for the cache to be filled, which happens asynchronously
1✔
358
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
359

360
        let stream_from_cache = processor
1✔
361
            .query(
1✔
362
                QueryRectangle {
1✔
363
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
364
                        [-180., -90.].into(),
1✔
365
                        [180., 90.].into(),
1✔
366
                    ),
1✔
367
                    time_interval: TimeInterval::default(),
1✔
368
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
369
                },
1✔
370
                &query_ctx,
1✔
371
            )
1✔
372
            .await
×
373
            .unwrap();
1✔
374

375
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
376
        let tiles_from_cache = tiles_from_cache
1✔
377
            .into_iter()
1✔
378
            .collect::<Result<Vec<_>>>()
1✔
379
            .unwrap();
1✔
380

1✔
381
        // TODO: how to ensure the tiles are actually from the cache?
1✔
382

1✔
383
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
384
    }
385
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc