• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.93
/operators/src/adapters/raster_subquery/raster_subquery_adapter.rs
1
use crate::adapters::sparse_tiles_fill_adapter::{
2
    FillerTileCacheExpirationStrategy, FillerTimeBounds,
3
};
4
use crate::adapters::SparseTilesFillAdapter;
5
use crate::engine::{QueryContext, QueryProcessor, RasterQueryProcessor, RasterResultDescriptor};
6
use crate::error;
7
use crate::util::Result;
8
use futures::future::BoxFuture;
9
use futures::{
10
    ready,
11
    stream::{BoxStream, TryFold},
12
    FutureExt, TryFuture, TryStreamExt,
13
};
14
use futures::{stream::FusedStream, Future};
15
use futures::{Stream, StreamExt, TryFutureExt};
16
use geoengine_datatypes::primitives::{BandSelection, CacheHint};
17
use geoengine_datatypes::primitives::{
18
    RasterQueryRectangle, SpatialPartition2D, SpatialPartitioned,
19
};
20
use geoengine_datatypes::raster::{EmptyGrid2D, GridBoundingBox2D, GridBounds, GridStep};
21
use geoengine_datatypes::{
22
    primitives::TimeInstance,
23
    raster::{Blit, Pixel, RasterTile2D, TileInformation},
24
};
25
use geoengine_datatypes::{primitives::TimeInterval, raster::TilingSpecification};
26

27
use pin_project::pin_project;
28
use rayon::ThreadPool;
29

30
use std::marker::PhantomData;
31
use std::sync::Arc;
32
use std::task::Poll;
33

34
use std::pin::Pin;
35

36
use async_trait::async_trait;
37

38
#[async_trait]
39
pub trait FoldTileAccu {
40
    type RasterType: Pixel;
41
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>>;
42
    fn thread_pool(&self) -> &Arc<ThreadPool>;
43
}
44

45
pub trait FoldTileAccuMut: FoldTileAccu {
46
    fn tile_mut(&mut self) -> &mut RasterTile2D<Self::RasterType>;
47
}
48

49
pub type RasterFold<'a, T, FoldFuture, FoldMethod, FoldTileAccu> =
50
    TryFold<BoxStream<'a, Result<RasterTile2D<T>>>, FoldFuture, FoldTileAccu, FoldMethod>;
51

52
type QueryAccuFuture<'a, T, A> = BoxFuture<'a, Result<(BoxStream<'a, Result<RasterTile2D<T>>>, A)>>;
53

54
type IntoTileFuture<'a, T> = BoxFuture<'a, Result<RasterTile2D<T>>>;
55

56
/// This adapter allows to generate a tile stream using sub-querys.
57
/// This is done using a `TileSubQuery`.
58
/// The sub-query is resolved for each produced tile.
59

60
#[pin_project(project=StateInnerProjection)]
1,818✔
61
#[derive(Debug, Clone)]
62
enum StateInner<A, B, C, D> {
63
    CreateNextQuery,
64
    RunningQuery {
65
        #[pin]
66
        query_with_accu: A,
67
    },
68
    RunningFold(#[pin] B),
69
    RunningIntoTile(#[pin] D),
70
    ReturnResult(Option<C>),
71
    Ended,
72
}
73

74
/// This type is needed to stop Clippy from complaining about a very complex type in the `RasterSubQueryAdapter` struct.
75
type StateInnerType<'a, P, FoldFuture, FoldMethod, TileAccu> = StateInner<
76
    QueryAccuFuture<'a, P, TileAccu>,
77
    RasterFold<'a, P, FoldFuture, FoldMethod, TileAccu>,
78
    RasterTile2D<P>,
79
    IntoTileFuture<'a, P>,
80
>;
81

82
/// This adapter allows to generate a tile stream using sub-querys.
83
/// This is done using a `TileSubQuery`.
84
/// The sub-query is resolved for each produced tile.
85
#[pin_project(project = RasterSubQueryAdapterProjection)]
1,248✔
86
pub struct RasterSubQueryAdapter<'a, PixelType, RasterProcessorType, SubQuery>
87
where
88
    PixelType: Pixel,
89
    RasterProcessorType: RasterQueryProcessor<RasterType = PixelType>,
90
    SubQuery: SubQueryTileAggregator<'a, PixelType>,
91
{
92
    /// The `RasterQueryProcessor` to answer the sub-queries
93
    source_processor: &'a RasterProcessorType,
94
    /// The `QueryContext` to use for sub-queries
95
    query_ctx: &'a dyn QueryContext,
96
    /// The `QueryRectangle` the adapter is queried with
97
    query_rect_to_answer: RasterQueryRectangle,
98
    /// The `GridBoundingBox2D` that defines the tile grid space of the query.
99
    grid_bounds: GridBoundingBox2D,
100
    // the selected bands from the source
101
    bands: Vec<u32>,
102
    // the band being currently processed
103
    current_band_index: u32,
104

105
    /// The `SubQuery` defines what this adapter does.
106
    sub_query: SubQuery,
107

108
    /// This `TimeInterval` is the time currently worked on
109
    current_time_start: TimeInstance,
110
    current_time_end: Option<TimeInstance>,
111
    /// The `GridIdx2D` currently worked on
112
    current_tile_spec: TileInformation,
113

114
    /// This current state of the adapter
115
    #[pin]
116
    state: StateInnerType<
117
        'a,
118
        PixelType,
119
        SubQuery::FoldFuture,
120
        SubQuery::FoldMethod,
121
        SubQuery::TileAccu,
122
    >,
123
}
124

125
impl<'a, PixelType, RasterProcessor, SubQuery>
126
    RasterSubQueryAdapter<'a, PixelType, RasterProcessor, SubQuery>
127
where
128
    PixelType: Pixel,
129
    RasterProcessor: RasterQueryProcessor<RasterType = PixelType>,
130
    SubQuery: SubQueryTileAggregator<'a, PixelType>,
131
{
132
    pub fn new(
35✔
133
        source_processor: &'a RasterProcessor,
35✔
134
        query_rect_to_answer: RasterQueryRectangle,
35✔
135
        tiling_spec: TilingSpecification,
35✔
136
        query_ctx: &'a dyn QueryContext,
35✔
137
        sub_query: SubQuery,
35✔
138
    ) -> Self {
35✔
139
        debug_assert!(query_rect_to_answer.spatial_resolution.y > 0.);
35✔
140

141
        let tiling_strat = tiling_spec.strategy(
35✔
142
            query_rect_to_answer.spatial_resolution.x,
35✔
143
            -query_rect_to_answer.spatial_resolution.y,
35✔
144
        );
35✔
145

35✔
146
        let grid_bounds = tiling_strat.tile_grid_box(query_rect_to_answer.spatial_partition());
35✔
147

35✔
148
        let first_tile_spec = TileInformation {
35✔
149
            global_geo_transform: tiling_strat.geo_transform,
35✔
150
            global_tile_position: grid_bounds.min_index(),
35✔
151
            tile_size_in_pixels: tiling_strat.tile_size_in_pixels,
35✔
152
        };
35✔
153

35✔
154
        Self {
35✔
155
            current_tile_spec: first_tile_spec,
35✔
156
            current_time_end: None,
35✔
157
            current_time_start: query_rect_to_answer.time_interval.start(),
35✔
158
            current_band_index: 0,
35✔
159
            grid_bounds,
35✔
160
            bands: query_rect_to_answer.attributes.as_vec(),
35✔
161
            query_ctx,
35✔
162
            query_rect_to_answer,
35✔
163
            source_processor,
35✔
164
            state: StateInner::CreateNextQuery,
35✔
165
            sub_query,
35✔
166
        }
35✔
167
    }
35✔
168

169
    /// Wrap the `RasterSubQueryAdapter` with a filter and a `SparseTilesFillAdapter` to produce a `Stream` compatible with `RasterQueryProcessor`.
170
    /// Set the `cache_expiration` to unlimited, if the filler tiles will alway be empty.
171
    pub fn filter_and_fill(
11✔
172
        self,
11✔
173
        cache_expiration: FillerTileCacheExpirationStrategy,
11✔
174
    ) -> BoxStream<'a, Result<RasterTile2D<PixelType>>>
11✔
175
    where
11✔
176
        Self: Stream<Item = Result<Option<RasterTile2D<PixelType>>>> + 'a,
11✔
177
    {
11✔
178
        let grid_bounds = self.grid_bounds.clone();
11✔
179
        let global_geo_transform = self.current_tile_spec.global_geo_transform;
11✔
180
        let tile_shape = self.current_tile_spec.tile_size_in_pixels;
11✔
181
        let num_bands = self.bands.len() as u32;
11✔
182
        let query_time_bounds = self.query_rect_to_answer.time_interval;
11✔
183

11✔
184
        let s = self.filter_map(|x| async move {
136✔
185
            match x {
136✔
186
                Ok(Some(t)) => Some(Ok(t)),
132✔
187
                Ok(None) => None,
4✔
UNCOV
188
                Err(e) => Some(Err(e)),
×
189
            }
190
        });
272✔
191

11✔
192
        let s_filled = SparseTilesFillAdapter::new(
11✔
193
            s,
11✔
194
            grid_bounds,
11✔
195
            num_bands,
11✔
196
            global_geo_transform,
11✔
197
            tile_shape,
11✔
198
            cache_expiration,
11✔
199
            query_time_bounds,
11✔
200
            FillerTimeBounds::from(query_time_bounds), // operator should at least fill the query rect. Adapter will handle overflow at start / end gracefully.
11✔
201
        );
11✔
202
        s_filled.boxed()
11✔
203
    }
11✔
204

205
    /// Wrap `RasterSubQueryAdapter` to flatten the inner option.
206
    ///
207
    /// SAFETY: This call will cause panics if there is a None result!
208
    pub(crate) fn expect(self, msg: &'static str) -> BoxStream<'a, Result<RasterTile2D<PixelType>>>
22✔
209
    where
22✔
210
        Self: Stream<Item = Result<Option<RasterTile2D<PixelType>>>> + 'a,
22✔
211
    {
22✔
212
        self.map(|r| r.map(|o| o.expect(msg))).boxed()
61✔
213
    }
22✔
214
}
215

216
impl<'a, PixelType, RasterProcessorType, SubQuery> FusedStream
217
    for RasterSubQueryAdapter<'a, PixelType, RasterProcessorType, SubQuery>
218
where
219
    PixelType: Pixel,
220
    RasterProcessorType: QueryProcessor<
221
        Output = RasterTile2D<PixelType>,
222
        SpatialBounds = SpatialPartition2D,
223
        Selection = BandSelection,
224
        ResultDescription = RasterResultDescriptor,
225
    >,
226
    SubQuery: SubQueryTileAggregator<'a, PixelType> + 'static,
227
{
228
    fn is_terminated(&self) -> bool {
×
229
        matches!(self.state, StateInner::Ended)
×
230
    }
×
231
}
232

233
impl<'a, PixelType, RasterProcessorType, SubQuery> Stream
234
    for RasterSubQueryAdapter<'a, PixelType, RasterProcessorType, SubQuery>
235
where
236
    PixelType: Pixel,
237
    RasterProcessorType: QueryProcessor<
238
        Output = RasterTile2D<PixelType>,
239
        SpatialBounds = SpatialPartition2D,
240
        Selection = BandSelection,
241
        ResultDescription = RasterResultDescriptor,
242
    >,
243
    SubQuery: SubQueryTileAggregator<'a, PixelType> + 'static,
244
{
245
    type Item = Result<Option<RasterTile2D<PixelType>>>;
246

247
    /**************************************************************************************************************************************
248
     * This method uses the `StateInner` enum to keep track of the current state
249
     *
250
     * There are two cases aka transition flows that are valid:
251
     *  a) CreateNextQuery -> ReturnResult
252
     *  b) CreateNextQuery -> RunningQuery -> RunningFold -> ReturnResult
253
     *
254
     * In case a) a valid `QueryRectangle` for the target tile is produced and a stream is queryed and folded to produce a new tile.
255
     * In case b) no valid `QueryRectange` is produced. Therefore, all async steps are skipped and None is produced instead of a tile.
256
     *
257
     * When all tiles are queried the state transitions from ReturnResult to Ended.
258
     *
259
     * In case an Error occures the state is set to Ended AND the method returns Poll::Ready(Some(Err))).
260
     *************************************************************************************************************************************/
261
    #[allow(clippy::too_many_lines)]
262
    fn poll_next(
1,248✔
263
        self: Pin<&mut Self>,
1,248✔
264
        cx: &mut std::task::Context<'_>,
1,248✔
265
    ) -> std::task::Poll<Option<Self::Item>> {
1,248✔
266
        let mut this = self.project();
1,248✔
267

268
        // check if we ended in a previous call
269
        if matches!(*this.state, StateInner::Ended) {
1,248✔
270
            return Poll::Ready(None);
33✔
271
        }
1,215✔
272

273
        // first generate a new query
274
        if matches!(*this.state, StateInner::CreateNextQuery) {
1,215✔
275
            match this.sub_query.tile_query_rectangle(
205✔
276
                *this.current_tile_spec,
205✔
277
                this.query_rect_to_answer.clone(),
205✔
278
                *this.current_time_start,
205✔
279
                this.bands[*this.current_band_index as usize],
205✔
280
            ) {
205✔
281
                Ok(Some(tile_query_rectangle)) => {
201✔
282
                    let tile_query_stream_fut = this
201✔
283
                        .source_processor
201✔
284
                        .raster_query(tile_query_rectangle.clone(), *this.query_ctx);
201✔
285

201✔
286
                    let tile_folding_accu_fut = this.sub_query.new_fold_accu(
201✔
287
                        *this.current_tile_spec,
201✔
288
                        tile_query_rectangle,
201✔
289
                        this.query_ctx.thread_pool(),
201✔
290
                    );
201✔
291

201✔
292
                    let joined_future =
201✔
293
                        async { futures::try_join!(tile_query_stream_fut, tile_folding_accu_fut) }
201✔
294
                            .boxed();
201✔
295

201✔
296
                    this.state.set(StateInner::RunningQuery {
201✔
297
                        query_with_accu: joined_future,
201✔
298
                    });
201✔
299
                }
201✔
300
                Ok(None) => this.state.set(StateInner::ReturnResult(None)),
4✔
301
                Err(e) => {
×
302
                    this.state.set(StateInner::Ended);
×
303
                    return Poll::Ready(Some(Err(e)));
×
304
                }
305
            }
306
        }
1,010✔
307

308
        // A query was issued, so we check whether it is finished
309
        // To work in this scope we first check if the state is the one we expect. We want to set the state in this scope so we can not borrow it here!
310
        if matches!(*this.state, StateInner::RunningQuery { query_with_accu: _ }) {
1,215✔
311
            // The state is pinned. Project it to get access to the query stored in the context.
312
            let rq_res = if let StateInnerProjection::RunningQuery { query_with_accu } =
339✔
313
                this.state.as_mut().project()
339✔
314
            {
315
                ready!(query_with_accu.poll(cx))
339✔
316
            } else {
317
                // we already checked that the state is `StateInner::RunningQuery` so this case can not happen.
318
                unreachable!()
×
319
            };
320

321
            match rq_res {
201✔
322
                Ok((query, tile_folding_accu)) => {
201✔
323
                    let tile_folding_stream =
201✔
324
                        query.try_fold(tile_folding_accu, this.sub_query.fold_method());
201✔
325

201✔
326
                    this.state.set(StateInner::RunningFold(tile_folding_stream));
201✔
327
                }
201✔
328
                Err(e) => {
×
329
                    this.state.set(StateInner::Ended);
×
330
                    return Poll::Ready(Some(Err(e)));
×
331
                }
332
            };
333
        }
876✔
334

335
        // We are waiting for/expecting the result of the fold.
336
        // This block uses the same check and project pattern as above.
337
        if matches!(*this.state, StateInner::RunningFold(_)) {
1,077✔
338
            let rf_res =
201✔
339
                if let StateInnerProjection::RunningFold(fold) = this.state.as_mut().project() {
993✔
340
                    ready!(fold.poll(cx))
993✔
341
                } else {
342
                    unreachable!()
×
343
                };
344

345
            match rf_res {
201✔
346
                Ok(tile_accu) => {
201✔
347
                    let tile = tile_accu.into_tile();
201✔
348
                    this.state.set(StateInner::RunningIntoTile(tile));
201✔
349
                }
201✔
350
                Err(e) => {
×
351
                    this.state.set(StateInner::Ended);
×
352
                    return Poll::Ready(Some(Err(e)));
×
353
                }
354
            }
355
        }
84✔
356

357
        // We are waiting for/expecting the result of `into_tile` method.
358
        // This block uses the same check and project pattern as above.
359
        if matches!(*this.state, StateInner::RunningIntoTile(_)) {
285✔
360
            let rf_res = if let StateInnerProjection::RunningIntoTile(fold) =
281✔
361
                this.state.as_mut().project()
281✔
362
            {
363
                ready!(fold.poll(cx))
281✔
364
            } else {
365
                unreachable!()
×
366
            };
367

368
            match rf_res {
201✔
369
                Ok(mut tile) => {
201✔
370
                    // set the tile band to the running index, that is because output bands always start at zero and are consecutive, independent of the input bands
201✔
371
                    tile.band = *this.current_band_index;
201✔
372
                    this.state.set(StateInner::ReturnResult(Some(tile)));
201✔
373
                }
201✔
374
                Err(e) => {
×
375
                    this.state.set(StateInner::Ended);
×
376
                    return Poll::Ready(Some(Err(e)));
×
377
                }
378
            }
379
        }
4✔
380

381
        // At this stage we are in ReturnResult state. Either from a running fold or because the tile query rect was not valid.
382
        // This block uses the check and project pattern as above.
383
        let tile_option = if let StateInnerProjection::ReturnResult(tile_option) =
205✔
384
            this.state.as_mut().project()
205✔
385
        {
386
            tile_option.take()
205✔
387
        } else {
388
            unreachable!()
×
389
        };
390
        // In the next poll we need to produce a new tile (if nothing else happens)
391
        this.state.set(StateInner::CreateNextQuery);
205✔
392

393
        // If there is a tile, set the current_time_end option.
394
        if let Some(tile) = &tile_option {
205✔
395
            debug_assert!(*this.current_time_start >= tile.time.start());
201✔
396
            *this.current_time_end = Some(tile.time.end());
201✔
397
        };
4✔
398

399
        // now do progress
400

401
        let next_tile_pos = if *this.current_band_index + 1 < this.bands.len() as u32 {
205✔
402
            // there is still another band to process for the current tile position
403
            *this.current_band_index += 1;
16✔
404
            Some(this.current_tile_spec.global_tile_position)
16✔
405
        } else {
406
            // all bands for the current tile are processed, we can go to the next tile in space, if there is one
407
            *this.current_band_index = 0;
189✔
408
            this.grid_bounds
189✔
409
                .inc_idx_unchecked(this.current_tile_spec.global_tile_position, 1)
189✔
410
        };
411

412
        // if the grid idx wraps around set the ne query time instance to the end time instance of the last round
413
        match (next_tile_pos, *this.current_time_end) {
205✔
414
            (Some(idx), _) => {
152✔
415
                // update the spatial index
152✔
416
                this.current_tile_spec.global_tile_position = idx;
152✔
417
            }
152✔
418
            (None, None) => {
419
                // end the stream since we never recieved a tile from any subquery. Should only happen if we end the first grid iteration.
420
                // NOTE: this assumes that the input operator produces no data tiles for queries where time and space are valid but no data is avalable.
421
                debug_assert!(&tile_option.is_none());
1✔
422
                debug_assert!(
1✔
423
                    *this.current_time_start == this.query_rect_to_answer.time_interval.start()
1✔
424
                );
425
                this.state.set(StateInner::Ended);
1✔
426
            }
427
            (None, Some(end_time)) if end_time == *this.current_time_start => {
52✔
428
                // Only for time instants: reset the spatial idx to the first tile of the grid AND increase the request time by 1.
×
429
                this.current_tile_spec.global_tile_position = this.grid_bounds.min_index();
×
430
                *this.current_time_start = end_time + 1;
×
431
                *this.current_time_end = None;
×
432

×
433
                // check if the next time to request is inside the bounds we are want to answer.
×
434
                if *this.current_time_start >= this.query_rect_to_answer.time_interval.end() {
×
435
                    this.state.set(StateInner::Ended);
×
436
                }
×
437
            }
438
            (None, Some(end_time)) => {
52✔
439
                // reset the spatial idx to the first tile of the grid AND move the requested time to the last known time.
52✔
440
                this.current_tile_spec.global_tile_position = this.grid_bounds.min_index();
52✔
441
                *this.current_time_start = end_time;
52✔
442
                *this.current_time_end = None;
52✔
443

52✔
444
                // check if the next time to request is inside the bounds we are want to answer.
52✔
445
                if *this.current_time_start >= this.query_rect_to_answer.time_interval.end() {
52✔
446
                    this.state.set(StateInner::Ended);
34✔
447
                }
34✔
448
            }
449
        };
450

451
        Poll::Ready(Some(Ok(tile_option)))
205✔
452
    }
1,248✔
453
}
454

455
/// This trait defines the behavior of the `RasterOverlapAdapter`.
456
pub trait SubQueryTileAggregator<'a, T>: Send
457
where
458
    T: Pixel,
459
{
460
    type FoldFuture: Send + TryFuture<Ok = Self::TileAccu, Error = error::Error>;
461
    type FoldMethod: 'a
462
        + Send
463
        + Sync
464
        + Clone
465
        + Fn(Self::TileAccu, RasterTile2D<T>) -> Self::FoldFuture;
466
    type TileAccu: FoldTileAccu<RasterType = T> + Clone + Send;
467
    type TileAccuFuture: Send + Future<Output = Result<Self::TileAccu>>;
468

469
    /// This method generates a new accumulator which is used to fold the `Stream` of `RasterTile2D` of a sub-query.
470
    fn new_fold_accu(
471
        &self,
472
        tile_info: TileInformation,
473
        query_rect: RasterQueryRectangle,
474
        pool: &Arc<ThreadPool>,
475
    ) -> Self::TileAccuFuture;
476

477
    /// This method generates `Some(QueryRectangle)` for a tile-specific sub-query or `None` if the `query_rect` cannot be translated.
478
    /// In the latter case an `EmptyTile` will be produced for the sub query instead of querying the source.
479
    fn tile_query_rectangle(
480
        &self,
481
        tile_info: TileInformation,
482
        query_rect: RasterQueryRectangle,
483
        start_time: TimeInstance,
484
        band_idx: u32,
485
    ) -> Result<Option<RasterQueryRectangle>>;
486

487
    /// This method generates the method which combines the accumulator and each tile of the sub-query stream in the `TryFold` stream adapter.
488
    fn fold_method(&self) -> Self::FoldMethod;
489

490
    fn into_raster_subquery_adapter<S>(
22✔
491
        self,
22✔
492
        source: &'a S,
22✔
493
        query: RasterQueryRectangle,
22✔
494
        ctx: &'a dyn QueryContext,
22✔
495
        tiling_specification: TilingSpecification,
22✔
496
    ) -> RasterSubQueryAdapter<'a, T, S, Self>
22✔
497
    where
22✔
498
        S: RasterQueryProcessor<RasterType = T>,
22✔
499
        Self: Sized,
22✔
500
    {
22✔
501
        RasterSubQueryAdapter::<'a, T, S, Self>::new(source, query, tiling_specification, ctx, self)
22✔
502
    }
22✔
503
}
504

505
#[derive(Clone, Debug)]
506
pub struct RasterTileAccu2D<T> {
507
    pub tile: RasterTile2D<T>,
508
    pub pool: Arc<ThreadPool>,
509
}
510

511
impl<T> RasterTileAccu2D<T> {
512
    pub fn new(tile: RasterTile2D<T>, pool: Arc<ThreadPool>) -> Self {
8✔
513
        RasterTileAccu2D { tile, pool }
8✔
514
    }
8✔
515
}
516

517
#[async_trait]
518
impl<T: Pixel> FoldTileAccu for RasterTileAccu2D<T> {
519
    type RasterType = T;
520

521
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
4✔
522
        Ok(self.tile)
4✔
523
    }
8✔
524

525
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
526
        &self.pool
×
527
    }
×
528
}
529

530
impl<T: Pixel> FoldTileAccuMut for RasterTileAccu2D<T> {
531
    fn tile_mut(&mut self) -> &mut RasterTile2D<T> {
×
532
        &mut self.tile
×
533
    }
×
534
}
535

536
#[derive(Debug, Clone)]
537
pub struct TileSubQueryIdentity<F, T> {
538
    fold_fn: F,
539
    _phantom_pixel_type: PhantomData<T>,
540
}
541

542
impl<'a, T, FoldM, FoldF> SubQueryTileAggregator<'a, T> for TileSubQueryIdentity<FoldM, T>
543
where
544
    T: Pixel,
545
    FoldM: Send + Sync + 'a + Clone + Fn(RasterTileAccu2D<T>, RasterTile2D<T>) -> FoldF,
546
    FoldF: Send + TryFuture<Ok = RasterTileAccu2D<T>, Error = error::Error>,
547
{
548
    type FoldFuture = FoldF;
549

550
    type FoldMethod = FoldM;
551

552
    type TileAccu = RasterTileAccu2D<T>;
553
    type TileAccuFuture = BoxFuture<'a, Result<Self::TileAccu>>;
554

555
    fn new_fold_accu(
4✔
556
        &self,
4✔
557
        tile_info: TileInformation,
4✔
558
        query_rect: RasterQueryRectangle,
4✔
559
        pool: &Arc<ThreadPool>,
4✔
560
    ) -> Self::TileAccuFuture {
4✔
561
        identity_accu(tile_info, &query_rect, pool.clone()).boxed()
4✔
562
    }
4✔
563

564
    fn tile_query_rectangle(
4✔
565
        &self,
4✔
566
        tile_info: TileInformation,
4✔
567
        query_rect: RasterQueryRectangle,
4✔
568
        start_time: TimeInstance,
4✔
569
        band_idx: u32,
4✔
570
    ) -> Result<Option<RasterQueryRectangle>> {
4✔
571
        Ok(Some(RasterQueryRectangle {
4✔
572
            spatial_bounds: tile_info.spatial_partition(),
4✔
573
            time_interval: TimeInterval::new_instant(start_time)?,
4✔
574
            spatial_resolution: query_rect.spatial_resolution,
4✔
575
            attributes: band_idx.into(),
4✔
576
        }))
577
    }
4✔
578

579
    fn fold_method(&self) -> Self::FoldMethod {
4✔
580
        self.fold_fn.clone()
4✔
581
    }
4✔
582
}
583

584
pub fn identity_accu<T: Pixel>(
4✔
585
    tile_info: TileInformation,
4✔
586
    query_rect: &RasterQueryRectangle,
4✔
587
    pool: Arc<ThreadPool>,
4✔
588
) -> impl Future<Output = Result<RasterTileAccu2D<T>>> {
4✔
589
    let time_interval = query_rect.time_interval;
4✔
590
    crate::util::spawn_blocking(move || {
4✔
591
        let output_raster = EmptyGrid2D::new(tile_info.tile_size_in_pixels).into();
4✔
592
        let output_tile = RasterTile2D::new_with_tile_info(
4✔
593
            time_interval,
4✔
594
            tile_info,
4✔
595
            0,
4✔
596
            output_raster,
4✔
597
            CacheHint::max_duration(),
4✔
598
        );
4✔
599
        RasterTileAccu2D::new(output_tile, pool)
4✔
600
    })
4✔
601
    .map_err(From::from)
4✔
602
}
4✔
603

604
pub fn fold_by_blit_impl<T>(
4✔
605
    accu: RasterTileAccu2D<T>,
4✔
606
    tile: RasterTile2D<T>,
4✔
607
) -> Result<RasterTileAccu2D<T>>
4✔
608
where
4✔
609
    T: Pixel,
4✔
610
{
4✔
611
    let mut accu_tile = accu.tile;
4✔
612
    let pool = accu.pool;
4✔
613
    let t_union = accu_tile.time.union(&tile.time)?;
4✔
614

615
    accu_tile.time = t_union;
4✔
616

4✔
617
    if tile.grid_array.is_empty() && accu_tile.grid_array.is_empty() {
4✔
618
        // only skip if both tiles are empty. There might be valid data in one otherwise.
619
        return Ok(RasterTileAccu2D::new(accu_tile, pool));
×
620
    }
4✔
621

4✔
622
    let mut materialized_tile = accu_tile.into_materialized_tile();
4✔
623

4✔
624
    materialized_tile.blit(tile)?;
4✔
625

626
    Ok(RasterTileAccu2D::new(materialized_tile.into(), pool))
4✔
627
}
4✔
628

629
#[allow(dead_code)]
630
pub fn fold_by_blit_future<T>(
4✔
631
    accu: RasterTileAccu2D<T>,
4✔
632
    tile: RasterTile2D<T>,
4✔
633
) -> impl Future<Output = Result<RasterTileAccu2D<T>>>
4✔
634
where
4✔
635
    T: Pixel,
4✔
636
{
4✔
637
    crate::util::spawn_blocking(|| fold_by_blit_impl(accu, tile)).then(|x| async move {
4✔
638
        match x {
4✔
639
            Ok(r) => r,
4✔
UNCOV
640
            Err(e) => Err(e.into()),
×
641
        }
642
    })
8✔
643
}
4✔
644

645
#[cfg(test)]
646
mod tests {
647
    use geoengine_datatypes::{
648
        primitives::{SpatialPartition2D, SpatialResolution, TimeInterval},
649
        raster::{Grid, GridShape, RasterDataType, TilesEqualIgnoringCacheHint},
650
        spatial_reference::SpatialReference,
651
        util::test::TestDefault,
652
    };
653

654
    use super::*;
655
    use crate::engine::{
656
        MockExecutionContext, MockQueryContext, RasterBandDescriptors, RasterOperator,
657
        RasterResultDescriptor, WorkflowOperatorPath,
658
    };
659
    use crate::mock::{MockRasterSource, MockRasterSourceParams};
660
    use futures::StreamExt;
661

662
    #[tokio::test]
663
    async fn identity() {
1✔
664
        let data: Vec<RasterTile2D<u8>> = vec![
1✔
665
            RasterTile2D {
1✔
666
                time: TimeInterval::new_unchecked(0, 5),
1✔
667
                tile_position: [-1, 0].into(),
1✔
668
                band: 0,
1✔
669
                global_geo_transform: TestDefault::test_default(),
1✔
670
                grid_array: Grid::new([2, 2].into(), vec![1, 2, 3, 4]).unwrap().into(),
1✔
671
                properties: Default::default(),
1✔
672
                cache_hint: CacheHint::default(),
1✔
673
            },
1✔
674
            RasterTile2D {
1✔
675
                time: TimeInterval::new_unchecked(0, 5),
1✔
676
                tile_position: [-1, 1].into(),
1✔
677
                band: 0,
1✔
678
                global_geo_transform: TestDefault::test_default(),
1✔
679
                grid_array: Grid::new([2, 2].into(), vec![7, 8, 9, 10]).unwrap().into(),
1✔
680
                properties: Default::default(),
1✔
681
                cache_hint: CacheHint::default(),
1✔
682
            },
1✔
683
            RasterTile2D {
1✔
684
                time: TimeInterval::new_unchecked(5, 10),
1✔
685
                tile_position: [-1, 0].into(),
1✔
686
                band: 0,
1✔
687
                global_geo_transform: TestDefault::test_default(),
1✔
688
                grid_array: Grid::new([2, 2].into(), vec![13, 14, 15, 16])
1✔
689
                    .unwrap()
1✔
690
                    .into(),
1✔
691
                properties: Default::default(),
1✔
692
                cache_hint: CacheHint::default(),
1✔
693
            },
1✔
694
            RasterTile2D {
1✔
695
                time: TimeInterval::new_unchecked(5, 10),
1✔
696
                tile_position: [-1, 1].into(),
1✔
697
                band: 0,
1✔
698
                global_geo_transform: TestDefault::test_default(),
1✔
699
                grid_array: Grid::new([2, 2].into(), vec![19, 20, 21, 22])
1✔
700
                    .unwrap()
1✔
701
                    .into(),
1✔
702
                properties: Default::default(),
1✔
703
                cache_hint: CacheHint::default(),
1✔
704
            },
1✔
705
        ];
1✔
706

1✔
707
        let mrs1 = MockRasterSource {
1✔
708
            params: MockRasterSourceParams {
1✔
709
                data: data.clone(),
1✔
710
                result_descriptor: RasterResultDescriptor {
1✔
711
                    data_type: RasterDataType::U8,
1✔
712
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
713
                    time: None,
1✔
714
                    bbox: None,
1✔
715
                    resolution: None,
1✔
716
                    bands: RasterBandDescriptors::new_single_band(),
1✔
717
                },
1✔
718
            },
1✔
719
        }
1✔
720
        .boxed();
1✔
721

1✔
722
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
723
        exe_ctx.tiling_specification.tile_size_in_pixels = GridShape {
1✔
724
            shape_array: [2, 2],
1✔
725
        };
1✔
726

1✔
727
        let query_rect = RasterQueryRectangle {
1✔
728
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 1.).into(), (3., 0.).into()),
1✔
729
            time_interval: TimeInterval::new_unchecked(0, 10),
1✔
730
            spatial_resolution: SpatialResolution::one(),
1✔
731
            attributes: BandSelection::first(),
1✔
732
        };
1✔
733

1✔
734
        let query_ctx = MockQueryContext::test_default();
1✔
735
        let tiling_strat = exe_ctx.tiling_specification;
1✔
736

1✔
737
        let op = mrs1
1✔
738
            .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
739
            .await
1✔
740
            .unwrap();
1✔
741

1✔
742
        let qp = op.query_processor().unwrap().get_u8().unwrap();
1✔
743

1✔
744
        let a = RasterSubQueryAdapter::new(
1✔
745
            &qp,
1✔
746
            query_rect,
1✔
747
            tiling_strat,
1✔
748
            &query_ctx,
1✔
749
            TileSubQueryIdentity {
1✔
750
                fold_fn: fold_by_blit_future,
1✔
751
                _phantom_pixel_type: PhantomData,
1✔
752
            },
1✔
753
        );
1✔
754
        let res = a
1✔
755
            .map(Result::unwrap)
1✔
756
            .map(Option::unwrap)
1✔
757
            .collect::<Vec<RasterTile2D<u8>>>()
1✔
758
            .await;
6✔
759
        assert!(data.tiles_equal_ignoring_cache_hint(&res));
1✔
760
    }
1✔
761
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc