• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 16189529302

10 Jul 2025 08:02AM UTC coverage: 88.731%. First build
16189529302

Pull #1061

github

web-flow
Merge 345a798f4 into b8910c811
Pull Request #1061: feat(operators): skip empty tiles and merge masks in onnx; remove trace/debug in release mode

98 of 133 new or added lines in 12 files covered. (73.68%)

111296 of 125431 relevant lines covered (88.73%)

79561.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

64.0
/operators/src/cache/cache_operator.rs
1
use super::cache_chunks::CacheElementSpatialBounds;
2
use super::error::CacheError;
3
use super::shared_cache::CacheElement;
4
use crate::adapters::FeatureCollectionChunkMerger;
5
use crate::cache::shared_cache::{AsyncCache, SharedCache};
6
use crate::engine::{
7
    CanonicOperatorName, ChunkByteSize, InitializedRasterOperator, InitializedVectorOperator,
8
    QueryContext, QueryProcessor, RasterResultDescriptor, ResultDescriptor,
9
    TypedRasterQueryProcessor, WorkflowOperatorPath,
10
};
11
use crate::error::Error;
12
use crate::util::Result;
13
use crate::{ge_tracing_removed_debug, ge_tracing_removed_trace};
14
use async_trait::async_trait;
15
use futures::stream::{BoxStream, FusedStream};
16
use futures::{Stream, StreamExt, TryStreamExt, ready};
17
use geoengine_datatypes::collections::{FeatureCollection, FeatureCollectionInfos};
18
use geoengine_datatypes::primitives::{
19
    AxisAlignedRectangle, Geometry, QueryAttributeSelection, QueryRectangle, VectorQueryRectangle,
20
};
21
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
22
use geoengine_datatypes::util::arrow::ArrowTyped;
23
use geoengine_datatypes::util::helpers::ge_report;
24
use pin_project::{pin_project, pinned_drop};
25
use std::pin::Pin;
26
use std::task::{Context, Poll};
27
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
28

29
/// A cache operator that caches the results of its source operator
30
pub struct InitializedCacheOperator<S> {
31
    source: S,
32
}
33

34
impl<S> InitializedCacheOperator<S> {
35
    pub fn new(source: S) -> Self {
2✔
36
        Self { source }
2✔
37
    }
2✔
38
}
39

40
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
41
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
42
        self.source.result_descriptor()
×
43
    }
×
44

45
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
46
        let processor_result = self.source.query_processor();
2✔
47
        match processor_result {
2✔
48
            Ok(p) => {
2✔
49
                let res_processor = match p {
2✔
50
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
2✔
51
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
2✔
52
                    )),
2✔
53
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
54
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
55
                    )),
×
56
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
57
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
58
                    )),
×
59
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
60
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
61
                    )),
×
62
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
63
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
64
                    )),
×
65
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
66
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
67
                    )),
×
68
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
69
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
70
                    )),
×
71
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
72
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
73
                    )),
×
74
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
75
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
76
                    )),
×
77
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
78
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
79
                    )),
×
80
                };
81
                ge_tracing_removed_debug!(event = "query processor created");
2✔
82
                Ok(res_processor)
2✔
83
            }
84
            Err(err) => {
×
NEW
85
                ge_tracing_removed_debug!(event = "query processor failed");
×
86
                Err(err)
×
87
            }
88
        }
89
    }
2✔
90

91
    fn canonic_name(&self) -> CanonicOperatorName {
×
92
        self.source.canonic_name()
×
93
    }
×
94

95
    fn name(&self) -> &'static str {
×
96
        self.source.name()
×
97
    }
×
98

99
    fn path(&self) -> WorkflowOperatorPath {
×
100
        self.source.path()
×
101
    }
×
102
}
103

104
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
105
    fn result_descriptor(&self) -> &crate::engine::VectorResultDescriptor {
×
106
        self.source.result_descriptor()
×
107
    }
×
108

109
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
×
110
        let processor_result = self.source.query_processor();
×
111
        match processor_result {
×
112
            Ok(p) => {
×
113
                let res_processor = match p {
×
114
                    crate::engine::TypedVectorQueryProcessor::Data(p) => {
×
115
                        crate::engine::TypedVectorQueryProcessor::Data(Box::new(
×
116
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
117
                        ))
×
118
                    }
119
                    crate::engine::TypedVectorQueryProcessor::MultiPoint(p) => {
×
120
                        crate::engine::TypedVectorQueryProcessor::MultiPoint(Box::new(
×
121
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
122
                        ))
×
123
                    }
124
                    crate::engine::TypedVectorQueryProcessor::MultiLineString(p) => {
×
125
                        crate::engine::TypedVectorQueryProcessor::MultiLineString(Box::new(
×
126
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
127
                        ))
×
128
                    }
129
                    crate::engine::TypedVectorQueryProcessor::MultiPolygon(p) => {
×
130
                        crate::engine::TypedVectorQueryProcessor::MultiPolygon(Box::new(
×
131
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
132
                        ))
×
133
                    }
134
                };
NEW
135
                ge_tracing_removed_debug!(event = "query processor created");
×
136

137
                Ok(res_processor)
×
138
            }
139
            Err(err) => {
×
NEW
140
                ge_tracing_removed_debug!(event = "query processor failed");
×
141
                Err(err)
×
142
            }
143
        }
144
    }
×
145

146
    fn canonic_name(&self) -> CanonicOperatorName {
×
147
        self.source.canonic_name()
×
148
    }
×
149

150
    fn name(&self) -> &'static str {
×
151
        self.source.name()
×
152
    }
×
153

154
    fn path(&self) -> WorkflowOperatorPath {
×
155
        self.source.path()
×
156
    }
×
157
}
158

159
/// A cache operator that caches the results of its source operator
160
struct CacheQueryProcessor<P, E, Q, U, R>
161
where
162
    E: CacheElement + Send + Sync + 'static,
163
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U, ResultDescription = R>,
164
{
165
    processor: P,
166
    cache_key: CanonicOperatorName,
167
}
168

169
impl<P, E, Q, U, R> CacheQueryProcessor<P, E, Q, U, R>
170
where
171
    E: CacheElement + Send + Sync + 'static,
172
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U, ResultDescription = R> + Sized,
173
{
174
    pub fn new(processor: P, cache_key: CanonicOperatorName) -> Self {
2✔
175
        CacheQueryProcessor {
2✔
176
            processor,
2✔
177
            cache_key,
2✔
178
        }
2✔
179
    }
2✔
180
}
181

182
#[async_trait]
183
impl<P, E, S, U, R> QueryProcessor for CacheQueryProcessor<P, E, S, U, R>
184
where
185
    P: QueryProcessor<Output = E, SpatialBounds = S, Selection = U, ResultDescription = R> + Sized,
186
    S: AxisAlignedRectangle + Send + Sync + 'static,
187
    U: QueryAttributeSelection,
188
    E: CacheElement<Query = QueryRectangle<S, U>>
189
        + Send
190
        + Sync
191
        + 'static
192
        + ResultStreamWrapper
193
        + Clone,
194
    E::ResultStream: Stream<Item = Result<E, CacheError>> + Send + Sync + 'static,
195
    SharedCache: AsyncCache<E>,
196
    R: ResultDescriptor<QueryRectangleSpatialBounds = S, QueryRectangleAttributeSelection = U>,
197
{
198
    type Output = E;
199
    type SpatialBounds = S;
200
    type Selection = U;
201
    type ResultDescription = R;
202

203
    async fn _query<'a>(
204
        &'a self,
205
        query: QueryRectangle<Self::SpatialBounds, Self::Selection>,
206
        ctx: &'a dyn QueryContext,
207
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
8✔
208
        let shared_cache = ctx
4✔
209
            .cache()
4✔
210
            .expect("`SharedCache` extension should be set during `ProContext` creation");
4✔
211

212
        let cache_result = shared_cache.query_cache(&self.cache_key, &query).await;
4✔
213

214
        if let Ok(Some(cache_result)) = cache_result {
×
215
            // cache hit
NEW
216
            ge_tracing_removed_debug!("cache hit for operator {}", self.cache_key);
×
217

218
            let wrapped_result_steam =
×
219
                E::wrap_result_stream(cache_result, ctx.chunk_byte_size(), query.clone());
×
220

221
            return Ok(wrapped_result_steam);
×
222
        }
4✔
223

224
        // cache miss
225
        ge_tracing_removed_debug!("cache miss for operator {}", self.cache_key);
4✔
226
        let source_stream = self.processor.query(query.clone(), ctx).await?;
4✔
227

228
        let query_id = shared_cache.insert_query(&self.cache_key, &query).await;
4✔
229

230
        if let Err(e) = query_id {
4✔
NEW
231
            ge_tracing_removed_debug!("could not insert query into cache: {e}");
×
232
            return Ok(source_stream);
×
233
        }
4✔
234

235
        let query_id = query_id.expect("query_id should be set because of the previous check");
4✔
236

237
        // lazily insert tiles into the cache as they are produced
238
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
4✔
239

240
        let cache_key = self.cache_key.clone();
4✔
241
        let tile_cache = shared_cache.clone();
4✔
242
        crate::util::spawn(async move {
4✔
243
            while let Some(event) = stream_event_receiver.recv().await {
30✔
244
                match event {
28✔
245
                    SourceStreamEvent::Element(tile) => {
26✔
246
                        let result = tile_cache
26✔
247
                            .insert_query_element(&cache_key, &query_id, tile)
26✔
248
                            .await;
26✔
249
                        ge_tracing_removed_trace!(
25✔
250
                            "inserted tile into cache for cache key {cache_key} and query id {query_id}. result: {result:?}"
×
251
                        );
252
                    }
253
                    SourceStreamEvent::Abort => {
254
                        tile_cache.abort_query(&cache_key, &query_id).await;
×
NEW
255
                        ge_tracing_removed_debug!(
×
256
                            "aborted cache insertion for cache key {cache_key} and query id {query_id}"
×
257
                        );
258
                    }
259
                    SourceStreamEvent::Finished => {
260
                        let result = tile_cache.finish_query(&cache_key, &query_id).await;
2✔
261
                        ge_tracing_removed_debug!(
2✔
262
                            "finished cache insertion for cache key {cache_key} and query id {query_id}, result: {result:?}"
×
263
                        );
264
                    }
265
                }
266
            }
267
        });
2✔
268

269
        let output_stream = CacheOutputStream {
4✔
270
            source: source_stream,
4✔
271
            stream_event_sender,
4✔
272
            finished: false,
4✔
273
            pristine: true,
4✔
274
        };
4✔
275

276
        Ok(Box::pin(output_stream))
4✔
277
    }
8✔
278

279
    fn result_descriptor(&self) -> &Self::ResultDescription {
8✔
280
        self.processor.result_descriptor()
8✔
281
    }
8✔
282
}
283

284
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
285
enum SourceStreamEvent<E: CacheElement> {
286
    Element(E),
287
    Abort,
288
    Finished,
289
}
290

291
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
292
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
293
struct CacheOutputStream<S, E>
294
where
295
    S: Stream<Item = Result<E>>,
296
    E: CacheElement + Clone,
297
{
298
    #[pin]
299
    source: S,
300
    stream_event_sender: UnboundedSender<SourceStreamEvent<E>>,
301
    finished: bool,
302
    pristine: bool,
303
}
304

305
impl<S, E> Stream for CacheOutputStream<S, E>
306
where
307
    S: Stream<Item = Result<E>>,
308
    E: CacheElement + Clone,
309
{
310
    type Item = Result<E>;
311

312
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
51✔
313
        let this = self.project();
51✔
314

315
        let next = ready!(this.source.poll_next(cx));
51✔
316

317
        if let Some(element) = &next {
44✔
318
            *this.pristine = false;
40✔
319
            if let Ok(element) = element {
40✔
320
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
321
                let r = this
40✔
322
                    .stream_event_sender
40✔
323
                    .send(SourceStreamEvent::Element(element.clone()));
40✔
324
                if let Err(e) = r {
40✔
325
                    log::warn!("could not send tile to cache: {}", ge_report(e));
×
326
                }
40✔
327
            } else {
328
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
329
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
330
                if let Err(e) = r {
×
331
                    log::warn!("could not send abort to cache: {}", ge_report(e));
×
332
                }
×
333
            }
334
        } else {
335
            if *this.pristine {
4✔
336
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
337
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
338
                if let Err(e) = r {
×
339
                    log::warn!("could not send abort to cache: {}", ge_report(e));
×
340
                }
×
341
            } else {
342
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
343
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
4✔
344
                if let Err(e) = r {
4✔
345
                    log::warn!("could not send finished to cache: {}", ge_report(e));
×
346
                }
4✔
347
                log::debug!("stream finished, mark cache entry as finished.");
4✔
348
            }
349
            *this.finished = true;
4✔
350
        }
351

352
        Poll::Ready(next)
44✔
353
    }
51✔
354
}
355

356
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
357
#[pinned_drop]
358
impl<S, E> PinnedDrop for CacheOutputStream<S, E>
359
where
360
    S: Stream<Item = Result<E>>,
361
    E: CacheElement + Clone,
4✔
362
{
4✔
363
    fn drop(self: Pin<&mut Self>) {
4✔
364
        if !self.finished {
4✔
365
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
366
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
×
367
            if let Err(e) = r {
×
368
                log::debug!("could not send abort to cache: {e}");
×
369
            }
×
370
        }
4✔
371
    }
4✔
372
}
373

374
trait ResultStreamWrapper: CacheElement {
375
    fn wrap_result_stream<'a>(
376
        stream: Self::ResultStream,
377
        chunk_byte_size: ChunkByteSize,
378
        query: Self::Query,
379
    ) -> BoxStream<'a, Result<Self>>;
380
}
381

382
impl<G> ResultStreamWrapper for FeatureCollection<G>
383
where
384
    G: Geometry + ArrowTyped + Send + Sync + 'static,
385
    FeatureCollection<G>:
386
        CacheElement<Query = VectorQueryRectangle> + Send + Sync + CacheElementSpatialBounds,
387
    Self::ResultStream: FusedStream + Send + Sync,
388
{
389
    fn wrap_result_stream<'a>(
×
390
        stream: Self::ResultStream,
×
391
        chunk_byte_size: ChunkByteSize,
×
392
        query: Self::Query,
×
393
    ) -> BoxStream<'a, Result<Self>> {
×
394
        let filter_stream = stream.filter_map(move |result| {
×
395
            let query = query.clone();
×
396
            async move {
×
397
                result
×
398
                    .and_then(|collection| collection.filter_cache_element_entries(&query))
×
399
                    .map_err(|source| Error::CacheCantProduceResult {
×
400
                        source: source.into(),
×
401
                    })
×
402
                    .map(|fc| if fc.is_empty() { None } else { Some(fc) })
×
403
                    .transpose()
×
404
            }
×
405
        });
×
406

407
        let merger_stream =
×
408
            FeatureCollectionChunkMerger::new(filter_stream, chunk_byte_size.into());
×
409
        Box::pin(merger_stream)
×
410
    }
×
411
}
412

413
impl<P> ResultStreamWrapper for RasterTile2D<P>
414
where
415
    P: 'static + Pixel,
416
    RasterTile2D<P>: CacheElement,
417
    Self::ResultStream: Send + Sync,
418
{
419
    fn wrap_result_stream<'a>(
×
420
        stream: Self::ResultStream,
×
421
        _chunk_byte_size: ChunkByteSize,
×
422
        _query: Self::Query,
×
423
    ) -> BoxStream<'a, Result<Self>> {
×
424
        Box::pin(stream.map_err(|ce| Error::CacheCantProduceResult { source: ce.into() }))
×
425
    }
×
426
}
427

428
#[cfg(test)]
429
mod tests {
430
    use super::*;
431

432
    use crate::{
433
        engine::{
434
            ChunkByteSize, MockExecutionContext, MockQueryContext, MultipleRasterSources,
435
            RasterOperator, SingleRasterSource, WorkflowOperatorPath,
436
        },
437
        processing::{Expression, ExpressionParams, RasterStacker, RasterStackerParams},
438
        source::{GdalSource, GdalSourceParameters},
439
        util::gdal::add_ndvi_dataset,
440
    };
441
    use futures::StreamExt;
442
    use geoengine_datatypes::{
443
        primitives::{BandSelection, SpatialPartition2D, SpatialResolution, TimeInterval},
444
        raster::{RasterDataType, RenameBands, TilesEqualIgnoringCacheHint},
445
        util::test::TestDefault,
446
    };
447
    use std::sync::Arc;
448

449
    #[tokio::test]
450
    async fn it_caches() {
1✔
451
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
452

453
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
454

455
        let operator = GdalSource {
1✔
456
            params: GdalSourceParameters {
1✔
457
                data: ndvi_id.clone(),
1✔
458
            },
1✔
459
        }
1✔
460
        .boxed()
1✔
461
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
462
        .await
1✔
463
        .unwrap();
1✔
464

465
        let cached_op = InitializedCacheOperator::new(operator);
1✔
466

467
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
468

469
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
470

471
        let query_ctx = MockQueryContext::new_with_query_extensions(
1✔
472
            ChunkByteSize::test_default(),
1✔
473
            Some(tile_cache),
1✔
474
            None,
1✔
475
            None,
1✔
476
        );
477

478
        let stream = processor
1✔
479
            .query(
1✔
480
                QueryRectangle {
1✔
481
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
482
                        [-180., -90.].into(),
1✔
483
                        [180., 90.].into(),
1✔
484
                    ),
1✔
485
                    time_interval: TimeInterval::default(),
1✔
486
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
487
                    attributes: BandSelection::first(),
1✔
488
                },
1✔
489
                &query_ctx,
1✔
490
            )
1✔
491
            .await
1✔
492
            .unwrap();
1✔
493

494
        let tiles = stream.collect::<Vec<_>>().await;
1✔
495
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
496

497
        // wait for the cache to be filled, which happens asynchronously
498
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
499

500
        // delete the dataset to make sure the result is served from the cache
501
        exe_ctx.delete_meta_data(&ndvi_id);
1✔
502

503
        let stream_from_cache = processor
1✔
504
            .query(
1✔
505
                QueryRectangle {
1✔
506
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
507
                        [-180., -90.].into(),
1✔
508
                        [180., 90.].into(),
1✔
509
                    ),
1✔
510
                    time_interval: TimeInterval::default(),
1✔
511
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
512
                    attributes: BandSelection::first(),
1✔
513
                },
1✔
514
                &query_ctx,
1✔
515
            )
1✔
516
            .await
1✔
517
            .unwrap();
1✔
518

519
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
520
        let tiles_from_cache = tiles_from_cache
1✔
521
            .into_iter()
1✔
522
            .collect::<Result<Vec<_>>>()
1✔
523
            .unwrap();
1✔
524

525
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
526
    }
1✔
527

528
    #[tokio::test]
529
    async fn it_reuses_bands_from_cache_entries() {
1✔
530
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
531

532
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
533

534
        let operator = RasterStacker {
1✔
535
            params: RasterStackerParams {
1✔
536
                rename_bands: RenameBands::Default,
1✔
537
            },
1✔
538
            sources: MultipleRasterSources {
1✔
539
                rasters: vec![
1✔
540
                    GdalSource {
1✔
541
                        params: GdalSourceParameters {
1✔
542
                            data: ndvi_id.clone(),
1✔
543
                        },
1✔
544
                    }
1✔
545
                    .boxed(),
1✔
546
                    Expression {
1✔
547
                        params: ExpressionParams {
1✔
548
                            expression: "2 * A".to_string(),
1✔
549
                            output_type: RasterDataType::U8,
1✔
550
                            output_band: None,
1✔
551
                            map_no_data: false,
1✔
552
                        },
1✔
553
                        sources: SingleRasterSource {
1✔
554
                            raster: GdalSource {
1✔
555
                                params: GdalSourceParameters {
1✔
556
                                    data: ndvi_id.clone(),
1✔
557
                                },
1✔
558
                            }
1✔
559
                            .boxed(),
1✔
560
                        },
1✔
561
                    }
1✔
562
                    .boxed(),
1✔
563
                ],
1✔
564
            },
1✔
565
        }
1✔
566
        .boxed()
1✔
567
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
568
        .await
1✔
569
        .unwrap();
1✔
570

571
        let cached_op = InitializedCacheOperator::new(operator);
1✔
572

573
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
574

575
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
576

577
        let query_ctx = MockQueryContext::new_with_query_extensions(
1✔
578
            ChunkByteSize::test_default(),
1✔
579
            Some(tile_cache),
1✔
580
            None,
1✔
581
            None,
1✔
582
        );
583

584
        // query the first two bands
585
        let stream = processor
1✔
586
            .query(
1✔
587
                QueryRectangle {
1✔
588
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
589
                        [-180., -90.].into(),
1✔
590
                        [180., 90.].into(),
1✔
591
                    ),
1✔
592
                    time_interval: TimeInterval::default(),
1✔
593
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
594
                    attributes: BandSelection::new(vec![0, 1]).unwrap(),
1✔
595
                },
1✔
596
                &query_ctx,
1✔
597
            )
1✔
598
            .await
1✔
599
            .unwrap();
1✔
600

601
        let tiles = stream.collect::<Vec<_>>().await;
1✔
602
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
603
        // only keep the second band for comparison
604
        let tiles = tiles
1✔
605
            .into_iter()
1✔
606
            .filter_map(|mut tile| {
16✔
607
                if tile.band == 1 {
16✔
608
                    tile.band = 0;
8✔
609
                    Some(tile)
8✔
610
                } else {
611
                    None
8✔
612
                }
613
            })
16✔
614
            .collect::<Vec<_>>();
1✔
615

616
        // wait for the cache to be filled, which happens asynchronously
617
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
618

619
        // delete the dataset to make sure the result is served from the cache
620
        exe_ctx.delete_meta_data(&ndvi_id);
1✔
621

622
        // now query only the second band
623
        let stream_from_cache = processor
1✔
624
            .query(
1✔
625
                QueryRectangle {
1✔
626
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
627
                        [-180., -90.].into(),
1✔
628
                        [180., 90.].into(),
1✔
629
                    ),
1✔
630
                    time_interval: TimeInterval::default(),
1✔
631
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
632
                    attributes: BandSelection::new_single(1),
1✔
633
                },
1✔
634
                &query_ctx,
1✔
635
            )
1✔
636
            .await
1✔
637
            .unwrap();
1✔
638

639
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
640
        let tiles_from_cache = tiles_from_cache
1✔
641
            .into_iter()
1✔
642
            .collect::<Result<Vec<_>>>()
1✔
643
            .unwrap();
1✔
644

645
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
646
    }
1✔
647
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc