• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5666839735

26 Jul 2023 09:06AM UTC coverage: 88.916% (-0.3%) from 89.193%
5666839735

Pull #833

github

web-flow
Merge 865f64877 into 3d8a7e0ad
Pull Request #833: Shared-cache

1353 of 1353 new or added lines in 15 files covered. (100.0%)

105888 of 119088 relevant lines covered (88.92%)

60793.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

59.49
/operators/src/pro/cache/cache_operator.rs
1
use super::shared_cache::{CacheElement, CacheElementSubType};
2
use crate::adapters::FeatureCollectionChunkMerger;
3
use crate::engine::{
4
    CanonicOperatorName, ChunkByteSize, InitializedRasterOperator, InitializedVectorOperator,
5
    QueryContext, QueryProcessor, RasterResultDescriptor, TypedRasterQueryProcessor,
6
};
7
use crate::pro::cache::shared_cache::{AsyncCache, SharedCache};
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::stream::{BoxStream, FusedStream};
11
use futures::{ready, Stream};
12
use geoengine_datatypes::collections::FeatureCollection;
13
use geoengine_datatypes::primitives::{AxisAlignedRectangle, Geometry, QueryRectangle};
14
use geoengine_datatypes::raster::RasterTile2D;
15
use geoengine_datatypes::util::arrow::ArrowTyped;
16
use pin_project::{pin_project, pinned_drop};
17
use std::pin::Pin;
18
use std::sync::Arc;
19
use std::task::{Context, Poll};
20
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
21

22
/// A cache operator that caches the results of its source operator
23
pub struct InitializedCacheOperator<S> {
24
    source: S,
25
}
26

27
impl<S> InitializedCacheOperator<S> {
28
    pub fn new(source: S) -> Self {
1✔
29
        Self { source }
1✔
30
    }
1✔
31
}
32

33
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
34
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
35
        self.source.result_descriptor()
×
36
    }
×
37

38
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
39
        let processor_result = self.source.query_processor();
1✔
40
        match processor_result {
1✔
41
            Ok(p) => {
1✔
42
                let res_processor = match p {
1✔
43
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
1✔
44
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
1✔
45
                    )),
1✔
46
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
47
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
48
                    )),
×
49
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
50
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
51
                    )),
×
52
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
53
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
54
                    )),
×
55
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
56
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
57
                    )),
×
58
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
59
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
60
                    )),
×
61
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
62
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
63
                    )),
×
64
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
65
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
66
                    )),
×
67
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
68
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
69
                    )),
×
70
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
71
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
72
                    )),
×
73
                };
74
                tracing::debug!(event = "query processor created");
1✔
75
                Ok(res_processor)
1✔
76
            }
77
            Err(err) => {
×
78
                tracing::debug!(event = "query processor failed");
×
79
                Err(err)
×
80
            }
81
        }
82
    }
1✔
83

84
    fn canonic_name(&self) -> CanonicOperatorName {
×
85
        self.source.canonic_name()
×
86
    }
×
87
}
88

89
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
90
    fn result_descriptor(&self) -> &crate::engine::VectorResultDescriptor {
×
91
        self.source.result_descriptor()
×
92
    }
×
93

94
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
×
95
        let processor_result = self.source.query_processor();
×
96
        match processor_result {
×
97
            Ok(p) => {
×
98
                let res_processor = match p {
×
99
                    crate::engine::TypedVectorQueryProcessor::Data(p) => {
×
100
                        crate::engine::TypedVectorQueryProcessor::Data(Box::new(
×
101
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
102
                        ))
×
103
                    }
104
                    crate::engine::TypedVectorQueryProcessor::MultiPoint(p) => {
×
105
                        crate::engine::TypedVectorQueryProcessor::MultiPoint(Box::new(
×
106
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
107
                        ))
×
108
                    }
109
                    crate::engine::TypedVectorQueryProcessor::MultiLineString(p) => {
×
110
                        crate::engine::TypedVectorQueryProcessor::MultiLineString(Box::new(
×
111
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
112
                        ))
×
113
                    }
114
                    crate::engine::TypedVectorQueryProcessor::MultiPolygon(p) => {
×
115
                        crate::engine::TypedVectorQueryProcessor::MultiPolygon(Box::new(
×
116
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
117
                        ))
×
118
                    }
119
                };
120
                tracing::debug!(event = "query processor created");
×
121

122
                Ok(res_processor)
×
123
            }
124
            Err(err) => {
×
125
                tracing::debug!(event = "query processor failed");
×
126
                Err(err)
×
127
            }
128
        }
129
    }
×
130

131
    fn canonic_name(&self) -> CanonicOperatorName {
×
132
        self.source.canonic_name()
×
133
    }
×
134
}
135

136
/// A cache operator that caches the results of its source operator
137
struct CacheQueryProcessor<P, E, Q>
138
where
139
    E: CacheElement + Send + Sync + 'static,
140
    P: QueryProcessor<Output = E, SpatialBounds = Q>,
141
{
142
    processor: P,
143
    cache_key: CanonicOperatorName,
144
}
145

146
impl<P, E, Q> CacheQueryProcessor<P, E, Q>
147
where
148
    E: CacheElement + Send + Sync + 'static,
149
    P: QueryProcessor<Output = E, SpatialBounds = Q> + Sized,
150
{
151
    pub fn new(processor: P, cache_key: CanonicOperatorName) -> Self {
1✔
152
        CacheQueryProcessor {
1✔
153
            processor,
1✔
154
            cache_key,
1✔
155
        }
1✔
156
    }
1✔
157
}
158

159
#[async_trait]
160
impl<P, S, E, Q> QueryProcessor for CacheQueryProcessor<P, E, Q>
161
where
162
    P: QueryProcessor<Output = E, SpatialBounds = Q> + Sized,
163
    E: CacheElement<CacheElementSubType = S, Query = QueryRectangle<Q>>
164
        + ResultStreamWrapper
165
        + Send
166
        + Sync
167
        + Clone
168
        + 'static,
169
    S: CacheElementSubType<CacheElementType = E> + Send + Sync + 'static,
170
    Q: AxisAlignedRectangle + Send + Sync + 'static,
171
    E::ResultStream: Stream<Item = Result<E>> + Send + Sync + 'static,
172
    SharedCache: AsyncCache<E>,
173
{
174
    type Output = E;
175
    type SpatialBounds = Q;
176

177
    async fn _query<'a>(
2✔
178
        &'a self,
2✔
179
        query: QueryRectangle<Self::SpatialBounds>,
2✔
180
        ctx: &'a dyn QueryContext,
2✔
181
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
2✔
182
        let shared_cache = ctx
2✔
183
            .extensions()
2✔
184
            .get::<Arc<SharedCache>>()
2✔
185
            .expect("`SharedCache` extension should be set during `ProContext` creation");
2✔
186

187
        let cache_result = shared_cache.query_cache::<S>(&self.cache_key, &query).await;
2✔
188

189
        if let Ok(Some(cache_result)) = cache_result {
2✔
190
            // cache hit
191
            log::debug!("cache hit for operator {}", self.cache_key);
×
192

193
            let wrapped_result_steam = E::wrap_result_stream(cache_result, ctx.chunk_byte_size());
×
194

×
195
            return Ok(wrapped_result_steam);
×
196
        }
2✔
197

2✔
198
        // cache miss
2✔
199
        log::debug!("cache miss for operator {}", self.cache_key);
2✔
200
        let source_stream = self.processor.query(query, ctx).await?;
2✔
201

202
        let query_id = shared_cache
2✔
203
            .insert_query::<S>(&self.cache_key, &query)
2✔
204
            .await;
×
205

206
        if let Err(e) = query_id {
2✔
207
            log::debug!("could not insert query into cache: {}", e);
×
208
            return Ok(source_stream);
×
209
        }
2✔
210

2✔
211
        let query_id = query_id.expect("query_id should be set because of the previous check");
2✔
212

2✔
213
        // lazily insert tiles into the cache as they are produced
2✔
214
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
2✔
215

2✔
216
        let cache_key = self.cache_key.clone();
2✔
217
        let tile_cache = shared_cache.clone();
2✔
218
        crate::util::spawn(async move {
2✔
219
            while let Some(event) = stream_event_receiver.recv().await {
10✔
220
                match event {
9✔
221
                    SourceStreamEvent::Element(tile) => {
8✔
222
                        let result = tile_cache
8✔
223
                            .insert_query_element::<S>(&cache_key, &query_id, tile)
8✔
224
                            .await;
×
225
                        log::trace!(
8✔
226
                            "inserted tile into cache for cache key {} and query id {}. result: {:?}",
×
227
                            cache_key,
228
                            query_id,
229
                            result
230
                        );
231
                    }
232
                    SourceStreamEvent::Abort => {
233
                        tile_cache.abort_query::<S>(&cache_key, &query_id).await;
×
234
                        log::debug!(
×
235
                            "aborted cache insertion for cache key {} and query id {}",
×
236
                            cache_key,
237
                            query_id
238
                        );
239
                    }
240
                    SourceStreamEvent::Finished => {
241
                        let result = tile_cache.finish_query::<S>(&cache_key, &query_id).await;
1✔
242
                        log::debug!(
1✔
243
                            "finished cache insertion for cache key {} and query id {}, result: {:?}",
×
244
                            cache_key,query_id,
245
                            result
246
                        );
247
                    }
248
                }
249
            }
250
        });
2✔
251

2✔
252
        let output_stream = CacheOutputStream {
2✔
253
            source: source_stream,
2✔
254
            stream_event_sender,
2✔
255
            finished: false,
2✔
256
            pristine: true,
2✔
257
        };
2✔
258

2✔
259
        Ok(Box::pin(output_stream))
2✔
260
    }
4✔
261
}
262

263
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
264
enum SourceStreamEvent<E: CacheElement> {
265
    Element(E),
266
    Abort,
267
    Finished,
268
}
269

270
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
271
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
18✔
272
struct CacheOutputStream<S, E>
273
where
274
    S: Stream<Item = Result<E>>,
275
    E: CacheElement,
276
{
277
    #[pin]
278
    source: S,
279
    stream_event_sender: UnboundedSender<SourceStreamEvent<E>>,
280
    finished: bool,
281
    pristine: bool,
282
}
283

284
impl<S, E> Stream for CacheOutputStream<S, E>
285
where
286
    S: Stream<Item = Result<E>>,
287
    E: CacheElement + Clone,
288
{
289
    type Item = Result<E>;
290

291
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
18✔
292
        let this = self.project();
18✔
293

294
        let next = ready!(this.source.poll_next(cx));
18✔
295

296
        if let Some(element) = &next {
18✔
297
            *this.pristine = false;
16✔
298
            if let Ok(element) = element {
16✔
299
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
300
                let r = this
16✔
301
                    .stream_event_sender
16✔
302
                    .send(SourceStreamEvent::Element(element.clone()));
16✔
303
                if let Err(e) = r {
16✔
304
                    log::warn!("could not send tile to cache: {}", e.to_string());
×
305
                }
16✔
306
            } else {
307
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
308
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
309
                if let Err(e) = r {
×
310
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
311
                }
×
312
            }
313
        } else {
314
            if *this.pristine {
2✔
315
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
316
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
317
                if let Err(e) = r {
×
318
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
319
                }
×
320
            } else {
321
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
322
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
2✔
323
                if let Err(e) = r {
2✔
324
                    log::warn!("could not send finished to cache: {}", e.to_string());
×
325
                }
2✔
326
                log::debug!("stream finished, mark cache entry as finished.");
2✔
327
            }
328
            *this.finished = true;
2✔
329
        }
330

331
        Poll::Ready(next)
18✔
332
    }
18✔
333
}
334

335
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
336
#[pinned_drop]
×
337
impl<S, E> PinnedDrop for CacheOutputStream<S, E>
338
where
339
    S: Stream<Item = Result<E>>,
340
    E: CacheElement,
2✔
341
{
2✔
342
    fn drop(self: Pin<&mut Self>) {
2✔
343
        if !self.finished {
2✔
344
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
2✔
345
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
2✔
346
            if let Err(e) = r {
2✔
347
                log::debug!("could not send abort to cache: {}", e.to_string());
2✔
348
            }
2✔
349
        }
2✔
350
    }
2✔
351
}
352

353
trait ResultStreamWrapper: CacheElement {
354
    fn wrap_result_stream<'a>(
355
        stream: Self::ResultStream,
356
        chunk_byte_size: ChunkByteSize,
357
    ) -> BoxStream<'a, Result<Self>>;
358
}
359

360
impl<G> ResultStreamWrapper for FeatureCollection<G>
361
where
362
    G: Geometry + ArrowTyped + Send + Sync + 'static,
363
    FeatureCollection<G>: CacheElement,
364
    <FeatureCollection<G> as CacheElement>::ResultStream: FusedStream + Send,
365
    <FeatureCollection<G> as CacheElement>::ResultStream: Stream<Item = Result<Self>>,
366
{
367
    fn wrap_result_stream<'a>(
×
368
        stream: Self::ResultStream,
×
369
        chunk_byte_size: ChunkByteSize,
×
370
    ) -> BoxStream<'a, Result<Self>> {
×
371
        Box::pin(FeatureCollectionChunkMerger::new(
×
372
            stream,
×
373
            chunk_byte_size.into(),
×
374
        ))
×
375
    }
×
376
}
377

378
impl<P> ResultStreamWrapper for RasterTile2D<P>
379
where
380
    P: 'static,
381
    RasterTile2D<P>: CacheElement,
382
    <RasterTile2D<P> as CacheElement>::ResultStream: Stream<Item = Result<Self>> + Send,
383
{
384
    fn wrap_result_stream<'a>(
×
385
        stream: Self::ResultStream,
×
386
        _chunk_byte_size: ChunkByteSize,
×
387
    ) -> BoxStream<'a, Result<Self>> {
×
388
        Box::pin(stream)
×
389
    }
×
390
}
391

392
#[cfg(test)]
393
mod tests {
394
    use futures::StreamExt;
395
    use geoengine_datatypes::{
396
        primitives::{SpatialPartition2D, SpatialResolution, TimeInterval},
397
        raster::TilesEqualIgnoringCacheHint,
398
        util::test::TestDefault,
399
    };
400

401
    use crate::{
402
        engine::{
403
            ChunkByteSize, MockExecutionContext, MockQueryContext, QueryContextExtensions,
404
            RasterOperator, WorkflowOperatorPath,
405
        },
406
        source::{GdalSource, GdalSourceParameters},
407
        util::gdal::add_ndvi_dataset,
408
    };
409

410
    use super::*;
411

412
    #[tokio::test]
1✔
413
    async fn it_caches() {
1✔
414
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
415

1✔
416
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
417

418
        let operator = GdalSource {
1✔
419
            params: GdalSourceParameters { data: ndvi_id },
1✔
420
        }
1✔
421
        .boxed()
1✔
422
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
423
        .await
×
424
        .unwrap();
1✔
425

1✔
426
        let cached_op = InitializedCacheOperator::new(operator);
1✔
427

1✔
428
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
429

1✔
430
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
431

1✔
432
        let mut extensions = QueryContextExtensions::default();
1✔
433

1✔
434
        extensions.insert(tile_cache);
1✔
435

1✔
436
        let query_ctx =
1✔
437
            MockQueryContext::new_with_query_extensions(ChunkByteSize::test_default(), extensions);
1✔
438

439
        let stream = processor
1✔
440
            .query(
1✔
441
                QueryRectangle {
1✔
442
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
443
                        [-180., -90.].into(),
1✔
444
                        [180., 90.].into(),
1✔
445
                    ),
1✔
446
                    time_interval: TimeInterval::default(),
1✔
447
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
448
                },
1✔
449
                &query_ctx,
1✔
450
            )
1✔
451
            .await
×
452
            .unwrap();
1✔
453

454
        let tiles = stream.collect::<Vec<_>>().await;
1✔
455
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
456

1✔
457
        // wait for the cache to be filled, which happens asynchronously
1✔
458
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
459

460
        let stream_from_cache = processor
1✔
461
            .query(
1✔
462
                QueryRectangle {
1✔
463
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
464
                        [-180., -90.].into(),
1✔
465
                        [180., 90.].into(),
1✔
466
                    ),
1✔
467
                    time_interval: TimeInterval::default(),
1✔
468
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
469
                },
1✔
470
                &query_ctx,
1✔
471
            )
1✔
472
            .await
×
473
            .unwrap();
1✔
474

475
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
476
        let tiles_from_cache = tiles_from_cache
1✔
477
            .into_iter()
1✔
478
            .collect::<Result<Vec<_>>>()
1✔
479
            .unwrap();
1✔
480

1✔
481
        // TODO: how to ensure the tiles are actually from the cache?
1✔
482

1✔
483
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
484
    }
485
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc