• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

67.5
/operators/src/processing/temporal_raster_aggregation/first_last_subquery.rs
1
use crate::{
2
    adapters::{FoldTileAccu, FoldTileAccuMut, SubQueryTileAggregator},
3
    util::Result,
4
};
5
use async_trait::async_trait;
6
use futures::{future::BoxFuture, Future, FutureExt, TryFuture, TryFutureExt};
7
use geoengine_datatypes::{
8
    primitives::{
9
        CacheHint, QueryRectangle, RasterQueryRectangle, SpatialPartitioned, TimeInstance,
10
        TimeInterval, TimeStep,
11
    },
12
    raster::{EmptyGrid2D, Pixel, RasterTile2D, TileInformation},
13
};
14
use rayon::ThreadPool;
15
use std::{marker::PhantomData, sync::Arc};
16

17
/// Only outputs the first tile as accumulator.
18
pub fn first_tile_fold_fn<T>(
6✔
19
    acc: TemporalRasterAggregationTileAccu<T>,
6✔
20
    tile: RasterTile2D<T>,
6✔
21
) -> TemporalRasterAggregationTileAccu<T>
6✔
22
where
6✔
23
    T: Pixel,
6✔
24
{
6✔
25
    if acc.initial_state {
6✔
26
        let mut next_accu = tile;
2✔
27
        next_accu.time = acc.accu_tile.time;
2✔
28

2✔
29
        TemporalRasterAggregationTileAccu {
2✔
30
            accu_tile: next_accu,
2✔
31
            initial_state: false,
2✔
32
            pool: acc.pool,
2✔
33
        }
2✔
34
    } else {
35
        acc
4✔
36
    }
37
}
6✔
38

39
pub fn first_tile_fold_future<T>(
6✔
40
    accu: TemporalRasterAggregationTileAccu<T>,
6✔
41
    tile: RasterTile2D<T>,
6✔
42
) -> impl Future<Output = Result<TemporalRasterAggregationTileAccu<T>>>
6✔
43
where
6✔
44
    T: Pixel,
6✔
45
{
6✔
46
    crate::util::spawn_blocking(|| first_tile_fold_fn(accu, tile)).then(move |x| async move {
6✔
47
        match x {
6✔
48
            Ok(r) => Ok(r),
6✔
UNCOV
49
            Err(e) => Err(e.into()),
×
50
        }
51
    })
12✔
52
}
6✔
53

54
/// Only outputs the last tile as accumulator.
55
#[allow(clippy::needless_pass_by_value)]
56
pub fn last_tile_fold_fn<T>(
12✔
57
    acc: TemporalRasterAggregationTileAccu<T>,
12✔
58
    tile: RasterTile2D<T>,
12✔
59
) -> TemporalRasterAggregationTileAccu<T>
12✔
60
where
12✔
61
    T: Pixel,
12✔
62
{
12✔
63
    let mut next_accu = tile;
12✔
64
    next_accu.time = acc.accu_tile.time;
12✔
65

12✔
66
    TemporalRasterAggregationTileAccu {
12✔
67
        accu_tile: next_accu,
12✔
68
        initial_state: false,
12✔
69
        pool: acc.pool,
12✔
70
    }
12✔
71
}
12✔
72

73
pub fn last_tile_fold_future<T>(
12✔
74
    accu: TemporalRasterAggregationTileAccu<T>,
12✔
75
    tile: RasterTile2D<T>,
12✔
76
) -> impl Future<Output = Result<TemporalRasterAggregationTileAccu<T>>>
12✔
77
where
12✔
78
    T: Pixel,
12✔
79
{
12✔
80
    crate::util::spawn_blocking(|| last_tile_fold_fn(accu, tile)).then(move |x| async move {
12✔
81
        match x {
12✔
82
            Ok(r) => Ok(r),
12✔
UNCOV
83
            Err(e) => Err(e.into()),
×
84
        }
85
    })
24✔
86
}
12✔
87

88
#[derive(Debug, Clone)]
89
pub struct TemporalRasterAggregationTileAccu<T> {
90
    accu_tile: RasterTile2D<T>,
91
    initial_state: bool,
92
    pool: Arc<ThreadPool>,
93
}
94

95
#[async_trait]
96
impl<T: Pixel> FoldTileAccu for TemporalRasterAggregationTileAccu<T> {
97
    type RasterType = T;
98

99
    async fn into_tile(self) -> Result<RasterTile2D<Self::RasterType>> {
6✔
100
        Ok(self.accu_tile)
6✔
101
    }
12✔
102

103
    fn thread_pool(&self) -> &Arc<ThreadPool> {
×
104
        &self.pool
×
105
    }
×
106
}
107

108
impl<T: Pixel> FoldTileAccuMut for TemporalRasterAggregationTileAccu<T> {
109
    fn tile_mut(&mut self) -> &mut RasterTile2D<Self::RasterType> {
×
110
        &mut self.accu_tile
×
111
    }
×
112
}
113

114
#[derive(Debug, Clone)]
115
pub struct TemporalRasterAggregationSubQuery<F, T: Pixel> {
116
    pub fold_fn: F,
117
    pub step: TimeStep,
118
    pub step_reference: TimeInstance,
119
    pub _phantom_pixel_type: PhantomData<T>,
120
}
121

122
impl<'a, T, FoldM, FoldF> SubQueryTileAggregator<'a, T>
123
    for TemporalRasterAggregationSubQuery<FoldM, T>
124
where
125
    T: Pixel,
126
    FoldM: Send
127
        + Sync
128
        + 'static
129
        + Clone
130
        + Fn(TemporalRasterAggregationTileAccu<T>, RasterTile2D<T>) -> FoldF,
131
    FoldF: Send + TryFuture<Ok = TemporalRasterAggregationTileAccu<T>, Error = crate::error::Error>,
132
{
133
    type TileAccu = TemporalRasterAggregationTileAccu<T>;
134
    type TileAccuFuture = BoxFuture<'a, Result<Self::TileAccu>>;
135

136
    type FoldFuture = FoldF;
137

138
    type FoldMethod = FoldM;
139

140
    fn new_fold_accu(
×
141
        &self,
×
142
        tile_info: TileInformation,
×
143
        query_rect: RasterQueryRectangle,
×
144
        pool: &Arc<ThreadPool>,
×
145
    ) -> Self::TileAccuFuture {
×
146
        build_temporal_accu(&query_rect, tile_info, pool.clone()).boxed()
×
147
    }
×
148

149
    fn tile_query_rectangle(
×
150
        &self,
×
151
        tile_info: TileInformation,
×
152
        query_rect: RasterQueryRectangle,
×
153
        start_time: TimeInstance,
×
154
        band_idx: u32,
×
155
    ) -> Result<Option<RasterQueryRectangle>> {
×
156
        let snapped_start = self.step.snap_relative(self.step_reference, start_time)?;
×
157
        Ok(Some(QueryRectangle {
158
            spatial_bounds: tile_info.spatial_partition(),
×
159
            spatial_resolution: query_rect.spatial_resolution,
×
160
            time_interval: TimeInterval::new(snapped_start, (snapped_start + self.step)?)?,
×
161
            attributes: band_idx.into(),
×
162
        }))
163
    }
×
164

165
    fn fold_method(&self) -> Self::FoldMethod {
×
166
        self.fold_fn.clone()
×
167
    }
×
168
}
169

170
fn build_temporal_accu<T: Pixel>(
×
171
    query_rect: &RasterQueryRectangle,
×
172
    tile_info: TileInformation,
×
173
    pool: Arc<ThreadPool>,
×
174
) -> impl Future<Output = Result<TemporalRasterAggregationTileAccu<T>>> {
×
175
    let time_interval = query_rect.time_interval;
×
176
    crate::util::spawn_blocking(move || TemporalRasterAggregationTileAccu {
×
177
        accu_tile: RasterTile2D::new_with_tile_info(
×
178
            time_interval,
×
179
            tile_info,
×
180
            0,
×
181
            EmptyGrid2D::new(tile_info.tile_size_in_pixels).into(),
×
182
            CacheHint::max_duration(),
×
183
        ),
×
184

×
185
        initial_state: true,
×
186
        pool,
×
187
    })
×
188
    .map_err(From::from)
×
189
}
×
190

191
#[derive(Debug, Clone)]
192
pub struct TemporalRasterAggregationSubQueryNoDataOnly<F, T: Pixel> {
193
    pub fold_fn: F,
194
    pub step: TimeStep,
195
    pub step_reference: TimeInstance,
196
    pub _phantom_pixel_type: PhantomData<T>,
197
}
198

199
impl<'a, T, FoldM, FoldF> SubQueryTileAggregator<'a, T>
200
    for TemporalRasterAggregationSubQueryNoDataOnly<FoldM, T>
201
where
202
    T: Pixel,
203
    FoldM: Send
204
        + Sync
205
        + 'static
206
        + Clone
207
        + Fn(TemporalRasterAggregationTileAccu<T>, RasterTile2D<T>) -> FoldF,
208
    FoldF: Send + TryFuture<Ok = TemporalRasterAggregationTileAccu<T>, Error = crate::error::Error>,
209
{
210
    type TileAccu = TemporalRasterAggregationTileAccu<T>;
211
    type TileAccuFuture = BoxFuture<'a, Result<Self::TileAccu>>;
212
    type FoldFuture = FoldF;
213

214
    type FoldMethod = FoldM;
215

216
    fn new_fold_accu(
6✔
217
        &self,
6✔
218
        tile_info: TileInformation,
6✔
219
        query_rect: RasterQueryRectangle,
6✔
220
        pool: &Arc<ThreadPool>,
6✔
221
    ) -> Self::TileAccuFuture {
6✔
222
        build_temporal_no_data_accu(&query_rect, tile_info, pool.clone()).boxed()
6✔
223
    }
6✔
224

225
    fn tile_query_rectangle(
6✔
226
        &self,
6✔
227
        tile_info: TileInformation,
6✔
228
        query_rect: RasterQueryRectangle,
6✔
229
        start_time: TimeInstance,
6✔
230
        band_idx: u32,
6✔
231
    ) -> Result<Option<RasterQueryRectangle>> {
6✔
232
        let snapped_start = self.step.snap_relative(self.step_reference, start_time)?;
6✔
233
        Ok(Some(QueryRectangle {
234
            spatial_bounds: tile_info.spatial_partition(),
6✔
235
            spatial_resolution: query_rect.spatial_resolution,
6✔
236
            time_interval: TimeInterval::new(snapped_start, (snapped_start + self.step)?)?,
6✔
237
            attributes: band_idx.into(),
6✔
238
        }))
239
    }
6✔
240

241
    fn fold_method(&self) -> Self::FoldMethod {
6✔
242
        self.fold_fn.clone()
6✔
243
    }
6✔
244
}
245

246
fn build_temporal_no_data_accu<T: Pixel>(
6✔
247
    query_rect: &RasterQueryRectangle,
6✔
248
    tile_info: TileInformation,
6✔
249
    pool: Arc<ThreadPool>,
6✔
250
) -> impl Future<Output = Result<TemporalRasterAggregationTileAccu<T>>> {
6✔
251
    let time_interval = query_rect.time_interval;
6✔
252
    crate::util::spawn_blocking(move || {
6✔
253
        let output_raster = EmptyGrid2D::new(tile_info.tile_size_in_pixels).into();
6✔
254

6✔
255
        TemporalRasterAggregationTileAccu {
6✔
256
            accu_tile: RasterTile2D::new_with_tile_info(
6✔
257
                time_interval,
6✔
258
                tile_info,
6✔
259
                0,
6✔
260
                output_raster,
6✔
261
                CacheHint::max_duration(),
6✔
262
            ),
6✔
263
            initial_state: true,
6✔
264
            pool,
6✔
265
        }
6✔
266
    })
6✔
267
    .map_err(From::from)
6✔
268
}
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc