• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10618683645

29 Aug 2024 03:43PM UTC coverage: 91.108% (-0.009%) from 91.117%
10618683645

push

github

web-flow
Merge pull request #955 from 1lutz/detailed-error-log

More detailed error messages

203 of 267 new or added lines in 35 files covered. (76.03%)

20 existing lines in 9 files now uncovered.

133154 of 146149 relevant lines covered (91.11%)

52778.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.39
/operators/src/pro/cache/cache_operator.rs
1
use super::cache_chunks::CacheElementSpatialBounds;
2
use super::error::CacheError;
3
use super::shared_cache::CacheElement;
4
use crate::adapters::FeatureCollectionChunkMerger;
5
use crate::engine::{
6
    CanonicOperatorName, ChunkByteSize, InitializedRasterOperator, InitializedVectorOperator,
7
    QueryContext, QueryProcessor, RasterResultDescriptor, ResultDescriptor,
8
    TypedRasterQueryProcessor,
9
};
10
use crate::error::Error;
11
use crate::pro::cache::shared_cache::{AsyncCache, SharedCache};
12
use crate::util::Result;
13
use async_trait::async_trait;
14
use futures::stream::{BoxStream, FusedStream};
15
use futures::{ready, Stream, StreamExt, TryStreamExt};
16
use geoengine_datatypes::collections::{FeatureCollection, FeatureCollectionInfos};
17
use geoengine_datatypes::primitives::{
18
    AxisAlignedRectangle, Geometry, QueryAttributeSelection, QueryRectangle, VectorQueryRectangle,
19
};
20
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
21
use geoengine_datatypes::util::arrow::ArrowTyped;
22
use geoengine_datatypes::util::helpers::ge_report;
23
use pin_project::{pin_project, pinned_drop};
24
use std::pin::Pin;
25
use std::sync::Arc;
26
use std::task::{Context, Poll};
27
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
28

29
/// A cache operator that caches the results of its source operator
30
pub struct InitializedCacheOperator<S> {
31
    source: S,
32
}
33

34
impl<S> InitializedCacheOperator<S> {
35
    pub fn new(source: S) -> Self {
2✔
36
        Self { source }
2✔
37
    }
2✔
38
}
39

40
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
41
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
42
        self.source.result_descriptor()
×
43
    }
×
44

45
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
46
        let processor_result = self.source.query_processor();
2✔
47
        match processor_result {
2✔
48
            Ok(p) => {
2✔
49
                let res_processor = match p {
2✔
50
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
2✔
51
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
2✔
52
                    )),
2✔
53
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
54
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
55
                    )),
×
56
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
57
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
58
                    )),
×
59
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
60
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
61
                    )),
×
62
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
63
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
64
                    )),
×
65
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
66
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
67
                    )),
×
68
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
69
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
70
                    )),
×
71
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
72
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
73
                    )),
×
74
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
75
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
76
                    )),
×
77
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
78
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
79
                    )),
×
80
                };
81
                tracing::debug!(event = "query processor created");
2✔
82
                Ok(res_processor)
2✔
83
            }
84
            Err(err) => {
×
85
                tracing::debug!(event = "query processor failed");
×
86
                Err(err)
×
87
            }
88
        }
89
    }
2✔
90

91
    fn canonic_name(&self) -> CanonicOperatorName {
×
92
        self.source.canonic_name()
×
93
    }
×
94
}
95

96
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
97
    fn result_descriptor(&self) -> &crate::engine::VectorResultDescriptor {
×
98
        self.source.result_descriptor()
×
99
    }
×
100

101
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
×
102
        let processor_result = self.source.query_processor();
×
103
        match processor_result {
×
104
            Ok(p) => {
×
105
                let res_processor = match p {
×
106
                    crate::engine::TypedVectorQueryProcessor::Data(p) => {
×
107
                        crate::engine::TypedVectorQueryProcessor::Data(Box::new(
×
108
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
109
                        ))
×
110
                    }
111
                    crate::engine::TypedVectorQueryProcessor::MultiPoint(p) => {
×
112
                        crate::engine::TypedVectorQueryProcessor::MultiPoint(Box::new(
×
113
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
114
                        ))
×
115
                    }
116
                    crate::engine::TypedVectorQueryProcessor::MultiLineString(p) => {
×
117
                        crate::engine::TypedVectorQueryProcessor::MultiLineString(Box::new(
×
118
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
119
                        ))
×
120
                    }
121
                    crate::engine::TypedVectorQueryProcessor::MultiPolygon(p) => {
×
122
                        crate::engine::TypedVectorQueryProcessor::MultiPolygon(Box::new(
×
123
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
124
                        ))
×
125
                    }
126
                };
127
                tracing::debug!(event = "query processor created");
×
128

129
                Ok(res_processor)
×
130
            }
131
            Err(err) => {
×
132
                tracing::debug!(event = "query processor failed");
×
133
                Err(err)
×
134
            }
135
        }
136
    }
×
137

138
    fn canonic_name(&self) -> CanonicOperatorName {
×
139
        self.source.canonic_name()
×
140
    }
×
141
}
142

143
/// A cache operator that caches the results of its source operator
144
struct CacheQueryProcessor<P, E, Q, U, R>
145
where
146
    E: CacheElement + Send + Sync + 'static,
147
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U, ResultDescription = R>,
148
{
149
    processor: P,
150
    cache_key: CanonicOperatorName,
151
}
152

153
impl<P, E, Q, U, R> CacheQueryProcessor<P, E, Q, U, R>
154
where
155
    E: CacheElement + Send + Sync + 'static,
156
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U, ResultDescription = R> + Sized,
157
{
158
    pub fn new(processor: P, cache_key: CanonicOperatorName) -> Self {
2✔
159
        CacheQueryProcessor {
2✔
160
            processor,
2✔
161
            cache_key,
2✔
162
        }
2✔
163
    }
2✔
164
}
165

166
#[async_trait]
167
impl<P, E, S, U, R> QueryProcessor for CacheQueryProcessor<P, E, S, U, R>
168
where
169
    P: QueryProcessor<Output = E, SpatialBounds = S, Selection = U, ResultDescription = R> + Sized,
170
    S: AxisAlignedRectangle + Send + Sync + 'static,
171
    U: QueryAttributeSelection,
172
    E: CacheElement<Query = QueryRectangle<S, U>>
173
        + Send
174
        + Sync
175
        + 'static
176
        + ResultStreamWrapper
177
        + Clone,
178
    E::ResultStream: Stream<Item = Result<E, CacheError>> + Send + Sync + 'static,
179
    SharedCache: AsyncCache<E>,
180
    R: ResultDescriptor<QueryRectangleSpatialBounds = S, QueryRectangleAttributeSelection = U>,
181
{
182
    type Output = E;
183
    type SpatialBounds = S;
184
    type Selection = U;
185
    type ResultDescription = R;
186

187
    async fn _query<'a>(
188
        &'a self,
189
        query: QueryRectangle<Self::SpatialBounds, Self::Selection>,
190
        ctx: &'a dyn QueryContext,
191
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
4✔
192
        let shared_cache = ctx
4✔
193
            .extensions()
4✔
194
            .get::<Arc<SharedCache>>()
4✔
195
            .expect("`SharedCache` extension should be set during `ProContext` creation");
4✔
196

4✔
197
        let cache_result = shared_cache.query_cache(&self.cache_key, &query).await;
4✔
198

4✔
199
        if let Ok(Some(cache_result)) = cache_result {
4✔
200
            // cache hit
4✔
201
            log::debug!("cache hit for operator {}", self.cache_key);
4✔
202

4✔
203
            let wrapped_result_steam =
4✔
204
                E::wrap_result_stream(cache_result, ctx.chunk_byte_size(), query.clone());
×
205

×
206
            return Ok(wrapped_result_steam);
×
207
        }
4✔
208

4✔
209
        // cache miss
4✔
210
        log::debug!("cache miss for operator {}", self.cache_key);
4✔
211
        let source_stream = self.processor.query(query.clone(), ctx).await?;
4✔
212

4✔
213
        let query_id = shared_cache.insert_query(&self.cache_key, &query).await;
4✔
214

4✔
215
        if let Err(e) = query_id {
4✔
216
            log::debug!("could not insert query into cache: {}", e);
4✔
217
            return Ok(source_stream);
4✔
218
        }
4✔
219

4✔
220
        let query_id = query_id.expect("query_id should be set because of the previous check");
4✔
221

4✔
222
        // lazily insert tiles into the cache as they are produced
4✔
223
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
4✔
224

4✔
225
        let cache_key = self.cache_key.clone();
4✔
226
        let tile_cache = shared_cache.clone();
4✔
227
        crate::util::spawn(async move {
4✔
228
            while let Some(event) = stream_event_receiver.recv().await {
34✔
229
                match event {
32✔
230
                    SourceStreamEvent::Element(tile) => {
30✔
231
                        let result = tile_cache
30✔
232
                            .insert_query_element(&cache_key, &query_id, tile)
30✔
233
                            .await;
29✔
234
                        log::trace!(
29✔
235
                            "inserted tile into cache for cache key {} and query id {}. result: {:?}",
4✔
236
                            cache_key,
4✔
237
                            query_id,
4✔
238
                            result
4✔
239
                        );
4✔
240
                    }
4✔
241
                    SourceStreamEvent::Abort => {
4✔
242
                        tile_cache.abort_query(&cache_key, &query_id).await;
4✔
243
                        log::debug!(
4✔
244
                            "aborted cache insertion for cache key {} and query id {}",
4✔
245
                            cache_key,
4✔
246
                            query_id
4✔
247
                        );
4✔
248
                    }
4✔
249
                    SourceStreamEvent::Finished => {
4✔
250
                        let result = tile_cache.finish_query(&cache_key, &query_id).await;
4✔
251
                        log::debug!(
4✔
252
                            "finished cache insertion for cache key {} and query id {}, result: {:?}",
4✔
253
                            cache_key,query_id,
4✔
254
                            result
4✔
255
                        );
4✔
256
                    }
4✔
257
                }
4✔
258
            }
4✔
259
        });
4✔
260

4✔
261
        let output_stream = CacheOutputStream {
4✔
262
            source: source_stream,
4✔
263
            stream_event_sender,
4✔
264
            finished: false,
4✔
265
            pristine: true,
4✔
266
        };
4✔
267

4✔
268
        Ok(Box::pin(output_stream))
4✔
269
    }
4✔
270

271
    fn result_descriptor(&self) -> &Self::ResultDescription {
8✔
272
        self.processor.result_descriptor()
8✔
273
    }
8✔
274
}
275

276
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
277
enum SourceStreamEvent<E: CacheElement> {
278
    Element(E),
279
    Abort,
280
    Finished,
281
}
282

283
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
284
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
60✔
285
struct CacheOutputStream<S, E>
286
where
287
    S: Stream<Item = Result<E>>,
288
    E: CacheElement + Clone,
289
{
290
    #[pin]
291
    source: S,
292
    stream_event_sender: UnboundedSender<SourceStreamEvent<E>>,
293
    finished: bool,
294
    pristine: bool,
295
}
296

297
impl<S, E> Stream for CacheOutputStream<S, E>
298
where
299
    S: Stream<Item = Result<E>>,
300
    E: CacheElement + Clone,
301
{
302
    type Item = Result<E>;
303

304
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60✔
305
        let this = self.project();
60✔
306

307
        let next = ready!(this.source.poll_next(cx));
60✔
308

309
        if let Some(element) = &next {
44✔
310
            *this.pristine = false;
40✔
311
            if let Ok(element) = element {
40✔
312
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
313
                let r = this
40✔
314
                    .stream_event_sender
40✔
315
                    .send(SourceStreamEvent::Element(element.clone()));
40✔
316
                if let Err(e) = r {
40✔
NEW
317
                    log::warn!("could not send tile to cache: {}", ge_report(e));
×
318
                }
40✔
319
            } else {
320
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
321
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
322
                if let Err(e) = r {
×
NEW
323
                    log::warn!("could not send abort to cache: {}", ge_report(e));
×
324
                }
×
325
            }
326
        } else {
327
            if *this.pristine {
4✔
328
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
329
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
330
                if let Err(e) = r {
×
NEW
331
                    log::warn!("could not send abort to cache: {}", ge_report(e));
×
332
                }
×
333
            } else {
334
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
335
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
4✔
336
                if let Err(e) = r {
4✔
NEW
337
                    log::warn!("could not send finished to cache: {}", ge_report(e));
×
338
                }
4✔
339
                log::debug!("stream finished, mark cache entry as finished.");
4✔
340
            }
341
            *this.finished = true;
4✔
342
        }
343

344
        Poll::Ready(next)
44✔
345
    }
60✔
346
}
347

348
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
349
#[pinned_drop]
×
350
impl<S, E> PinnedDrop for CacheOutputStream<S, E>
351
where
352
    S: Stream<Item = Result<E>>,
353
    E: CacheElement + Clone,
4✔
354
{
4✔
355
    fn drop(self: Pin<&mut Self>) {
4✔
356
        if !self.finished {
4✔
357
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
358
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
×
359
            if let Err(e) = r {
×
360
                log::debug!("could not send abort to cache: {}", e.to_string());
×
361
            }
×
362
        }
4✔
363
    }
4✔
364
}
365

366
trait ResultStreamWrapper: CacheElement {
367
    fn wrap_result_stream<'a>(
368
        stream: Self::ResultStream,
369
        chunk_byte_size: ChunkByteSize,
370
        query: Self::Query,
371
    ) -> BoxStream<'a, Result<Self>>;
372
}
373

374
impl<G> ResultStreamWrapper for FeatureCollection<G>
375
where
376
    G: Geometry + ArrowTyped + Send + Sync + 'static,
377
    FeatureCollection<G>:
378
        CacheElement<Query = VectorQueryRectangle> + Send + Sync + CacheElementSpatialBounds,
379
    Self::ResultStream: FusedStream + Send + Sync,
380
{
381
    fn wrap_result_stream<'a>(
×
382
        stream: Self::ResultStream,
×
383
        chunk_byte_size: ChunkByteSize,
×
384
        query: Self::Query,
×
385
    ) -> BoxStream<'a, Result<Self>> {
×
386
        let filter_stream = stream.filter_map(move |result| {
×
387
            let query = query.clone();
×
388
            async move {
×
389
                result
×
390
                    .and_then(|collection| collection.filter_cache_element_entries(&query))
×
391
                    .map_err(|source| Error::CacheCantProduceResult {
×
392
                        source: source.into(),
×
393
                    })
×
394
                    .map(|fc| if fc.is_empty() { None } else { Some(fc) })
×
395
                    .transpose()
×
396
            }
×
397
        });
×
398

×
399
        let merger_stream =
×
400
            FeatureCollectionChunkMerger::new(filter_stream, chunk_byte_size.into());
×
401
        Box::pin(merger_stream)
×
402
    }
×
403
}
404

405
impl<P> ResultStreamWrapper for RasterTile2D<P>
406
where
407
    P: 'static + Pixel,
408
    RasterTile2D<P>: CacheElement,
409
    Self::ResultStream: Send + Sync,
410
{
411
    fn wrap_result_stream<'a>(
×
412
        stream: Self::ResultStream,
×
413
        _chunk_byte_size: ChunkByteSize,
×
414
        _query: Self::Query,
×
415
    ) -> BoxStream<'a, Result<Self>> {
×
416
        Box::pin(stream.map_err(|ce| Error::CacheCantProduceResult { source: ce.into() }))
×
417
    }
×
418
}
419

420
#[cfg(test)]
421
mod tests {
422
    use futures::StreamExt;
423
    use geoengine_datatypes::{
424
        primitives::{BandSelection, SpatialPartition2D, SpatialResolution, TimeInterval},
425
        raster::{RasterDataType, RenameBands, TilesEqualIgnoringCacheHint},
426
        util::test::TestDefault,
427
    };
428

429
    use crate::{
430
        engine::{
431
            ChunkByteSize, MockExecutionContext, MockQueryContext, MultipleRasterSources,
432
            QueryContextExtensions, RasterOperator, SingleRasterSource, WorkflowOperatorPath,
433
        },
434
        processing::{Expression, ExpressionParams, RasterStacker, RasterStackerParams},
435
        source::{GdalSource, GdalSourceParameters},
436
        util::gdal::add_ndvi_dataset,
437
    };
438

439
    use super::*;
440

441
    #[tokio::test]
442
    async fn it_caches() {
1✔
443
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
444

1✔
445
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
446

1✔
447
        let operator = GdalSource {
1✔
448
            params: GdalSourceParameters {
1✔
449
                data: ndvi_id.clone(),
1✔
450
            },
1✔
451
        }
1✔
452
        .boxed()
1✔
453
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
454
        .await
1✔
455
        .unwrap();
1✔
456

1✔
457
        let cached_op = InitializedCacheOperator::new(operator);
1✔
458

1✔
459
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
460

1✔
461
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
462

1✔
463
        let mut extensions = QueryContextExtensions::default();
1✔
464

1✔
465
        extensions.insert(tile_cache);
1✔
466

1✔
467
        let query_ctx =
1✔
468
            MockQueryContext::new_with_query_extensions(ChunkByteSize::test_default(), extensions);
1✔
469

1✔
470
        let stream = processor
1✔
471
            .query(
1✔
472
                QueryRectangle {
1✔
473
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
474
                        [-180., -90.].into(),
1✔
475
                        [180., 90.].into(),
1✔
476
                    ),
1✔
477
                    time_interval: TimeInterval::default(),
1✔
478
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
479
                    attributes: BandSelection::first(),
1✔
480
                },
1✔
481
                &query_ctx,
1✔
482
            )
1✔
483
            .await
1✔
484
            .unwrap();
1✔
485

1✔
486
        let tiles = stream.collect::<Vec<_>>().await;
1✔
487
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
488

1✔
489
        // wait for the cache to be filled, which happens asynchronously
1✔
490
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
491

1✔
492
        // delete the dataset to make sure the result is served from the cache
1✔
493
        exe_ctx.delete_meta_data(&ndvi_id);
1✔
494

1✔
495
        let stream_from_cache = processor
1✔
496
            .query(
1✔
497
                QueryRectangle {
1✔
498
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
499
                        [-180., -90.].into(),
1✔
500
                        [180., 90.].into(),
1✔
501
                    ),
1✔
502
                    time_interval: TimeInterval::default(),
1✔
503
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
504
                    attributes: BandSelection::first(),
1✔
505
                },
1✔
506
                &query_ctx,
1✔
507
            )
1✔
508
            .await
1✔
509
            .unwrap();
1✔
510

1✔
511
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
512
        let tiles_from_cache = tiles_from_cache
1✔
513
            .into_iter()
1✔
514
            .collect::<Result<Vec<_>>>()
1✔
515
            .unwrap();
1✔
516

1✔
517
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
518
    }
1✔
519

520
    #[tokio::test]
521
    async fn it_reuses_bands_from_cache_entries() {
1✔
522
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
523

1✔
524
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
525

1✔
526
        let operator = RasterStacker {
1✔
527
            params: RasterStackerParams {
1✔
528
                rename_bands: RenameBands::Default,
1✔
529
            },
1✔
530
            sources: MultipleRasterSources {
1✔
531
                rasters: vec![
1✔
532
                    GdalSource {
1✔
533
                        params: GdalSourceParameters {
1✔
534
                            data: ndvi_id.clone(),
1✔
535
                        },
1✔
536
                    }
1✔
537
                    .boxed(),
1✔
538
                    Expression {
1✔
539
                        params: ExpressionParams {
1✔
540
                            expression: "2 * A".to_string(),
1✔
541
                            output_type: RasterDataType::U8,
1✔
542
                            output_band: None,
1✔
543
                            map_no_data: false,
1✔
544
                        },
1✔
545
                        sources: SingleRasterSource {
1✔
546
                            raster: GdalSource {
1✔
547
                                params: GdalSourceParameters {
1✔
548
                                    data: ndvi_id.clone(),
1✔
549
                                },
1✔
550
                            }
1✔
551
                            .boxed(),
1✔
552
                        },
1✔
553
                    }
1✔
554
                    .boxed(),
1✔
555
                ],
1✔
556
            },
1✔
557
        }
1✔
558
        .boxed()
1✔
559
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
560
        .await
1✔
561
        .unwrap();
1✔
562

1✔
563
        let cached_op = InitializedCacheOperator::new(operator);
1✔
564

1✔
565
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
566

1✔
567
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
568

1✔
569
        let mut extensions = QueryContextExtensions::default();
1✔
570

1✔
571
        extensions.insert(tile_cache);
1✔
572

1✔
573
        let query_ctx =
1✔
574
            MockQueryContext::new_with_query_extensions(ChunkByteSize::test_default(), extensions);
1✔
575

1✔
576
        // query the first two bands
1✔
577
        let stream = processor
1✔
578
            .query(
1✔
579
                QueryRectangle {
1✔
580
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
581
                        [-180., -90.].into(),
1✔
582
                        [180., 90.].into(),
1✔
583
                    ),
1✔
584
                    time_interval: TimeInterval::default(),
1✔
585
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
586
                    attributes: BandSelection::new(vec![0, 1]).unwrap(),
1✔
587
                },
1✔
588
                &query_ctx,
1✔
589
            )
1✔
590
            .await
1✔
591
            .unwrap();
1✔
592

1✔
593
        let tiles = stream.collect::<Vec<_>>().await;
8✔
594
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
595
        // only keep the second band for comparison
1✔
596
        let tiles = tiles
1✔
597
            .into_iter()
1✔
598
            .filter_map(|mut tile| {
16✔
599
                if tile.band == 1 {
16✔
600
                    tile.band = 0;
8✔
601
                    Some(tile)
8✔
602
                } else {
1✔
603
                    None
8✔
604
                }
1✔
605
            })
16✔
606
            .collect::<Vec<_>>();
1✔
607

1✔
608
        // wait for the cache to be filled, which happens asynchronously
1✔
609
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
610

1✔
611
        // delete the dataset to make sure the result is served from the cache
1✔
612
        exe_ctx.delete_meta_data(&ndvi_id);
1✔
613

1✔
614
        // now query only the second band
1✔
615
        let stream_from_cache = processor
1✔
616
            .query(
1✔
617
                QueryRectangle {
1✔
618
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
619
                        [-180., -90.].into(),
1✔
620
                        [180., 90.].into(),
1✔
621
                    ),
1✔
622
                    time_interval: TimeInterval::default(),
1✔
623
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
624
                    attributes: BandSelection::new_single(1),
1✔
625
                },
1✔
626
                &query_ctx,
1✔
627
            )
1✔
628
            .await
1✔
629
            .unwrap();
1✔
630

1✔
631
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
8✔
632
        let tiles_from_cache = tiles_from_cache
1✔
633
            .into_iter()
1✔
634
            .collect::<Result<Vec<_>>>()
1✔
635
            .unwrap();
1✔
636

1✔
637
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
638
    }
1✔
639
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc