• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.45
/operators/src/cache/cache_operator.rs
1
use super::cache_chunks::CacheElementSpatialBounds;
2
use super::error::CacheError;
3
use super::shared_cache::CacheElement;
4
use crate::adapters::FeatureCollectionChunkMerger;
5
use crate::cache::shared_cache::{AsyncCache, SharedCache};
6
use crate::engine::{
7
    CanonicOperatorName, ChunkByteSize, InitializedRasterOperator, InitializedVectorOperator,
8
    QueryContext, QueryProcessor, RasterResultDescriptor, ResultDescriptor,
9
    TypedRasterQueryProcessor, WorkflowOperatorPath,
10
};
11
use crate::error::Error;
12
use crate::util::Result;
13
use async_trait::async_trait;
14
use futures::stream::{BoxStream, FusedStream};
15
use futures::{ready, Stream, StreamExt, TryStreamExt};
16
use geoengine_datatypes::collections::{FeatureCollection, FeatureCollectionInfos};
17
use geoengine_datatypes::primitives::{
18
    AxisAlignedRectangle, Geometry, QueryAttributeSelection, QueryRectangle, VectorQueryRectangle,
19
};
20
use geoengine_datatypes::raster::{Pixel, RasterTile2D};
21
use geoengine_datatypes::util::arrow::ArrowTyped;
22
use geoengine_datatypes::util::helpers::ge_report;
23
use pin_project::{pin_project, pinned_drop};
24
use std::pin::Pin;
25
use std::task::{Context, Poll};
26
use tokio::sync::mpsc::{unbounded_channel, UnboundedSender};
27

28
/// A cache operator that caches the results of its source operator
29
pub struct InitializedCacheOperator<S> {
30
    source: S,
31
}
32

33
impl<S> InitializedCacheOperator<S> {
34
    pub fn new(source: S) -> Self {
2✔
35
        Self { source }
2✔
36
    }
2✔
37
}
38

39
impl InitializedRasterOperator for InitializedCacheOperator<Box<dyn InitializedRasterOperator>> {
40
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
41
        self.source.result_descriptor()
×
42
    }
×
43

44
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
45
        let processor_result = self.source.query_processor();
2✔
46
        match processor_result {
2✔
47
            Ok(p) => {
2✔
48
                let res_processor = match p {
2✔
49
                    TypedRasterQueryProcessor::U8(p) => TypedRasterQueryProcessor::U8(Box::new(
2✔
50
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
2✔
51
                    )),
2✔
52
                    TypedRasterQueryProcessor::U16(p) => TypedRasterQueryProcessor::U16(Box::new(
×
53
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
54
                    )),
×
55
                    TypedRasterQueryProcessor::U32(p) => TypedRasterQueryProcessor::U32(Box::new(
×
56
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
57
                    )),
×
58
                    TypedRasterQueryProcessor::U64(p) => TypedRasterQueryProcessor::U64(Box::new(
×
59
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
60
                    )),
×
61
                    TypedRasterQueryProcessor::I8(p) => TypedRasterQueryProcessor::I8(Box::new(
×
62
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
63
                    )),
×
64
                    TypedRasterQueryProcessor::I16(p) => TypedRasterQueryProcessor::I16(Box::new(
×
65
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
66
                    )),
×
67
                    TypedRasterQueryProcessor::I32(p) => TypedRasterQueryProcessor::I32(Box::new(
×
68
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
69
                    )),
×
70
                    TypedRasterQueryProcessor::I64(p) => TypedRasterQueryProcessor::I64(Box::new(
×
71
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
72
                    )),
×
73
                    TypedRasterQueryProcessor::F32(p) => TypedRasterQueryProcessor::F32(Box::new(
×
74
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
75
                    )),
×
76
                    TypedRasterQueryProcessor::F64(p) => TypedRasterQueryProcessor::F64(Box::new(
×
77
                        CacheQueryProcessor::new(p, self.source.canonic_name()),
×
78
                    )),
×
79
                };
80
                tracing::debug!(event = "query processor created");
2✔
81
                Ok(res_processor)
2✔
82
            }
83
            Err(err) => {
×
84
                tracing::debug!(event = "query processor failed");
×
85
                Err(err)
×
86
            }
87
        }
88
    }
2✔
89

90
    fn canonic_name(&self) -> CanonicOperatorName {
×
91
        self.source.canonic_name()
×
92
    }
×
93

NEW
94
    fn name(&self) -> &'static str {
×
NEW
95
        self.source.name()
×
NEW
96
    }
×
97

NEW
98
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
99
        self.source.path()
×
NEW
100
    }
×
101
}
102

103
impl InitializedVectorOperator for InitializedCacheOperator<Box<dyn InitializedVectorOperator>> {
104
    fn result_descriptor(&self) -> &crate::engine::VectorResultDescriptor {
×
105
        self.source.result_descriptor()
×
106
    }
×
107

108
    fn query_processor(&self) -> Result<crate::engine::TypedVectorQueryProcessor> {
×
109
        let processor_result = self.source.query_processor();
×
110
        match processor_result {
×
111
            Ok(p) => {
×
112
                let res_processor = match p {
×
113
                    crate::engine::TypedVectorQueryProcessor::Data(p) => {
×
114
                        crate::engine::TypedVectorQueryProcessor::Data(Box::new(
×
115
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
116
                        ))
×
117
                    }
118
                    crate::engine::TypedVectorQueryProcessor::MultiPoint(p) => {
×
119
                        crate::engine::TypedVectorQueryProcessor::MultiPoint(Box::new(
×
120
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
121
                        ))
×
122
                    }
123
                    crate::engine::TypedVectorQueryProcessor::MultiLineString(p) => {
×
124
                        crate::engine::TypedVectorQueryProcessor::MultiLineString(Box::new(
×
125
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
126
                        ))
×
127
                    }
128
                    crate::engine::TypedVectorQueryProcessor::MultiPolygon(p) => {
×
129
                        crate::engine::TypedVectorQueryProcessor::MultiPolygon(Box::new(
×
130
                            CacheQueryProcessor::new(p, self.source.canonic_name()),
×
131
                        ))
×
132
                    }
133
                };
134
                tracing::debug!(event = "query processor created");
×
135

136
                Ok(res_processor)
×
137
            }
138
            Err(err) => {
×
139
                tracing::debug!(event = "query processor failed");
×
140
                Err(err)
×
141
            }
142
        }
143
    }
×
144

145
    fn canonic_name(&self) -> CanonicOperatorName {
×
146
        self.source.canonic_name()
×
147
    }
×
148

NEW
149
    fn name(&self) -> &'static str {
×
NEW
150
        self.source.name()
×
NEW
151
    }
×
152

NEW
153
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
154
        self.source.path()
×
NEW
155
    }
×
156
}
157

158
/// A cache operator that caches the results of its source operator
159
struct CacheQueryProcessor<P, E, Q, U, R>
160
where
161
    E: CacheElement + Send + Sync + 'static,
162
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U, ResultDescription = R>,
163
{
164
    processor: P,
165
    cache_key: CanonicOperatorName,
166
}
167

168
impl<P, E, Q, U, R> CacheQueryProcessor<P, E, Q, U, R>
169
where
170
    E: CacheElement + Send + Sync + 'static,
171
    P: QueryProcessor<Output = E, SpatialBounds = Q, Selection = U, ResultDescription = R> + Sized,
172
{
173
    pub fn new(processor: P, cache_key: CanonicOperatorName) -> Self {
2✔
174
        CacheQueryProcessor {
2✔
175
            processor,
2✔
176
            cache_key,
2✔
177
        }
2✔
178
    }
2✔
179
}
180

181
#[async_trait]
182
impl<P, E, S, U, R> QueryProcessor for CacheQueryProcessor<P, E, S, U, R>
183
where
184
    P: QueryProcessor<Output = E, SpatialBounds = S, Selection = U, ResultDescription = R> + Sized,
185
    S: AxisAlignedRectangle + Send + Sync + 'static,
186
    U: QueryAttributeSelection,
187
    E: CacheElement<Query = QueryRectangle<S, U>>
188
        + Send
189
        + Sync
190
        + 'static
191
        + ResultStreamWrapper
192
        + Clone,
193
    E::ResultStream: Stream<Item = Result<E, CacheError>> + Send + Sync + 'static,
194
    SharedCache: AsyncCache<E>,
195
    R: ResultDescriptor<QueryRectangleSpatialBounds = S, QueryRectangleAttributeSelection = U>,
196
{
197
    type Output = E;
198
    type SpatialBounds = S;
199
    type Selection = U;
200
    type ResultDescription = R;
201

202
    async fn _query<'a>(
203
        &'a self,
204
        query: QueryRectangle<Self::SpatialBounds, Self::Selection>,
205
        ctx: &'a dyn QueryContext,
206
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
4✔
207
        let shared_cache = ctx
4✔
208
            .cache()
4✔
209
            .expect("`SharedCache` extension should be set during `ProContext` creation");
4✔
210

211
        let cache_result = shared_cache.query_cache(&self.cache_key, &query).await;
4✔
212

213
        if let Ok(Some(cache_result)) = cache_result {
×
214
            // cache hit
215
            log::debug!("cache hit for operator {}", self.cache_key);
×
216

217
            let wrapped_result_steam =
×
218
                E::wrap_result_stream(cache_result, ctx.chunk_byte_size(), query.clone());
×
219

×
220
            return Ok(wrapped_result_steam);
×
221
        }
4✔
222

4✔
223
        // cache miss
4✔
224
        log::debug!("cache miss for operator {}", self.cache_key);
4✔
225
        let source_stream = self.processor.query(query.clone(), ctx).await?;
4✔
226

227
        let query_id = shared_cache.insert_query(&self.cache_key, &query).await;
4✔
228

229
        if let Err(e) = query_id {
4✔
230
            log::debug!("could not insert query into cache: {}", e);
×
231
            return Ok(source_stream);
×
232
        }
4✔
233

4✔
234
        let query_id = query_id.expect("query_id should be set because of the previous check");
4✔
235

4✔
236
        // lazily insert tiles into the cache as they are produced
4✔
237
        let (stream_event_sender, mut stream_event_receiver) = unbounded_channel();
4✔
238

4✔
239
        let cache_key = self.cache_key.clone();
4✔
240
        let tile_cache = shared_cache.clone();
4✔
241
        crate::util::spawn(async move {
4✔
242
            while let Some(event) = stream_event_receiver.recv().await {
33✔
243
                match event {
31✔
244
                    SourceStreamEvent::Element(tile) => {
29✔
245
                        let result = tile_cache
29✔
246
                            .insert_query_element(&cache_key, &query_id, tile)
29✔
247
                            .await;
29✔
248
                        log::trace!(
28✔
249
                            "inserted tile into cache for cache key {} and query id {}. result: {:?}",
×
250
                            cache_key,
251
                            query_id,
252
                            result
253
                        );
254
                    }
255
                    SourceStreamEvent::Abort => {
256
                        tile_cache.abort_query(&cache_key, &query_id).await;
×
257
                        log::debug!(
×
258
                            "aborted cache insertion for cache key {} and query id {}",
×
259
                            cache_key,
260
                            query_id
261
                        );
262
                    }
263
                    SourceStreamEvent::Finished => {
264
                        let result = tile_cache.finish_query(&cache_key, &query_id).await;
2✔
265
                        log::debug!(
2✔
266
                            "finished cache insertion for cache key {} and query id {}, result: {:?}",
×
267
                            cache_key,query_id,
268
                            result
269
                        );
270
                    }
271
                }
272
            }
273
        });
4✔
274

4✔
275
        let output_stream = CacheOutputStream {
4✔
276
            source: source_stream,
4✔
277
            stream_event_sender,
4✔
278
            finished: false,
4✔
279
            pristine: true,
4✔
280
        };
4✔
281

4✔
282
        Ok(Box::pin(output_stream))
4✔
283
    }
8✔
284

285
    fn result_descriptor(&self) -> &Self::ResultDescription {
8✔
286
        self.processor.result_descriptor()
8✔
287
    }
8✔
288
}
289

290
#[allow(clippy::large_enum_variant)] // TODO: Box instead?
291
enum SourceStreamEvent<E: CacheElement> {
292
    Element(E),
293
    Abort,
294
    Finished,
295
}
296

297
/// Custom stream that lazily puts the produced tile in the cache and finishes the cache entry when the source stream completes
298
#[pin_project(PinnedDrop, project = CacheOutputStreamProjection)]
60✔
299
struct CacheOutputStream<S, E>
300
where
301
    S: Stream<Item = Result<E>>,
302
    E: CacheElement + Clone,
303
{
304
    #[pin]
305
    source: S,
306
    stream_event_sender: UnboundedSender<SourceStreamEvent<E>>,
307
    finished: bool,
308
    pristine: bool,
309
}
310

311
impl<S, E> Stream for CacheOutputStream<S, E>
312
where
313
    S: Stream<Item = Result<E>>,
314
    E: CacheElement + Clone,
315
{
316
    type Item = Result<E>;
317

318
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
60✔
319
        let this = self.project();
60✔
320

321
        let next = ready!(this.source.poll_next(cx));
60✔
322

323
        if let Some(element) = &next {
44✔
324
            *this.pristine = false;
40✔
325
            if let Ok(element) = element {
40✔
326
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
327
                let r = this
40✔
328
                    .stream_event_sender
40✔
329
                    .send(SourceStreamEvent::Element(element.clone()));
40✔
330
                if let Err(e) = r {
40✔
331
                    log::warn!("could not send tile to cache: {}", ge_report(e));
×
332
                }
40✔
333
            } else {
334
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
335
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
336
                if let Err(e) = r {
×
337
                    log::warn!("could not send abort to cache: {}", ge_report(e));
×
338
                }
×
339
            }
340
        } else {
341
            if *this.pristine {
4✔
342
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
343
                let r = this.stream_event_sender.send(SourceStreamEvent::Abort);
×
344
                if let Err(e) = r {
×
345
                    log::warn!("could not send abort to cache: {}", ge_report(e));
×
346
                }
×
347
            } else {
348
                // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
349
                let r = this.stream_event_sender.send(SourceStreamEvent::Finished);
4✔
350
                if let Err(e) = r {
4✔
351
                    log::warn!("could not send finished to cache: {}", ge_report(e));
×
352
                }
4✔
353
                log::debug!("stream finished, mark cache entry as finished.");
4✔
354
            }
355
            *this.finished = true;
4✔
356
        }
357

358
        Poll::Ready(next)
44✔
359
    }
60✔
360
}
361

362
/// On drop, trigger the removal of the cache entry if it hasn't been finished yet
363
#[pinned_drop]
×
364
impl<S, E> PinnedDrop for CacheOutputStream<S, E>
365
where
366
    S: Stream<Item = Result<E>>,
367
    E: CacheElement + Clone,
4✔
368
{
4✔
369
    fn drop(self: Pin<&mut Self>) {
4✔
370
        if !self.finished {
4✔
371
            // ignore the result. The receiver shold never drop prematurely, but if it does we don't want to crash
372
            let r = self.stream_event_sender.send(SourceStreamEvent::Abort);
×
373
            if let Err(e) = r {
×
374
                log::debug!("could not send abort to cache: {}", e.to_string());
×
375
            }
×
376
        }
4✔
377
    }
4✔
378
}
379

380
trait ResultStreamWrapper: CacheElement {
381
    fn wrap_result_stream<'a>(
382
        stream: Self::ResultStream,
383
        chunk_byte_size: ChunkByteSize,
384
        query: Self::Query,
385
    ) -> BoxStream<'a, Result<Self>>;
386
}
387

388
impl<G> ResultStreamWrapper for FeatureCollection<G>
389
where
390
    G: Geometry + ArrowTyped + Send + Sync + 'static,
391
    FeatureCollection<G>:
392
        CacheElement<Query = VectorQueryRectangle> + Send + Sync + CacheElementSpatialBounds,
393
    Self::ResultStream: FusedStream + Send + Sync,
394
{
395
    fn wrap_result_stream<'a>(
×
396
        stream: Self::ResultStream,
×
397
        chunk_byte_size: ChunkByteSize,
×
398
        query: Self::Query,
×
399
    ) -> BoxStream<'a, Result<Self>> {
×
400
        let filter_stream = stream.filter_map(move |result| {
×
401
            let query = query.clone();
×
402
            async move {
×
403
                result
×
404
                    .and_then(|collection| collection.filter_cache_element_entries(&query))
×
405
                    .map_err(|source| Error::CacheCantProduceResult {
×
406
                        source: source.into(),
×
407
                    })
×
408
                    .map(|fc| if fc.is_empty() { None } else { Some(fc) })
×
409
                    .transpose()
×
410
            }
×
411
        });
×
412

×
413
        let merger_stream =
×
414
            FeatureCollectionChunkMerger::new(filter_stream, chunk_byte_size.into());
×
415
        Box::pin(merger_stream)
×
416
    }
×
417
}
418

419
impl<P> ResultStreamWrapper for RasterTile2D<P>
420
where
421
    P: 'static + Pixel,
422
    RasterTile2D<P>: CacheElement,
423
    Self::ResultStream: Send + Sync,
424
{
425
    fn wrap_result_stream<'a>(
×
426
        stream: Self::ResultStream,
×
427
        _chunk_byte_size: ChunkByteSize,
×
428
        _query: Self::Query,
×
429
    ) -> BoxStream<'a, Result<Self>> {
×
430
        Box::pin(stream.map_err(|ce| Error::CacheCantProduceResult { source: ce.into() }))
×
431
    }
×
432
}
433

434
#[cfg(test)]
435
mod tests {
436
    use super::*;
437

438
    use crate::{
439
        engine::{
440
            ChunkByteSize, MockExecutionContext, MockQueryContext, MultipleRasterSources,
441
            RasterOperator, SingleRasterSource, WorkflowOperatorPath,
442
        },
443
        processing::{Expression, ExpressionParams, RasterStacker, RasterStackerParams},
444
        source::{GdalSource, GdalSourceParameters},
445
        util::gdal::add_ndvi_dataset,
446
    };
447
    use futures::StreamExt;
448
    use geoengine_datatypes::{
449
        primitives::{BandSelection, SpatialPartition2D, SpatialResolution, TimeInterval},
450
        raster::{RasterDataType, RenameBands, TilesEqualIgnoringCacheHint},
451
        util::test::TestDefault,
452
    };
453
    use std::sync::Arc;
454

455
    #[tokio::test]
456
    async fn it_caches() {
1✔
457
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
458

1✔
459
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
460

1✔
461
        let operator = GdalSource {
1✔
462
            params: GdalSourceParameters {
1✔
463
                data: ndvi_id.clone(),
1✔
464
            },
1✔
465
        }
1✔
466
        .boxed()
1✔
467
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
468
        .await
1✔
469
        .unwrap();
1✔
470

1✔
471
        let cached_op = InitializedCacheOperator::new(operator);
1✔
472

1✔
473
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
474

1✔
475
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
476

1✔
477
        let query_ctx = MockQueryContext::new_with_query_extensions(
1✔
478
            ChunkByteSize::test_default(),
1✔
479
            Some(tile_cache),
1✔
480
            None,
1✔
481
            None,
1✔
482
        );
1✔
483

1✔
484
        let stream = processor
1✔
485
            .query(
1✔
486
                QueryRectangle {
1✔
487
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
488
                        [-180., -90.].into(),
1✔
489
                        [180., 90.].into(),
1✔
490
                    ),
1✔
491
                    time_interval: TimeInterval::default(),
1✔
492
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
493
                    attributes: BandSelection::first(),
1✔
494
                },
1✔
495
                &query_ctx,
1✔
496
            )
1✔
497
            .await
1✔
498
            .unwrap();
1✔
499

1✔
500
        let tiles = stream.collect::<Vec<_>>().await;
1✔
501
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
502

1✔
503
        // wait for the cache to be filled, which happens asynchronously
1✔
504
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
505

1✔
506
        // delete the dataset to make sure the result is served from the cache
1✔
507
        exe_ctx.delete_meta_data(&ndvi_id);
1✔
508

1✔
509
        let stream_from_cache = processor
1✔
510
            .query(
1✔
511
                QueryRectangle {
1✔
512
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
513
                        [-180., -90.].into(),
1✔
514
                        [180., 90.].into(),
1✔
515
                    ),
1✔
516
                    time_interval: TimeInterval::default(),
1✔
517
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
518
                    attributes: BandSelection::first(),
1✔
519
                },
1✔
520
                &query_ctx,
1✔
521
            )
1✔
522
            .await
1✔
523
            .unwrap();
1✔
524

1✔
525
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
526
        let tiles_from_cache = tiles_from_cache
1✔
527
            .into_iter()
1✔
528
            .collect::<Result<Vec<_>>>()
1✔
529
            .unwrap();
1✔
530

1✔
531
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
532
    }
1✔
533

534
    #[tokio::test]
535
    async fn it_reuses_bands_from_cache_entries() {
1✔
536
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
537

1✔
538
        let ndvi_id = add_ndvi_dataset(&mut exe_ctx);
1✔
539

1✔
540
        let operator = RasterStacker {
1✔
541
            params: RasterStackerParams {
1✔
542
                rename_bands: RenameBands::Default,
1✔
543
            },
1✔
544
            sources: MultipleRasterSources {
1✔
545
                rasters: vec![
1✔
546
                    GdalSource {
1✔
547
                        params: GdalSourceParameters {
1✔
548
                            data: ndvi_id.clone(),
1✔
549
                        },
1✔
550
                    }
1✔
551
                    .boxed(),
1✔
552
                    Expression {
1✔
553
                        params: ExpressionParams {
1✔
554
                            expression: "2 * A".to_string(),
1✔
555
                            output_type: RasterDataType::U8,
1✔
556
                            output_band: None,
1✔
557
                            map_no_data: false,
1✔
558
                        },
1✔
559
                        sources: SingleRasterSource {
1✔
560
                            raster: GdalSource {
1✔
561
                                params: GdalSourceParameters {
1✔
562
                                    data: ndvi_id.clone(),
1✔
563
                                },
1✔
564
                            }
1✔
565
                            .boxed(),
1✔
566
                        },
1✔
567
                    }
1✔
568
                    .boxed(),
1✔
569
                ],
1✔
570
            },
1✔
571
        }
1✔
572
        .boxed()
1✔
573
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
574
        .await
1✔
575
        .unwrap();
1✔
576

1✔
577
        let cached_op = InitializedCacheOperator::new(operator);
1✔
578

1✔
579
        let processor = cached_op.query_processor().unwrap().get_u8().unwrap();
1✔
580

1✔
581
        let tile_cache = Arc::new(SharedCache::test_default());
1✔
582

1✔
583
        let query_ctx = MockQueryContext::new_with_query_extensions(
1✔
584
            ChunkByteSize::test_default(),
1✔
585
            Some(tile_cache),
1✔
586
            None,
1✔
587
            None,
1✔
588
        );
1✔
589

1✔
590
        // query the first two bands
1✔
591
        let stream = processor
1✔
592
            .query(
1✔
593
                QueryRectangle {
1✔
594
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
595
                        [-180., -90.].into(),
1✔
596
                        [180., 90.].into(),
1✔
597
                    ),
1✔
598
                    time_interval: TimeInterval::default(),
1✔
599
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
600
                    attributes: BandSelection::new(vec![0, 1]).unwrap(),
1✔
601
                },
1✔
602
                &query_ctx,
1✔
603
            )
1✔
604
            .await
1✔
605
            .unwrap();
1✔
606

1✔
607
        let tiles = stream.collect::<Vec<_>>().await;
1✔
608
        let tiles = tiles.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
609
        // only keep the second band for comparison
1✔
610
        let tiles = tiles
1✔
611
            .into_iter()
1✔
612
            .filter_map(|mut tile| {
16✔
613
                if tile.band == 1 {
16✔
614
                    tile.band = 0;
8✔
615
                    Some(tile)
8✔
616
                } else {
1✔
617
                    None
8✔
618
                }
1✔
619
            })
16✔
620
            .collect::<Vec<_>>();
1✔
621

1✔
622
        // wait for the cache to be filled, which happens asynchronously
1✔
623
        tokio::time::sleep(std::time::Duration::from_millis(1000)).await;
1✔
624

1✔
625
        // delete the dataset to make sure the result is served from the cache
1✔
626
        exe_ctx.delete_meta_data(&ndvi_id);
1✔
627

1✔
628
        // now query only the second band
1✔
629
        let stream_from_cache = processor
1✔
630
            .query(
1✔
631
                QueryRectangle {
1✔
632
                    spatial_bounds: SpatialPartition2D::new_unchecked(
1✔
633
                        [-180., -90.].into(),
1✔
634
                        [180., 90.].into(),
1✔
635
                    ),
1✔
636
                    time_interval: TimeInterval::default(),
1✔
637
                    spatial_resolution: SpatialResolution::zero_point_one(),
1✔
638
                    attributes: BandSelection::new_single(1),
1✔
639
                },
1✔
640
                &query_ctx,
1✔
641
            )
1✔
642
            .await
1✔
643
            .unwrap();
1✔
644

1✔
645
        let tiles_from_cache = stream_from_cache.collect::<Vec<_>>().await;
1✔
646
        let tiles_from_cache = tiles_from_cache
1✔
647
            .into_iter()
1✔
648
            .collect::<Result<Vec<_>>>()
1✔
649
            .unwrap();
1✔
650

1✔
651
        assert!(tiles.tiles_equal_ignoring_cache_hint(&tiles_from_cache));
1✔
652
    }
1✔
653
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc