• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 9209766736

23 May 2024 01:55PM UTC coverage: 90.571% (-0.004%) from 90.575%
9209766736

push

github

web-flow
Merge pull request #948 from geo-engine/percentiles

Compute Percentile Estimates

309 of 348 new or added lines in 4 files covered. (88.79%)

12 existing lines in 9 files now uncovered.

131051 of 144694 relevant lines covered (90.57%)

53422.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.42
/operators/src/processing/temporal_raster_aggregation/subquery.rs
1
use super::aggregators::{GlobalStateTemporalRasterPixelAggregator, TemporalRasterPixelAggregator};
2
use crate::{
3
    adapters::{FoldTileAccu, SubQueryTileAggregator},
4
    util::Result,
5
};
6
use async_trait::async_trait;
7
use futures::TryFuture;
8
use geoengine_datatypes::{
9
    primitives::{
10
        CacheHint, RasterQueryRectangle, SpatialPartitioned, TimeInstance, TimeInterval, TimeStep,
11
    },
12
    raster::{
13
        EmptyGrid2D, GeoTransform, GridIdx2D, GridIndexAccess, GridOrEmpty, GridOrEmpty2D,
14
        GridShapeAccess, Pixel, RasterTile2D, TileInformation, UpdateIndexedElementsParallel,
15
    },
16
};
17
use rayon::ThreadPool;
18
use std::{marker::PhantomData, sync::Arc};
19

20
/// A method to fold a tile into the accumulator.
21
pub async fn subquery_all_tiles_fold_fn<P: Pixel, F: TemporalRasterPixelAggregator<P> + 'static>(
121✔
22
    accu: TileAccumulator<P, F>,
121✔
23
    tile: RasterTile2D<P>,
121✔
24
) -> Result<TileAccumulator<P, F>> {
121✔
25
    crate::util::spawn_blocking_with_thread_pool(accu.pool.clone(), || {
121✔
26
        let mut accu = accu;
121✔
27
        accu.add_tile(tile);
121✔
28
        Ok(accu)
121✔
29
    })
121✔
30
    .await?
121✔
31
}
121✔
32

33
/// A method to fold a tile into the accumulator.
34
pub async fn subquery_all_tiles_global_state_fold_fn<
8✔
35
    P: Pixel,
8✔
36
    F: GlobalStateTemporalRasterPixelAggregator<P> + 'static,
8✔
37
>(
8✔
38
    accu: GlobalStateTileAccumulator<P, F>,
8✔
39
    tile: RasterTile2D<P>,
8✔
40
) -> Result<GlobalStateTileAccumulator<P, F>> {
8✔
41
    crate::util::spawn_blocking_with_thread_pool(accu.pool.clone(), || {
8✔
42
        let mut accu = accu;
8✔
43
        accu.add_tile(tile);
8✔
44
        Ok(accu)
8✔
45
    })
8✔
46
    .await?
8✔
47
}
8✔
48

49
/// An accumulator for a time series of tiles in the same position.
50
#[derive(Debug, Clone)]
×
51
pub struct TileAccumulator<P: Pixel, F: TemporalRasterPixelAggregator<P>> {
52
    time: TimeInterval,
53
    tile_position: GridIdx2D,
54
    global_geo_transform: GeoTransform,
55
    state_grid: GridOrEmpty2D<F::PixelState>,
56
    prestine: bool,
57
    pool: Arc<ThreadPool>,
58
    cache_hint: CacheHint,
59
}
60

61
/// An accumulator for a time series of tiles in the same position.
NEW
62
#[derive(Debug, Clone)]
×
63
pub struct GlobalStateTileAccumulator<P: Pixel, F: GlobalStateTemporalRasterPixelAggregator<P>> {
64
    aggregator: Arc<F>,
65
    time: TimeInterval,
66
    tile_position: GridIdx2D,
67
    global_geo_transform: GeoTransform,
68
    state_grid: GridOrEmpty2D<F::PixelState>,
69
    prestine: bool,
70
    pool: Arc<ThreadPool>,
71
    cache_hint: CacheHint,
72
}
73

74
impl<P, F> TileAccumulator<P, F>
75
where
76
    P: Pixel,
77
    F: TemporalRasterPixelAggregator<P> + 'static,
78
{
79
    pub fn add_tile(&mut self, in_tile: RasterTile2D<P>) {
121✔
80
        // Add a tile to the accumulator, which represents the aggregate over a time interval that contains the in_tile.
81
        // TODO: for tiles which are only partially contained in the aggregate time interval, investigate whether the pixels have to be scaled (e.g. if the pixel is a count, assume uniform distribution and divide it by the fraction of time that is contained in the aggregate)".
82

83
        // The tile must intersect the time of the query otherwise it includes wrong data
84
        debug_assert!(
85
            self.time.intersects(&in_tile.time),
121✔
86
            "Tile time {:?} does not intersect the accumulator/query time {:?}",
×
87
            in_tile.time,
88
            self.time
89
        );
90

91
        debug_assert!(self.state_grid.grid_shape() == in_tile.grid_shape());
121✔
92

93
        let in_tile_grid = match in_tile.grid_array {
121✔
94
            GridOrEmpty::Grid(g) => g,
96✔
95
            GridOrEmpty::Empty(_) => {
96
                self.prestine = false;
25✔
97
                return;
25✔
98
            }
99
        };
100

101
        match &mut self.state_grid {
55✔
102
            GridOrEmpty::Empty(_) if !self.prestine && !F::IGNORE_NO_DATA => {
55✔
103
                // every pixel is nodata we will keep it like this forever
6✔
104
            }
6✔
105

106
            GridOrEmpty::Empty(_) => {
49✔
107
                // TODO: handle case where this could stay empty
49✔
108

49✔
109
                let map_fn = |lin_idx: usize,
49✔
110
                              _acc_values_option: Option<F::PixelState>|
111
                 -> Option<F::PixelState> {
294✔
112
                    let new_value_option = in_tile_grid.get_at_grid_index_unchecked(lin_idx);
294✔
113
                    F::initialize(new_value_option)
294✔
114
                };
294✔
115

49✔
116
                self.state_grid.update_indexed_elements_parallel(map_fn);
49✔
117
            }
49✔
118

119
            GridOrEmpty::Grid(g) => {
41✔
120
                let map_fn = |lin_idx: usize, acc_values_option: Option<F::PixelState>| {
246✔
121
                    let new_value_option = in_tile_grid.get_at_grid_index_unchecked(lin_idx);
246✔
122
                    F::aggregate(acc_values_option, new_value_option)
246✔
123
                };
246✔
124

41✔
125
                g.update_indexed_elements_parallel(map_fn);
41✔
126
            }
41✔
127
        }
128

129
        self.prestine = false;
96✔
130
        self.cache_hint.merge_with(&in_tile.cache_hint);
96✔
131
    }
121✔
132
}
133

134
impl<P, F> GlobalStateTileAccumulator<P, F>
135
where
136
    P: Pixel,
137
    F: GlobalStateTemporalRasterPixelAggregator<P> + 'static,
138
{
139
    pub fn add_tile(&mut self, in_tile: RasterTile2D<P>) {
8✔
140
        // Add a tile to the accumulator, which represents the aggregate over a time interval that contains the in_tile.
141
        // TODO: for tiles which are only partially contained in the aggregate time interval, investigate whether the pixels have to be scaled (e.g. if the pixel is a count, assume uniform distribution and divide it by the fraction of time that is contained in the aggregate)".
142

143
        // The tile must intersect the time of the query otherwise it includes wrong data
144
        debug_assert!(
145
            self.time.intersects(&in_tile.time),
8✔
NEW
146
            "Tile time {:?} does not intersect the accumulator/query time {:?}",
×
147
            in_tile.time,
148
            self.time
149
        );
150

151
        debug_assert!(self.state_grid.grid_shape() == in_tile.grid_shape());
8✔
152

153
        let in_tile_grid = match in_tile.grid_array {
8✔
154
            GridOrEmpty::Grid(g) => g,
8✔
155
            GridOrEmpty::Empty(_) => {
NEW
156
                self.prestine = false;
×
NEW
157
                return;
×
158
            }
159
        };
160

161
        match &mut self.state_grid {
2✔
162
            GridOrEmpty::Empty(_) if !self.prestine && !F::IGNORE_NO_DATA => {
2✔
NEW
163
                // every pixel is nodata we will keep it like this forever
×
NEW
164
            }
×
165

166
            GridOrEmpty::Empty(_) => {
2✔
167
                // TODO: handle case where this could stay empty
2✔
168

2✔
169
                let aggregator = self.aggregator.clone();
2✔
170
                let map_fn = move |lin_idx: usize,
2✔
171
                                   _acc_values_option: Option<F::PixelState>|
172
                      -> Option<F::PixelState> {
12✔
173
                    let new_value_option = in_tile_grid.get_at_grid_index_unchecked(lin_idx);
12✔
174
                    aggregator.initialize(new_value_option)
12✔
175
                };
12✔
176

2✔
177
                self.state_grid.update_indexed_elements_parallel(map_fn);
2✔
178
            }
2✔
179

180
            GridOrEmpty::Grid(g) => {
6✔
181
                let aggregator = self.aggregator.clone();
6✔
182
                let map_fn = move |lin_idx: usize, acc_values_option: Option<F::PixelState>| {
36✔
183
                    let new_value_option = in_tile_grid.get_at_grid_index_unchecked(lin_idx);
36✔
184
                    aggregator.aggregate(acc_values_option, new_value_option)
36✔
185
                };
36✔
186

6✔
187
                g.update_indexed_elements_parallel(map_fn);
6✔
188
            }
6✔
189
        }
190

191
        self.prestine = false;
8✔
192
        self.cache_hint.merge_with(&in_tile.cache_hint);
8✔
193
    }
8✔
194
}
195

196
#[async_trait]
197
impl<P, F> FoldTileAccu for TileAccumulator<P, F>
198
where
199
    P: Pixel,
200
    F: TemporalRasterPixelAggregator<P>,
201
{
202
    type RasterType = P;
203

204
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
53✔
205
        let TileAccumulator {
206
            time,
53✔
207
            tile_position,
53✔
208
            global_geo_transform,
53✔
209
            state_grid,
53✔
210
            prestine: _,
53✔
211
            pool: _pool,
53✔
212
            cache_hint,
53✔
213
        } = self;
53✔
214

53✔
215
        Ok(RasterTile2D::new(
53✔
216
            time,
53✔
217
            tile_position,
53✔
218
            0,
53✔
219
            global_geo_transform,
53✔
220
            F::into_grid(state_grid)?,
53✔
221
            cache_hint,
53✔
222
        ))
223
    }
159✔
224

225
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
226
        &self.pool
×
227
    }
×
228
}
229

230
#[async_trait]
231
impl<P, F> FoldTileAccu for GlobalStateTileAccumulator<P, F>
232
where
233
    P: Pixel,
234
    F: GlobalStateTemporalRasterPixelAggregator<P>,
235
{
236
    type RasterType = P;
237

238
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
2✔
239
        let Self {
240
            aggregator,
2✔
241
            time,
2✔
242
            tile_position,
2✔
243
            global_geo_transform,
2✔
244
            state_grid,
2✔
245
            prestine: _,
2✔
246
            pool: _pool,
2✔
247
            cache_hint,
2✔
248
        } = self;
2✔
249

2✔
250
        Ok(RasterTile2D::new(
2✔
251
            time,
2✔
252
            tile_position,
2✔
253
            0,
2✔
254
            global_geo_transform,
2✔
255
            aggregator.to_grid(state_grid)?,
2✔
256
            cache_hint,
2✔
257
        ))
258
    }
6✔
259

NEW
260
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
NEW
261
        &self.pool
×
NEW
262
    }
×
263
}
264

265
/// A subquery that aggregates a time series of tiles.
266
#[derive(Debug, Clone)]
×
267
pub struct TemporalRasterAggregationSubQuery<FoldFn, P: Pixel, F: TemporalRasterPixelAggregator<P>>
268
{
269
    pub fold_fn: FoldFn,
270
    pub step: TimeStep,
271
    pub step_reference: TimeInstance,
272
    pub _phantom_pixel_type: PhantomData<(P, F)>,
273
}
274

275
/// A subquery that aggregates a time series of tiles.
NEW
276
#[derive(Debug, Clone)]
×
277
pub struct GlobalStateTemporalRasterAggregationSubQuery<
278
    FoldFn,
279
    P: Pixel,
280
    F: GlobalStateTemporalRasterPixelAggregator<P>,
281
> {
282
    pub aggregator: Arc<F>,
283
    pub fold_fn: FoldFn,
284
    pub step: TimeStep,
285
    pub step_reference: TimeInstance,
286
    pub _phantom_pixel_type: PhantomData<(P, F)>,
287
}
288

289
impl<'a, P, F, FoldM, FoldF> SubQueryTileAggregator<'a, P>
290
    for TemporalRasterAggregationSubQuery<FoldM, P, F>
291
where
292
    P: Pixel,
293
    F: TemporalRasterPixelAggregator<P> + 'static,
294
    FoldM: Send + Sync + 'static + Clone + Fn(TileAccumulator<P, F>, RasterTile2D<P>) -> FoldF,
295
    FoldF: Send + TryFuture<Ok = TileAccumulator<P, F>, Error = crate::error::Error>,
296
{
297
    type TileAccu = TileAccumulator<P, F>;
298
    type TileAccuFuture = futures::future::Ready<Result<Self::TileAccu>>;
299

300
    type FoldFuture = FoldF;
301

302
    type FoldMethod = FoldM;
303

304
    fn new_fold_accu(
53✔
305
        &self,
53✔
306
        tile_info: TileInformation,
53✔
307
        query_rect: RasterQueryRectangle,
53✔
308
        pool: &Arc<ThreadPool>,
53✔
309
    ) -> Self::TileAccuFuture {
53✔
310
        let accu = TileAccumulator {
53✔
311
            time: query_rect.time_interval,
53✔
312
            tile_position: tile_info.global_tile_position,
53✔
313
            global_geo_transform: tile_info.global_geo_transform,
53✔
314
            state_grid: EmptyGrid2D::new(tile_info.tile_size_in_pixels).into(),
53✔
315
            prestine: true,
53✔
316
            pool: pool.clone(),
53✔
317
            cache_hint: CacheHint::max_duration(),
53✔
318
        };
53✔
319

53✔
320
        futures::future::ok(accu)
53✔
321
    }
53✔
322

323
    fn tile_query_rectangle(
53✔
324
        &self,
53✔
325
        tile_info: TileInformation,
53✔
326
        query_rect: RasterQueryRectangle,
53✔
327
        start_time: TimeInstance,
53✔
328
        band_idx: u32,
53✔
329
    ) -> Result<Option<RasterQueryRectangle>> {
53✔
330
        let snapped_start = self.step.snap_relative(self.step_reference, start_time)?;
53✔
331
        Ok(Some(RasterQueryRectangle {
332
            spatial_bounds: tile_info.spatial_partition(),
53✔
333
            spatial_resolution: query_rect.spatial_resolution,
53✔
334
            time_interval: TimeInterval::new(snapped_start, (snapped_start + self.step)?)?,
53✔
335
            attributes: band_idx.into(),
53✔
336
        }))
337
    }
53✔
338

339
    fn fold_method(&self) -> Self::FoldMethod {
53✔
340
        self.fold_fn.clone()
53✔
341
    }
53✔
342
}
343

344
impl<'a, P, F, FoldM, FoldF> SubQueryTileAggregator<'a, P>
345
    for GlobalStateTemporalRasterAggregationSubQuery<FoldM, P, F>
346
where
347
    P: Pixel,
348
    F: GlobalStateTemporalRasterPixelAggregator<P> + 'static,
349
    FoldM: Send
350
        + Sync
351
        + 'static
352
        + Clone
353
        + Fn(GlobalStateTileAccumulator<P, F>, RasterTile2D<P>) -> FoldF,
354
    FoldF: Send + TryFuture<Ok = GlobalStateTileAccumulator<P, F>, Error = crate::error::Error>,
355
{
356
    type TileAccu = GlobalStateTileAccumulator<P, F>;
357
    type TileAccuFuture = futures::future::Ready<Result<Self::TileAccu>>;
358

359
    type FoldFuture = FoldF;
360

361
    type FoldMethod = FoldM;
362

363
    fn new_fold_accu(
2✔
364
        &self,
2✔
365
        tile_info: TileInformation,
2✔
366
        query_rect: RasterQueryRectangle,
2✔
367
        pool: &Arc<ThreadPool>,
2✔
368
    ) -> Self::TileAccuFuture {
2✔
369
        let accu = GlobalStateTileAccumulator {
2✔
370
            aggregator: self.aggregator.clone(),
2✔
371
            time: query_rect.time_interval,
2✔
372
            tile_position: tile_info.global_tile_position,
2✔
373
            global_geo_transform: tile_info.global_geo_transform,
2✔
374
            state_grid: EmptyGrid2D::new(tile_info.tile_size_in_pixels).into(),
2✔
375
            prestine: true,
2✔
376
            pool: pool.clone(),
2✔
377
            cache_hint: CacheHint::max_duration(),
2✔
378
        };
2✔
379

2✔
380
        futures::future::ok(accu)
2✔
381
    }
2✔
382

383
    fn tile_query_rectangle(
2✔
384
        &self,
2✔
385
        tile_info: TileInformation,
2✔
386
        query_rect: RasterQueryRectangle,
2✔
387
        start_time: TimeInstance,
2✔
388
        band_idx: u32,
2✔
389
    ) -> Result<Option<RasterQueryRectangle>> {
2✔
390
        let snapped_start = self.step.snap_relative(self.step_reference, start_time)?;
2✔
391
        Ok(Some(RasterQueryRectangle {
392
            spatial_bounds: tile_info.spatial_partition(),
2✔
393
            spatial_resolution: query_rect.spatial_resolution,
2✔
394
            time_interval: TimeInterval::new(snapped_start, (snapped_start + self.step)?)?,
2✔
395
            attributes: band_idx.into(),
2✔
396
        }))
397
    }
2✔
398

399
    fn fold_method(&self) -> Self::FoldMethod {
2✔
400
        self.fold_fn.clone()
2✔
401
    }
2✔
402
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc