• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 22061625775

16 Feb 2026 11:51AM UTC coverage: 88.379%. First build
22061625775

Pull #1120

github

web-flow
Merge 3d98d327a into 078c12706
Pull Request #1120: fix: Cache and Stacker should keep band order for subsets

319 of 357 new or added lines in 10 files covered. (89.36%)

116037 of 131295 relevant lines covered (88.38%)

499205.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.24
/operators/src/cache/cache_operator.rs
1
use super::cache_chunks::CacheElementSpatialBounds;
2
use super::error::CacheError;
3
use super::shared_cache::CacheElement;
4
use crate::adapters::FeatureCollectionChunkMerger;
5
use crate::cache::shared_cache::{AsyncCache, SharedCache};
6
use crate::engine::{
7
    CanonicOperatorName, ChunkByteSize, InitializedRasterOperator, InitializedVectorOperator,
8
    QueryContext, QueryProcessor, RasterQueryProcessor, RasterResultDescriptor, ResultDescriptor,
9
    TypedRasterQueryProcessor, WorkflowOperatorPath,
10
};
11
use crate::error::Error;
12
use crate::util::Result;
13
use async_trait::async_trait;
14
use futures::stream::{BoxStream, FusedStream};
15
use futures::{Stream, StreamExt, ready};
16
use geoengine_datatypes::collections::{FeatureCollection, FeatureCollectionInfos};
17
use geoengine_datatypes::primitives::{
18
    BandSelection, Geometry, QueryAttributeSelection, QueryRectangle, RasterQueryRectangle,
19
    VectorQueryRectangle,
20
};
21
use geoengine_datatypes::raster::{GridBoundingBox2D, Pixel, RasterTile2D};
22
use geoengine_datatypes::util::arrow::ArrowTyped;
23
use geoengine_datatypes::util::helpers::ge_report;
24
use pin_project::{pin_project, pinned_drop};
25
use std::pin::Pin;
26
use std::task::{Context, Poll};
27
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
28

29
/// A cache operator that caches the results of its source operator
30
pub struct InitializedCacheOperator<S> {
31
    source: S,
32
}
33

34
impl<S> InitializedCacheOperator<S> {
35
    pub fn new(source: S) -> Self {
3✔
36
        Self { source }
3✔
37
    }
3✔
38
}
39

40
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
41
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
42
        self.source.result_descriptor()
×
43
    }
×
44

45
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
3✔
46
        let processor_result = self.source.query_processor();
3✔
47
        match processor_result {
3✔
48
            Ok(p) => {
3✔
49
                let res_processor = match p {
3✔
50
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
3✔
51
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
3✔
52
                    )),
3✔
53
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
54
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
55
                    )),
×
56
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
57
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
58
                    )),
×
59
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
60
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
61
                    )),
×
62
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
63
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
64
                    )),
×
65
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
66
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
67
                    )),
×
68
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
69
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
70
                    )),
×
71
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
72
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
73
                    )),
×
74
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
75
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
76
                    )),
×
77
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
78
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
79
                    )),
×
80
                };
81
                tracing::debug!(event = "query processor created");
3✔
82
                Ok(res_processor)
3✔
83
            }
84
            Err(err) => {
×
85
                tracing::debug!(event = "query processor failed");
×
86
                Err(err)
×
87
            }
88
        }
89
    }
3✔
90

91
    fn canonic_name(&self) -> CanonicOperatorName {
×
92
        self.source.canonic_name()
×
93
    }
×
94

95
    fn name(&self) -> &'static str {
×
96
        self.source.name()
×
97
    }
×
98

99
    fn path(&self) -> WorkflowOperatorPath {
×
100
        self.source.path()
×
101
    }
×
102

103
    fn optimize(
×
104
        &self,
×
105
        resolution: geoengine_datatypes::primitives::SpatialResolution,
×
106
    ) -> Result<Box<dyn crate::engine::RasterOperator>, crate::optimization::OptimizationError>
×
107
    {
108
        self.source.optimize(resolution)
×
109
    }
×
110
}
111

112
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
113
    fn result_descriptor(&self) -> &crate::engine::VectorResultDescriptor {
×
114
        self.source.result_descriptor()
×
115
    }
×
116

117
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
×
118
        let processor_result = self.source.query_processor();
×
119
        match processor_result {
×
120
            Ok(p) => {
×
121
                let res_processor = match p {
×
122
                    crate::engine::TypedVectorQueryProcessor::Data(p) => {
×
123
                        crate::engine::TypedVectorQueryProcessor::Data(Box::new(
×
124
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
125
                        ))
×
126
                    }
127
                    crate::engine::TypedVectorQueryProcessor::MultiPoint(p) => {
×
128
                        crate::engine::TypedVectorQueryProcessor::MultiPoint(Box::new(
×
129
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
130
                        ))
×
131
                    }
132
                    crate::engine::TypedVectorQueryProcessor::MultiLineString(p) => {
×
133
                        crate::engine::TypedVectorQueryProcessor::MultiLineString(Box::new(
×
134
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
135
                        ))
×
136
                    }
137
                    crate::engine::TypedVectorQueryProcessor::MultiPolygon(p) => {
×
138
                        crate::engine::TypedVectorQueryProcessor::MultiPolygon(Box::new(
×
139
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
140
                        ))
×
141
                    }
142
                };
143
                tracing::debug!(event = "query processor created");
×
144

145
                Ok(res_processor)
×
146
            }
147
            Err(err) => {
×
148
                tracing::debug!(event = "query processor failed");
×
149
                Err(err)
×
150
            }
151
        }
152
    }
×
153

154
    fn canonic_name(&self) -> CanonicOperatorName {
×
155
        self.source.canonic_name()
×
156
    }
×
157

158
    fn name(&self) -> &'static str {
×
159
        self.source.name()
×
160
    }
×
161

162
    fn path(&self) -> WorkflowOperatorPath {
×
163
        self.source.path()
×
164
    }
×
165

166
    fn optimize(
×
167
        &self,
×
168
        resolution: geoengine_datatypes::primitives::SpatialResolution,
×
169
    ) -> Result<Box<dyn crate::engine::VectorOperator>, crate::optimization::OptimizationError>
×
170
    {
171
        self.source.optimize(resolution)
×
172
    }
×
173
}
174

175
/// A cache operator that caches the results of its source operator
176
struct CacheQueryProcessor<P, E, Q, U, R>
177
where
178
    E: CacheElement + Send + Sync + 'static,
179
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U, ResultDescription = R>,
180
{
181
    processor: P,
182
    cache_key: CanonicOperatorName,
183
}
184

185
impl<P, E, Q, U, R> CacheQueryProcessor<P, E, Q, U, R>
186
where
187
    E: CacheElement + Send + Sync + 'static,
188
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U, ResultDescription = R> + Sized,
189
{
190
    pub fn new(processor: P, cache_key: CanonicOperatorName) -> Self {
3✔
191
        CacheQueryProcessor {
3✔
192
            processor,
3✔
193
            cache_key,
3✔
194
        }
3✔
195
    }
3✔
196
}
197

198
#[async_trait]
199
impl<P, E, S, U, R> QueryProcessor for CacheQueryProcessor<P, E, S, U, R>
200
where
201
    P: QueryProcessor<Output = E, SpatialBounds = S, Selection = U, ResultDescription = R> + Sized,
202
    S: Clone + Send + Sync + 'static,
203
    U: QueryAttributeSelection,
204
    E: CacheElement<Query = QueryRectangle<S, U>>
205
        + Send
206
        + Sync
207
        + 'static
208
        + ResultStreamWrapper
209
        + Clone,
210
    E::ResultStream: Stream<Item = Result<E, CacheError>> + Send + Sync + 'static,
211
    SharedCache: AsyncCache<E>,
212
    R: ResultDescriptor<QueryRectangleSpatialBounds = S, QueryRectangleAttributeSelection = U>,
213
{
214
    type Output = E;
215
    type SpatialBounds = S;
216
    type Selection = U;
217
    type ResultDescription = R;
218

219
    async fn _query<'a>(
220
        &'a self,
221
        query: QueryRectangle<Self::SpatialBounds, Self::Selection>,
222
        ctx: &'a dyn QueryContext,
223
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
7✔
224
        let shared_cache = ctx
225
            .cache()
226
            .expect("`SharedCache` extension should be set during `ProContext` creation");
227

228
        let cache_result = shared_cache.query_cache(&self.cache_key, &query).await;
229

230
        if let Ok(Some(cache_result)) = cache_result {
231
            // cache hit
232
            tracing::debug!("cache hit for operator {}", self.cache_key);
233

234
            let wrapped_result_steam =
235
                E::wrap_result_stream(cache_result, ctx.chunk_byte_size(), query.clone());
236

237
            return Ok(wrapped_result_steam);
238
        }
239

240
        // cache miss
241
        tracing::debug!("cache miss for operator {}", self.cache_key);
242
        let source_stream = self.processor.query(query.clone(), ctx).await?;
243

244
        let query_id = shared_cache.insert_query(&self.cache_key, &query).await;
245

246
        if let Err(e) = query_id {
247
            tracing::debug!("could not insert query into cache: {e}");
248
            return Ok(source_stream);
249
        }
250

251
        let query_id = query_id.expect("query_id should be set because of the previous check");
252

253
        // lazily insert tiles into the cache as they are produced
254
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
255

256
        let cache_key = self.cache_key.clone();
257
        let tile_cache = shared_cache.clone();
258
        crate::util::spawn(async move {
7✔
259
            while let Some(event) = stream_event_receiver.recv().await {
230✔
260
                match event {
227✔
261
                    SourceStreamEvent::Element(tile) => {
224✔
262
                        let result = tile_cache
224✔
263
                            .insert_query_element(&cache_key, &query_id, tile)
224✔
264
                            .await;
224✔
265
                        tracing::trace!(
220✔
266
                            "inserted tile into cache for cache key {cache_key} and query id {query_id}. result: {result:?}"
×
267
                        );
268
                    }
269
                    SourceStreamEvent::Abort => {
270
                        tile_cache.abort_query(&cache_key, &query_id).await;
×
271
                        tracing::debug!(
×
272
                            "aborted cache insertion for cache key {cache_key} and query id {query_id}"
×
273
                        );
274
                    }
275
                    SourceStreamEvent::Finished => {
276
                        let result = tile_cache.finish_query(&cache_key, &query_id).await;
3✔
277
                        tracing::debug!(
3✔
278
                            "finished cache insertion for cache key {cache_key} and query id {query_id}, result: {result:?}"
×
279
                        );
280
                    }
281
                }
282
            }
283
        });
3✔
284

285
        let output_stream = CacheOutputStream {
286
            source: source_stream,
287
            stream_event_sender,
288
            finished: false,
289
            pristine: true,
290
        };
291

292
        Ok(Box::pin(output_stream))
293
    }
7✔
294

295
    fn result_descriptor(&self) -> &Self::ResultDescription {
14✔
296
        self.processor.result_descriptor()
14✔
297
    }
14✔
298
}
299

300
#[async_trait]
301
impl<T, P> RasterQueryProcessor
302
    for CacheQueryProcessor<
303
        P,
304
        RasterTile2D<T>,
305
        GridBoundingBox2D,
306
        BandSelection,
307
        RasterResultDescriptor,
308
    >
309
where
310
    P: RasterQueryProcessor<RasterType = T> + Sized,
311
    SharedCache: AsyncCache<RasterTile2D<T>>,
312
    T: Pixel + Send + Sync + 'static,
313
    RasterTile2D<T>: CacheElement<Query = RasterQueryRectangle> + Send + Sync + 'static,
314
    <RasterTile2D<T> as CacheElement>::ResultStream:
315
        Stream<Item = Result<RasterTile2D<T>, CacheError>> + Send + Sync + 'static,
316
{
317
    type RasterType = T;
318

319
    async fn _time_query<'a>(
320
        &'a self,
321
        query: geoengine_datatypes::primitives::TimeInterval,
322
        ctx: &'a dyn QueryContext,
323
    ) -> Result<BoxStream<'a, Result<geoengine_datatypes::primitives::TimeInterval>>> {
×
324
        self.processor.time_query(query, ctx).await // TODO: investigate if we can use caching here?
325
    }
×
326
}
327

328
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
329
enum SourceStreamEvent<E: CacheElement> {
330
    Element(E),
331
    Abort,
332
    Finished,
333
}
334

335
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
336
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
337
struct CacheOutputStream<S, E>
338
where
339
    S: Stream<Item = Result<E>>,
340
    E: CacheElement + Clone,
341
{
342
    #[pin]
343
    source: S,
344
    stream_event_sender: UnboundedSender<SourceStreamEvent<E>>,
345
    finished: bool,
346
    pristine: bool,
347
}
348

349
impl<S, E> Stream for CacheOutputStream<S, E>
350
where
351
    S: Stream<Item = Result<E>>,
352
    E: CacheElement + Clone,
353
{
354
    type Item = Result<E>;
355

356
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
706✔
357
        let this = self.project();
706✔
358

359
        let next = ready!(this.source.poll_next(cx));
706✔
360

361
        if let Some(element) = &next {
295✔
362
            *this.pristine = false;
288✔
363
            if let Ok(element) = element {
288✔
364
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
365
                let r = this
288✔
366
                    .stream_event_sender
288✔
367
                    .send(SourceStreamEvent::Element(element.clone()));
288✔
368
                if let Err(e) = r {
288✔
369
                    tracing::warn!("could not send tile to cache: {}", ge_report(e));
×
370
                }
288✔
371
            } else {
372
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
373
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
374
                if let Err(e) = r {
×
375
                    tracing::warn!("could not send abort to cache: {}", ge_report(e));
×
376
                }
×
377
            }
378
        } else {
379
            if *this.pristine {
7✔
380
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
381
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
382
                if let Err(e) = r {
×
383
                    tracing::warn!("could not send abort to cache: {}", ge_report(e));
×
384
                }
×
385
            } else {
386
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
387
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
7✔
388
                if let Err(e) = r {
7✔
389
                    tracing::warn!("could not send finished to cache: {}", ge_report(e));
×
390
                }
7✔
391
                tracing::debug!("stream finished, mark cache entry as finished.");
7✔
392
            }
393
            *this.finished = true;
7✔
394
        }
395

396
        Poll::Ready(next)
295✔
397
    }
706✔
398
}
399

400
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
401
#[pinned_drop]
402
impl<S, E> PinnedDrop for CacheOutputStream<S, E>
403
where
404
    S: Stream<Item = Result<E>>,
405
    E: CacheElement + Clone,
7✔
406
{
7✔
407
    fn drop(self: Pin<&mut Self>) {
7✔
408
        if !self.finished {
7✔
409
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
410
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
×
411
            if let Err(e) = r {
×
412
                tracing::debug!("could not send abort to cache: {e}");
×
413
            }
×
414
        }
7✔
415
    }
7✔
416
}
417

418
trait ResultStreamWrapper: CacheElement {
419
    fn wrap_result_stream<'a>(
420
        stream: Self::ResultStream,
421
        chunk_byte_size: ChunkByteSize,
422
        query: Self::Query,
423
    ) -> BoxStream<'a, Result<Self>>;
424
}
425

426
impl<G> ResultStreamWrapper for FeatureCollection<G>
427
where
428
    G: Geometry + ArrowTyped + Send + Sync + 'static,
429
    FeatureCollection<G>:
430
        CacheElement<Query = VectorQueryRectangle> + Send + Sync + CacheElementSpatialBounds,
431
    Self::ResultStream: FusedStream + Send + Sync,
432
{
433
    fn wrap_result_stream<'a>(
×
434
        stream: Self::ResultStream,
×
435
        chunk_byte_size: ChunkByteSize,
×
436
        query: Self::Query,
×
437
    ) -> BoxStream<'a, Result<Self>> {
×
438
        let filter_stream = stream.filter_map(move |result| {
×
439
            let query = query.clone();
×
440
            async move {
×
441
                result
×
442
                    .and_then(|collection| collection.filter_cache_element_entries(&query))
×
443
                    .map_err(|source| Error::CacheCantProduceResult {
×
444
                        source: source.into(),
×
445
                    })
×
446
                    .map(|fc| if fc.is_empty() { None } else { Some(fc) })
×
447
                    .transpose()
×
448
            }
×
449
        });
×
450

451
        let merger_stream =
×
452
            FeatureCollectionChunkMerger::new(filter_stream, chunk_byte_size.into());
×
453
        Box::pin(merger_stream)
×
454
    }
×
455
}
456

457
impl<P> ResultStreamWrapper for RasterTile2D<P>
458
where
459
    P: 'static + Pixel,
460
    RasterTile2D<P>: CacheElement<Query = RasterQueryRectangle>,
461
    Self::ResultStream: Send + Sync,
462
{
463
    fn wrap_result_stream<'a>(
×
464
        stream: Self::ResultStream,
×
465
        _chunk_byte_size: ChunkByteSize,
×
NEW
466
        query: Self::Query,
×
467
    ) -> BoxStream<'a, Result<Self>> {
×
NEW
468
        let query_bands = query.attributes().clone();
×
469

NEW
470
        let band_filter_stream = stream.filter_map(move |result| {
×
NEW
471
            let qb = query_bands.clone();
×
NEW
472
            async move {
×
NEW
473
                match result {
×
NEW
474
                    Ok(tile) => {
×
NEW
475
                        let tile_band = tile.band;
×
NEW
476
                        if qb.contains(tile_band) {
×
NEW
477
                            Some(Ok(tile))
×
478
                        } else {
NEW
479
                            None
×
480
                        }
481
                    }
NEW
482
                    Err(e) => Some(Err(Error::CacheCantProduceResult { source: e.into() })),
×
483
                }
NEW
484
            }
×
NEW
485
        });
×
486

NEW
487
        Box::pin(band_filter_stream)
×
488
    }
×
489
}
490

491
#[cfg(test)]
492
mod tests {
493
    use super::*;
494

495
    use crate::{
496
        engine::{
497
            ChunkByteSize, MockExecutionContext, MultipleRasterSources, RasterOperator,
498
            SingleRasterSource, WorkflowOperatorPath,
499
        },
500
        processing::{Expression, ExpressionParams, RasterStacker, RasterStackerParams},
501
        source::{GdalSource, GdalSourceParameters},
502
        util::gdal::add_ndvi_dataset,
503
    };
504
    use futures::{StreamExt, TryStreamExt};
505
    use geoengine_datatypes::{
506
        primitives::{BandSelection, RasterQueryRectangle, TimeInterval},
507
        raster::{GridBoundingBox2D, RasterDataType, RenameBands, TilesEqualIgnoringCacheHint},
508
        util::test::TestDefault,
509
    };
510
    use std::sync::Arc;
511

512
    #[tokio::test]
513
    async fn it_caches() {
1✔
514
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
515

516
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
517

518
        let operator = GdalSource {
1✔
519
            params: GdalSourceParameters::new(ndvi_id.clone()),
1✔
520
        }
1✔
521
        .boxed()
1✔
522
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
523
        .await
1✔
524
        .unwrap();
1✔
525

526
        let cached_op = InitializedCacheOperator::new(operator);
1✔
527

528
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
529

530
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
531

532
        let query_ctx = exe_ctx.mock_query_context_with_query_extensions(
1✔
533
            ChunkByteSize::test_default(),
1✔
534
            Some(tile_cache),
1✔
535
            None,
1✔
536
            None,
1✔
537
        );
538

539
        let stream = processor
1✔
540
            .query(
1✔
541
                RasterQueryRectangle::new(
1✔
542
                    GridBoundingBox2D::new([-90, -180], [89, 179]).unwrap(),
1✔
543
                    TimeInterval::default(),
1✔
544
                    BandSelection::first(),
1✔
545
                ),
1✔
546
                &query_ctx,
1✔
547
            )
1✔
548
            .await
1✔
549
            .unwrap();
1✔
550

551
        let tiles = stream.collect::<Vec<_>>().await;
1✔
552
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
553

554
        // wait for the cache to be filled, which happens asynchronously
555
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
556

557
        // delete the dataset to make sure the result is served from the cache
558
        exe_ctx.delete_meta_data(&ndvi_id);
1✔
559

560
        let stream_from_cache = processor
1✔
561
            .query(
1✔
562
                RasterQueryRectangle::new(
1✔
563
                    GridBoundingBox2D::new([-90, -180], [89, 179]).unwrap(),
1✔
564
                    TimeInterval::default(),
1✔
565
                    BandSelection::first(),
1✔
566
                ),
1✔
567
                &query_ctx,
1✔
568
            )
1✔
569
            .await
1✔
570
            .unwrap();
1✔
571

572
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
573
        let tiles_from_cache = tiles_from_cache
1✔
574
            .into_iter()
1✔
575
            .collect::<Result<Vec<_>>>()
1✔
576
            .unwrap();
1✔
577

578
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
579
    }
1✔
580

581
    #[tokio::test]
582
    async fn it_reuses_bands_from_cache_entries() {
1✔
583
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
584

585
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
586

587
        let operator = RasterStacker {
1✔
588
            params: RasterStackerParams {
1✔
589
                rename_bands: RenameBands::Default,
1✔
590
            },
1✔
591
            sources: MultipleRasterSources {
1✔
592
                rasters: vec![
1✔
593
                    GdalSource {
1✔
594
                        params: GdalSourceParameters {
1✔
595
                            data: ndvi_id.clone(),
1✔
596
                            overview_level: None,
1✔
597
                        },
1✔
598
                    }
1✔
599
                    .boxed(),
1✔
600
                    Expression {
1✔
601
                        params: ExpressionParams {
1✔
602
                            expression: "2 * A".to_string(),
1✔
603
                            output_type: RasterDataType::U8,
1✔
604
                            output_band: None,
1✔
605
                            map_no_data: false,
1✔
606
                        },
1✔
607
                        sources: SingleRasterSource {
1✔
608
                            raster: GdalSource {
1✔
609
                                params: GdalSourceParameters {
1✔
610
                                    data: ndvi_id.clone(),
1✔
611
                                    overview_level: None,
1✔
612
                                },
1✔
613
                            }
1✔
614
                            .boxed(),
1✔
615
                        },
1✔
616
                    }
1✔
617
                    .boxed(),
1✔
618
                ],
1✔
619
            },
1✔
620
        }
1✔
621
        .boxed()
1✔
622
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
623
        .await
1✔
624
        .unwrap();
1✔
625

626
        let cached_op = InitializedCacheOperator::new(operator);
1✔
627

628
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
629

630
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
631

632
        let query_ctx = exe_ctx.mock_query_context_with_query_extensions(
1✔
633
            ChunkByteSize::test_default(),
1✔
634
            Some(tile_cache),
1✔
635
            None,
1✔
636
            None,
1✔
637
        );
638

639
        // query the first two bands
640
        let stream = processor
1✔
641
            .query(
1✔
642
                RasterQueryRectangle::new(
1✔
643
                    GridBoundingBox2D::new([-90, -180], [89, 179]).unwrap(),
1✔
644
                    TimeInterval::default(),
1✔
645
                    BandSelection::new(vec![0, 1]).unwrap(),
1✔
646
                ),
1✔
647
                &query_ctx,
1✔
648
            )
1✔
649
            .await
1✔
650
            .unwrap();
1✔
651

652
        let tiles = stream.collect::<Vec<_>>().await;
1✔
653
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
654
        // only keep the second band for comparison
655
        let tiles = tiles
1✔
656
            .into_iter()
1✔
657
            .filter_map(|tile| {
64✔
658
                if tile.band == 1 {
64✔
659
                    //tile.band = 0;
660
                    Some(tile)
32✔
661
                } else {
662
                    None
32✔
663
                }
664
            })
64✔
665
            .collect::<Vec<_>>();
1✔
666

667
        // wait for the cache to be filled, which happens asynchronously
668
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
669

670
        // delete the dataset to make sure the result is served from the cache
671
        exe_ctx.delete_meta_data(&ndvi_id);
1✔
672

673
        // now query only the second band
674
        let stream_from_cache = processor
1✔
675
            .query(
1✔
676
                RasterQueryRectangle::new(
1✔
677
                    GridBoundingBox2D::new([-90, -180], [89, 179]).unwrap(),
1✔
678
                    TimeInterval::default(),
1✔
679
                    BandSelection::new_single(1),
1✔
680
                ),
1✔
681
                &query_ctx,
1✔
682
            )
1✔
683
            .await
1✔
684
            .unwrap();
1✔
685

686
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
687
        let tiles_from_cache = tiles_from_cache
1✔
688
            .into_iter()
1✔
689
            .collect::<Result<Vec<_>>>()
1✔
690
            .unwrap();
1✔
691

692
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
693
    }
1✔
694

695
    #[allow(clippy::too_many_lines)]
696
    #[tokio::test]
697
    async fn it_reuses_multi_band_subset() {
1✔
698
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
699

700
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
701

702
        let source_operator_a = GdalSource {
1✔
703
            params: GdalSourceParameters::new(ndvi_id.clone()),
1✔
704
        };
1✔
705

706
        let source_operator_b = GdalSource {
1✔
707
            params: GdalSourceParameters::new(ndvi_id.clone()),
1✔
708
        };
1✔
709

710
        let stacked_operator = RasterStacker {
1✔
711
            params: RasterStackerParams {
1✔
712
                rename_bands: RenameBands::Rename(vec!["band_0".to_string(), "band_1".to_string()]),
1✔
713
            },
1✔
714
            sources: MultipleRasterSources {
1✔
715
                rasters: vec![source_operator_a.boxed(), source_operator_b.boxed()],
1✔
716
            },
1✔
717
        };
1✔
718

719
        let operator = stacked_operator
1✔
720
            .boxed()
1✔
721
            .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
722
            .await
1✔
723
            .unwrap();
1✔
724

725
        let cached_op = InitializedCacheOperator::new(operator);
1✔
726

727
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
728

729
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
730

731
        let query_ctx = exe_ctx.mock_query_context_with_query_extensions(
1✔
732
            ChunkByteSize::test_default(),
1✔
733
            Some(tile_cache),
1✔
734
            None,
1✔
735
            None,
1✔
736
        );
737

738
        // query the first two bands
739
        let stream = processor
1✔
740
            .query(
1✔
741
                RasterQueryRectangle::new(
1✔
742
                    GridBoundingBox2D::new([-90, -180], [89, 179]).unwrap(),
1✔
743
                    TimeInterval::default(),
1✔
744
                    BandSelection::new(vec![0, 1]).unwrap(),
1✔
745
                ),
1✔
746
                &query_ctx,
1✔
747
            )
1✔
748
            .await
1✔
749
            .unwrap();
1✔
750

751
        // now count the tiles per band by folding over the stream
752
        let tile_per_band_count: (usize, usize) = stream
1✔
753
            .fold((0usize, 0usize), |mut acc, tile_result| async move {
64✔
754
                match tile_result {
64✔
755
                    Ok(tile) => {
64✔
756
                        if tile.band == 0 {
64✔
757
                            acc.0 += 1;
32✔
758
                        } else if tile.band == 1 {
32✔
759
                            acc.1 += 1;
32✔
760
                        }
32✔
761
                    }
NEW
762
                    Err(e) => {
×
NEW
763
                        panic!("Error in tile stream: {e}")
×
764
                    }
765
                }
766
                acc
64✔
767
            })
128✔
768
            .await;
1✔
769

770
        assert!(
1✔
771
            tile_per_band_count.0 > 0,
1✔
NEW
772
            "There should be tiles for band 0"
×
773
        );
774
        assert_eq!(
1✔
775
            tile_per_band_count.0, tile_per_band_count.1,
NEW
776
            "Both bands should have the same number of tiles"
×
777
        );
778

779
        // wait for the cache to be filled, which happens asynchronously
780
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
781

782
        // delete the dataset to make sure the result is served from the cache
783
        exe_ctx.delete_meta_data(&ndvi_id);
1✔
784

785
        // now query only the second band
786
        let stream_from_cache_0 = processor
1✔
787
            .query(
1✔
788
                RasterQueryRectangle::new(
1✔
789
                    GridBoundingBox2D::new([-90, -180], [89, 179]).unwrap(),
1✔
790
                    TimeInterval::default(),
1✔
791
                    BandSelection::first_n(1),
1✔
792
                ),
1✔
793
                &query_ctx,
1✔
794
            )
1✔
795
            .await
1✔
796
            .unwrap();
1✔
797

798
        let band_0_count = stream_from_cache_0
1✔
799
            .inspect_ok(|tile| {
32✔
800
                assert_eq!(tile.band, 0, "Only band 0 should be present in the result");
32✔
801
            })
32✔
802
            .count()
1✔
803
            .await;
1✔
804

805
        assert!(
1✔
806
            band_0_count > 0,
1✔
NEW
807
            "There should be tiles for band 0 in the cache result"
×
808
        );
809
        assert_eq!(
1✔
810
            band_0_count, tile_per_band_count.0,
NEW
811
            "The number of tiles for band 0 should match the previous query"
×
812
        );
813

814
        // now query only the second band
815
        let stream_from_cache_1 = processor
1✔
816
            .query(
1✔
817
                RasterQueryRectangle::new(
1✔
818
                    GridBoundingBox2D::new([-90, -180], [89, 179]).unwrap(),
1✔
819
                    TimeInterval::default(),
1✔
820
                    BandSelection::new_single(1),
1✔
821
                ),
1✔
822
                &query_ctx,
1✔
823
            )
1✔
824
            .await
1✔
825
            .unwrap();
1✔
826

827
        let band_1_count = stream_from_cache_1
1✔
828
            .inspect_ok(|tile| {
32✔
829
                assert_eq!(tile.band, 1, "Only band 1 should be present in the result");
32✔
830
            })
32✔
831
            .count()
1✔
832
            .await;
1✔
833

834
        assert!(
1✔
835
            band_1_count > 1,
1✔
NEW
836
            "There should be tiles for band 1 in the cache result"
×
837
        );
838
        assert_eq!(
1✔
839
            band_1_count, tile_per_band_count.1,
1✔
840
            "The number of tiles for band 1 should match the previous query"
1✔
841
        );
1✔
842
    }
1✔
843
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc