• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.68
/operators/src/processing/temporal_raster_aggregation/subquery.rs
1
use super::aggregators::TemporalRasterPixelAggregator;
2
use crate::{
3
    adapters::{FoldTileAccu, SubQueryTileAggregator},
4
    util::Result,
5
};
6
use async_trait::async_trait;
7
use futures::TryFuture;
8
use geoengine_datatypes::{
9
    primitives::{RasterQueryRectangle, SpatialPartitioned, TimeInstance, TimeInterval, TimeStep},
10
    raster::{
11
        EmptyGrid2D, GeoTransform, GridIdx2D, GridIndexAccess, GridOrEmpty, GridOrEmpty2D,
12
        GridShapeAccess, Pixel, RasterTile2D, TileInformation, UpdateIndexedElementsParallel,
13
    },
14
};
15
use rayon::ThreadPool;
16
use std::{marker::PhantomData, sync::Arc};
17

18
/// A method to fold a tile into the accumulator.
19
pub async fn subquery_all_tiles_fold_fn<P: Pixel, F: TemporalRasterPixelAggregator<P> + 'static>(
105✔
20
    accu: TileAccumulator<P, F>,
105✔
21
    tile: RasterTile2D<P>,
105✔
22
) -> Result<TileAccumulator<P, F>> {
105✔
23
    crate::util::spawn_blocking_with_thread_pool(accu.pool.clone(), || {
105✔
24
        let mut accu = accu;
105✔
25
        accu.add_tile(tile)?;
105✔
26
        Ok(accu)
105✔
27
    })
105✔
28
    .await?
105✔
29
}
105✔
30

31
/// An accumulator for a time series of tiles in the same position.
32
#[derive(Debug, Clone)]
×
33
pub struct TileAccumulator<P: Pixel, F: TemporalRasterPixelAggregator<P>> {
34
    time: TimeInterval,
35
    tile_position: GridIdx2D,
36
    global_geo_transform: GeoTransform,
37
    state_grid: GridOrEmpty2D<F::PixelState>,
38
    prestine: bool,
39
    pool: Arc<ThreadPool>,
40
}
41

42
impl<P, F> TileAccumulator<P, F>
43
where
44
    P: Pixel,
45
    F: TemporalRasterPixelAggregator<P> + 'static,
46
{
47
    pub fn add_tile(&mut self, in_tile: RasterTile2D<P>) -> Result<()> {
105✔
48
        self.time = self.time.union(&in_tile.time)?;
105✔
49

50
        debug_assert!(self.state_grid.grid_shape() == in_tile.grid_shape());
105✔
51

52
        let in_tile_grid = match in_tile.grid_array {
105✔
53
            GridOrEmpty::Grid(g) => g,
80✔
54
            GridOrEmpty::Empty(_) => {
55
                self.prestine = false;
25✔
56
                return Ok(());
25✔
57
            }
58
        };
59

60
        match &mut self.state_grid {
47✔
61
            GridOrEmpty::Empty(_) if !self.prestine && !F::IGNORE_NO_DATA => {
47✔
62
                // every pixel is nodata we will keep it like this forever
6✔
63
            }
6✔
64

65
            GridOrEmpty::Empty(_) => {
41✔
66
                // TODO: handle case where this could stay empty
41✔
67

41✔
68
                let map_fn = |lin_idx: usize, _acc_values_option| {
246✔
69
                    let new_value_option = in_tile_grid.get_at_grid_index_unchecked(lin_idx);
246✔
70
                    F::initialize(new_value_option)
246✔
71
                };
246✔
72

41✔
73
                self.state_grid.update_indexed_elements_parallel(map_fn);
41✔
74
            }
41✔
75

76
            GridOrEmpty::Grid(g) => {
33✔
77
                let map_fn = |lin_idx: usize, acc_values_option: Option<F::PixelState>| {
198✔
78
                    let new_value_option = in_tile_grid.get_at_grid_index_unchecked(lin_idx);
198✔
79
                    F::aggregate(acc_values_option, new_value_option)
198✔
80
                };
198✔
81

33✔
82
                g.update_indexed_elements_parallel(map_fn);
33✔
83
            }
33✔
84
        }
85

86
        self.prestine = false;
80✔
87
        Ok(())
80✔
88
    }
105✔
89
}
90

91
#[async_trait]
92
impl<P, F> FoldTileAccu for TileAccumulator<P, F>
93
where
94
    P: Pixel,
95
    F: TemporalRasterPixelAggregator<P>,
96
{
97
    type RasterType = P;
98

99
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
45✔
100
        let TileAccumulator {
101
            time,
45✔
102
            tile_position,
45✔
103
            global_geo_transform,
45✔
104
            state_grid,
45✔
105
            prestine: _,
45✔
106
            pool: _pool,
45✔
107
        } = self;
45✔
108

45✔
109
        Ok(RasterTile2D::new(
45✔
110
            time,
45✔
111
            tile_position,
45✔
112
            global_geo_transform,
45✔
113
            F::into_grid(state_grid)?,
45✔
114
        ))
115
    }
90✔
116

117
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
118
        &self.pool
×
119
    }
×
120
}
121

122
/// A subquery that aggregates a time series of tiles.
123
#[derive(Debug, Clone)]
×
124
pub struct TemporalRasterAggregationSubQuery<FoldFn, P: Pixel, F: TemporalRasterPixelAggregator<P>>
125
{
126
    pub fold_fn: FoldFn,
127
    pub step: TimeStep,
128
    pub step_reference: TimeInstance,
129
    pub _phantom_pixel_type: PhantomData<(P, F)>,
130
}
131

132
impl<'a, P, F, FoldM, FoldF> SubQueryTileAggregator<'a, P>
133
    for TemporalRasterAggregationSubQuery<FoldM, P, F>
134
where
135
    P: Pixel,
136
    F: TemporalRasterPixelAggregator<P> + 'static,
137
    FoldM: Send + Sync + 'static + Clone + Fn(TileAccumulator<P, F>, RasterTile2D<P>) -> FoldF,
138
    FoldF: Send + TryFuture<Ok = TileAccumulator<P, F>, Error = crate::error::Error>,
139
{
140
    type TileAccu = TileAccumulator<P, F>;
141
    type TileAccuFuture = futures::future::Ready<Result<Self::TileAccu>>;
142

143
    type FoldFuture = FoldF;
144

145
    type FoldMethod = FoldM;
146

147
    fn new_fold_accu(
45✔
148
        &self,
45✔
149
        tile_info: TileInformation,
45✔
150
        query_rect: RasterQueryRectangle,
45✔
151
        pool: &Arc<ThreadPool>,
45✔
152
    ) -> Self::TileAccuFuture {
45✔
153
        let accu = TileAccumulator {
45✔
154
            time: query_rect.time_interval,
45✔
155
            tile_position: tile_info.global_tile_position,
45✔
156
            global_geo_transform: tile_info.global_geo_transform,
45✔
157
            state_grid: EmptyGrid2D::new(tile_info.tile_size_in_pixels).into(),
45✔
158
            prestine: true,
45✔
159
            pool: pool.clone(),
45✔
160
        };
45✔
161

45✔
162
        futures::future::ok(accu)
45✔
163
    }
45✔
164

165
    fn tile_query_rectangle(
45✔
166
        &self,
45✔
167
        tile_info: TileInformation,
45✔
168
        query_rect: RasterQueryRectangle,
45✔
169
        start_time: TimeInstance,
45✔
170
    ) -> Result<Option<RasterQueryRectangle>> {
45✔
171
        let snapped_start = self.step.snap_relative(self.step_reference, start_time)?;
45✔
172
        Ok(Some(RasterQueryRectangle {
173
            spatial_bounds: tile_info.spatial_partition(),
45✔
174
            spatial_resolution: query_rect.spatial_resolution,
45✔
175
            time_interval: TimeInterval::new(snapped_start, (snapped_start + self.step)?)?,
45✔
176
        }))
177
    }
45✔
178

179
    fn fold_method(&self) -> Self::FoldMethod {
45✔
180
        self.fold_fn.clone()
45✔
181
    }
45✔
182
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc