• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/operators/src/processing/expression/query_processor.rs
1
use std::{marker::PhantomData, sync::Arc};
2

3
use async_trait::async_trait;
4
use futures::{stream::BoxStream, StreamExt, TryStreamExt};
5
use geoengine_datatypes::{
6
    primitives::{RasterQueryRectangle, SpatialPartition2D, TimeInterval},
7
    raster::{
8
        ConvertDataType, FromIndexFnParallel, GeoTransform, GridIdx2D, GridIndexAccess,
9
        GridOrEmpty, GridOrEmpty2D, GridShape2D, GridShapeAccess, MapElementsParallel, Pixel,
10
        RasterTile2D,
11
    },
12
};
13
use libloading::Symbol;
14
use num_traits::AsPrimitive;
15

16
use crate::{
17
    adapters::{QueryWrapper, RasterArrayTimeAdapter, RasterTimeAdapter},
18
    engine::{BoxRasterQueryProcessor, QueryContext, QueryProcessor},
19
    util::Result,
20
};
21

22
use super::compiled::LinkedExpression;
23

24
pub struct ExpressionQueryProcessor<TO, Sources>
25
where
26
    TO: Pixel,
27
{
28
    pub sources: Sources,
29
    pub phantom_data: PhantomData<TO>,
30
    pub program: Arc<LinkedExpression>,
31
    pub map_no_data: bool,
32
}
33

34
impl<TO, Sources> ExpressionQueryProcessor<TO, Sources>
35
where
36
    TO: Pixel,
37
{
38
    pub fn new(program: LinkedExpression, sources: Sources, map_no_data: bool) -> Self {
10✔
39
        Self {
10✔
40
            sources,
10✔
41
            program: Arc::new(program),
10✔
42
            phantom_data: PhantomData::default(),
10✔
43
            map_no_data,
10✔
44
        }
10✔
45
    }
10✔
46
}
47

48
#[async_trait]
49
impl<TO, Tuple> QueryProcessor for ExpressionQueryProcessor<TO, Tuple>
50
where
51
    TO: Pixel,
52
    Tuple: ExpressionTupleProcessor<TO>,
53
{
54
    type Output = RasterTile2D<TO>;
55
    type SpatialBounds = SpatialPartition2D;
56

57
    async fn _query<'b>(
13✔
58
        &'b self,
13✔
59
        query: RasterQueryRectangle,
13✔
60
        ctx: &'b dyn QueryContext,
13✔
61
    ) -> Result<BoxStream<'b, Result<Self::Output>>> {
13✔
62
        let stream = self
13✔
63
            .sources
13✔
64
            .queries(query, ctx)
13✔
65
            .await?
×
66
            .and_then(move |rasters| async move {
39✔
67
                if Tuple::all_empty(&rasters) {
39✔
68
                    return Ok(Tuple::empty_raster(&rasters));
×
69
                }
39✔
70

39✔
71
                let (out_time, out_tile_position, out_global_geo_transform, _output_grid_shape) =
39✔
72
                    Tuple::metadata(&rasters);
39✔
73

39✔
74
                let program = self.program.clone();
39✔
75
                let map_no_data = self.map_no_data;
39✔
76

77
                let out = crate::util::spawn_blocking_with_thread_pool(
39✔
78
                    ctx.thread_pool().clone(),
39✔
79
                    move || Tuple::compute_expression(rasters, &program, map_no_data),
39✔
80
                )
39✔
81
                .await??;
41✔
82

83
                Ok(RasterTile2D::new(
39✔
84
                    out_time,
39✔
85
                    out_tile_position,
39✔
86
                    out_global_geo_transform,
39✔
87
                    out,
39✔
88
                ))
39✔
89
            });
39✔
90

13✔
91
        Ok(stream.boxed())
13✔
92
    }
26✔
93
}
94

95
#[async_trait]
96
trait ExpressionTupleProcessor<TO: Pixel>: Send + Sync {
97
    type Tuple: Send + 'static;
98

99
    async fn queries<'a>(
100
        &'a self,
101
        query: RasterQueryRectangle,
102
        ctx: &'a dyn QueryContext,
103
    ) -> Result<BoxStream<'a, Result<Self::Tuple>>>;
104

105
    fn all_empty(tuple: &Self::Tuple) -> bool;
106

107
    fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO>;
108

109
    fn metadata(tuple: &Self::Tuple) -> (TimeInterval, GridIdx2D, GeoTransform, GridShape2D);
110

111
    fn compute_expression(
112
        tuple: Self::Tuple,
113
        program: &LinkedExpression,
114
        map_no_data: bool,
115
    ) -> Result<GridOrEmpty2D<TO>>;
116
}
117

118
#[async_trait]
119
impl<TO, T1> ExpressionTupleProcessor<TO> for BoxRasterQueryProcessor<T1>
120
where
121
    TO: Pixel,
122
    T1: Pixel + AsPrimitive<TO>,
123
{
124
    type Tuple = RasterTile2D<T1>;
125

126
    #[inline]
127
    async fn queries<'a>(
7✔
128
        &'a self,
7✔
129
        query: RasterQueryRectangle,
7✔
130
        ctx: &'a dyn QueryContext,
7✔
131
    ) -> Result<BoxStream<'a, Result<Self::Tuple>>> {
7✔
132
        let stream = self.query(query, ctx).await?;
7✔
133

134
        Ok(stream.boxed())
7✔
135
    }
14✔
136

137
    #[inline]
138
    fn all_empty(tuple: &Self::Tuple) -> bool {
11✔
139
        tuple.grid_array.is_empty()
11✔
140
    }
11✔
141

142
    #[inline]
143
    fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO> {
×
144
        tuple.clone().convert_data_type()
×
145
    }
×
146

147
    #[inline]
148
    fn metadata(tuple: &Self::Tuple) -> (TimeInterval, GridIdx2D, GeoTransform, GridShape2D) {
11✔
149
        let raster = &tuple;
11✔
150

11✔
151
        (
11✔
152
            raster.time,
11✔
153
            raster.tile_position,
11✔
154
            raster.global_geo_transform,
11✔
155
            raster.grid_shape(),
11✔
156
        )
11✔
157
    }
11✔
158

159
    #[inline]
160
    fn compute_expression(
11✔
161
        raster: Self::Tuple,
11✔
162
        program: &LinkedExpression,
11✔
163
        map_no_data: bool,
11✔
164
    ) -> Result<GridOrEmpty2D<TO>> {
11✔
165
        let expression = unsafe {
11✔
166
            // we have to "trust" that the function has the signature we expect
167
            program.function_1::<Option<f64>>()?
11✔
168
        };
169

170
        let map_fn = |in_value: Option<T1>| {
66✔
171
            // TODO: could be a |in_value: T1| if map no data is false!
66✔
172
            if !map_no_data && in_value.is_none() {
66✔
173
                return None;
1✔
174
            }
65✔
175

65✔
176
            let result = expression(in_value.map(AsPrimitive::as_));
65✔
177

65✔
178
            result.map(TO::from_)
65✔
179
        };
66✔
180

181
        let res = raster.grid_array.map_elements_parallel(map_fn);
11✔
182

11✔
183
        Result::Ok(res)
11✔
184
    }
11✔
185
}
186

187
// TODO: implement this via macro for 2-8 sources
188
#[async_trait]
189
impl<TO, T1, T2> ExpressionTupleProcessor<TO>
190
    for (BoxRasterQueryProcessor<T1>, BoxRasterQueryProcessor<T2>)
191
where
192
    TO: Pixel,
193
    T1: Pixel + AsPrimitive<TO>,
194
    T2: Pixel,
195
{
196
    type Tuple = (RasterTile2D<T1>, RasterTile2D<T2>);
197

198
    #[inline]
199
    async fn queries<'a>(
4✔
200
        &'a self,
4✔
201
        query: RasterQueryRectangle,
4✔
202
        ctx: &'a dyn QueryContext,
4✔
203
    ) -> Result<BoxStream<'a, Result<Self::Tuple>>> {
4✔
204
        let source_a = QueryWrapper { p: &self.0, ctx };
4✔
205

4✔
206
        let source_b = QueryWrapper { p: &self.1, ctx };
4✔
207

4✔
208
        Ok(Box::pin(RasterTimeAdapter::new(source_a, source_b, query)))
4✔
209
    }
4✔
210

211
    #[inline]
212
    fn all_empty(tuple: &Self::Tuple) -> bool {
26✔
213
        tuple.0.grid_array.is_empty() && tuple.1.grid_array.is_empty()
26✔
214
    }
26✔
215

216
    #[inline]
217
    fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO> {
×
218
        tuple.0.clone().convert_data_type()
×
219
    }
×
220

221
    #[inline]
222
    fn metadata(tuple: &Self::Tuple) -> (TimeInterval, GridIdx2D, GeoTransform, GridShape2D) {
26✔
223
        let raster = &tuple.0;
26✔
224

26✔
225
        (
26✔
226
            raster.time,
26✔
227
            raster.tile_position,
26✔
228
            raster.global_geo_transform,
26✔
229
            raster.grid_shape(),
26✔
230
        )
26✔
231
    }
26✔
232

233
    #[inline]
234
    fn compute_expression(
26✔
235
        rasters: Self::Tuple,
26✔
236
        program: &LinkedExpression,
26✔
237
        map_no_data: bool,
26✔
238
    ) -> Result<GridOrEmpty2D<TO>> {
26✔
239
        let expression = unsafe {
26✔
240
            // we have to "trust" that the function has the signature we expect
241
            program.function_2::<Option<f64>, Option<f64>>()?
26✔
242
        };
243

244
        let map_fn = |lin_idx: usize| {
6,205,926✔
245
            let t0_value = rasters.0.get_at_grid_index_unchecked(lin_idx);
6,205,926✔
246
            let t1_value = rasters.1.get_at_grid_index_unchecked(lin_idx);
6,205,926✔
247

6,205,926✔
248
            if !map_no_data && (t0_value.is_none() || t1_value.is_none()) {
6,271,937✔
249
                return None;
6,208,624✔
250
            }
74,721✔
251

74,721✔
252
            let result = expression(
74,721✔
253
                t0_value.map(AsPrimitive::as_),
74,721✔
254
                t1_value.map(AsPrimitive::as_),
74,721✔
255
            );
74,721✔
256

74,721✔
257
            result.map(TO::from_)
74,721✔
258
        };
6,283,345✔
259

260
        let grid_shape = rasters.0.grid_shape();
26✔
261
        let out = GridOrEmpty::from_index_fn_parallel(&grid_shape, map_fn);
26✔
262

26✔
263
        Result::Ok(out)
26✔
264
    }
26✔
265
}
266

267
type Function3 = fn(Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
268
type Function4 = fn(Option<f64>, Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
269
type Function5 = fn(Option<f64>, Option<f64>, Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
270
type Function6 =
271
    fn(Option<f64>, Option<f64>, Option<f64>, Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
272
type Function7 = fn(
273
    Option<f64>,
274
    Option<f64>,
275
    Option<f64>,
276
    Option<f64>,
277
    Option<f64>,
278
    Option<f64>,
279
    Option<f64>,
280
) -> Option<f64>;
281
type Function8 = fn(
282
    Option<f64>,
283
    Option<f64>,
284
    Option<f64>,
285
    Option<f64>,
286
    Option<f64>,
287
    Option<f64>,
288
    Option<f64>,
289
    Option<f64>,
290
) -> Option<f64>;
291

292
macro_rules! impl_expression_tuple_processor {
293
    ( $i:tt => $( $x:tt ),+ ) => {
294
        paste::paste! {
295
            impl_expression_tuple_processor!(
296
                @inner
297
                $i
298
                |
299
                $( $x ),*
300
                |
301
                $( [< pixel_ $x >] ),*
302
                |
303
                $( [< is_nodata_ $x >] ),*
304
                |
305
                [< Function $i >]
306
            );
307
        }
308
    };
309

310
    // We have `0, 1, 2, …` and `T0, T1, T2, …`
311
    (@inner $N:tt | $( $I:tt ),+ | $( $PIXEL:tt ),+ | $( $IS_NODATA:tt ),+ | $FN_T:ty ) => {
312
        #[async_trait]
313
        impl<TO, T1> ExpressionTupleProcessor<TO> for [BoxRasterQueryProcessor<T1>; $N]
314
        where
315
            TO: Pixel,
316
            T1 : Pixel + AsPrimitive<TO>
317
        {
318
            type Tuple = [RasterTile2D<T1>; $N];
319

320
            #[inline]
321
            async fn queries<'a>(
2✔
322
                &'a self,
2✔
323
                query: RasterQueryRectangle,
2✔
324
                ctx: &'a dyn QueryContext,
2✔
325
            ) -> Result<BoxStream<'a, Result<Self::Tuple>>> {
2✔
326
                let sources = [$( QueryWrapper { p: &self[$I], ctx } ),*];
327

328
                Ok(Box::pin(RasterArrayTimeAdapter::new(sources, query)))
329
            }
330

331
            #[inline]
332
            fn all_empty(tuple: &Self::Tuple) -> bool {
333
                $( tuple[$I].grid_array.is_empty() )&&*
2✔
334
            }
2✔
335

336
            #[inline]
337
            fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO> {
×
338
                tuple[0].clone().convert_data_type()
×
339
            }
×
340

341
            #[inline]
342
            fn metadata(tuple: &Self::Tuple) -> (TimeInterval, GridIdx2D, GeoTransform, GridShape2D) {
2✔
343
                let raster = &tuple[0];
2✔
344

2✔
345
                (
2✔
346
                    raster.time,
2✔
347
                    raster.tile_position,
2✔
348
                    raster.global_geo_transform,
2✔
349
                    raster.grid_shape(),
2✔
350
                )
2✔
351
            }
2✔
352

353
            fn compute_expression(
2✔
354
                rasters: Self::Tuple,
2✔
355
                program: &LinkedExpression,
2✔
356
                map_no_data: bool,
2✔
357
            ) -> Result<GridOrEmpty2D<TO>> {
2✔
358
                let expression: Symbol<$FN_T> = unsafe {
2✔
359
                    // we have to "trust" that the function has the signature we expect
360
                    program.function_nary()?
2✔
361
                };
362

363
                let map_fn = |lin_idx: usize| {
2✔
364
                    $(
365
                        let $PIXEL = rasters[$I].get_at_grid_index_unchecked(lin_idx);
366
                        let $IS_NODATA = $PIXEL.is_none();
367
                    )*
368

369
                    if !map_no_data && ( $($IS_NODATA)||* ) {
370
                        return None;
371
                    }
372

373
                    let result = expression(
374
                        $(
375
                            $PIXEL.map(AsPrimitive::as_)
376
                        ),*
377
                    );
378

379
                    result.map(TO::from_)
380
                };
381

382
                let grid_shape = rasters[0].grid_shape();
2✔
383
                let out = GridOrEmpty::from_index_fn_parallel(&grid_shape, map_fn);
2✔
384

2✔
385
                Result::Ok(out)
2✔
386
            }
2✔
387
        }
388
    };
389

390
    // For any input, generate `f64, bool`
391
    (@input_dtypes $x:tt) => {
392
        f64, bool
393
    };
394
}
395

396
impl_expression_tuple_processor!(3 => 0, 1, 2);
6✔
397
impl_expression_tuple_processor!(4 => 0, 1, 2, 3);
×
398
impl_expression_tuple_processor!(5 => 0, 1, 2, 3, 4);
×
399
impl_expression_tuple_processor!(6 => 0, 1, 2, 3, 4, 5);
×
400
impl_expression_tuple_processor!(7 => 0, 1, 2, 3, 4, 5, 6);
×
401
impl_expression_tuple_processor!(8 => 0, 1, 2, 3, 4, 5, 6, 7);
6✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc