• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5641674846

24 Jul 2023 06:42AM UTC coverage: 88.938% (-0.2%) from 89.184%
5641674846

Pull #833

github

web-flow
Merge 054f347f4 into 8c287ecf7
Pull Request #833: Shared-cache

1301 of 1301 new or added lines in 15 files covered. (100.0%)

105810 of 118970 relevant lines covered (88.94%)

60854.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.42
/operators/src/pro/cache/cache_operator.rs
1
use std::pin::Pin;
2
use std::sync::Arc;
3
use std::task::{Context, Poll};
4

5
use crate::engine::{
6
    CanonicOperatorName, InitializedRasterOperator, InitializedVectorOperator, QueryContext,
7
    QueryProcessor, RasterResultDescriptor, TypedRasterQueryProcessor,
8
};
9
use crate::pro::cache::shared_cache::{AsyncCache, SharedCache};
10
use crate::util::Result;
11
use async_trait::async_trait;
12
use futures::stream::BoxStream;
13
use futures::{ready, Stream};
14
use geoengine_datatypes::primitives::{AxisAlignedRectangle, QueryRectangle};
15
use pin_project::{pin_project, pinned_drop};
16
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
17

18
use super::shared_cache::{CacheElement, CacheElementSubType};
19

20
/// A cache operator that caches the results of its source operator
21
pub struct InitializedCacheOperator<S> {
22
    source: S,
23
}
24

25
impl<S> InitializedCacheOperator<S> {
26
    pub fn new(source: S) -> Self {
1✔
27
        Self { source }
1✔
28
    }
1✔
29
}
30

31
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
32
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
33
        self.source.result_descriptor()
×
34
    }
×
35

36
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
37
        let processor_result = self.source.query_processor();
1✔
38
        match processor_result {
1✔
39
            Ok(p) => {
1✔
40
                let res_processor = match p {
1✔
41
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
1✔
42
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
1✔
43
                    )),
1✔
44
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
45
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
46
                    )),
×
47
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
48
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
49
                    )),
×
50
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
51
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
52
                    )),
×
53
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
54
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
55
                    )),
×
56
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
57
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
58
                    )),
×
59
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
60
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
61
                    )),
×
62
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
63
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
64
                    )),
×
65
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
66
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
67
                    )),
×
68
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
69
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
70
                    )),
×
71
                };
72
                tracing::debug!(event = "query processor created");
1✔
73
                Ok(res_processor)
1✔
74
            }
75
            Err(err) => {
×
76
                tracing::debug!(event = "query processor failed");
×
77
                Err(err)
×
78
            }
79
        }
80
    }
1✔
81

82
    fn canonic_name(&self) -> CanonicOperatorName {
×
83
        self.source.canonic_name()
×
84
    }
×
85
}
86

87
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
88
    fn result_descriptor(&self) -> &crate::engine::VectorResultDescriptor {
×
89
        self.source.result_descriptor()
×
90
    }
×
91

92
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
×
93
        let processor_result = self.source.query_processor();
×
94
        match processor_result {
×
95
            Ok(p) => {
×
96
                let res_processor = match p {
×
97
                    crate::engine::TypedVectorQueryProcessor::Data(p) => {
×
98
                        crate::engine::TypedVectorQueryProcessor::Data(Box::new(
×
99
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
100
                        ))
×
101
                    }
102
                    crate::engine::TypedVectorQueryProcessor::MultiPoint(p) => {
×
103
                        crate::engine::TypedVectorQueryProcessor::MultiPoint(Box::new(
×
104
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
105
                        ))
×
106
                    }
107
                    crate::engine::TypedVectorQueryProcessor::MultiLineString(p) => {
×
108
                        crate::engine::TypedVectorQueryProcessor::MultiLineString(Box::new(
×
109
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
110
                        ))
×
111
                    }
112
                    crate::engine::TypedVectorQueryProcessor::MultiPolygon(p) => {
×
113
                        crate::engine::TypedVectorQueryProcessor::MultiPolygon(Box::new(
×
114
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
115
                        ))
×
116
                    }
117
                };
118
                tracing::debug!(event = "query processor created");
×
119

120
                Ok(res_processor)
×
121
            }
122
            Err(err) => {
×
123
                tracing::debug!(event = "query processor failed");
×
124
                Err(err)
×
125
            }
126
        }
127
    }
×
128

129
    fn canonic_name(&self) -> CanonicOperatorName {
×
130
        self.source.canonic_name()
×
131
    }
×
132
}
133

134
/// A cache operator that caches the results of its source operator
135
struct CacheQueryProcessor<P, E, Q>
136
where
137
    E: CacheElement + Send + Sync + 'static,
138
    P: QueryProcessor<Output = E, SpatialBounds = Q>,
139
{
140
    processor: P,
141
    cache_key: CanonicOperatorName,
142
}
143

144
impl<P, E, Q> CacheQueryProcessor<P, E, Q>
145
where
146
    E: CacheElement + Send + Sync + 'static,
147
    P: QueryProcessor<Output = E, SpatialBounds = Q> + Sized,
148
{
149
    pub fn new(processor: P, cache_key: CanonicOperatorName) -> Self {
1✔
150
        CacheQueryProcessor {
1✔
151
            processor,
1✔
152
            cache_key,
1✔
153
        }
1✔
154
    }
1✔
155
}
156

157
#[async_trait]
158
impl<P, S, E, Q> QueryProcessor for CacheQueryProcessor<P, E, Q>
159
where
160
    P: QueryProcessor<Output = E, SpatialBounds = Q> + Sized,
161
    E: CacheElement<CacheElementSubType = S, Query = QueryRectangle<Q>>
162
        + Send
163
        + Sync
164
        + Clone
165
        + 'static,
166
    S: CacheElementSubType<CacheElementType = E> + Send + Sync + 'static,
167
    Q: AxisAlignedRectangle + Send + Sync + 'static,
168
    E::ResultStream: Stream<Item = Result<E>> + Send + Sync + 'static,
169
    SharedCache: AsyncCache<E>,
170
{
171
    type Output = E;
172
    type SpatialBounds = Q;
173

174
    async fn _query<'a>(
2✔
175
        &'a self,
2✔
176
        query: QueryRectangle<Self::SpatialBounds>,
2✔
177
        ctx: &'a dyn QueryContext,
2✔
178
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
2✔
179
        let shared_cache = ctx
2✔
180
            .extensions()
2✔
181
            .get::<Arc<SharedCache>>()
2✔
182
            .expect("`TileCache` extension should be set during `ProContext` creation");
2✔
183

184
        let cache_result = shared_cache.query_cache::<S>(&self.cache_key, &query).await;
2✔
185

186
        if let Ok(Some(cache_result)) = cache_result {
2✔
187
            // cache hit
188
            log::debug!("cache hit for operator {}", self.cache_key);
×
189

190
            return Ok(Box::pin(cache_result));
×
191
        }
2✔
192

2✔
193
        // cache miss
2✔
194
        log::debug!("cache miss for operator {}", self.cache_key);
2✔
195
        let source_stream = self.processor.query(query, ctx).await?;
2✔
196

197
        let query_id = shared_cache
2✔
198
            .insert_query::<S>(&self.cache_key, &query)
2✔
199
            .await;
×
200

201
        if let Err(e) = query_id {
2✔
202
            log::debug!("could not insert query into cache: {}", e);
×
203
            return Ok(source_stream);
×
204
        }
2✔
205

2✔
206
        let query_id = query_id.expect("query_id should be set because of the previous check");
2✔
207

2✔
208
        // lazily insert tiles into the cache as they are produced
2✔
209
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
2✔
210

2✔
211
        let cache_key = self.cache_key.clone();
2✔
212
        let tile_cache = shared_cache.clone();
2✔
213
        crate::util::spawn(async move {
2✔
214
            while let Some(event) = stream_event_receiver.recv().await {
10✔
215
                match event {
9✔
216
                    SourceStreamEvent::Element(tile) => {
8✔
217
                        let result = tile_cache
8✔
218
                            .insert_query_element::<S>(&cache_key, &query_id, tile)
8✔
219
                            .await;
×
220
                        log::trace!(
8✔
221
                            "inserted tile into cache for cache key {} and query id {}. result: {:?}",
×
222
                            cache_key,
223
                            query_id,
224
                            result
225
                        );
226
                    }
227
                    SourceStreamEvent::Abort => {
228
                        tile_cache.abort_query::<S>(&cache_key, &query_id).await;
×
229
                        log::debug!(
×
230
                            "aborted cache insertion for cache key {} and query id {}",
×
231
                            cache_key,
232
                            query_id
233
                        );
234
                    }
235
                    SourceStreamEvent::Finished => {
236
                        let result = tile_cache.finish_query::<S>(&cache_key, &query_id).await;
1✔
237
                        log::debug!(
1✔
238
                            "finished cache insertion for cache key {} and query id {}, result: {:?}",
×
239
                            cache_key,query_id,
240
                            result
241
                        );
242
                    }
243
                }
244
            }
245
        });
2✔
246

2✔
247
        let output_stream = CacheOutputStream {
2✔
248
            source: source_stream,
2✔
249
            stream_event_sender,
2✔
250
            finished: false,
2✔
251
            pristine: true,
2✔
252
        };
2✔
253

2✔
254
        Ok(Box::pin(output_stream))
2✔
255
    }
4✔
256
}
257

258
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
259
enum SourceStreamEvent<E: CacheElement> {
260
    Element(E),
261
    Abort,
262
    Finished,
263
}
264

265
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
266
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
18✔
267
struct CacheOutputStream<S, E>
268
where
269
    S: Stream<Item = Result<E>>,
270
    E: CacheElement,
271
{
272
    #[pin]
273
    source: S,
274
    stream_event_sender: UnboundedSender<SourceStreamEvent<E>>,
275
    finished: bool,
276
    pristine: bool,
277
}
278

279
impl<S, E> Stream for CacheOutputStream<S, E>
280
where
281
    S: Stream<Item = Result<E>>,
282
    E: CacheElement + Clone,
283
{
284
    type Item = Result<E>;
285

286
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
18✔
287
        let this = self.project();
18✔
288

289
        let next = ready!(this.source.poll_next(cx));
18✔
290

291
        if let Some(element) = &next {
18✔
292
            *this.pristine = false;
16✔
293
            if let Ok(element) = element {
16✔
294
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
295
                let r = this
16✔
296
                    .stream_event_sender
16✔
297
                    .send(SourceStreamEvent::Element(element.clone()));
16✔
298
                if let Err(e) = r {
16✔
299
                    log::warn!("could not send tile to cache: {}", e.to_string());
×
300
                }
16✔
301
            } else {
302
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
303
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
304
                if let Err(e) = r {
×
305
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
306
                }
×
307
            }
308
        } else {
309
            if *this.pristine {
2✔
310
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
311
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
312
                if let Err(e) = r {
×
313
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
314
                }
×
315
            } else {
316
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
317
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
2✔
318
                if let Err(e) = r {
2✔
319
                    log::warn!("could not send finished to cache: {}", e.to_string());
×
320
                }
2✔
321
                log::debug!("stream finished, mark cache entry as finished.");
2✔
322
            }
323
            *this.finished = true;
2✔
324
        }
325

326
        Poll::Ready(next)
18✔
327
    }
18✔
328
}
329

330
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
331
#[pinned_drop]
×
332
impl<S, E> PinnedDrop for CacheOutputStream<S, E>
333
where
334
    S: Stream<Item = Result<E>>,
335
    E: CacheElement,
2✔
336
{
2✔
337
    fn drop(self: Pin<&mut Self>) {
2✔
338
        if !self.finished {
2✔
339
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
2✔
340
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
2✔
341
            if let Err(e) = r {
2✔
342
                log::debug!("could not send abort to cache: {}", e.to_string());
2✔
343
            }
2✔
344
        }
2✔
345
    }
2✔
346
}
347

348
#[cfg(test)]
349
mod tests {
350
    use futures::StreamExt;
351
    use geoengine_datatypes::{
352
        primitives::{SpatialPartition2D, SpatialResolution, TimeInterval},
353
        raster::TilesEqualIgnoringCacheHint,
354
        util::test::TestDefault,
355
    };
356

357
    use crate::{
358
        engine::{
359
            ChunkByteSize, MockExecutionContext, MockQueryContext, QueryContextExtensions,
360
            RasterOperator, WorkflowOperatorPath,
361
        },
362
        source::{GdalSource, GdalSourceParameters},
363
        util::gdal::add_ndvi_dataset,
364
    };
365

366
    use super::*;
367

368
    #[tokio::test]
1✔
369
    async fn it_caches() {
1✔
370
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
371

1✔
372
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
373

374
        let operator = GdalSource {
1✔
375
            params: GdalSourceParameters { data: ndvi_id },
1✔
376
        }
1✔
377
        .boxed()
1✔
378
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
379
        .await
×
380
        .unwrap();
1✔
381

1✔
382
        let cached_op = InitializedCacheOperator::new(operator);
1✔
383

1✔
384
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
385

1✔
386
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
387

1✔
388
        let mut extensions = QueryContextExtensions::default();
1✔
389

1✔
390
        extensions.insert(tile_cache);
1✔
391

1✔
392
        let query_ctx =
1✔
393
            MockQueryContext::new_with_query_extensions(ChunkByteSize::test_default(), extensions);
1✔
394

395
        let stream = processor
1✔
396
            .query(
1✔
397
                QueryRectangle {
1✔
398
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
399
                        [-180., -90.].into(),
1✔
400
                        [180., 90.].into(),
1✔
401
                    ),
1✔
402
                    time_interval: TimeInterval::default(),
1✔
403
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
404
                },
1✔
405
                &query_ctx,
1✔
406
            )
1✔
407
            .await
×
408
            .unwrap();
1✔
409

410
        let tiles = stream.collect::<Vec<_>>().await;
1✔
411
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
412

1✔
413
        // wait for the cache to be filled, which happens asynchronously
1✔
414
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
415

416
        let stream_from_cache = processor
1✔
417
            .query(
1✔
418
                QueryRectangle {
1✔
419
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
420
                        [-180., -90.].into(),
1✔
421
                        [180., 90.].into(),
1✔
422
                    ),
1✔
423
                    time_interval: TimeInterval::default(),
1✔
424
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
425
                },
1✔
426
                &query_ctx,
1✔
427
            )
1✔
428
            .await
×
429
            .unwrap();
1✔
430

431
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
432
        let tiles_from_cache = tiles_from_cache
1✔
433
            .into_iter()
1✔
434
            .collect::<Result<Vec<_>>>()
1✔
435
            .unwrap();
1✔
436

1✔
437
        // TODO: how to ensure the tiles are actually from the cache?
1✔
438

1✔
439
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
440
    }
441
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc