• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 6022770913

30 Aug 2023 08:59AM UTC coverage: 89.934% (+0.09%) from 89.84%
6022770913

push

github

web-flow
Merge pull request #864 from geo-engine/compressed-vector-cache

Compressed-vector-cache

569 of 569 new or added lines in 6 files covered. (100.0%)

106288 of 118185 relevant lines covered (89.93%)

61263.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.39
/operators/src/pro/cache/cache_operator.rs
1
use super::cache_chunks::CacheElementSpatialBounds;
2
use super::error::CacheError;
3
use super::shared_cache::CacheElement;
4
use crate::adapters::FeatureCollectionChunkMerger;
5
use crate::engine::{
6
    CanonicOperatorName, ChunkByteSize, InitializedRasterOperator, InitializedVectorOperator,
7
    QueryContext, QueryProcessor, RasterResultDescriptor, TypedRasterQueryProcessor,
8
};
9
use crate::error::Error;
10
use crate::pro::cache::shared_cache::{AsyncCache, SharedCache};
11
use crate::util::Result;
12
use async_trait::async_trait;
13
use futures::stream::{BoxStream, FusedStream};
14
use futures::{ready, Stream, StreamExt, TryStreamExt};
15
use geoengine_datatypes::collections::{FeatureCollection, FeatureCollectionInfos};
16
use geoengine_datatypes::primitives::{
17
    AxisAlignedRectangle, Geometry, QueryRectangle, VectorQueryRectangle,
18
};
19
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
20
use geoengine_datatypes::util::arrow::ArrowTyped;
21
use pin_project::{pin_project, pinned_drop};
22
use std::pin::Pin;
23
use std::sync::Arc;
24
use std::task::{Context, Poll};
25
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
26

27
/// A cache operator that caches the results of its source operator
28
pub struct InitializedCacheOperator<S> {
29
    source: S,
30
}
31

32
impl<S> InitializedCacheOperator<S> {
33
    pub fn new(source: S) -> Self {
1✔
34
        Self { source }
1✔
35
    }
1✔
36
}
37

38
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
39
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
40
        self.source.result_descriptor()
×
41
    }
×
42

43
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
44
        let processor_result = self.source.query_processor();
1✔
45
        match processor_result {
1✔
46
            Ok(p) => {
1✔
47
                let res_processor = match p {
1✔
48
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
1✔
49
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
1✔
50
                    )),
1✔
51
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
52
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
53
                    )),
×
54
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
55
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
56
                    )),
×
57
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
58
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
59
                    )),
×
60
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
61
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
62
                    )),
×
63
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
64
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
65
                    )),
×
66
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
67
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
68
                    )),
×
69
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
70
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
71
                    )),
×
72
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
73
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
74
                    )),
×
75
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
76
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
77
                    )),
×
78
                };
79
                tracing::debug!(event = "query processor created");
1✔
80
                Ok(res_processor)
1✔
81
            }
82
            Err(err) => {
×
83
                tracing::debug!(event = "query processor failed");
×
84
                Err(err)
×
85
            }
86
        }
87
    }
1✔
88

89
    fn canonic_name(&self) -> CanonicOperatorName {
×
90
        self.source.canonic_name()
×
91
    }
×
92
}
93

94
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
95
    fn result_descriptor(&self) -> &crate::engine::VectorResultDescriptor {
×
96
        self.source.result_descriptor()
×
97
    }
×
98

99
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
×
100
        let processor_result = self.source.query_processor();
×
101
        match processor_result {
×
102
            Ok(p) => {
×
103
                let res_processor = match p {
×
104
                    crate::engine::TypedVectorQueryProcessor::Data(p) => {
×
105
                        crate::engine::TypedVectorQueryProcessor::Data(Box::new(
×
106
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
107
                        ))
×
108
                    }
109
                    crate::engine::TypedVectorQueryProcessor::MultiPoint(p) => {
×
110
                        crate::engine::TypedVectorQueryProcessor::MultiPoint(Box::new(
×
111
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
112
                        ))
×
113
                    }
114
                    crate::engine::TypedVectorQueryProcessor::MultiLineString(p) => {
×
115
                        crate::engine::TypedVectorQueryProcessor::MultiLineString(Box::new(
×
116
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
117
                        ))
×
118
                    }
119
                    crate::engine::TypedVectorQueryProcessor::MultiPolygon(p) => {
×
120
                        crate::engine::TypedVectorQueryProcessor::MultiPolygon(Box::new(
×
121
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
122
                        ))
×
123
                    }
124
                };
125
                tracing::debug!(event = "query processor created");
×
126

127
                Ok(res_processor)
×
128
            }
129
            Err(err) => {
×
130
                tracing::debug!(event = "query processor failed");
×
131
                Err(err)
×
132
            }
133
        }
134
    }
×
135

136
    fn canonic_name(&self) -> CanonicOperatorName {
×
137
        self.source.canonic_name()
×
138
    }
×
139
}
140

141
/// A cache operator that caches the results of its source operator
142
struct CacheQueryProcessor<P, E, Q>
143
where
144
    E: CacheElement + Send + Sync + 'static,
145
    P: QueryProcessor<Output = E, SpatialBounds = Q>,
146
{
147
    processor: P,
148
    cache_key: CanonicOperatorName,
149
}
150

151
impl<P, E, Q> CacheQueryProcessor<P, E, Q>
152
where
153
    E: CacheElement + Send + Sync + 'static,
154
    P: QueryProcessor<Output = E, SpatialBounds = Q> + Sized,
155
{
156
    pub fn new(processor: P, cache_key: CanonicOperatorName) -> Self {
1✔
157
        CacheQueryProcessor {
1✔
158
            processor,
1✔
159
            cache_key,
1✔
160
        }
1✔
161
    }
1✔
162
}
163

164
#[async_trait]
165
impl<P, E, S> QueryProcessor for CacheQueryProcessor<P, E, S>
166
where
167
    P: QueryProcessor<Output = E, SpatialBounds = S> + Sized,
168
    S: AxisAlignedRectangle + Send + Sync + 'static,
169
    E: CacheElement<Query = QueryRectangle<S>>
170
        + Send
171
        + Sync
172
        + 'static
173
        + ResultStreamWrapper
174
        + Clone,
175
    E::ResultStream: Stream<Item = Result<E, CacheError>> + Send + Sync + 'static,
176
    SharedCache: AsyncCache<E>,
177
{
178
    type Output = E;
179
    type SpatialBounds = S;
180

181
    async fn _query<'a>(
2✔
182
        &'a self,
2✔
183
        query: QueryRectangle<Self::SpatialBounds>,
2✔
184
        ctx: &'a dyn QueryContext,
2✔
185
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
2✔
186
        let shared_cache = ctx
2✔
187
            .extensions()
2✔
188
            .get::<Arc<SharedCache>>()
2✔
189
            .expect("`SharedCache` extension should be set during `ProContext` creation");
2✔
190

191
        let cache_result = shared_cache.query_cache(&self.cache_key, &query).await;
2✔
192

193
        if let Ok(Some(cache_result)) = cache_result {
2✔
194
            // cache hit
195
            log::debug!("cache hit for operator {}", self.cache_key);
×
196

197
            let wrapped_result_steam =
×
198
                E::wrap_result_stream(cache_result, ctx.chunk_byte_size(), query);
×
199

×
200
            return Ok(wrapped_result_steam);
×
201
        }
2✔
202

2✔
203
        // cache miss
2✔
204
        log::debug!("cache miss for operator {}", self.cache_key);
2✔
205
        let source_stream = self.processor.query(query, ctx).await?;
2✔
206

207
        let query_id = shared_cache.insert_query(&self.cache_key, &query).await;
2✔
208

209
        if let Err(e) = query_id {
2✔
210
            log::debug!("could not insert query into cache: {}", e);
×
211
            return Ok(source_stream);
×
212
        }
2✔
213

2✔
214
        let query_id = query_id.expect("query_id should be set because of the previous check");
2✔
215

2✔
216
        // lazily insert tiles into the cache as they are produced
2✔
217
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
2✔
218

2✔
219
        let cache_key = self.cache_key.clone();
2✔
220
        let tile_cache = shared_cache.clone();
2✔
221
        crate::util::spawn(async move {
2✔
222
            while let Some(event) = stream_event_receiver.recv().await {
10✔
223
                match event {
9✔
224
                    SourceStreamEvent::Element(tile) => {
8✔
225
                        let result = tile_cache
8✔
226
                            .insert_query_element(&cache_key, &query_id, tile)
8✔
227
                            .await;
8✔
228
                        log::trace!(
8✔
229
                            "inserted tile into cache for cache key {} and query id {}. result: {:?}",
×
230
                            cache_key,
231
                            query_id,
232
                            result
233
                        );
234
                    }
235
                    SourceStreamEvent::Abort => {
236
                        tile_cache.abort_query(&cache_key, &query_id).await;
×
237
                        log::debug!(
×
238
                            "aborted cache insertion for cache key {} and query id {}",
×
239
                            cache_key,
240
                            query_id
241
                        );
242
                    }
243
                    SourceStreamEvent::Finished => {
244
                        let result = tile_cache.finish_query(&cache_key, &query_id).await;
1✔
245
                        log::debug!(
1✔
246
                            "finished cache insertion for cache key {} and query id {}, result: {:?}",
×
247
                            cache_key,query_id,
248
                            result
249
                        );
250
                    }
251
                }
252
            }
253
        });
2✔
254

2✔
255
        let output_stream = CacheOutputStream {
2✔
256
            source: source_stream,
2✔
257
            stream_event_sender,
2✔
258
            finished: false,
2✔
259
            pristine: true,
2✔
260
        };
2✔
261

2✔
262
        Ok(Box::pin(output_stream))
2✔
263
    }
4✔
264
}
265

266
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
267
enum SourceStreamEvent<E: CacheElement> {
268
    Element(E),
269
    Abort,
270
    Finished,
271
}
272

273
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
274
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
18✔
275
struct CacheOutputStream<S, E>
276
where
277
    S: Stream<Item = Result<E>>,
278
    E: CacheElement + Clone,
279
{
280
    #[pin]
281
    source: S,
282
    stream_event_sender: UnboundedSender<SourceStreamEvent<E>>,
283
    finished: bool,
284
    pristine: bool,
285
}
286

287
impl<S, E> Stream for CacheOutputStream<S, E>
288
where
289
    S: Stream<Item = Result<E>>,
290
    E: CacheElement + Clone,
291
{
292
    type Item = Result<E>;
293

294
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
18✔
295
        let this = self.project();
18✔
296

297
        let next = ready!(this.source.poll_next(cx));
18✔
298

299
        if let Some(element) = &next {
18✔
300
            *this.pristine = false;
16✔
301
            if let Ok(element) = element {
16✔
302
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
303
                let r = this
16✔
304
                    .stream_event_sender
16✔
305
                    .send(SourceStreamEvent::Element(element.clone()));
16✔
306
                if let Err(e) = r {
16✔
307
                    log::warn!("could not send tile to cache: {}", e.to_string());
×
308
                }
16✔
309
            } else {
310
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
311
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
312
                if let Err(e) = r {
×
313
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
314
                }
×
315
            }
316
        } else {
317
            if *this.pristine {
2✔
318
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
319
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
320
                if let Err(e) = r {
×
321
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
322
                }
×
323
            } else {
324
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
325
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
2✔
326
                if let Err(e) = r {
2✔
327
                    log::warn!("could not send finished to cache: {}", e.to_string());
×
328
                }
2✔
329
                log::debug!("stream finished, mark cache entry as finished.");
2✔
330
            }
331
            *this.finished = true;
2✔
332
        }
333

334
        Poll::Ready(next)
18✔
335
    }
18✔
336
}
337

338
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
339
#[pinned_drop]
×
340
impl<S, E> PinnedDrop for CacheOutputStream<S, E>
341
where
342
    S: Stream<Item = Result<E>>,
343
    E: CacheElement + Clone,
2✔
344
{
2✔
345
    fn drop(self: Pin<&mut Self>) {
2✔
346
        if !self.finished {
2✔
347
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
2✔
348
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
2✔
349
            if let Err(e) = r {
2✔
350
                log::debug!("could not send abort to cache: {}", e.to_string());
2✔
351
            }
2✔
352
        }
2✔
353
    }
2✔
354
}
355

356
trait ResultStreamWrapper: CacheElement {
357
    fn wrap_result_stream<'a>(
358
        stream: Self::ResultStream,
359
        chunk_byte_size: ChunkByteSize,
360
        query: Self::Query,
361
    ) -> BoxStream<'a, Result<Self>>;
362
}
363

364
impl<G> ResultStreamWrapper for FeatureCollection<G>
365
where
366
    G: Geometry + ArrowTyped + Send + Sync + 'static,
367
    FeatureCollection<G>:
368
        CacheElement<Query = VectorQueryRectangle> + Send + Sync + CacheElementSpatialBounds,
369
    Self::ResultStream: FusedStream + Send + Sync,
370
{
371
    fn wrap_result_stream<'a>(
×
372
        stream: Self::ResultStream,
×
373
        chunk_byte_size: ChunkByteSize,
×
374
        query: Self::Query,
×
375
    ) -> BoxStream<'a, Result<Self>> {
×
376
        let filter_stream = stream.filter_map(move |result| async move {
×
377
            result
×
378
                .and_then(|collection| collection.filter_cache_element_entries(&query))
×
379
                .map_err(|source| Error::CacheCantProduceResult {
×
380
                    source: source.into(),
×
381
                })
×
382
                .map(|fc| if fc.is_empty() { None } else { Some(fc) })
×
383
                .transpose()
×
384
        });
×
385

×
386
        let merger_stream =
×
387
            FeatureCollectionChunkMerger::new(filter_stream, chunk_byte_size.into());
×
388
        Box::pin(merger_stream)
×
389
    }
×
390
}
391

392
impl<P> ResultStreamWrapper for RasterTile2D<P>
393
where
394
    P: 'static + Pixel,
395
    RasterTile2D<P>: CacheElement,
396
    Self::ResultStream: Send + Sync,
397
{
398
    fn wrap_result_stream<'a>(
×
399
        stream: Self::ResultStream,
×
400
        _chunk_byte_size: ChunkByteSize,
×
401
        _query: Self::Query,
×
402
    ) -> BoxStream<'a, Result<Self>> {
×
403
        Box::pin(stream.map_err(|ce| Error::CacheCantProduceResult { source: ce.into() }))
×
404
    }
×
405
}
406

407
#[cfg(test)]
408
mod tests {
409
    use futures::StreamExt;
410
    use geoengine_datatypes::{
411
        primitives::{SpatialPartition2D, SpatialResolution, TimeInterval},
412
        raster::TilesEqualIgnoringCacheHint,
413
        util::test::TestDefault,
414
    };
415

416
    use crate::{
417
        engine::{
418
            ChunkByteSize, MockExecutionContext, MockQueryContext, QueryContextExtensions,
419
            RasterOperator, WorkflowOperatorPath,
420
        },
421
        source::{GdalSource, GdalSourceParameters},
422
        util::gdal::add_ndvi_dataset,
423
    };
424

425
    use super::*;
426

427
    #[tokio::test]
1✔
428
    async fn it_caches() {
1✔
429
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
430

1✔
431
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
432

433
        let operator = GdalSource {
1✔
434
            params: GdalSourceParameters { data: ndvi_id },
1✔
435
        }
1✔
436
        .boxed()
1✔
437
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
438
        .await
×
439
        .unwrap();
1✔
440

1✔
441
        let cached_op = InitializedCacheOperator::new(operator);
1✔
442

1✔
443
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
444

1✔
445
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
446

1✔
447
        let mut extensions = QueryContextExtensions::default();
1✔
448

1✔
449
        extensions.insert(tile_cache);
1✔
450

1✔
451
        let query_ctx =
1✔
452
            MockQueryContext::new_with_query_extensions(ChunkByteSize::test_default(), extensions);
1✔
453

454
        let stream = processor
1✔
455
            .query(
1✔
456
                QueryRectangle {
1✔
457
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
458
                        [-180., -90.].into(),
1✔
459
                        [180., 90.].into(),
1✔
460
                    ),
1✔
461
                    time_interval: TimeInterval::default(),
1✔
462
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
463
                },
1✔
464
                &query_ctx,
1✔
465
            )
1✔
466
            .await
×
467
            .unwrap();
1✔
468

469
        let tiles = stream.collect::<Vec<_>>().await;
1✔
470
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
471

1✔
472
        // wait for the cache to be filled, which happens asynchronously
1✔
473
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
474

475
        let stream_from_cache = processor
1✔
476
            .query(
1✔
477
                QueryRectangle {
1✔
478
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
479
                        [-180., -90.].into(),
1✔
480
                        [180., 90.].into(),
1✔
481
                    ),
1✔
482
                    time_interval: TimeInterval::default(),
1✔
483
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
484
                },
1✔
485
                &query_ctx,
1✔
486
            )
1✔
487
            .await
×
488
            .unwrap();
1✔
489

490
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
491
        let tiles_from_cache = tiles_from_cache
1✔
492
            .into_iter()
1✔
493
            .collect::<Result<Vec<_>>>()
1✔
494
            .unwrap();
1✔
495

1✔
496
        // TODO: how to ensure the tiles are actually from the cache?
1✔
497

1✔
498
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
499
    }
500
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc