• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5856046428

14 Aug 2023 01:37PM UTC coverage: 89.484% (-0.1%) from 89.596%
5856046428

push

github

web-flow
Merge pull request #848 from geo-engine/compressed-raster-cache

compress raster tile cache

475 of 475 new or added lines in 4 files covered. (100.0%)

104049 of 116277 relevant lines covered (89.48%)

62266.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.93
/operators/src/pro/cache/cache_operator.rs
1
use super::error::CacheError;
2
use super::shared_cache::CacheElement;
3
use crate::adapters::FeatureCollectionChunkMerger;
4
use crate::engine::{
5
    CanonicOperatorName, ChunkByteSize, InitializedRasterOperator, InitializedVectorOperator,
6
    QueryContext, QueryProcessor, RasterResultDescriptor, TypedRasterQueryProcessor,
7
};
8
use crate::error::Error;
9
use crate::pro::cache::shared_cache::{AsyncCache, SharedCache};
10
use crate::util::Result;
11
use async_trait::async_trait;
12
use futures::stream::{BoxStream, FusedStream};
13
use futures::{ready, Stream, TryStreamExt};
14
use geoengine_datatypes::collections::FeatureCollection;
15
use geoengine_datatypes::primitives::{AxisAlignedRectangle, Geometry, QueryRectangle};
16
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
17
use geoengine_datatypes::util::arrow::ArrowTyped;
18
use pin_project::{pin_project, pinned_drop};
19
use std::pin::Pin;
20
use std::sync::Arc;
21
use std::task::{Context, Poll};
22
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
23

24
/// A cache operator that caches the results of its source operator
25
pub struct InitializedCacheOperator<S> {
26
    source: S,
27
}
28

29
impl<S> InitializedCacheOperator<S> {
30
    pub fn new(source: S) -> Self {
1✔
31
        Self { source }
1✔
32
    }
1✔
33
}
34

35
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
36
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
37
        self.source.result_descriptor()
×
38
    }
×
39

40
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
41
        let processor_result = self.source.query_processor();
1✔
42
        match processor_result {
1✔
43
            Ok(p) => {
1✔
44
                let res_processor = match p {
1✔
45
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
1✔
46
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
1✔
47
                    )),
1✔
48
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
49
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
50
                    )),
×
51
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
52
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
53
                    )),
×
54
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
55
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
56
                    )),
×
57
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
58
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
59
                    )),
×
60
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
61
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
62
                    )),
×
63
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
64
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
65
                    )),
×
66
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
67
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
68
                    )),
×
69
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
70
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
71
                    )),
×
72
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
73
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
74
                    )),
×
75
                };
76
                tracing::debug!(event = "query processor created");
1✔
77
                Ok(res_processor)
1✔
78
            }
79
            Err(err) => {
×
80
                tracing::debug!(event = "query processor failed");
×
81
                Err(err)
×
82
            }
83
        }
84
    }
1✔
85

86
    fn canonic_name(&self) -> CanonicOperatorName {
×
87
        self.source.canonic_name()
×
88
    }
×
89
}
90

91
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
92
    fn result_descriptor(&self) -> &crate::engine::VectorResultDescriptor {
×
93
        self.source.result_descriptor()
×
94
    }
×
95

96
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
×
97
        let processor_result = self.source.query_processor();
×
98
        match processor_result {
×
99
            Ok(p) => {
×
100
                let res_processor = match p {
×
101
                    crate::engine::TypedVectorQueryProcessor::Data(p) => {
×
102
                        crate::engine::TypedVectorQueryProcessor::Data(Box::new(
×
103
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
104
                        ))
×
105
                    }
106
                    crate::engine::TypedVectorQueryProcessor::MultiPoint(p) => {
×
107
                        crate::engine::TypedVectorQueryProcessor::MultiPoint(Box::new(
×
108
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
109
                        ))
×
110
                    }
111
                    crate::engine::TypedVectorQueryProcessor::MultiLineString(p) => {
×
112
                        crate::engine::TypedVectorQueryProcessor::MultiLineString(Box::new(
×
113
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
114
                        ))
×
115
                    }
116
                    crate::engine::TypedVectorQueryProcessor::MultiPolygon(p) => {
×
117
                        crate::engine::TypedVectorQueryProcessor::MultiPolygon(Box::new(
×
118
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
119
                        ))
×
120
                    }
121
                };
122
                tracing::debug!(event = "query processor created");
×
123

124
                Ok(res_processor)
×
125
            }
126
            Err(err) => {
×
127
                tracing::debug!(event = "query processor failed");
×
128
                Err(err)
×
129
            }
130
        }
131
    }
×
132

133
    fn canonic_name(&self) -> CanonicOperatorName {
×
134
        self.source.canonic_name()
×
135
    }
×
136
}
137

138
/// A cache operator that caches the results of its source operator
139
struct CacheQueryProcessor<P, E, Q>
140
where
141
    E: CacheElement + Send + Sync + 'static,
142
    P: QueryProcessor<Output = E, SpatialBounds = Q>,
143
{
144
    processor: P,
145
    cache_key: CanonicOperatorName,
146
}
147

148
impl<P, E, Q> CacheQueryProcessor<P, E, Q>
149
where
150
    E: CacheElement + Send + Sync + 'static,
151
    P: QueryProcessor<Output = E, SpatialBounds = Q> + Sized,
152
{
153
    pub fn new(processor: P, cache_key: CanonicOperatorName) -> Self {
1✔
154
        CacheQueryProcessor {
1✔
155
            processor,
1✔
156
            cache_key,
1✔
157
        }
1✔
158
    }
1✔
159
}
160

161
#[async_trait]
162
impl<P, E, S> QueryProcessor for CacheQueryProcessor<P, E, S>
163
where
164
    P: QueryProcessor<Output = E, SpatialBounds = S> + Sized,
165
    S: AxisAlignedRectangle + Send + Sync + 'static,
166
    E: CacheElement<Query = QueryRectangle<S>>
167
        + Send
168
        + Sync
169
        + 'static
170
        + ResultStreamWrapper
171
        + Clone,
172
    E::ResultStream: Stream<Item = Result<E, CacheError>> + Send + Sync + 'static,
173
    SharedCache: AsyncCache<E>,
174
{
175
    type Output = E;
176
    type SpatialBounds = S;
177

178
    async fn _query<'a>(
2✔
179
        &'a self,
2✔
180
        query: QueryRectangle<Self::SpatialBounds>,
2✔
181
        ctx: &'a dyn QueryContext,
2✔
182
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
2✔
183
        let shared_cache = ctx
2✔
184
            .extensions()
2✔
185
            .get::<Arc<SharedCache>>()
2✔
186
            .expect("`SharedCache` extension should be set during `ProContext` creation");
2✔
187

188
        let cache_result = shared_cache.query_cache(&self.cache_key, &query).await;
2✔
189

190
        if let Ok(Some(cache_result)) = cache_result {
2✔
191
            // cache hit
192
            log::debug!("cache hit for operator {}", self.cache_key);
×
193

194
            let wrapped_result_steam = E::wrap_result_stream(cache_result, ctx.chunk_byte_size());
×
195

×
196
            return Ok(wrapped_result_steam);
×
197
        }
2✔
198

2✔
199
        // cache miss
2✔
200
        log::debug!("cache miss for operator {}", self.cache_key);
2✔
201
        let source_stream = self.processor.query(query, ctx).await?;
2✔
202

203
        let query_id = shared_cache.insert_query(&self.cache_key, &query).await;
2✔
204

205
        if let Err(e) = query_id {
2✔
206
            log::debug!("could not insert query into cache: {}", e);
×
207
            return Ok(source_stream);
×
208
        }
2✔
209

2✔
210
        let query_id = query_id.expect("query_id should be set because of the previous check");
2✔
211

2✔
212
        // lazily insert tiles into the cache as they are produced
2✔
213
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
2✔
214

2✔
215
        let cache_key = self.cache_key.clone();
2✔
216
        let tile_cache = shared_cache.clone();
2✔
217
        crate::util::spawn(async move {
2✔
218
            while let Some(event) = stream_event_receiver.recv().await {
10✔
219
                match event {
9✔
220
                    SourceStreamEvent::Element(tile) => {
8✔
221
                        let result = tile_cache
8✔
222
                            .insert_query_element(&cache_key, &query_id, tile)
8✔
223
                            .await;
8✔
224
                        log::trace!(
8✔
225
                            "inserted tile into cache for cache key {} and query id {}. result: {:?}",
×
226
                            cache_key,
227
                            query_id,
228
                            result
229
                        );
230
                    }
231
                    SourceStreamEvent::Abort => {
232
                        tile_cache.abort_query(&cache_key, &query_id).await;
×
233
                        log::debug!(
×
234
                            "aborted cache insertion for cache key {} and query id {}",
×
235
                            cache_key,
236
                            query_id
237
                        );
238
                    }
239
                    SourceStreamEvent::Finished => {
240
                        let result = tile_cache.finish_query(&cache_key, &query_id).await;
1✔
241
                        log::debug!(
1✔
242
                            "finished cache insertion for cache key {} and query id {}, result: {:?}",
×
243
                            cache_key,query_id,
244
                            result
245
                        );
246
                    }
247
                }
248
            }
249
        });
2✔
250

2✔
251
        let output_stream = CacheOutputStream {
2✔
252
            source: source_stream,
2✔
253
            stream_event_sender,
2✔
254
            finished: false,
2✔
255
            pristine: true,
2✔
256
        };
2✔
257

2✔
258
        Ok(Box::pin(output_stream))
2✔
259
    }
4✔
260
}
261

262
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
263
enum SourceStreamEvent<E: CacheElement> {
264
    Element(E),
265
    Abort,
266
    Finished,
267
}
268

269
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
270
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
18✔
271
struct CacheOutputStream<S, E>
272
where
273
    S: Stream<Item = Result<E>>,
274
    E: CacheElement + Clone,
275
{
276
    #[pin]
277
    source: S,
278
    stream_event_sender: UnboundedSender<SourceStreamEvent<E>>,
279
    finished: bool,
280
    pristine: bool,
281
}
282

283
impl<S, E> Stream for CacheOutputStream<S, E>
284
where
285
    S: Stream<Item = Result<E>>,
286
    E: CacheElement + Clone,
287
{
288
    type Item = Result<E>;
289

290
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
18✔
291
        let this = self.project();
18✔
292

293
        let next = ready!(this.source.poll_next(cx));
18✔
294

295
        if let Some(element) = &next {
18✔
296
            *this.pristine = false;
16✔
297
            if let Ok(element) = element {
16✔
298
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
299
                let r = this
16✔
300
                    .stream_event_sender
16✔
301
                    .send(SourceStreamEvent::Element(element.clone()));
16✔
302
                if let Err(e) = r {
16✔
303
                    log::warn!("could not send tile to cache: {}", e.to_string());
×
304
                }
16✔
305
            } else {
306
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
307
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
308
                if let Err(e) = r {
×
309
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
310
                }
×
311
            }
312
        } else {
313
            if *this.pristine {
2✔
314
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
315
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
316
                if let Err(e) = r {
×
317
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
318
                }
×
319
            } else {
320
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
321
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
2✔
322
                if let Err(e) = r {
2✔
323
                    log::warn!("could not send finished to cache: {}", e.to_string());
×
324
                }
2✔
325
                log::debug!("stream finished, mark cache entry as finished.");
2✔
326
            }
327
            *this.finished = true;
2✔
328
        }
329

330
        Poll::Ready(next)
18✔
331
    }
18✔
332
}
333

334
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
335
#[pinned_drop]
×
336
impl<S, E> PinnedDrop for CacheOutputStream<S, E>
337
where
338
    S: Stream<Item = Result<E>>,
339
    E: CacheElement + Clone,
2✔
340
{
2✔
341
    fn drop(self: Pin<&mut Self>) {
2✔
342
        if !self.finished {
2✔
343
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
2✔
344
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
2✔
345
            if let Err(e) = r {
2✔
346
                log::debug!("could not send abort to cache: {}", e.to_string());
2✔
347
            }
2✔
348
        }
2✔
349
    }
2✔
350
}
351

352
trait ResultStreamWrapper: CacheElement {
353
    fn wrap_result_stream<'a>(
354
        stream: Self::ResultStream,
355
        chunk_byte_size: ChunkByteSize,
356
    ) -> BoxStream<'a, Result<Self>>;
357
}
358

359
impl<G> ResultStreamWrapper for FeatureCollection<G>
360
where
361
    G: Geometry + ArrowTyped + Send + Sync + 'static,
362
    FeatureCollection<G>: CacheElement + Send + Sync,
363
    Self::ResultStream: FusedStream + Send + Sync,
364
{
365
    fn wrap_result_stream<'a>(
×
366
        stream: Self::ResultStream,
×
367
        chunk_byte_size: ChunkByteSize,
×
368
    ) -> BoxStream<'a, Result<Self>> {
×
369
        Box::pin(FeatureCollectionChunkMerger::new(
×
370
            stream.map_err(|ce| Error::CacheCantProduceResult { source: ce.into() }),
×
371
            chunk_byte_size.into(),
×
372
        ))
×
373
    }
×
374
}
375

376
impl<P> ResultStreamWrapper for RasterTile2D<P>
377
where
378
    P: 'static + Pixel,
379
    RasterTile2D<P>: CacheElement,
380
    Self::ResultStream: Send + Sync,
381
{
382
    fn wrap_result_stream<'a>(
×
383
        stream: Self::ResultStream,
×
384
        _chunk_byte_size: ChunkByteSize,
×
385
    ) -> BoxStream<'a, Result<Self>> {
×
386
        Box::pin(stream.map_err(|ce| Error::CacheCantProduceResult { source: ce.into() }))
×
387
    }
×
388
}
389

390
#[cfg(test)]
391
mod tests {
392
    use futures::StreamExt;
393
    use geoengine_datatypes::{
394
        primitives::{SpatialPartition2D, SpatialResolution, TimeInterval},
395
        raster::TilesEqualIgnoringCacheHint,
396
        util::test::TestDefault,
397
    };
398

399
    use crate::{
400
        engine::{
401
            ChunkByteSize, MockExecutionContext, MockQueryContext, QueryContextExtensions,
402
            RasterOperator, WorkflowOperatorPath,
403
        },
404
        source::{GdalSource, GdalSourceParameters},
405
        util::gdal::add_ndvi_dataset,
406
    };
407

408
    use super::*;
409

410
    #[tokio::test]
1✔
411
    async fn it_caches() {
1✔
412
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
413

1✔
414
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
415

416
        let operator = GdalSource {
1✔
417
            params: GdalSourceParameters { data: ndvi_id },
1✔
418
        }
1✔
419
        .boxed()
1✔
420
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
421
        .await
×
422
        .unwrap();
1✔
423

1✔
424
        let cached_op = InitializedCacheOperator::new(operator);
1✔
425

1✔
426
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
427

1✔
428
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
429

1✔
430
        let mut extensions = QueryContextExtensions::default();
1✔
431

1✔
432
        extensions.insert(tile_cache);
1✔
433

1✔
434
        let query_ctx =
1✔
435
            MockQueryContext::new_with_query_extensions(ChunkByteSize::test_default(), extensions);
1✔
436

437
        let stream = processor
1✔
438
            .query(
1✔
439
                QueryRectangle {
1✔
440
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
441
                        [-180., -90.].into(),
1✔
442
                        [180., 90.].into(),
1✔
443
                    ),
1✔
444
                    time_interval: TimeInterval::default(),
1✔
445
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
446
                },
1✔
447
                &query_ctx,
1✔
448
            )
1✔
449
            .await
×
450
            .unwrap();
1✔
451

452
        let tiles = stream.collect::<Vec<_>>().await;
1✔
453
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
454

1✔
455
        // wait for the cache to be filled, which happens asynchronously
1✔
456
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
457

458
        let stream_from_cache = processor
1✔
459
            .query(
1✔
460
                QueryRectangle {
1✔
461
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
462
                        [-180., -90.].into(),
1✔
463
                        [180., 90.].into(),
1✔
464
                    ),
1✔
465
                    time_interval: TimeInterval::default(),
1✔
466
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
467
                },
1✔
468
                &query_ctx,
1✔
469
            )
1✔
470
            .await
×
471
            .unwrap();
1✔
472

473
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
474
        let tiles_from_cache = tiles_from_cache
1✔
475
            .into_iter()
1✔
476
            .collect::<Result<Vec<_>>>()
1✔
477
            .unwrap();
1✔
478

1✔
479
        // TODO: how to ensure the tiles are actually from the cache?
1✔
480

1✔
481
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
482
    }
483
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc