• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 7006568925

27 Nov 2023 02:07PM UTC coverage: 89.651% (+0.2%) from 89.498%
7006568925

push

github

web-flow
Merge pull request #888 from geo-engine/raster_stacks

raster stacking

4032 of 4274 new or added lines in 107 files covered. (94.34%)

12 existing lines in 8 files now uncovered.

113020 of 126066 relevant lines covered (89.65%)

59901.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.41
/operators/src/adapters/raster_subquery/raster_subquery_adapter.rs
1
use crate::adapters::sparse_tiles_fill_adapter::FillerTileCacheExpirationStrategy;
2
use crate::adapters::SparseTilesFillAdapter;
3
use crate::engine::{QueryContext, QueryProcessor, RasterQueryProcessor};
4
use crate::error;
5
use crate::util::Result;
6
use futures::future::BoxFuture;
7
use futures::{
8
    ready,
9
    stream::{BoxStream, TryFold},
10
    FutureExt, TryFuture, TryStreamExt,
11
};
12
use futures::{stream::FusedStream, Future};
13
use futures::{Stream, StreamExt, TryFutureExt};
14
use geoengine_datatypes::primitives::{BandSelection, CacheHint};
15
use geoengine_datatypes::primitives::{
16
    RasterQueryRectangle, SpatialPartition2D, SpatialPartitioned,
17
};
18
use geoengine_datatypes::raster::{EmptyGrid2D, GridBoundingBox2D, GridBounds, GridStep};
19
use geoengine_datatypes::{
20
    primitives::TimeInstance,
21
    raster::{Blit, Pixel, RasterTile2D, TileInformation},
22
};
23
use geoengine_datatypes::{primitives::TimeInterval, raster::TilingSpecification};
24

25
use pin_project::pin_project;
26
use rayon::ThreadPool;
27

28
use std::marker::PhantomData;
29
use std::sync::Arc;
30
use std::task::Poll;
31

32
use std::pin::Pin;
33

34
use async_trait::async_trait;
35

36
#[async_trait]
37
pub trait FoldTileAccu {
38
    type RasterType: Pixel;
39
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>>;
40
    fn thread_pool(&self) -> &Arc<ThreadPool>;
41
}
42

43
pub trait FoldTileAccuMut: FoldTileAccu {
44
    fn tile_mut(&mut self) -> &mut RasterTile2D<Self::RasterType>;
45
}
46

47
pub type RasterFold<'a, T, FoldFuture, FoldMethod, FoldTileAccu> =
48
    TryFold<BoxStream<'a, Result<RasterTile2D<T>>>, FoldFuture, FoldTileAccu, FoldMethod>;
49

50
type QueryAccuFuture<'a, T, A> = BoxFuture<'a, Result<(BoxStream<'a, Result<RasterTile2D<T>>>, A)>>;
51

52
type IntoTileFuture<'a, T> = BoxFuture<'a, Result<RasterTile2D<T>>>;
53

54
/// This adapter allows to generate a tile stream using sub-querys.
55
/// This is done using a `TileSubQuery`.
56
/// The sub-query is resolved for each produced tile.
57

58
#[pin_project(project=StateInnerProjection)]
1,434✔
59
#[derive(Debug, Clone)]
×
60
enum StateInner<A, B, C, D> {
61
    CreateNextQuery,
62
    RunningQuery {
63
        #[pin]
64
        query_with_accu: A,
65
    },
66
    RunningFold(#[pin] B),
67
    RunningIntoTile(#[pin] D),
68
    ReturnResult(Option<C>),
69
    Ended,
70
}
71

72
/// This type is needed to stop Clippy from complaining about a very complex type in the `RasterSubQueryAdapter` struct.
73
type StateInnerType<'a, P, FoldFuture, FoldMethod, TileAccu> = StateInner<
74
    QueryAccuFuture<'a, P, TileAccu>,
75
    RasterFold<'a, P, FoldFuture, FoldMethod, TileAccu>,
76
    RasterTile2D<P>,
77
    IntoTileFuture<'a, P>,
78
>;
79

80
/// This adapter allows to generate a tile stream using sub-querys.
81
/// This is done using a `TileSubQuery`.
82
/// The sub-query is resolved for each produced tile.
83
#[pin_project(project = RasterSubQueryAdapterProjection)]
987✔
84
pub struct RasterSubQueryAdapter<'a, PixelType, RasterProcessorType, SubQuery>
85
where
86
    PixelType: Pixel,
87
    RasterProcessorType: RasterQueryProcessor<RasterType = PixelType>,
88
    SubQuery: SubQueryTileAggregator<'a, PixelType>,
89
{
90
    /// The `RasterQueryProcessor` to answer the sub-queries
91
    source_processor: &'a RasterProcessorType,
92
    /// The `QueryContext` to use for sub-queries
93
    query_ctx: &'a dyn QueryContext,
94
    /// The `QueryRectangle` the adapter is queried with
95
    query_rect_to_answer: RasterQueryRectangle,
96
    /// The `GridBoundingBox2D` that defines the tile grid space of the query.
97
    grid_bounds: GridBoundingBox2D,
98
    // the selected bands from the source
99
    bands: Vec<usize>,
100
    // the band being currently processed
101
    current_band_index: usize,
102

103
    /// The `SubQuery` defines what this adapter does.
104
    sub_query: SubQuery,
105

106
    /// This `TimeInterval` is the time currently worked on
107
    current_time_start: TimeInstance,
108
    current_time_end: Option<TimeInstance>,
109
    /// The `GridIdx2D` currently worked on
110
    current_tile_spec: TileInformation,
111

112
    /// This current state of the adapter
113
    #[pin]
114
    state: StateInnerType<
115
        'a,
116
        PixelType,
117
        SubQuery::FoldFuture,
118
        SubQuery::FoldMethod,
119
        SubQuery::TileAccu,
120
    >,
121
}
122

123
impl<'a, PixelType, RasterProcessor, SubQuery>
124
    RasterSubQueryAdapter<'a, PixelType, RasterProcessor, SubQuery>
125
where
126
    PixelType: Pixel,
127
    RasterProcessor: RasterQueryProcessor<RasterType = PixelType>,
128
    SubQuery: SubQueryTileAggregator<'a, PixelType>,
129
{
130
    pub fn new(
31✔
131
        source_processor: &'a RasterProcessor,
31✔
132
        query_rect_to_answer: RasterQueryRectangle,
31✔
133
        tiling_spec: TilingSpecification,
31✔
134
        query_ctx: &'a dyn QueryContext,
31✔
135
        sub_query: SubQuery,
31✔
136
    ) -> Self {
31✔
137
        debug_assert!(query_rect_to_answer.spatial_resolution.y > 0.);
31✔
138

139
        let tiling_strat = tiling_spec.strategy(
31✔
140
            query_rect_to_answer.spatial_resolution.x,
31✔
141
            -query_rect_to_answer.spatial_resolution.y,
31✔
142
        );
31✔
143

31✔
144
        let grid_bounds = tiling_strat.tile_grid_box(query_rect_to_answer.spatial_partition());
31✔
145

31✔
146
        let first_tile_spec = TileInformation {
31✔
147
            global_geo_transform: tiling_strat.geo_transform,
31✔
148
            global_tile_position: grid_bounds.min_index(),
31✔
149
            tile_size_in_pixels: tiling_strat.tile_size_in_pixels,
31✔
150
        };
31✔
151

31✔
152
        Self {
31✔
153
            current_tile_spec: first_tile_spec,
31✔
154
            current_time_end: None,
31✔
155
            current_time_start: query_rect_to_answer.time_interval.start(),
31✔
156
            current_band_index: 0,
31✔
157
            grid_bounds,
31✔
158
            bands: query_rect_to_answer.attributes.as_vec(),
31✔
159
            query_ctx,
31✔
160
            query_rect_to_answer,
31✔
161
            source_processor,
31✔
162
            state: StateInner::CreateNextQuery,
31✔
163
            sub_query,
31✔
164
        }
31✔
165
    }
31✔
166

167
    /// Wrap the `RasterSubQueryAdapter` with a filter and a `SparseTilesFillAdapter` to produce a `Stream` compatible with `RasterQueryProcessor`.
168
    /// Set the `cache_expiration` to unlimited, if the filler tiles will alway be empty.
169
    pub fn filter_and_fill(
8✔
170
        self,
8✔
171
        cache_expiration: FillerTileCacheExpirationStrategy,
8✔
172
    ) -> BoxStream<'a, Result<RasterTile2D<PixelType>>>
8✔
173
    where
8✔
174
        Self: Stream<Item = Result<Option<RasterTile2D<PixelType>>>> + 'a,
8✔
175
    {
8✔
176
        let grid_bounds = self.grid_bounds.clone();
8✔
177
        let global_geo_transform = self.current_tile_spec.global_geo_transform;
8✔
178
        let tile_shape = self.current_tile_spec.tile_size_in_pixels;
8✔
179
        let bands = self.bands.len();
8✔
180

8✔
181
        let s = self.filter_map(|x| async move {
8✔
182
            match x {
96✔
183
                Ok(Some(t)) => Some(Ok(t)),
92✔
184
                Ok(None) => None,
4✔
185
                Err(e) => Some(Err(e)),
×
186
            }
187
        });
96✔
188

8✔
189
        let s_filled = SparseTilesFillAdapter::new(
8✔
190
            s,
8✔
191
            grid_bounds,
8✔
192
            bands,
8✔
193
            global_geo_transform,
8✔
194
            tile_shape,
8✔
195
            cache_expiration,
8✔
196
        );
8✔
197
        s_filled.boxed()
8✔
198
    }
8✔
199

200
    /// Wrap `RasterSubQueryAdapter` to flatten the inner option.
201
    ///
202
    /// SAFETY: This call will cause panics if there is a None result!
203
    pub(crate) fn expect(self, msg: &'static str) -> BoxStream<'a, Result<RasterTile2D<PixelType>>>
21✔
204
    where
21✔
205
        Self: Stream<Item = Result<Option<RasterTile2D<PixelType>>>> + 'a,
21✔
206
    {
21✔
207
        self.map(|r| r.map(|o| o.expect(msg))).boxed()
59✔
208
    }
21✔
209
}
210

211
impl<'a, PixelType, RasterProcessorType, SubQuery> FusedStream
212
    for RasterSubQueryAdapter<'a, PixelType, RasterProcessorType, SubQuery>
213
where
214
    PixelType: Pixel,
215
    RasterProcessorType: QueryProcessor<
216
        Output = RasterTile2D<PixelType>,
217
        SpatialBounds = SpatialPartition2D,
218
        Selection = BandSelection,
219
    >,
220
    SubQuery: SubQueryTileAggregator<'a, PixelType> + 'static,
221
{
222
    fn is_terminated(&self) -> bool {
×
223
        matches!(self.state, StateInner::Ended)
×
224
    }
×
225
}
226

227
impl<'a, PixelType, RasterProcessorType, SubQuery> Stream
228
    for RasterSubQueryAdapter<'a, PixelType, RasterProcessorType, SubQuery>
229
where
230
    PixelType: Pixel,
231
    RasterProcessorType: QueryProcessor<
232
        Output = RasterTile2D<PixelType>,
233
        SpatialBounds = SpatialPartition2D,
234
        Selection = BandSelection,
235
    >,
236
    SubQuery: SubQueryTileAggregator<'a, PixelType> + 'static,
237
{
238
    type Item = Result<Option<RasterTile2D<PixelType>>>;
239

240
    /**************************************************************************************************************************************
241
     * This method uses the `StateInner` enum to keep track of the current state
242
     *
243
     * There are two cases aka transition flows that are valid:
244
     *  a) CreateNextQuery -> ReturnResult
245
     *  b) CreateNextQuery -> RunningQuery -> RunningFold -> ReturnResult
246
     *
247
     * In case a) a valid `QueryRectangle` for the target tile is produced and a stream is queryed and folded to produce a new tile.
248
     * In case b) no valid `QueryRectange` is produced. Therefore, all async steps are skipped and None is produced instead of a tile.
249
     *
250
     * When all tiles are queried the state transitions from ReturnResult to Ended.
251
     *
252
     * In case an Error occures the state is set to Ended AND the method returns Poll::Ready(Some(Err))).
253
     *************************************************************************************************************************************/
254
    #[allow(clippy::too_many_lines)]
255
    fn poll_next(
987✔
256
        self: Pin<&mut Self>,
987✔
257
        cx: &mut std::task::Context<'_>,
987✔
258
    ) -> std::task::Poll<Option<Self::Item>> {
987✔
259
        let mut this = self.project();
987✔
260

261
        // check if we ended in a previous call
262
        if matches!(*this.state, StateInner::Ended) {
987✔
263
            return Poll::Ready(None);
30✔
264
        }
957✔
265

266
        // first generate a new query
267
        if matches!(*this.state, StateInner::CreateNextQuery) {
957✔
268
            match this.sub_query.tile_query_rectangle(
163✔
269
                *this.current_tile_spec,
163✔
270
                this.query_rect_to_answer.clone(),
163✔
271
                *this.current_time_start,
163✔
272
                this.bands[*this.current_band_index],
163✔
273
            ) {
163✔
274
                Ok(Some(tile_query_rectangle)) => {
159✔
275
                    let tile_query_stream_fut = this
159✔
276
                        .source_processor
159✔
277
                        .raster_query(tile_query_rectangle.clone(), *this.query_ctx);
159✔
278

159✔
279
                    let tile_folding_accu_fut = this.sub_query.new_fold_accu(
159✔
280
                        *this.current_tile_spec,
159✔
281
                        tile_query_rectangle,
159✔
282
                        this.query_ctx.thread_pool(),
159✔
283
                    );
159✔
284

159✔
285
                    let joined_future =
159✔
286
                        async { futures::try_join!(tile_query_stream_fut, tile_folding_accu_fut) }
263✔
287
                            .boxed();
159✔
288

159✔
289
                    this.state.set(StateInner::RunningQuery {
159✔
290
                        query_with_accu: joined_future,
159✔
291
                    });
159✔
292
                }
159✔
293
                Ok(None) => this.state.set(StateInner::ReturnResult(None)),
4✔
294
                Err(e) => {
×
295
                    this.state.set(StateInner::Ended);
×
296
                    return Poll::Ready(Some(Err(e)));
×
297
                }
298
            }
299
        }
794✔
300

301
        // A query was issued, so we check whether it is finished
302
        // To work in this scope we first check if the state is the one we expect. We want to set the state in this scope so we can not borrow it here!
303
        if matches!(*this.state, StateInner::RunningQuery { query_with_accu: _ }) {
957✔
304
            // The state is pinned. Project it to get access to the query stored in the context.
305
            let rq_res = if let StateInnerProjection::RunningQuery { query_with_accu } =
263✔
306
                this.state.as_mut().project()
263✔
307
            {
308
                ready!(query_with_accu.poll(cx))
263✔
309
            } else {
310
                // we already checked that the state is `StateInner::RunningQuery` so this case can not happen.
311
                unreachable!()
×
312
            };
313

314
            match rq_res {
159✔
315
                Ok((query, tile_folding_accu)) => {
159✔
316
                    let tile_folding_stream =
159✔
317
                        query.try_fold(tile_folding_accu, this.sub_query.fold_method());
159✔
318

159✔
319
                    this.state.set(StateInner::RunningFold(tile_folding_stream));
159✔
320
                }
159✔
321
                Err(e) => {
×
322
                    this.state.set(StateInner::Ended);
×
323
                    return Poll::Ready(Some(Err(e)));
×
324
                }
325
            };
326
        }
694✔
327

328
        // We are waiting for/expecting the result of the fold.
329
        // This block uses the same check and project pattern as above.
330
        if matches!(*this.state, StateInner::RunningFold(_)) {
853✔
331
            let rf_res =
159✔
332
                if let StateInnerProjection::RunningFold(fold) = this.state.as_mut().project() {
809✔
333
                    ready!(fold.poll(cx))
809✔
334
                } else {
335
                    unreachable!()
×
336
                };
337

338
            match rf_res {
159✔
339
                Ok(tile_accu) => {
159✔
340
                    let tile = tile_accu.into_tile();
159✔
341
                    this.state.set(StateInner::RunningIntoTile(tile));
159✔
342
                }
159✔
343
                Err(e) => {
×
344
                    this.state.set(StateInner::Ended);
×
345
                    return Poll::Ready(Some(Err(e)));
×
346
                }
347
            }
348
        }
44✔
349

350
        // We are waiting for/expecting the result of `into_tile` method.
351
        // This block uses the same check and project pattern as above.
352
        if matches!(*this.state, StateInner::RunningIntoTile(_)) {
203✔
353
            let rf_res = if let StateInnerProjection::RunningIntoTile(fold) =
199✔
354
                this.state.as_mut().project()
199✔
355
            {
356
                ready!(fold.poll(cx))
199✔
357
            } else {
358
                unreachable!()
×
359
            };
360

361
            match rf_res {
159✔
362
                Ok(mut tile) => {
159✔
363
                    // set the tile band to the running index, that is because output bands always start at zero and are consecutive, independent of the input bands
159✔
364
                    tile.band = *this.current_band_index;
159✔
365
                    this.state.set(StateInner::ReturnResult(Some(tile)));
159✔
366
                }
159✔
367
                Err(e) => {
×
368
                    this.state.set(StateInner::Ended);
×
369
                    return Poll::Ready(Some(Err(e)));
×
370
                }
371
            }
372
        }
4✔
373

374
        // At this stage we are in ReturnResult state. Either from a running fold or because the tile query rect was not valid.
375
        // This block uses the check and project pattern as above.
376
        let tile_option = if let StateInnerProjection::ReturnResult(tile_option) =
163✔
377
            this.state.as_mut().project()
163✔
378
        {
379
            tile_option.take()
163✔
380
        } else {
381
            unreachable!()
×
382
        };
383
        // In the next poll we need to produce a new tile (if nothing else happens)
384
        this.state.set(StateInner::CreateNextQuery);
163✔
385

386
        // If there is a tile, set the current_time_end option.
387
        if let Some(tile) = &tile_option {
163✔
388
            debug_assert!(*this.current_time_start >= tile.time.start());
159✔
389
            *this.current_time_end = Some(tile.time.end());
159✔
390
        };
4✔
391

392
        // now do progress
393

394
        let next_tile_pos = if *this.current_band_index + 1 < this.bands.len() {
163✔
395
            // there is still another band to process for the current tile position
NEW
396
            *this.current_band_index += 1;
×
NEW
397
            Some(this.current_tile_spec.global_tile_position)
×
398
        } else {
399
            // all bands for the current tile are processed, we can go to the next tile in space, if there is one
400
            *this.current_band_index = 0;
163✔
401
            this.grid_bounds
163✔
402
                .inc_idx_unchecked(this.current_tile_spec.global_tile_position, 1)
163✔
403
        };
404

405
        // if the grid idx wraps around set the ne query time instance to the end time instance of the last round
406
        match (next_tile_pos, *this.current_time_end) {
163✔
407
            (Some(idx), _) => {
117✔
408
                // update the spatial index
117✔
409
                this.current_tile_spec.global_tile_position = idx;
117✔
410
            }
117✔
411
            (None, None) => {
412
                // end the stream since we never recieved a tile from any subquery. Should only happen if we end the first grid iteration.
413
                // NOTE: this assumes that the input operator produces no data tiles for queries where time and space are valid but no data is avalable.
414
                debug_assert!(&tile_option.is_none());
1✔
415
                debug_assert!(
416
                    *this.current_time_start == this.query_rect_to_answer.time_interval.start()
1✔
417
                );
418
                this.state.set(StateInner::Ended);
1✔
419
            }
420
            (None, Some(end_time)) if end_time == *this.current_time_start => {
45✔
421
                // Only for time instants: reset the spatial idx to the first tile of the grid AND increase the request time by 1.
×
422
                this.current_tile_spec.global_tile_position = this.grid_bounds.min_index();
×
423
                *this.current_time_start = end_time + 1;
×
424
                *this.current_time_end = None;
×
425

×
426
                // check if the next time to request is inside the bounds we are want to answer.
×
427
                if *this.current_time_start >= this.query_rect_to_answer.time_interval.end() {
×
428
                    this.state.set(StateInner::Ended);
×
429
                }
×
430
            }
431
            (None, Some(end_time)) => {
45✔
432
                // reset the spatial idx to the first tile of the grid AND move the requested time to the last known time.
45✔
433
                this.current_tile_spec.global_tile_position = this.grid_bounds.min_index();
45✔
434
                *this.current_time_start = end_time;
45✔
435
                *this.current_time_end = None;
45✔
436

45✔
437
                // check if the next time to request is inside the bounds we are want to answer.
45✔
438
                if *this.current_time_start >= this.query_rect_to_answer.time_interval.end() {
45✔
439
                    this.state.set(StateInner::Ended);
30✔
440
                }
30✔
441
            }
442
        };
443

444
        Poll::Ready(Some(Ok(tile_option)))
163✔
445
    }
987✔
446
}
447

448
/// This trait defines the behavior of the `RasterOverlapAdapter`.
449
pub trait SubQueryTileAggregator<'a, T>: Send
450
where
451
    T: Pixel,
452
{
453
    type FoldFuture: Send + TryFuture<Ok = Self::TileAccu, Error = error::Error>;
454
    type FoldMethod: 'a
455
        + Send
456
        + Sync
457
        + Clone
458
        + Fn(Self::TileAccu, RasterTile2D<T>) -> Self::FoldFuture;
459
    type TileAccu: FoldTileAccu<RasterType = T> + Clone + Send;
460
    type TileAccuFuture: Send + Future<Output = Result<Self::TileAccu>>;
461

462
    /// This method generates a new accumulator which is used to fold the `Stream` of `RasterTile2D` of a sub-query.
463
    fn new_fold_accu(
464
        &self,
465
        tile_info: TileInformation,
466
        query_rect: RasterQueryRectangle,
467
        pool: &Arc<ThreadPool>,
468
    ) -> Self::TileAccuFuture;
469

470
    /// This method generates `Some(QueryRectangle)` for a tile-specific sub-query or `None` if the `query_rect` cannot be translated.
471
    /// In the latter case an `EmptyTile` will be produced for the sub query instead of querying the source.
472
    fn tile_query_rectangle(
473
        &self,
474
        tile_info: TileInformation,
475
        query_rect: RasterQueryRectangle,
476
        start_time: TimeInstance,
477
        band: usize,
478
    ) -> Result<Option<RasterQueryRectangle>>;
479

480
    /// This method generates the method which combines the accumulator and each tile of the sub-query stream in the `TryFold` stream adapter.
481
    fn fold_method(&self) -> Self::FoldMethod;
482

483
    fn into_raster_subquery_adapter<S>(
21✔
484
        self,
21✔
485
        source: &'a S,
21✔
486
        query: RasterQueryRectangle,
21✔
487
        ctx: &'a dyn QueryContext,
21✔
488
        tiling_specification: TilingSpecification,
21✔
489
    ) -> RasterSubQueryAdapter<'a, T, S, Self>
21✔
490
    where
21✔
491
        S: RasterQueryProcessor<RasterType = T>,
21✔
492
        Self: Sized,
21✔
493
    {
21✔
494
        RasterSubQueryAdapter::<'a, T, S, Self>::new(source, query, tiling_specification, ctx, self)
21✔
495
    }
21✔
496
}
497

498
#[derive(Clone, Debug)]
×
499
pub struct RasterTileAccu2D<T> {
500
    pub tile: RasterTile2D<T>,
501
    pub pool: Arc<ThreadPool>,
502
}
503

504
impl<T> RasterTileAccu2D<T> {
505
    pub fn new(tile: RasterTile2D<T>, pool: Arc<ThreadPool>) -> Self {
8✔
506
        RasterTileAccu2D { tile, pool }
8✔
507
    }
8✔
508
}
509

510
#[async_trait]
511
impl<T: Pixel> FoldTileAccu for RasterTileAccu2D<T> {
512
    type RasterType = T;
513

514
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
4✔
515
        Ok(self.tile)
4✔
516
    }
4✔
517

518
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
519
        &self.pool
×
520
    }
×
521
}
522

523
impl<T: Pixel> FoldTileAccuMut for RasterTileAccu2D<T> {
524
    fn tile_mut(&mut self) -> &mut RasterTile2D<T> {
×
525
        &mut self.tile
×
526
    }
×
527
}
528

529
#[derive(Debug, Clone)]
×
530
pub struct TileSubQueryIdentity<F, T> {
531
    fold_fn: F,
532
    _phantom_pixel_type: PhantomData<T>,
533
}
534

535
impl<'a, T, FoldM, FoldF> SubQueryTileAggregator<'a, T> for TileSubQueryIdentity<FoldM, T>
536
where
537
    T: Pixel,
538
    FoldM: Send + Sync + 'a + Clone + Fn(RasterTileAccu2D<T>, RasterTile2D<T>) -> FoldF,
539
    FoldF: Send + TryFuture<Ok = RasterTileAccu2D<T>, Error = error::Error>,
540
{
541
    type FoldFuture = FoldF;
542

543
    type FoldMethod = FoldM;
544

545
    type TileAccu = RasterTileAccu2D<T>;
546
    type TileAccuFuture = BoxFuture<'a, Result<Self::TileAccu>>;
547

548
    fn new_fold_accu(
4✔
549
        &self,
4✔
550
        tile_info: TileInformation,
4✔
551
        query_rect: RasterQueryRectangle,
4✔
552
        pool: &Arc<ThreadPool>,
4✔
553
    ) -> Self::TileAccuFuture {
4✔
554
        identity_accu(tile_info, &query_rect, pool.clone()).boxed()
4✔
555
    }
4✔
556

557
    fn tile_query_rectangle(
4✔
558
        &self,
4✔
559
        tile_info: TileInformation,
4✔
560
        query_rect: RasterQueryRectangle,
4✔
561
        start_time: TimeInstance,
4✔
562
        band: usize,
4✔
563
    ) -> Result<Option<RasterQueryRectangle>> {
4✔
564
        Ok(Some(RasterQueryRectangle {
4✔
565
            spatial_bounds: tile_info.spatial_partition(),
4✔
566
            time_interval: TimeInterval::new_instant(start_time)?,
4✔
567
            spatial_resolution: query_rect.spatial_resolution,
4✔
568
            attributes: band.into(),
4✔
569
        }))
570
    }
4✔
571

572
    fn fold_method(&self) -> Self::FoldMethod {
4✔
573
        self.fold_fn.clone()
4✔
574
    }
4✔
575
}
576

577
pub fn identity_accu<T: Pixel>(
4✔
578
    tile_info: TileInformation,
4✔
579
    query_rect: &RasterQueryRectangle,
4✔
580
    pool: Arc<ThreadPool>,
4✔
581
) -> impl Future<Output = Result<RasterTileAccu2D<T>>> {
4✔
582
    let time_interval = query_rect.time_interval;
4✔
583
    crate::util::spawn_blocking(move || {
4✔
584
        let output_raster = EmptyGrid2D::new(tile_info.tile_size_in_pixels).into();
4✔
585
        let output_tile = RasterTile2D::new_with_tile_info(
4✔
586
            time_interval,
4✔
587
            tile_info,
4✔
588
            0,
4✔
589
            output_raster,
4✔
590
            CacheHint::max_duration(),
4✔
591
        );
4✔
592
        RasterTileAccu2D::new(output_tile, pool)
4✔
593
    })
4✔
594
    .map_err(From::from)
4✔
595
}
4✔
596

597
pub fn fold_by_blit_impl<T>(
4✔
598
    accu: RasterTileAccu2D<T>,
4✔
599
    tile: RasterTile2D<T>,
4✔
600
) -> Result<RasterTileAccu2D<T>>
4✔
601
where
4✔
602
    T: Pixel,
4✔
603
{
4✔
604
    let mut accu_tile = accu.tile;
4✔
605
    let pool = accu.pool;
4✔
606
    let t_union = accu_tile.time.union(&tile.time)?;
4✔
607

608
    accu_tile.time = t_union;
4✔
609

4✔
610
    if tile.grid_array.is_empty() && accu_tile.grid_array.is_empty() {
4✔
611
        // only skip if both tiles are empty. There might be valid data in one otherwise.
612
        return Ok(RasterTileAccu2D::new(accu_tile, pool));
×
613
    }
4✔
614

4✔
615
    let mut materialized_tile = accu_tile.into_materialized_tile();
4✔
616

4✔
617
    materialized_tile.blit(tile)?;
4✔
618

619
    Ok(RasterTileAccu2D::new(materialized_tile.into(), pool))
4✔
620
}
4✔
621

622
#[allow(dead_code)]
623
pub fn fold_by_blit_future<T>(
4✔
624
    accu: RasterTileAccu2D<T>,
4✔
625
    tile: RasterTile2D<T>,
4✔
626
) -> impl Future<Output = Result<RasterTileAccu2D<T>>>
4✔
627
where
4✔
628
    T: Pixel,
4✔
629
{
4✔
630
    crate::util::spawn_blocking(|| fold_by_blit_impl(accu, tile)).then(|x| async move {
4✔
631
        match x {
4✔
632
            Ok(r) => r,
4✔
633
            Err(e) => Err(e.into()),
×
634
        }
635
    })
4✔
636
}
4✔
637

638
#[cfg(test)]
639
mod tests {
640
    use geoengine_datatypes::{
641
        primitives::{SpatialPartition2D, SpatialResolution, TimeInterval},
642
        raster::{Grid, GridShape, RasterDataType, TilesEqualIgnoringCacheHint},
643
        spatial_reference::SpatialReference,
644
        util::test::TestDefault,
645
    };
646

647
    use super::*;
648
    use crate::engine::{
649
        MockExecutionContext, MockQueryContext, RasterBandDescriptors, RasterOperator,
650
        RasterResultDescriptor, WorkflowOperatorPath,
651
    };
652
    use crate::mock::{MockRasterSource, MockRasterSourceParams};
653
    use futures::StreamExt;
654

655
    #[tokio::test]
1✔
656
    async fn identity() {
1✔
657
        let data: Vec<RasterTile2D<u8>> = vec![
1✔
658
            RasterTile2D {
1✔
659
                time: TimeInterval::new_unchecked(0, 5),
1✔
660
                tile_position: [-1, 0].into(),
1✔
661
                band: 0,
1✔
662
                global_geo_transform: TestDefault::test_default(),
1✔
663
                grid_array: Grid::new([2, 2].into(), vec![1, 2, 3, 4]).unwrap().into(),
1✔
664
                properties: Default::default(),
1✔
665
                cache_hint: CacheHint::default(),
1✔
666
            },
1✔
667
            RasterTile2D {
1✔
668
                time: TimeInterval::new_unchecked(0, 5),
1✔
669
                tile_position: [-1, 1].into(),
1✔
670
                band: 0,
1✔
671
                global_geo_transform: TestDefault::test_default(),
1✔
672
                grid_array: Grid::new([2, 2].into(), vec![7, 8, 9, 10]).unwrap().into(),
1✔
673
                properties: Default::default(),
1✔
674
                cache_hint: CacheHint::default(),
1✔
675
            },
1✔
676
            RasterTile2D {
1✔
677
                time: TimeInterval::new_unchecked(5, 10),
1✔
678
                tile_position: [-1, 0].into(),
1✔
679
                band: 0,
1✔
680
                global_geo_transform: TestDefault::test_default(),
1✔
681
                grid_array: Grid::new([2, 2].into(), vec![13, 14, 15, 16])
1✔
682
                    .unwrap()
1✔
683
                    .into(),
1✔
684
                properties: Default::default(),
1✔
685
                cache_hint: CacheHint::default(),
1✔
686
            },
1✔
687
            RasterTile2D {
1✔
688
                time: TimeInterval::new_unchecked(5, 10),
1✔
689
                tile_position: [-1, 1].into(),
1✔
690
                band: 0,
1✔
691
                global_geo_transform: TestDefault::test_default(),
1✔
692
                grid_array: Grid::new([2, 2].into(), vec![19, 20, 21, 22])
1✔
693
                    .unwrap()
1✔
694
                    .into(),
1✔
695
                properties: Default::default(),
1✔
696
                cache_hint: CacheHint::default(),
1✔
697
            },
1✔
698
        ];
1✔
699

1✔
700
        let mrs1 = MockRasterSource {
1✔
701
            params: MockRasterSourceParams {
1✔
702
                data: data.clone(),
1✔
703
                result_descriptor: RasterResultDescriptor {
1✔
704
                    data_type: RasterDataType::U8,
1✔
705
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
706
                    time: None,
1✔
707
                    bbox: None,
1✔
708
                    resolution: None,
1✔
709
                    bands: RasterBandDescriptors::new_single_band(),
1✔
710
                },
1✔
711
            },
1✔
712
        }
1✔
713
        .boxed();
1✔
714

1✔
715
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
716
        exe_ctx.tiling_specification.tile_size_in_pixels = GridShape {
1✔
717
            shape_array: [2, 2],
1✔
718
        };
1✔
719

1✔
720
        let query_rect = RasterQueryRectangle {
1✔
721
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 1.).into(), (3., 0.).into()),
1✔
722
            time_interval: TimeInterval::new_unchecked(0, 10),
1✔
723
            spatial_resolution: SpatialResolution::one(),
1✔
724
            attributes: BandSelection::first(),
1✔
725
        };
1✔
726

1✔
727
        let query_ctx = MockQueryContext::test_default();
1✔
728
        let tiling_strat = exe_ctx.tiling_specification;
1✔
729

730
        let op = mrs1
1✔
731
            .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
732
            .await
×
733
            .unwrap();
1✔
734

1✔
735
        let qp = op.query_processor().unwrap().get_u8().unwrap();
1✔
736

1✔
737
        let a = RasterSubQueryAdapter::new(
1✔
738
            &qp,
1✔
739
            query_rect,
1✔
740
            tiling_strat,
1✔
741
            &query_ctx,
1✔
742
            TileSubQueryIdentity {
1✔
743
                fold_fn: fold_by_blit_future,
1✔
744
                _phantom_pixel_type: PhantomData,
1✔
745
            },
1✔
746
        );
1✔
747
        let res = a
1✔
748
            .map(Result::unwrap)
1✔
749
            .map(Option::unwrap)
1✔
750
            .collect::<Vec<RasterTile2D<u8>>>()
1✔
751
            .await;
8✔
752
        assert!(data.tiles_equal_ignoring_cache_hint(&res));
1✔
753
    }
754
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc