• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 16189529302

10 Jul 2025 08:02AM UTC coverage: 88.731%. First build
16189529302

Pull #1061

github

web-flow
Merge 345a798f4 into b8910c811
Pull Request #1061: feat(operators): skip empty tiles and merge masks in onnx; remove trace/debug in release mode

98 of 133 new or added lines in 12 files covered. (73.68%)

111296 of 125431 relevant lines covered (88.73%)

79561.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.5
/operators/src/processing/expression/raster_query_processor.rs
1
use super::RasterExpressionError;
2
use crate::{
3
    engine::{BoxRasterQueryProcessor, QueryContext, QueryProcessor, RasterResultDescriptor},
4
    ge_tracing_removed_trace,
5
    util::Result,
6
};
7
use async_trait::async_trait;
8
use futures::{StreamExt, TryStreamExt, stream::BoxStream};
9
use geoengine_datatypes::{
10
    primitives::{
11
        BandSelection, CacheHint, RasterQueryRectangle, SpatialPartition2D, TimeInterval,
12
    },
13
    raster::{
14
        ConvertDataType, FromIndexFnParallel, GeoTransform, GridIdx2D, GridIndexAccess,
15
        GridOrEmpty, GridOrEmpty2D, GridShape2D, GridShapeAccess, MapElementsParallel, Pixel,
16
        RasterTile2D,
17
    },
18
};
19
use geoengine_expression::LinkedExpression;
20
use libloading::Symbol;
21
use num_traits::AsPrimitive;
22
use std::{marker::PhantomData, sync::Arc};
23

24
pub struct ExpressionInput<const N: usize> {
25
    pub raster: BoxRasterQueryProcessor<f64>,
26
}
27

28
pub struct ExpressionQueryProcessor<TO, Sources>
29
where
30
    TO: Pixel,
31
{
32
    pub sources: Sources,
33
    pub result_descriptor: RasterResultDescriptor,
34
    pub phantom_data: PhantomData<TO>,
35
    pub program: Arc<LinkedExpression>,
36
    pub map_no_data: bool,
37
}
38

39
impl<TO, Sources> ExpressionQueryProcessor<TO, Sources>
40
where
41
    TO: Pixel,
42
{
43
    pub fn new(
16✔
44
        program: LinkedExpression,
16✔
45
        sources: Sources,
16✔
46
        result_descriptor: RasterResultDescriptor,
16✔
47
        map_no_data: bool,
16✔
48
    ) -> Self {
16✔
49
        Self {
16✔
50
            sources,
16✔
51
            result_descriptor,
16✔
52
            program: Arc::new(program),
16✔
53
            phantom_data: PhantomData,
16✔
54
            map_no_data,
16✔
55
        }
16✔
56
    }
16✔
57
}
58

59
#[async_trait]
60
impl<TO, Tuple> QueryProcessor for ExpressionQueryProcessor<TO, Tuple>
61
where
62
    TO: Pixel,
63
    Tuple: ExpressionTupleProcessor<TO>,
64
{
65
    type Output = RasterTile2D<TO>;
66
    type SpatialBounds = SpatialPartition2D;
67
    type Selection = BandSelection;
68
    type ResultDescription = RasterResultDescriptor;
69

70
    async fn _query<'b>(
71
        &'b self,
72
        query: RasterQueryRectangle,
73
        ctx: &'b dyn QueryContext,
74
    ) -> Result<BoxStream<'b, Result<Self::Output>>> {
42✔
75
        // rewrite query to request all input bands from the source. They are all combined in the single output band by means of the expression.
76
        let source_query = RasterQueryRectangle {
21✔
77
            spatial_bounds: query.spatial_bounds,
21✔
78
            time_interval: query.time_interval,
21✔
79
            spatial_resolution: query.spatial_resolution,
21✔
80
            attributes: BandSelection::first_n(Tuple::num_bands()),
21✔
81
        };
21✔
82

83
        let stream =
21✔
84
            self.sources
21✔
85
                .zip_bands(source_query, ctx)
21✔
86
                .await?
21✔
87
                .and_then(move |rasters| async move {
67✔
88
                    if Tuple::all_empty(&rasters) {
67✔
89
                        return Ok(Tuple::empty_raster(&rasters));
16✔
90
                    }
51✔
91

92
                    let (
93
                        out_time,
51✔
94
                        out_tile_position,
51✔
95
                        out_global_geo_transform,
51✔
96
                        _output_grid_shape,
51✔
97
                        cache_hint,
51✔
98
                    ) = Tuple::metadata(&rasters);
51✔
99

100
                    let program = self.program.clone();
51✔
101
                    let map_no_data = self.map_no_data;
51✔
102

103
                    let out = crate::util::spawn_blocking_with_thread_pool(
51✔
104
                        ctx.thread_pool().clone(),
51✔
105
                        move || Tuple::compute_expression(rasters, &program, map_no_data),
51✔
106
                    )
107
                    .await??;
51✔
108

109
                    Ok(RasterTile2D::new(
51✔
110
                        out_time,
51✔
111
                        out_tile_position,
51✔
112
                        0,
51✔
113
                        out_global_geo_transform,
51✔
114
                        out,
51✔
115
                        cache_hint,
51✔
116
                    ))
51✔
117
                });
134✔
118

119
        Ok(stream.boxed())
21✔
120
    }
42✔
121

122
    fn result_descriptor(&self) -> &RasterResultDescriptor {
52✔
123
        &self.result_descriptor
52✔
124
    }
52✔
125
}
126

127
#[async_trait]
128
trait ExpressionTupleProcessor<TO: Pixel>: Send + Sync {
129
    type Tuple: Send + 'static;
130

131
    async fn zip_bands<'a>(
132
        &'a self,
133
        query: RasterQueryRectangle,
134
        ctx: &'a dyn QueryContext,
135
    ) -> Result<BoxStream<'a, Result<Self::Tuple>>>;
136

137
    fn all_empty(tuple: &Self::Tuple) -> bool;
138

139
    fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO>;
140

141
    fn metadata(
142
        tuple: &Self::Tuple,
143
    ) -> (
144
        TimeInterval,
145
        GridIdx2D,
146
        GeoTransform,
147
        GridShape2D,
148
        CacheHint,
149
    );
150

151
    fn compute_expression(
152
        tuple: Self::Tuple,
153
        program: &LinkedExpression,
154
        map_no_data: bool,
155
    ) -> Result<GridOrEmpty2D<TO>>;
156

157
    fn num_bands() -> u32;
158
}
159

160
#[async_trait]
161
impl<TO> ExpressionTupleProcessor<TO> for ExpressionInput<1>
162
where
163
    TO: Pixel,
164
    f64: AsPrimitive<TO>,
165
{
166
    type Tuple = RasterTile2D<f64>;
167

168
    #[inline]
169
    async fn zip_bands<'a>(
170
        &'a self,
171
        query: RasterQueryRectangle,
172
        ctx: &'a dyn QueryContext,
173
    ) -> Result<BoxStream<'a, Result<Self::Tuple>>> {
26✔
174
        self.raster.query(query, ctx).await
13✔
175
    }
26✔
176

177
    #[inline]
178
    fn all_empty(tuple: &Self::Tuple) -> bool {
37✔
179
        tuple.grid_array.is_empty()
37✔
180
    }
37✔
181

182
    #[inline]
183
    fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO> {
16✔
184
        tuple.clone().convert_data_type()
16✔
185
    }
16✔
186

187
    #[inline]
188
    fn metadata(
21✔
189
        tuple: &Self::Tuple,
21✔
190
    ) -> (
21✔
191
        TimeInterval,
21✔
192
        GridIdx2D,
21✔
193
        GeoTransform,
21✔
194
        GridShape2D,
21✔
195
        CacheHint,
21✔
196
    ) {
21✔
197
        let raster = &tuple;
21✔
198

199
        (
21✔
200
            raster.time,
21✔
201
            raster.tile_position,
21✔
202
            raster.global_geo_transform,
21✔
203
            raster.grid_shape(),
21✔
204
            raster.cache_hint,
21✔
205
        )
21✔
206
    }
21✔
207

208
    #[inline]
209
    fn compute_expression(
21✔
210
        raster: Self::Tuple,
21✔
211
        program: &LinkedExpression,
21✔
212
        map_no_data: bool,
21✔
213
    ) -> Result<GridOrEmpty2D<TO>> {
21✔
214
        let expression = unsafe {
21✔
215
            // we have to "trust" that the function has the signature we expect
216
            program
21✔
217
                .function_1::<Option<f64>>()
21✔
218
                .map_err(RasterExpressionError::from)?
21✔
219
        };
220

221
        let map_fn = |in_value: Option<f64>| {
2,097,230✔
222
            // TODO: could be a |in_value: T1| if map no data is false!
223
            if !map_no_data && in_value.is_none() {
2,097,230✔
224
                return None;
1,967,945✔
225
            }
129,285✔
226

227
            let result = expression(in_value);
129,285✔
228

229
            result.map(TO::from_)
129,285✔
230
        };
2,097,230✔
231

232
        let res = raster.grid_array.map_elements_parallel(map_fn);
21✔
233

234
        Result::Ok(res)
21✔
235
    }
21✔
236

237
    fn num_bands() -> u32 {
13✔
238
        1
13✔
239
    }
13✔
240
}
241

242
// TODO: implement this via macro for 2-8 sources
243
#[async_trait]
244
impl<TO> ExpressionTupleProcessor<TO> for ExpressionInput<2>
245
where
246
    TO: Pixel,
247
    f64: AsPrimitive<TO>,
248
{
249
    type Tuple = (RasterTile2D<f64>, RasterTile2D<f64>);
250

251
    #[inline]
252
    async fn zip_bands<'a>(
253
        &'a self,
254
        query: RasterQueryRectangle,
255
        ctx: &'a dyn QueryContext,
256
    ) -> Result<BoxStream<'a, Result<Self::Tuple>>> {
10✔
257
        // chunk up the stream to get all bands for a spatial tile at once
258
        let stream = self.raster.query(query, ctx).await?.chunks(2).map(|chunk| {
27✔
259
            if chunk.len() != 2 {
27✔
260
                // if there are not exactly two tiles, it should mean the last tile was an error and the chunker ended prematurely
261
                if let Some(Err(e)) = chunk.into_iter().next_back() {
×
262
                    return Err(e);
×
263
                }
×
264
                // if there is no error, the source did not produce all bands, which likely means a bug in an operator
265
                return Err(crate::error::Error::MustNotHappen {
×
266
                    message: "source did not produce all bands".to_string(),
×
267
                });
×
268
            }
27✔
269

270
            let [a, b]: [Result<RasterTile2D<f64>>; 2] = chunk
27✔
271
                .try_into()
27✔
272
                .expect("all chunks should be of length 2 because it was checked above");
27✔
273
            match (a, b) {
27✔
274
                (Ok(t0), Ok(t1)) => Ok((t0, t1)),
27✔
275
                (Err(e), _) | (_, Err(e)) => Err(e),
×
276
            }
277
        });
27✔
278

279
        Ok(stream.boxed())
5✔
280
    }
10✔
281

282
    #[inline]
283
    fn all_empty(tuple: &Self::Tuple) -> bool {
27✔
284
        tuple.0.grid_array.is_empty() && tuple.1.grid_array.is_empty()
27✔
285
    }
27✔
286

287
    #[inline]
288
    fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO> {
×
289
        tuple.0.clone().convert_data_type()
×
290
    }
×
291

292
    #[inline]
293
    fn metadata(
27✔
294
        tuple: &Self::Tuple,
27✔
295
    ) -> (
27✔
296
        TimeInterval,
27✔
297
        GridIdx2D,
27✔
298
        GeoTransform,
27✔
299
        GridShape2D,
27✔
300
        CacheHint,
27✔
301
    ) {
27✔
302
        let raster = &tuple.0;
27✔
303

304
        (
27✔
305
            raster.time,
27✔
306
            raster.tile_position,
27✔
307
            raster.global_geo_transform,
27✔
308
            raster.grid_shape(),
27✔
309
            tuple.0.cache_hint.merged(&tuple.1.cache_hint),
27✔
310
        )
27✔
311
    }
27✔
312

313
    #[inline]
314
    fn compute_expression(
27✔
315
        rasters: Self::Tuple,
27✔
316
        program: &LinkedExpression,
27✔
317
        map_no_data: bool,
27✔
318
    ) -> Result<GridOrEmpty2D<TO>> {
27✔
319
        let expression = unsafe {
27✔
320
            // we have to "trust" that the function has the signature we expect
321
            program
27✔
322
                .function_2::<Option<f64>, Option<f64>>()
27✔
323
                .map_err(RasterExpressionError::from)?
27✔
324
        };
325

326
        let map_fn = |lin_idx: usize| {
6,291,474✔
327
            let t0_value = rasters.0.get_at_grid_index_unchecked(lin_idx);
6,291,474✔
328
            let t1_value = rasters.1.get_at_grid_index_unchecked(lin_idx);
6,291,474✔
329

330
            if !map_no_data && (t0_value.is_none() || t1_value.is_none()) {
6,291,474✔
331
                return None;
6,221,747✔
332
            }
69,727✔
333

334
            let result = expression(t0_value, t1_value);
69,727✔
335

336
            result.map(TO::from_)
69,727✔
337
        };
6,291,474✔
338

339
        let grid_shape = rasters.0.grid_shape();
27✔
340
        let out = GridOrEmpty::from_index_fn_parallel(&grid_shape, map_fn);
27✔
341

342
        let out_2 = match out {
27✔
343
            GridOrEmpty::Empty(t) => GridOrEmpty::Empty(t),
12✔
344
            GridOrEmpty::Grid(t) => {
15✔
345
                if t.validity_mask.data.iter().all(|&t| !t) {
1,405,559✔
NEW
346
                    ge_tracing_removed_trace!("Converting empty expression output to EmptyGrid");
×
NEW
347
                    GridOrEmpty::new_empty_shape(t.grid_shape())
×
348
                } else {
349
                    GridOrEmpty::Grid(t)
15✔
350
                }
351
            }
352
        };
353

354
        Result::Ok(out_2)
27✔
355
    }
27✔
356

357
    fn num_bands() -> u32 {
5✔
358
        2
5✔
359
    }
5✔
360
}
361

362
type Function3 = fn(Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
363
type Function4 = fn(Option<f64>, Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
364
type Function5 = fn(Option<f64>, Option<f64>, Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
365
type Function6 =
366
    fn(Option<f64>, Option<f64>, Option<f64>, Option<f64>, Option<f64>, Option<f64>) -> Option<f64>;
367
type Function7 = fn(
368
    Option<f64>,
369
    Option<f64>,
370
    Option<f64>,
371
    Option<f64>,
372
    Option<f64>,
373
    Option<f64>,
374
    Option<f64>,
375
) -> Option<f64>;
376
type Function8 = fn(
377
    Option<f64>,
378
    Option<f64>,
379
    Option<f64>,
380
    Option<f64>,
381
    Option<f64>,
382
    Option<f64>,
383
    Option<f64>,
384
    Option<f64>,
385
) -> Option<f64>;
386

387
macro_rules! impl_expression_tuple_processor {
388
    ( $i:tt => $( $x:tt ),+ ) => {
389
        paste::paste! {
390
            impl_expression_tuple_processor!(
391
                @inner
392
                $i
393
                |
394
                $( $x ),*
395
                |
396
                $( [< pixel_ $x >] ),*
397
                |
398
                $( [< is_nodata_ $x >] ),*
399
                |
400
                [< Function $i >]
401
            );
402
        }
403
    };
404

405
    // We have `0, 1, 2, …` and `T0, T1, T2, …`
406
    (@inner $N:tt | $( $I:tt ),+ | $( $PIXEL:tt ),+ | $( $IS_NODATA:tt ),+ | $FN_T:ty ) => {
407
        #[async_trait]
408
        impl<TO> ExpressionTupleProcessor<TO> for ExpressionInput<$N>
409
        where
410
            TO: Pixel,
411
            f64: AsPrimitive<TO>,
412
        {
413
            type Tuple = [RasterTile2D<f64>; $N];
414

415
            #[inline]
416
            async fn zip_bands<'a>(
3✔
417
                &'a self,
418
                query: RasterQueryRectangle,
419
                ctx: &'a dyn QueryContext,
420
            ) -> Result<BoxStream<'a, Result<Self::Tuple>>> {
3✔
421
                // chunk up the stream to get all bands for a spatial tile at once
422
                let stream = self.raster.query(query, ctx).await?.chunks($N).map(|chunk| {
3✔
423
                    if chunk.len() != $N {
1✔
424
                        // if there are not exactly N tiles, it should mean the last tile was an error and the chunker ended prematurely
425
                        if let Some(Err(e)) = chunk.into_iter().last() {
×
426
                            return Err(e);
×
427
                        }
×
428
                        // if there is no error, the source did not produce all bands, which likely means a bug in an operator
429
                        return Err(crate::error::Error::MustNotHappen {
×
430
                            message: "source did not produce all bands".to_string(),
×
431
                        });
×
432
                    }
3✔
433

434
                    let mut ok_tiles = Vec::with_capacity($N);
3✔
435

436
                    for tile in chunk {
17✔
437
                        match tile {
14✔
438
                            Ok(tile) => ok_tiles.push(tile),
14✔
439
                            Err(e) => return Err(e),
×
440
                        }
441
                    }
442

443
                    let tuple: [RasterTile2D<f64>; $N] = ok_tiles
3✔
444
                        .try_into()
3✔
445
                        .expect("all chunks should be of the expected langth because it was checked above");
3✔
446

447
                    Ok(tuple)
3✔
448
                });
3✔
449

450
                Ok(stream.boxed())
3✔
451
            }
6✔
452

453
            #[inline]
454
            fn all_empty(tuple: &Self::Tuple) -> bool {
3✔
455
                $( tuple[$I].grid_array.is_empty() )&&*
×
456
            }
3✔
457

458
            #[inline]
459
            fn empty_raster(tuple: &Self::Tuple) -> RasterTile2D<TO> {
×
460
                tuple[0].clone().convert_data_type()
×
461
            }
×
462

463
            #[inline]
464
            fn metadata(tuple: &Self::Tuple) -> (TimeInterval, GridIdx2D, GeoTransform, GridShape2D, CacheHint) {
3✔
465
                let raster = &tuple[0];
3✔
466

467
                (
468
                    raster.time,
3✔
469
                    raster.tile_position,
3✔
470
                    raster.global_geo_transform,
3✔
471
                    raster.grid_shape(),
3✔
472
                    tuple.iter().fold(CacheHint::max_duration(), |acc, r| acc.merged(&r.cache_hint)),
14✔
473
                )
474
            }
3✔
475

476
            fn compute_expression(
3✔
477
                rasters: Self::Tuple,
3✔
478
                program: &LinkedExpression,
3✔
479
                map_no_data: bool,
3✔
480
            ) -> Result<GridOrEmpty2D<TO>> {
3✔
481
                let expression: Symbol<$FN_T> = unsafe {
3✔
482
                    // we have to "trust" that the function has the signature we expect
483
                    program.function_nary().map_err(RasterExpressionError::from)?
3✔
484
                };
485

486
                let map_fn = |lin_idx: usize| {
18✔
487
                    $(
488
                        let $PIXEL = rasters[$I].get_at_grid_index_unchecked(lin_idx);
18✔
489
                        let $IS_NODATA = $PIXEL.is_none();
18✔
490
                    )*
491

492
                    if !map_no_data && ( $($IS_NODATA)||* ) {
18✔
493
                        return None;
1✔
494
                    }
17✔
495

496
                    let result = expression(
17✔
497
                        $(
17✔
498
                            $PIXEL
17✔
499
                        ),*
17✔
500
                    );
17✔
501

502
                    result.map(TO::from_)
17✔
503
                };
18✔
504

505
                let grid_shape = rasters[0].grid_shape();
3✔
506
                let out = GridOrEmpty::from_index_fn_parallel(&grid_shape, map_fn);
3✔
507

508
                Result::Ok(out)
3✔
509
            }
3✔
510

511
            fn num_bands() -> u32 {
3✔
512
                $N
513
            }
3✔
514
        }
515
    };
516

517
    // For any input, generate `f64, bool`
518
    (@input_dtypes $x:tt) => {
519
        f64, bool
520
    };
521
}
522

523
impl_expression_tuple_processor!(3 => 0, 1, 2);
524
impl_expression_tuple_processor!(4 => 0, 1, 2, 3);
525
impl_expression_tuple_processor!(5 => 0, 1, 2, 3, 4);
526
impl_expression_tuple_processor!(6 => 0, 1, 2, 3, 4, 5);
527
impl_expression_tuple_processor!(7 => 0, 1, 2, 3, 4, 5, 6);
528
impl_expression_tuple_processor!(8 => 0, 1, 2, 3, 4, 5, 6, 7);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc