• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 7006568925

27 Nov 2023 02:07PM UTC coverage: 89.651% (+0.2%) from 89.498%
7006568925

push

github

web-flow
Merge pull request #888 from geo-engine/raster_stacks

raster stacking

4032 of 4274 new or added lines in 107 files covered. (94.34%)

12 existing lines in 8 files now uncovered.

113020 of 126066 relevant lines covered (89.65%)

59901.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.95
/operators/src/pro/cache/cache_operator.rs
1
use super::cache_chunks::CacheElementSpatialBounds;
2
use super::error::CacheError;
3
use super::shared_cache::CacheElement;
4
use crate::adapters::FeatureCollectionChunkMerger;
5
use crate::engine::{
6
    CanonicOperatorName, ChunkByteSize, InitializedRasterOperator, InitializedVectorOperator,
7
    QueryContext, QueryProcessor, RasterResultDescriptor, TypedRasterQueryProcessor,
8
};
9
use crate::error::Error;
10
use crate::pro::cache::shared_cache::{AsyncCache, SharedCache};
11
use crate::util::Result;
12
use async_trait::async_trait;
13
use futures::stream::{BoxStream, FusedStream};
14
use futures::{ready, Stream, StreamExt, TryStreamExt};
15
use geoengine_datatypes::collections::{FeatureCollection, FeatureCollectionInfos};
16
use geoengine_datatypes::primitives::{
17
    AxisAlignedRectangle, Geometry, QueryAttributeSelection, QueryRectangle, VectorQueryRectangle,
18
};
19
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
20
use geoengine_datatypes::util::arrow::ArrowTyped;
21
use pin_project::{pin_project, pinned_drop};
22
use snafu::ensure;
23
use std::pin::Pin;
24
use std::sync::Arc;
25
use std::task::{Context, Poll};
26
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
27

28
/// A cache operator that caches the results of its source operator
29
pub struct InitializedCacheOperator<S> {
30
    source: S,
31
}
32

33
impl<S> InitializedCacheOperator<S> {
34
    pub fn new(source: S) -> Self {
1✔
35
        Self { source }
1✔
36
    }
1✔
37
}
38

39
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
40
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
41
        self.source.result_descriptor()
×
42
    }
×
43

44
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
45
        // TODO: implement multi-band functionality and remove this check
1✔
46
        ensure!(
1✔
47
            self.source.result_descriptor().bands.len() == 1,
1✔
NEW
48
            crate::error::OperatorDoesNotSupportMultiBandsSourcesYet {
×
NEW
49
                operator: "CacheOperator"
×
NEW
50
            }
×
51
        );
52

53
        let processor_result = self.source.query_processor();
1✔
54
        match processor_result {
1✔
55
            Ok(p) => {
1✔
56
                let res_processor = match p {
1✔
57
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
1✔
58
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
1✔
59
                    )),
1✔
60
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
61
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
62
                    )),
×
63
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
64
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
65
                    )),
×
66
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
67
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
68
                    )),
×
69
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
70
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
71
                    )),
×
72
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
73
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
74
                    )),
×
75
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
76
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
77
                    )),
×
78
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
79
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
80
                    )),
×
81
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
82
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
83
                    )),
×
84
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
85
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
86
                    )),
×
87
                };
88
                tracing::debug!(event = "query processor created");
1✔
89
                Ok(res_processor)
1✔
90
            }
91
            Err(err) => {
×
92
                tracing::debug!(event = "query processor failed");
×
93
                Err(err)
×
94
            }
95
        }
96
    }
1✔
97

98
    fn canonic_name(&self) -> CanonicOperatorName {
×
99
        self.source.canonic_name()
×
100
    }
×
101
}
102

103
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
104
    fn result_descriptor(&self) -> &crate::engine::VectorResultDescriptor {
×
105
        self.source.result_descriptor()
×
106
    }
×
107

108
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
×
109
        let processor_result = self.source.query_processor();
×
110
        match processor_result {
×
111
            Ok(p) => {
×
112
                let res_processor = match p {
×
113
                    crate::engine::TypedVectorQueryProcessor::Data(p) => {
×
114
                        crate::engine::TypedVectorQueryProcessor::Data(Box::new(
×
115
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
116
                        ))
×
117
                    }
118
                    crate::engine::TypedVectorQueryProcessor::MultiPoint(p) => {
×
119
                        crate::engine::TypedVectorQueryProcessor::MultiPoint(Box::new(
×
120
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
121
                        ))
×
122
                    }
123
                    crate::engine::TypedVectorQueryProcessor::MultiLineString(p) => {
×
124
                        crate::engine::TypedVectorQueryProcessor::MultiLineString(Box::new(
×
125
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
126
                        ))
×
127
                    }
128
                    crate::engine::TypedVectorQueryProcessor::MultiPolygon(p) => {
×
129
                        crate::engine::TypedVectorQueryProcessor::MultiPolygon(Box::new(
×
130
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
131
                        ))
×
132
                    }
133
                };
134
                tracing::debug!(event = "query processor created");
×
135

136
                Ok(res_processor)
×
137
            }
138
            Err(err) => {
×
139
                tracing::debug!(event = "query processor failed");
×
140
                Err(err)
×
141
            }
142
        }
143
    }
×
144

145
    fn canonic_name(&self) -> CanonicOperatorName {
×
146
        self.source.canonic_name()
×
147
    }
×
148
}
149

150
/// A cache operator that caches the results of its source operator
151
struct CacheQueryProcessor<P, E, Q, U>
152
where
153
    E: CacheElement + Send + Sync + 'static,
154
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U>,
155
{
156
    processor: P,
157
    cache_key: CanonicOperatorName,
158
}
159

160
impl<P, E, Q, U> CacheQueryProcessor<P, E, Q, U>
161
where
162
    E: CacheElement + Send + Sync + 'static,
163
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U> + Sized,
164
{
165
    pub fn new(processor: P, cache_key: CanonicOperatorName) -> Self {
1✔
166
        CacheQueryProcessor {
1✔
167
            processor,
1✔
168
            cache_key,
1✔
169
        }
1✔
170
    }
1✔
171
}
172

173
#[async_trait]
174
impl<P, E, S, U> QueryProcessor for CacheQueryProcessor<P, E, S, U>
175
where
176
    P: QueryProcessor<Output = E, SpatialBounds = S, Selection = U> + Sized,
177
    S: AxisAlignedRectangle + Send + Sync + 'static,
178
    U: QueryAttributeSelection,
179
    E: CacheElement<Query = QueryRectangle<S, U>>
180
        + Send
181
        + Sync
182
        + 'static
183
        + ResultStreamWrapper
184
        + Clone,
185
    E::ResultStream: Stream<Item = Result<E, CacheError>> + Send + Sync + 'static,
186
    SharedCache: AsyncCache<E>,
187
{
188
    type Output = E;
189
    type SpatialBounds = S;
190
    type Selection = U;
191

192
    async fn _query<'a>(
2✔
193
        &'a self,
2✔
194
        query: QueryRectangle<Self::SpatialBounds, Self::Selection>,
2✔
195
        ctx: &'a dyn QueryContext,
2✔
196
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
2✔
197
        let shared_cache = ctx
2✔
198
            .extensions()
2✔
199
            .get::<Arc<SharedCache>>()
2✔
200
            .expect("`SharedCache` extension should be set during `ProContext` creation");
2✔
201

202
        let cache_result = shared_cache.query_cache(&self.cache_key, &query).await;
2✔
203

204
        if let Ok(Some(cache_result)) = cache_result {
2✔
205
            // cache hit
206
            log::debug!("cache hit for operator {}", self.cache_key);
×
207

208
            let wrapped_result_steam =
×
NEW
209
                E::wrap_result_stream(cache_result, ctx.chunk_byte_size(), query.clone());
×
210

×
211
            return Ok(wrapped_result_steam);
×
212
        }
2✔
213

2✔
214
        // cache miss
2✔
215
        log::debug!("cache miss for operator {}", self.cache_key);
2✔
216
        let source_stream = self.processor.query(query.clone(), ctx).await?;
2✔
217

218
        let query_id = shared_cache.insert_query(&self.cache_key, &query).await;
2✔
219

220
        if let Err(e) = query_id {
2✔
221
            log::debug!("could not insert query into cache: {}", e);
×
222
            return Ok(source_stream);
×
223
        }
2✔
224

2✔
225
        let query_id = query_id.expect("query_id should be set because of the previous check");
2✔
226

2✔
227
        // lazily insert tiles into the cache as they are produced
2✔
228
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
2✔
229

2✔
230
        let cache_key = self.cache_key.clone();
2✔
231
        let tile_cache = shared_cache.clone();
2✔
232
        crate::util::spawn(async move {
2✔
233
            while let Some(event) = stream_event_receiver.recv().await {
10✔
234
                match event {
9✔
235
                    SourceStreamEvent::Element(tile) => {
8✔
236
                        let result = tile_cache
8✔
237
                            .insert_query_element(&cache_key, &query_id, tile)
8✔
238
                            .await;
8✔
239
                        log::trace!(
8✔
240
                            "inserted tile into cache for cache key {} and query id {}. result: {:?}",
×
241
                            cache_key,
242
                            query_id,
243
                            result
244
                        );
245
                    }
246
                    SourceStreamEvent::Abort => {
247
                        tile_cache.abort_query(&cache_key, &query_id).await;
×
248
                        log::debug!(
×
249
                            "aborted cache insertion for cache key {} and query id {}",
×
250
                            cache_key,
251
                            query_id
252
                        );
253
                    }
254
                    SourceStreamEvent::Finished => {
255
                        let result = tile_cache.finish_query(&cache_key, &query_id).await;
1✔
256
                        log::debug!(
1✔
257
                            "finished cache insertion for cache key {} and query id {}, result: {:?}",
×
258
                            cache_key,query_id,
259
                            result
260
                        );
261
                    }
262
                }
263
            }
264
        });
2✔
265

2✔
266
        let output_stream = CacheOutputStream {
2✔
267
            source: source_stream,
2✔
268
            stream_event_sender,
2✔
269
            finished: false,
2✔
270
            pristine: true,
2✔
271
        };
2✔
272

2✔
273
        Ok(Box::pin(output_stream))
2✔
274
    }
4✔
275
}
276

277
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
278
enum SourceStreamEvent<E: CacheElement> {
279
    Element(E),
280
    Abort,
281
    Finished,
282
}
283

284
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
285
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
18✔
286
struct CacheOutputStream<S, E>
287
where
288
    S: Stream<Item = Result<E>>,
289
    E: CacheElement + Clone,
290
{
291
    #[pin]
292
    source: S,
293
    stream_event_sender: UnboundedSender<SourceStreamEvent<E>>,
294
    finished: bool,
295
    pristine: bool,
296
}
297

298
impl<S, E> Stream for CacheOutputStream<S, E>
299
where
300
    S: Stream<Item = Result<E>>,
301
    E: CacheElement + Clone,
302
{
303
    type Item = Result<E>;
304

305
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
18✔
306
        let this = self.project();
18✔
307

308
        let next = ready!(this.source.poll_next(cx));
18✔
309

310
        if let Some(element) = &next {
18✔
311
            *this.pristine = false;
16✔
312
            if let Ok(element) = element {
16✔
313
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
314
                let r = this
16✔
315
                    .stream_event_sender
16✔
316
                    .send(SourceStreamEvent::Element(element.clone()));
16✔
317
                if let Err(e) = r {
16✔
318
                    log::warn!("could not send tile to cache: {}", e.to_string());
×
319
                }
16✔
320
            } else {
321
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
322
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
323
                if let Err(e) = r {
×
324
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
325
                }
×
326
            }
327
        } else {
328
            if *this.pristine {
2✔
329
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
330
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
331
                if let Err(e) = r {
×
332
                    log::warn!("could not send abort to cache: {}", e.to_string());
×
333
                }
×
334
            } else {
335
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
336
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
2✔
337
                if let Err(e) = r {
2✔
338
                    log::warn!("could not send finished to cache: {}", e.to_string());
×
339
                }
2✔
340
                log::debug!("stream finished, mark cache entry as finished.");
2✔
341
            }
342
            *this.finished = true;
2✔
343
        }
344

345
        Poll::Ready(next)
18✔
346
    }
18✔
347
}
348

349
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
350
#[pinned_drop]
×
351
impl<S, E> PinnedDrop for CacheOutputStream<S, E>
352
where
353
    S: Stream<Item = Result<E>>,
354
    E: CacheElement + Clone,
2✔
355
{
2✔
356
    fn drop(self: Pin<&mut Self>) {
2✔
357
        if !self.finished {
2✔
358
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
2✔
359
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
2✔
360
            if let Err(e) = r {
2✔
361
                log::debug!("could not send abort to cache: {}", e.to_string());
2✔
362
            }
2✔
363
        }
2✔
364
    }
2✔
365
}
366

367
trait ResultStreamWrapper: CacheElement {
368
    fn wrap_result_stream<'a>(
369
        stream: Self::ResultStream,
370
        chunk_byte_size: ChunkByteSize,
371
        query: Self::Query,
372
    ) -> BoxStream<'a, Result<Self>>;
373
}
374

375
impl<G> ResultStreamWrapper for FeatureCollection<G>
376
where
377
    G: Geometry + ArrowTyped + Send + Sync + 'static,
378
    FeatureCollection<G>:
379
        CacheElement<Query = VectorQueryRectangle> + Send + Sync + CacheElementSpatialBounds,
380
    Self::ResultStream: FusedStream + Send + Sync,
381
{
382
    fn wrap_result_stream<'a>(
×
383
        stream: Self::ResultStream,
×
384
        chunk_byte_size: ChunkByteSize,
×
385
        query: Self::Query,
×
386
    ) -> BoxStream<'a, Result<Self>> {
×
NEW
387
        let filter_stream = stream.filter_map(move |result| {
×
NEW
388
            let query = query.clone();
×
NEW
389
            async move {
×
NEW
390
                result
×
NEW
391
                    .and_then(|collection| collection.filter_cache_element_entries(&query))
×
NEW
392
                    .map_err(|source| Error::CacheCantProduceResult {
×
NEW
393
                        source: source.into(),
×
NEW
394
                    })
×
NEW
395
                    .map(|fc| if fc.is_empty() { None } else { Some(fc) })
×
NEW
396
                    .transpose()
×
NEW
397
            }
×
398
        });
×
399

×
400
        let merger_stream =
×
401
            FeatureCollectionChunkMerger::new(filter_stream, chunk_byte_size.into());
×
402
        Box::pin(merger_stream)
×
403
    }
×
404
}
405

406
impl<P> ResultStreamWrapper for RasterTile2D<P>
407
where
408
    P: 'static + Pixel,
409
    RasterTile2D<P>: CacheElement,
410
    Self::ResultStream: Send + Sync,
411
{
412
    fn wrap_result_stream<'a>(
×
413
        stream: Self::ResultStream,
×
414
        _chunk_byte_size: ChunkByteSize,
×
415
        _query: Self::Query,
×
416
    ) -> BoxStream<'a, Result<Self>> {
×
417
        Box::pin(stream.map_err(|ce| Error::CacheCantProduceResult { source: ce.into() }))
×
418
    }
×
419
}
420

421
#[cfg(test)]
422
mod tests {
423
    use futures::StreamExt;
424
    use geoengine_datatypes::{
425
        primitives::{BandSelection, SpatialPartition2D, SpatialResolution, TimeInterval},
426
        raster::TilesEqualIgnoringCacheHint,
427
        util::test::TestDefault,
428
    };
429

430
    use crate::{
431
        engine::{
432
            ChunkByteSize, MockExecutionContext, MockQueryContext, QueryContextExtensions,
433
            RasterOperator, WorkflowOperatorPath,
434
        },
435
        source::{GdalSource, GdalSourceParameters},
436
        util::gdal::add_ndvi_dataset,
437
    };
438

439
    use super::*;
440

441
    #[tokio::test]
1✔
442
    async fn it_caches() {
1✔
443
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
444

1✔
445
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
446

447
        let operator = GdalSource {
1✔
448
            params: GdalSourceParameters { data: ndvi_id },
1✔
449
        }
1✔
450
        .boxed()
1✔
451
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
452
        .await
×
453
        .unwrap();
1✔
454

1✔
455
        let cached_op = InitializedCacheOperator::new(operator);
1✔
456

1✔
457
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
458

1✔
459
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
460

1✔
461
        let mut extensions = QueryContextExtensions::default();
1✔
462

1✔
463
        extensions.insert(tile_cache);
1✔
464

1✔
465
        let query_ctx =
1✔
466
            MockQueryContext::new_with_query_extensions(ChunkByteSize::test_default(), extensions);
1✔
467

468
        let stream = processor
1✔
469
            .query(
1✔
470
                QueryRectangle {
1✔
471
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
472
                        [-180., -90.].into(),
1✔
473
                        [180., 90.].into(),
1✔
474
                    ),
1✔
475
                    time_interval: TimeInterval::default(),
1✔
476
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
477
                    attributes: BandSelection::first(),
1✔
478
                },
1✔
479
                &query_ctx,
1✔
480
            )
1✔
481
            .await
×
482
            .unwrap();
1✔
483

484
        let tiles = stream.collect::<Vec<_>>().await;
1✔
485
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
486

1✔
487
        // wait for the cache to be filled, which happens asynchronously
1✔
488
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
489

490
        let stream_from_cache = processor
1✔
491
            .query(
1✔
492
                QueryRectangle {
1✔
493
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
494
                        [-180., -90.].into(),
1✔
495
                        [180., 90.].into(),
1✔
496
                    ),
1✔
497
                    time_interval: TimeInterval::default(),
1✔
498
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
499
                    attributes: BandSelection::first(),
1✔
500
                },
1✔
501
                &query_ctx,
1✔
502
            )
1✔
503
            .await
×
504
            .unwrap();
1✔
505

506
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
507
        let tiles_from_cache = tiles_from_cache
1✔
508
            .into_iter()
1✔
509
            .collect::<Result<Vec<_>>>()
1✔
510
            .unwrap();
1✔
511

1✔
512
        // TODO: how to ensure the tiles are actually from the cache?
1✔
513

1✔
514
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
515
    }
516
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc