• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 14513006992

17 Apr 2025 09:53AM UTC coverage: 90.021%. First build
14513006992

Pull #1048

github

web-flow
Merge ff954054c into 1076a6163
Pull Request #1048: 1.86

32 of 77 new or added lines in 36 files covered. (41.56%)

126708 of 140754 relevant lines covered (90.02%)

57184.51 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.48
/operators/src/processing/expression/raster_query_processor.rs
1
use super::RasterExpressionError;
2
use crate::{
3
    engine::{BoxRasterQueryProcessor, QueryContext, QueryProcessor, RasterResultDescriptor},
4
    util::Result,
5
};
6
use async_trait::async_trait;
7
use futures::{StreamExt, TryStreamExt, stream::BoxStream};
8
use geoengine_datatypes::{
9
    primitives::{
10
        BandSelection, CacheHint, RasterQueryRectangle, SpatialPartition2D, TimeInterval,
11
    },
12
    raster::{
13
        ConvertDataType, FromIndexFnParallel, GeoTransform, GridIdx2D, GridIndexAccess,
14
        GridOrEmpty, GridOrEmpty2D, GridShape2D, GridShapeAccess, MapElementsParallel, Pixel,
15
        RasterTile2D,
16
    },
17
};
18
use geoengine_expression::LinkedExpression;
19
use libloading::Symbol;
20
use num_traits::AsPrimitive;
21
use std::{marker::PhantomData, sync::Arc};
22

23
pub struct ExpressionInput<const N: usize> {
24
    pub raster: BoxRasterQueryProcessor<f64>,
25
}
26

27
pub struct ExpressionQueryProcessor<TO, Sources>
28
where
29
    TO: Pixel,
30
{
31
    pub sources: Sources,
32
    pub result_descriptor: RasterResultDescriptor,
33
    pub phantom_data: PhantomData<TO>,
34
    pub program: Arc<LinkedExpression>,
35
    pub map_no_data: bool,
36
}
37

38
impl<TO, Sources> ExpressionQueryProcessor<TO, Sources>
39
where
40
    TO: Pixel,
41
{
42
    pub fn new(
16✔
43
        program: LinkedExpression,
16✔
44
        sources: Sources,
16✔
45
        result_descriptor: RasterResultDescriptor,
16✔
46
        map_no_data: bool,
16✔
47
    ) -> Self {
16✔
48
        Self {
16✔
49
            sources,
16✔
50
            result_descriptor,
16✔
51
            program: Arc::new(program),
16✔
52
            phantom_data: PhantomData,
16✔
53
            map_no_data,
16✔
54
        }
16✔
55
    }
16✔
56
}
57

58
#[async_trait]
59
impl<TO, Tuple> QueryProcessor for ExpressionQueryProcessor<TO, Tuple>
60
where
61
    TO: Pixel,
62
    Tuple: ExpressionTupleProcessor<TO>,
63
{
64
    type Output = RasterTile2D<TO>;
65
    type SpatialBounds = SpatialPartition2D;
66
    type Selection = BandSelection;
67
    type ResultDescription = RasterResultDescriptor;
68

69
    async fn _query<'b>(
70
        &'b self,
71
        query: RasterQueryRectangle,
72
        ctx: &'b dyn QueryContext,
73
    ) -> Result<BoxStream<'b, Result<Self::Output>>> {
21✔
74
        // rewrite query to request all input bands from the source. They are all combined in the single output band by means of the expression.
75
        let source_query = RasterQueryRectangle {
21✔
76
            spatial_bounds: query.spatial_bounds,
21✔
77
            time_interval: query.time_interval,
21✔
78
            spatial_resolution: query.spatial_resolution,
21✔
79
            attributes: BandSelection::first_n(Tuple::num_bands()),
21✔
80
        };
21✔
81

82
        let stream =
21✔
83
            self.sources
21✔
84
                .zip_bands(source_query, ctx)
21✔
85
                .await?
21✔
86
                .and_then(move |rasters| async move {
67✔
87
                    if Tuple::all_empty(&rasters) {
67✔
88
                        return Ok(Tuple::empty_raster(&rasters));
16✔
89
                    }
51✔
90

51✔
91
                    let (
51✔
92
                        out_time,
51✔
93
                        out_tile_position,
51✔
94
                        out_global_geo_transform,
51✔
95
                        _output_grid_shape,
51✔
96
                        cache_hint,
51✔
97
                    ) = Tuple::metadata(&rasters);
51✔
98

51✔
99
                    let program = self.program.clone();
51✔
100
                    let map_no_data = self.map_no_data;
51✔
101

102
                    let out = crate::util::spawn_blocking_with_thread_pool(
51✔
103
                        ctx.thread_pool().clone(),
51✔
104
                        move || Tuple::compute_expression(rasters, &program, map_no_data),
51✔
105
                    )
51✔
106
                    .await??;
51✔
107

108
                    Ok(RasterTile2D::new(
51✔
109
                        out_time,
51✔
110
                        out_tile_position,
51✔
111
                        0,
51✔
112
                        out_global_geo_transform,
51✔
113
                        out,
51✔
114
                        cache_hint,
51✔
115
                    ))
51✔
116
                });
134✔
117

21✔
118
        Ok(stream.boxed())
21✔
119
    }
42✔
120

121
    fn result_descriptor(&self) -> &RasterResultDescriptor {
52✔
122
        &self.result_descriptor
52✔
123
    }
52✔
124
}
125

126
#[async_trait]
127
trait ExpressionTupleProcessor<TO: Pixel>: Send + Sync {
128
    type Tuple: Send + 'static;
129

130
    async fn zip_bands<'a>(
131
        &'a self,
132
        query: RasterQueryRectangle,
133
        ctx: &'a dyn QueryContext,
134
    ) -> Result<BoxStream<'a, Result<Self::Tuple>>>;
135

136
    fn all_empty(tuple: &Self::Tuple) -> bool;
137

138
    fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO>;
139

140
    fn metadata(
141
        tuple: &Self::Tuple,
142
    ) -> (
143
        TimeInterval,
144
        GridIdx2D,
145
        GeoTransform,
146
        GridShape2D,
147
        CacheHint,
148
    );
149

150
    fn compute_expression(
151
        tuple: Self::Tuple,
152
        program: &LinkedExpression,
153
        map_no_data: bool,
154
    ) -> Result<GridOrEmpty2D<TO>>;
155

156
    fn num_bands() -> u32;
157
}
158

159
#[async_trait]
160
impl<TO> ExpressionTupleProcessor<TO> for ExpressionInput<1>
161
where
162
    TO: Pixel,
163
    f64: AsPrimitive<TO>,
164
{
165
    type Tuple = RasterTile2D<f64>;
166

167
    #[inline]
168
    async fn zip_bands<'a>(
169
        &'a self,
170
        query: RasterQueryRectangle,
171
        ctx: &'a dyn QueryContext,
172
    ) -> Result<BoxStream<'a, Result<Self::Tuple>>> {
13✔
173
        self.raster.query(query, ctx).await
13✔
174
    }
26✔
175

176
    #[inline]
177
    fn all_empty(tuple: &Self::Tuple) -> bool {
37✔
178
        tuple.grid_array.is_empty()
37✔
179
    }
37✔
180

181
    #[inline]
182
    fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO> {
16✔
183
        tuple.clone().convert_data_type()
16✔
184
    }
16✔
185

186
    #[inline]
187
    fn metadata(
21✔
188
        tuple: &Self::Tuple,
21✔
189
    ) -> (
21✔
190
        TimeInterval,
21✔
191
        GridIdx2D,
21✔
192
        GeoTransform,
21✔
193
        GridShape2D,
21✔
194
        CacheHint,
21✔
195
    ) {
21✔
196
        let raster = &tuple;
21✔
197

21✔
198
        (
21✔
199
            raster.time,
21✔
200
            raster.tile_position,
21✔
201
            raster.global_geo_transform,
21✔
202
            raster.grid_shape(),
21✔
203
            raster.cache_hint,
21✔
204
        )
21✔
205
    }
21✔
206

207
    #[inline]
208
    fn compute_expression(
21✔
209
        raster: Self::Tuple,
21✔
210
        program: &LinkedExpression,
21✔
211
        map_no_data: bool,
21✔
212
    ) -> Result<GridOrEmpty2D<TO>> {
21✔
213
        let expression = unsafe {
21✔
214
            // we have to "trust" that the function has the signature we expect
215
            program
21✔
216
                .function_1::<Option<f64>>()
21✔
217
                .map_err(RasterExpressionError::from)?
21✔
218
        };
219

220
        let map_fn = |in_value: Option<f64>| {
2,097,230✔
221
            // TODO: could be a |in_value: T1| if map no data is false!
2,097,230✔
222
            if !map_no_data && in_value.is_none() {
2,097,230✔
223
                return None;
1,967,945✔
224
            }
129,285✔
225

129,285✔
226
            let result = expression(in_value);
129,285✔
227

129,285✔
228
            result.map(TO::from_)
129,285✔
229
        };
2,097,230✔
230

231
        let res = raster.grid_array.map_elements_parallel(map_fn);
21✔
232

21✔
233
        Result::Ok(res)
21✔
234
    }
21✔
235

236
    fn num_bands() -> u32 {
13✔
237
        1
13✔
238
    }
13✔
239
}
240

241
// TODO: implement this via macro for 2-8 sources
242
#[async_trait]
243
impl<TO> ExpressionTupleProcessor<TO> for ExpressionInput<2>
244
where
245
    TO: Pixel,
246
    f64: AsPrimitive<TO>,
247
{
248
    type Tuple = (RasterTile2D<f64>, RasterTile2D<f64>);
249

250
    #[inline]
251
    async fn zip_bands<'a>(
252
        &'a self,
253
        query: RasterQueryRectangle,
254
        ctx: &'a dyn QueryContext,
255
    ) -> Result<BoxStream<'a, Result<Self::Tuple>>> {
5✔
256
        // chunk up the stream to get all bands for a spatial tile at once
257
        let stream = self.raster.query(query, ctx).await?.chunks(2).map(|chunk| {
27✔
258
            if chunk.len() != 2 {
27✔
259
                // if there are not exactly two tiles, it should mean the last tile was an error and the chunker ended prematurely
NEW
260
                if let Some(Err(e)) = chunk.into_iter().next_back() {
×
261
                    return Err(e);
×
262
                }
×
263
                // if there is no error, the source did not produce all bands, which likely means a bug in an operator
×
264
                return Err(crate::error::Error::MustNotHappen {
×
265
                    message: "source did not produce all bands".to_string(),
×
266
                });
×
267
            }
27✔
268

27✔
269
            let [a, b]: [Result<RasterTile2D<f64>>; 2] = chunk
27✔
270
                .try_into()
27✔
271
                .expect("all chunks should be of length 2 because it was checked above");
27✔
272
            match (a, b) {
27✔
273
                (Ok(t0), Ok(t1)) => Ok((t0, t1)),
27✔
274
                (Err(e), _) | (_, Err(e)) => Err(e),
×
275
            }
276
        });
27✔
277

5✔
278
        Ok(stream.boxed())
5✔
279
    }
10✔
280

281
    #[inline]
282
    fn all_empty(tuple: &Self::Tuple) -> bool {
27✔
283
        tuple.0.grid_array.is_empty() && tuple.1.grid_array.is_empty()
27✔
284
    }
27✔
285

286
    #[inline]
287
    fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO> {
×
288
        tuple.0.clone().convert_data_type()
×
289
    }
×
290

291
    #[inline]
292
    fn metadata(
27✔
293
        tuple: &Self::Tuple,
27✔
294
    ) -> (
27✔
295
        TimeInterval,
27✔
296
        GridIdx2D,
27✔
297
        GeoTransform,
27✔
298
        GridShape2D,
27✔
299
        CacheHint,
27✔
300
    ) {
27✔
301
        let raster = &tuple.0;
27✔
302

27✔
303
        (
27✔
304
            raster.time,
27✔
305
            raster.tile_position,
27✔
306
            raster.global_geo_transform,
27✔
307
            raster.grid_shape(),
27✔
308
            tuple.0.cache_hint.merged(&tuple.1.cache_hint),
27✔
309
        )
27✔
310
    }
27✔
311

312
    #[inline]
313
    fn compute_expression(
27✔
314
        rasters: Self::Tuple,
27✔
315
        program: &LinkedExpression,
27✔
316
        map_no_data: bool,
27✔
317
    ) -> Result<GridOrEmpty2D<TO>> {
27✔
318
        let expression = unsafe {
27✔
319
            // we have to "trust" that the function has the signature we expect
320
            program
27✔
321
                .function_2::<Option<f64>, Option<f64>>()
27✔
322
                .map_err(RasterExpressionError::from)?
27✔
323
        };
324

325
        let map_fn = |lin_idx: usize| {
6,291,474✔
326
            let t0_value = rasters.0.get_at_grid_index_unchecked(lin_idx);
6,291,474✔
327
            let t1_value = rasters.1.get_at_grid_index_unchecked(lin_idx);
6,291,474✔
328

6,291,474✔
329
            if !map_no_data && (t0_value.is_none() || t1_value.is_none()) {
6,291,474✔
330
                return None;
6,221,747✔
331
            }
69,727✔
332

69,727✔
333
            let result = expression(t0_value, t1_value);
69,727✔
334

69,727✔
335
            result.map(TO::from_)
69,727✔
336
        };
6,291,474✔
337

338
        let grid_shape = rasters.0.grid_shape();
27✔
339
        let out = GridOrEmpty::from_index_fn_parallel(&grid_shape, map_fn);
27✔
340

27✔
341
        Result::Ok(out)
27✔
342
    }
27✔
343

344
    fn num_bands() -> u32 {
5✔
345
        2
5✔
346
    }
5✔
347
}
348

349
type Function3 = fn(Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
350
type Function4 = fn(Option<f64>, Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
351
type Function5 = fn(Option<f64>, Option<f64>, Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
352
type Function6 =
353
    fn(Option<f64>, Option<f64>, Option<f64>, Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
354
type Function7 = fn(
355
    Option<f64>,
356
    Option<f64>,
357
    Option<f64>,
358
    Option<f64>,
359
    Option<f64>,
360
    Option<f64>,
361
    Option<f64>,
362
) -> Option<f64>;
363
type Function8 = fn(
364
    Option<f64>,
365
    Option<f64>,
366
    Option<f64>,
367
    Option<f64>,
368
    Option<f64>,
369
    Option<f64>,
370
    Option<f64>,
371
    Option<f64>,
372
) -> Option<f64>;
373

374
macro_rules! impl_expression_tuple_processor {
375
    ( $i:tt => $( $x:tt ),+ ) => {
376
        paste::paste! {
377
            impl_expression_tuple_processor!(
378
                @inner
379
                $i
380
                |
381
                $( $x ),*
382
                |
383
                $( [< pixel_ $x >] ),*
384
                |
385
                $( [< is_nodata_ $x >] ),*
386
                |
387
                [< Function $i >]
388
            );
389
        }
390
    };
391

392
    // We have `0, 1, 2, …` and `T0, T1, T2, …`
393
    (@inner $N:tt | $( $I:tt ),+ | $( $PIXEL:tt ),+ | $( $IS_NODATA:tt ),+ | $FN_T:ty ) => {
394
        #[async_trait]
395
        impl<TO> ExpressionTupleProcessor<TO> for ExpressionInput<$N>
396
        where
397
            TO: Pixel,
398
            f64: AsPrimitive<TO>,
399
        {
400
            type Tuple = [RasterTile2D<f64>; $N];
401

402
            #[inline]
403
            async fn zip_bands<'a>(
3✔
404
                &'a self,
3✔
405
                query: RasterQueryRectangle,
3✔
406
                ctx: &'a dyn QueryContext,
3✔
407
            ) -> Result<BoxStream<'a, Result<Self::Tuple>>> {
3✔
408
                // chunk up the stream to get all bands for a spatial tile at once
409
                let stream = self.raster.query(query, ctx).await?.chunks($N).map(|chunk| {
3✔
410
                    if chunk.len() != $N {
1✔
411
                        // if there are not exactly N tiles, it should mean the last tile was an error and the chunker ended prematurely
412
                        if let Some(Err(e)) = chunk.into_iter().last() {
×
413
                            return Err(e);
×
414
                        }
×
415
                        // if there is no error, the source did not produce all bands, which likely means a bug in an operator
×
416
                        return Err(crate::error::Error::MustNotHappen {
×
417
                            message: "source did not produce all bands".to_string(),
×
418
                        });
×
419
                    }
3✔
420

3✔
421
                    let mut ok_tiles = Vec::with_capacity($N);
3✔
422

423
                    for tile in chunk {
17✔
424
                        match tile {
14✔
425
                            Ok(tile) => ok_tiles.push(tile),
14✔
426
                            Err(e) => return Err(e),
×
427
                        }
428
                    }
429

430
                    let tuple: [RasterTile2D<f64>; $N] = ok_tiles
3✔
431
                        .try_into()
3✔
432
                        .expect("all chunks should be of the expected langth because it was checked above");
3✔
433

3✔
434
                    Ok(tuple)
3✔
435
                });
3✔
436

3✔
437
                Ok(stream.boxed())
3✔
438
            }
6✔
439

440
            #[inline]
441
            fn all_empty(tuple: &Self::Tuple) -> bool {
3✔
442
                $( tuple[$I].grid_array.is_empty() )&&*
×
443
            }
3✔
444

445
            #[inline]
446
            fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO> {
×
447
                tuple[0].clone().convert_data_type()
×
448
            }
×
449

450
            #[inline]
451
            fn metadata(tuple: &Self::Tuple) -> (TimeInterval, GridIdx2D, GeoTransform, GridShape2D, CacheHint) {
3✔
452
                let raster = &tuple[0];
3✔
453

3✔
454
                (
3✔
455
                    raster.time,
3✔
456
                    raster.tile_position,
3✔
457
                    raster.global_geo_transform,
3✔
458
                    raster.grid_shape(),
3✔
459
                    tuple.iter().fold(CacheHint::max_duration(), |acc, r| acc.merged(&r.cache_hint)),
14✔
460
                )
3✔
461
            }
3✔
462

463
            fn compute_expression(
3✔
464
                rasters: Self::Tuple,
3✔
465
                program: &LinkedExpression,
3✔
466
                map_no_data: bool,
3✔
467
            ) -> Result<GridOrEmpty2D<TO>> {
3✔
468
                let expression: Symbol<$FN_T> = unsafe {
3✔
469
                    // we have to "trust" that the function has the signature we expect
470
                    program.function_nary().map_err(RasterExpressionError::from)?
3✔
471
                };
472

473
                let map_fn = |lin_idx: usize| {
18✔
474
                    $(
18✔
475
                        let $PIXEL = rasters[$I].get_at_grid_index_unchecked(lin_idx);
18✔
476
                        let $IS_NODATA = $PIXEL.is_none();
18✔
477
                    )*
18✔
478

18✔
479
                    if !map_no_data && ( $($IS_NODATA)||* ) {
18✔
480
                        return None;
1✔
481
                    }
17✔
482

17✔
483
                    let result = expression(
17✔
484
                        $(
17✔
485
                            $PIXEL
17✔
486
                        ),*
17✔
487
                    );
17✔
488

17✔
489
                    result.map(TO::from_)
17✔
490
                };
18✔
491

492
                let grid_shape = rasters[0].grid_shape();
3✔
493
                let out = GridOrEmpty::from_index_fn_parallel(&grid_shape, map_fn);
3✔
494

3✔
495
                Result::Ok(out)
3✔
496
            }
3✔
497

498
            fn num_bands() -> u32 {
3✔
499
                $N
3✔
500
            }
3✔
501
        }
502
    };
503

504
    // For any input, generate `f64, bool`
505
    (@input_dtypes $x:tt) => {
506
        f64, bool
507
    };
508
}
509

510
impl_expression_tuple_processor!(3 => 0, 1, 2);
511
impl_expression_tuple_processor!(4 => 0, 1, 2, 3);
512
impl_expression_tuple_processor!(5 => 0, 1, 2, 3, 4);
513
impl_expression_tuple_processor!(6 => 0, 1, 2, 3, 4, 5);
514
impl_expression_tuple_processor!(7 => 0, 1, 2, 3, 4, 5, 6);
515
impl_expression_tuple_processor!(8 => 0, 1, 2, 3, 4, 5, 6, 7);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc