• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.76
/operators/src/processing/temporal_raster_aggregation/first_last_subquery.rs
1
use crate::{
2
    adapters::{FoldTileAccu, FoldTileAccuMut, SubQueryTileAggregator},
3
    util::Result,
4
};
5
use async_trait::async_trait;
6
use futures::{future::BoxFuture, Future, FutureExt, TryFuture, TryFutureExt};
7
use geoengine_datatypes::{
8
    primitives::{
9
        QueryRectangle, RasterQueryRectangle, SpatialPartitioned, TimeInstance, TimeInterval,
10
        TimeStep,
11
    },
12
    raster::{EmptyGrid2D, Pixel, RasterTile2D, TileInformation},
13
};
14
use rayon::ThreadPool;
15
use std::{marker::PhantomData, sync::Arc};
16

17
/// Only outputs the first tile as accumulator.
18
pub fn first_tile_fold_fn<T>(
6✔
19
    acc: TemporalRasterAggregationTileAccu<T>,
6✔
20
    tile: RasterTile2D<T>,
6✔
21
) -> TemporalRasterAggregationTileAccu<T>
6✔
22
where
6✔
23
    T: Pixel,
6✔
24
{
6✔
25
    if acc.initial_state {
6✔
26
        let mut next_accu = tile;
2✔
27
        next_accu.time = acc.accu_tile.time;
2✔
28

2✔
29
        TemporalRasterAggregationTileAccu {
2✔
30
            accu_tile: next_accu,
2✔
31
            initial_state: false,
2✔
32
            pool: acc.pool,
2✔
33
        }
2✔
34
    } else {
35
        acc
4✔
36
    }
37
}
6✔
38

39
pub fn first_tile_fold_future<T>(
6✔
40
    accu: TemporalRasterAggregationTileAccu<T>,
6✔
41
    tile: RasterTile2D<T>,
6✔
42
) -> impl Future<Output = Result<TemporalRasterAggregationTileAccu<T>>>
6✔
43
where
6✔
44
    T: Pixel,
6✔
45
{
6✔
46
    crate::util::spawn_blocking(|| first_tile_fold_fn(accu, tile)).then(move |x| async move {
6✔
47
        match x {
6✔
48
            Ok(r) => Ok(r),
6✔
49
            Err(e) => Err(e.into()),
×
50
        }
51
    })
6✔
52
}
6✔
53

54
/// Only outputs the last tile as accumulator.
55
#[allow(clippy::needless_pass_by_value)]
56
pub fn last_tile_fold_fn<T>(
12✔
57
    acc: TemporalRasterAggregationTileAccu<T>,
12✔
58
    tile: RasterTile2D<T>,
12✔
59
) -> TemporalRasterAggregationTileAccu<T>
12✔
60
where
12✔
61
    T: Pixel,
12✔
62
{
12✔
63
    let mut next_accu = tile;
12✔
64
    next_accu.time = acc.accu_tile.time;
12✔
65

12✔
66
    TemporalRasterAggregationTileAccu {
12✔
67
        accu_tile: next_accu,
12✔
68
        initial_state: false,
12✔
69
        pool: acc.pool,
12✔
70
    }
12✔
71
}
12✔
72

73
pub fn last_tile_fold_future<T>(
12✔
74
    accu: TemporalRasterAggregationTileAccu<T>,
12✔
75
    tile: RasterTile2D<T>,
12✔
76
) -> impl Future<Output = Result<TemporalRasterAggregationTileAccu<T>>>
12✔
77
where
12✔
78
    T: Pixel,
12✔
79
{
12✔
80
    crate::util::spawn_blocking(|| last_tile_fold_fn(accu, tile)).then(move |x| async move {
12✔
81
        match x {
12✔
82
            Ok(r) => Ok(r),
12✔
83
            Err(e) => Err(e.into()),
×
84
        }
85
    })
12✔
86
}
12✔
87

88
#[derive(Debug, Clone)]
×
89
pub struct TemporalRasterAggregationTileAccu<T> {
90
    accu_tile: RasterTile2D<T>,
91
    initial_state: bool,
92
    pool: Arc<ThreadPool>,
93
}
94

95
#[async_trait]
96
impl<T: Pixel> FoldTileAccu for TemporalRasterAggregationTileAccu<T> {
97
    type RasterType = T;
98

99
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
6✔
100
        Ok(self.accu_tile)
6✔
101
    }
6✔
102

103
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
104
        &self.pool
×
105
    }
×
106
}
107

108
impl<T: Pixel> FoldTileAccuMut for TemporalRasterAggregationTileAccu<T> {
109
    fn tile_mut(&mut self) -> &mut RasterTile2D<Self::RasterType> {
×
110
        &mut self.accu_tile
×
111
    }
×
112
}
113

114
#[derive(Debug, Clone)]
×
115
pub struct TemporalRasterAggregationSubQuery<F, T: Pixel> {
116
    pub fold_fn: F,
117
    pub step: TimeStep,
118
    pub step_reference: TimeInstance,
119
    pub _phantom_pixel_type: PhantomData<T>,
120
}
121

122
impl<'a, T, FoldM, FoldF> SubQueryTileAggregator<'a, T>
123
    for TemporalRasterAggregationSubQuery<FoldM, T>
124
where
125
    T: Pixel,
126
    FoldM: Send
127
        + Sync
128
        + 'static
129
        + Clone
130
        + Fn(TemporalRasterAggregationTileAccu<T>, RasterTile2D<T>) -> FoldF,
131
    FoldF: Send + TryFuture<Ok = TemporalRasterAggregationTileAccu<T>, Error = crate::error::Error>,
132
{
133
    type TileAccu = TemporalRasterAggregationTileAccu<T>;
134
    type TileAccuFuture = BoxFuture<'a, Result<Self::TileAccu>>;
135

136
    type FoldFuture = FoldF;
137

138
    type FoldMethod = FoldM;
139

140
    fn new_fold_accu(
×
141
        &self,
×
142
        tile_info: TileInformation,
×
143
        query_rect: RasterQueryRectangle,
×
144
        pool: &Arc<ThreadPool>,
×
145
    ) -> Self::TileAccuFuture {
×
146
        build_temporal_accu(query_rect, tile_info, pool.clone()).boxed()
×
147
    }
×
148

149
    fn tile_query_rectangle(
×
150
        &self,
×
151
        tile_info: TileInformation,
×
152
        query_rect: RasterQueryRectangle,
×
153
        start_time: TimeInstance,
×
154
    ) -> Result<Option<RasterQueryRectangle>> {
×
155
        let snapped_start = self.step.snap_relative(self.step_reference, start_time)?;
×
156
        Ok(Some(QueryRectangle {
157
            spatial_bounds: tile_info.spatial_partition(),
×
158
            spatial_resolution: query_rect.spatial_resolution,
×
159
            time_interval: TimeInterval::new(snapped_start, (snapped_start + self.step)?)?,
×
160
        }))
161
    }
×
162

163
    fn fold_method(&self) -> Self::FoldMethod {
×
164
        self.fold_fn.clone()
×
165
    }
×
166
}
167

168
fn build_temporal_accu<T: Pixel>(
×
169
    query_rect: RasterQueryRectangle,
×
170
    tile_info: TileInformation,
×
171
    pool: Arc<ThreadPool>,
×
172
) -> impl Future<Output = Result<TemporalRasterAggregationTileAccu<T>>> {
×
173
    crate::util::spawn_blocking(move || TemporalRasterAggregationTileAccu {
×
174
        accu_tile: RasterTile2D::new_with_tile_info(
×
175
            query_rect.time_interval,
×
176
            tile_info,
×
177
            EmptyGrid2D::new(tile_info.tile_size_in_pixels).into(),
×
178
        ),
×
179
        initial_state: true,
×
180
        pool,
×
181
    })
×
182
    .map_err(From::from)
×
183
}
×
184

185
#[derive(Debug, Clone)]
×
186
pub struct TemporalRasterAggregationSubQueryNoDataOnly<F, T: Pixel> {
187
    pub fold_fn: F,
188
    pub step: TimeStep,
189
    pub step_reference: TimeInstance,
190
    pub _phantom_pixel_type: PhantomData<T>,
191
}
192

193
impl<'a, T, FoldM, FoldF> SubQueryTileAggregator<'a, T>
194
    for TemporalRasterAggregationSubQueryNoDataOnly<FoldM, T>
195
where
196
    T: Pixel,
197
    FoldM: Send
198
        + Sync
199
        + 'static
200
        + Clone
201
        + Fn(TemporalRasterAggregationTileAccu<T>, RasterTile2D<T>) -> FoldF,
202
    FoldF: Send + TryFuture<Ok = TemporalRasterAggregationTileAccu<T>, Error = crate::error::Error>,
203
{
204
    type TileAccu = TemporalRasterAggregationTileAccu<T>;
205
    type TileAccuFuture = BoxFuture<'a, Result<Self::TileAccu>>;
206
    type FoldFuture = FoldF;
207

208
    type FoldMethod = FoldM;
209

210
    fn new_fold_accu(
6✔
211
        &self,
6✔
212
        tile_info: TileInformation,
6✔
213
        query_rect: RasterQueryRectangle,
6✔
214
        pool: &Arc<ThreadPool>,
6✔
215
    ) -> Self::TileAccuFuture {
6✔
216
        build_temporal_no_data_accu(query_rect, tile_info, pool.clone()).boxed()
6✔
217
    }
6✔
218

219
    fn tile_query_rectangle(
6✔
220
        &self,
6✔
221
        tile_info: TileInformation,
6✔
222
        query_rect: RasterQueryRectangle,
6✔
223
        start_time: TimeInstance,
6✔
224
    ) -> Result<Option<RasterQueryRectangle>> {
6✔
225
        let snapped_start = self.step.snap_relative(self.step_reference, start_time)?;
6✔
226
        Ok(Some(QueryRectangle {
227
            spatial_bounds: tile_info.spatial_partition(),
6✔
228
            spatial_resolution: query_rect.spatial_resolution,
6✔
229
            time_interval: TimeInterval::new(snapped_start, (snapped_start + self.step)?)?,
6✔
230
        }))
231
    }
6✔
232

233
    fn fold_method(&self) -> Self::FoldMethod {
6✔
234
        self.fold_fn.clone()
6✔
235
    }
6✔
236
}
237

238
fn build_temporal_no_data_accu<T: Pixel>(
6✔
239
    query_rect: RasterQueryRectangle,
6✔
240
    tile_info: TileInformation,
6✔
241
    pool: Arc<ThreadPool>,
6✔
242
) -> impl Future<Output = Result<TemporalRasterAggregationTileAccu<T>>> {
6✔
243
    crate::util::spawn_blocking(move || {
6✔
244
        let output_raster = EmptyGrid2D::new(tile_info.tile_size_in_pixels).into();
6✔
245

6✔
246
        TemporalRasterAggregationTileAccu {
6✔
247
            accu_tile: RasterTile2D::new_with_tile_info(
6✔
248
                query_rect.time_interval,
6✔
249
                tile_info,
6✔
250
                output_raster,
6✔
251
            ),
6✔
252
            initial_state: true,
6✔
253
            pool,
6✔
254
        }
6✔
255
    })
6✔
256
    .map_err(From::from)
6✔
257
}
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc