• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.41
/operators/src/processing/bandwise_expression/mod.rs
1
use std::sync::Arc;
2

3
use crate::engine::{
4
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
5
    OperatorName, QueryContext, RasterOperator, RasterQueryProcessor, RasterResultDescriptor,
6
    ResultDescriptor, SingleRasterSource, TypedRasterQueryProcessor, WorkflowOperatorPath,
7
};
8

9
use crate::util::Result;
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::{StreamExt, TryStreamExt};
13
use geoengine_datatypes::primitives::RasterQueryRectangle;
14
use geoengine_datatypes::raster::{
15
    GridOrEmpty2D, MapElementsParallel, Pixel, RasterDataType, RasterTile2D,
16
};
17
use geoengine_expression::{
18
    DataType, ExpressionAst, ExpressionParser, LinkedExpression, Parameter,
19
};
20
use serde::{Deserialize, Serialize};
21

22
use super::expression::get_expression_dependencies;
23
use super::RasterExpressionError;
24

25
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
×
26
#[serde(rename_all = "camelCase")]
27
pub struct BandwiseExpressionParams {
28
    pub expression: String,
29
    pub output_type: RasterDataType,
30
    pub map_no_data: bool,
31
    // TODO: new unit for each band?
32
}
33

34
/// This `QueryProcessor` performs a unary expression on all bands of its input raster series.
35
pub type BandwiseExpression = Operator<BandwiseExpressionParams, SingleRasterSource>;
36

37
impl OperatorName for BandwiseExpression {
38
    const TYPE_NAME: &'static str = "BandwiseExpression";
39
}
40

41
#[typetag::serde]
×
42
#[async_trait]
43
impl RasterOperator for BandwiseExpression {
44
    async fn _initialize(
45
        self: Box<Self>,
46
        path: WorkflowOperatorPath,
47
        context: &dyn ExecutionContext,
48
    ) -> Result<Box<dyn InitializedRasterOperator>> {
1✔
49
        let name = CanonicOperatorName::from(&self);
1✔
50

51
        let source = self.sources.initialize_sources(path, context).await?.raster;
1✔
52

53
        let in_descriptor = source.result_descriptor();
1✔
54

1✔
55
        // TODO: ensure all bands have same measurement unit?
1✔
56

1✔
57
        let result_descriptor = in_descriptor.map_data_type(|_| self.params.output_type);
1✔
58

1✔
59
        let parameters = vec![Parameter::Number("x".into())];
1✔
60

61
        let expression = ExpressionParser::new(&parameters, DataType::Number)
1✔
62
            .map_err(RasterExpressionError::from)?
1✔
63
            .parse(
1✔
64
                "expression", // TODO: what is the name used for?
1✔
65
                &self.params.expression,
1✔
66
            )
1✔
67
            .map_err(RasterExpressionError::from)?;
1✔
68

69
        Ok(Box::new(InitializedBandwiseExpression {
1✔
70
            name,
1✔
71
            result_descriptor,
1✔
72
            source,
1✔
73
            expression,
1✔
74
            map_no_data: self.params.map_no_data,
1✔
75
        }))
1✔
76
    }
2✔
77

78
    span_fn!(BandwiseExpression);
79
}
80

81
pub struct InitializedBandwiseExpression {
82
    name: CanonicOperatorName,
83
    result_descriptor: RasterResultDescriptor,
84
    source: Box<dyn InitializedRasterOperator>,
85
    expression: ExpressionAst,
86
    map_no_data: bool,
87
}
88

89
impl InitializedRasterOperator for InitializedBandwiseExpression {
90
    fn result_descriptor(&self) -> &RasterResultDescriptor {
1✔
91
        &self.result_descriptor
1✔
92
    }
1✔
93

94
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
95
        let typed_raster_processor = self.source.query_processor()?.into_f64();
1✔
96

1✔
97
        let output_type = self.result_descriptor().data_type;
1✔
98

99
        // TODO: spawn a blocking task for the compilation process
100
        let expression_dependencies = get_expression_dependencies()
1✔
101
            .map_err(|source| RasterExpressionError::Dependencies { source })?;
1✔
102

103
        let expression = LinkedExpression::new(
1✔
104
            self.expression.name(),
1✔
105
            &self.expression.code(),
1✔
106
            expression_dependencies,
1✔
107
        )
1✔
108
        .map_err(RasterExpressionError::from)?;
1✔
109

110
        Ok(call_generic_raster_processor!(
×
111
            output_type,
1✔
112
            BandwiseExpressionProcessor::new(
1✔
113
                typed_raster_processor,
1✔
114
                self.result_descriptor.clone(),
1✔
115
                expression,
1✔
116
                self.map_no_data
1✔
117
            )
1✔
118
            .boxed()
1✔
119
        ))
120
    }
1✔
121

122
    fn canonic_name(&self) -> CanonicOperatorName {
×
123
        self.name.clone()
×
124
    }
×
125
}
126

127
pub(crate) struct BandwiseExpressionProcessor<TO> {
128
    source: Box<dyn RasterQueryProcessor<RasterType = f64>>,
129
    result_descriptor: RasterResultDescriptor,
130
    expression: Arc<LinkedExpression>,
131
    map_no_data: bool,
132
    phantom: std::marker::PhantomData<TO>,
133
}
134

135
impl<TO> BandwiseExpressionProcessor<TO>
136
where
137
    TO: Pixel,
138
{
139
    pub fn new(
1✔
140
        source: Box<dyn RasterQueryProcessor<RasterType = f64>>,
1✔
141
        result_descriptor: RasterResultDescriptor,
1✔
142
        expression: LinkedExpression,
1✔
143
        map_no_data: bool,
1✔
144
    ) -> Self {
1✔
145
        Self {
1✔
146
            source,
1✔
147
            result_descriptor,
1✔
148
            expression: Arc::new(expression),
1✔
149
            map_no_data,
1✔
150
            phantom: Default::default(),
1✔
151
        }
1✔
152
    }
1✔
153

154
    #[inline]
155
    fn compute_expression(
8✔
156
        raster: RasterTile2D<f64>,
8✔
157
        expression: &LinkedExpression,
8✔
158
        map_no_data: bool,
8✔
159
    ) -> Result<GridOrEmpty2D<TO>> {
8✔
160
        let expression = unsafe {
8✔
161
            // we have to "trust" that the function has the signature we expect
162
            expression
8✔
163
                .function_1::<Option<f64>>()
8✔
164
                .map_err(RasterExpressionError::from)?
8✔
165
        };
166

167
        let map_fn = |in_value: Option<f64>| {
32✔
168
            // TODO: could be a |in_value: T1| if map no data is false!
32✔
169
            if !map_no_data && in_value.is_none() {
32✔
170
                return None;
×
171
            }
32✔
172

32✔
173
            let result = expression(in_value);
32✔
174

32✔
175
            result.map(TO::from_)
32✔
176
        };
32✔
177

178
        let res = raster.grid_array.map_elements_parallel(map_fn);
8✔
179

8✔
180
        Result::Ok(res)
8✔
181
    }
8✔
182
}
183

184
#[async_trait]
185
impl<TO> RasterQueryProcessor for BandwiseExpressionProcessor<TO>
186
where
187
    TO: Pixel,
188
{
189
    type RasterType = TO;
190

191
    async fn raster_query<'a>(
192
        &'a self,
193
        query: RasterQueryRectangle,
194
        ctx: &'a dyn QueryContext,
195
    ) -> Result<BoxStream<'a, Result<RasterTile2D<TO>>>> {
1✔
196
        let stream = self
1✔
197
            .source
1✔
198
            .raster_query(query, ctx)
1✔
UNCOV
199
            .await?
×
200
            .and_then(move |tile| async move {
8✔
201
                let expression = self.expression.clone();
8✔
202
                let map_no_data = self.map_no_data;
8✔
203

8✔
204
                let time = tile.time;
8✔
205
                let tile_position = tile.tile_position;
8✔
206
                let band = tile.band;
8✔
207
                let global_geo_transform = tile.global_geo_transform;
8✔
208
                let cache_hint = tile.cache_hint;
8✔
209

210
                let out = crate::util::spawn_blocking_with_thread_pool(
8✔
211
                    ctx.thread_pool().clone(),
8✔
212
                    move || Self::compute_expression(tile, &expression, map_no_data),
8✔
213
                )
8✔
214
                .await??;
8✔
215

216
                Ok(RasterTile2D::new(
8✔
217
                    time,
8✔
218
                    tile_position,
8✔
219
                    band,
8✔
220
                    global_geo_transform,
8✔
221
                    out,
8✔
222
                    cache_hint,
8✔
223
                ))
8✔
224
            });
16✔
225

1✔
226
        Ok(stream.boxed())
1✔
227
    }
2✔
228

229
    fn raster_result_descriptor(&self) -> &RasterResultDescriptor {
1✔
230
        &self.result_descriptor
1✔
231
    }
1✔
232
}
233

234
#[cfg(test)]
235
mod tests {
236
    use geoengine_datatypes::{
237
        primitives::{CacheHint, SpatialPartition2D, SpatialResolution, TimeInterval},
238
        raster::{Grid, GridShape, MapElements, RenameBands, TilesEqualIgnoringCacheHint},
239
        spatial_reference::SpatialReference,
240
        util::test::TestDefault,
241
    };
242

243
    use crate::{
244
        engine::{
245
            MockExecutionContext, MockQueryContext, MultipleRasterSources, RasterBandDescriptors,
246
        },
247
        mock::{MockRasterSource, MockRasterSourceParams},
248
        processing::{RasterStacker, RasterStackerParams},
249
    };
250

251
    use super::*;
252

253
    #[tokio::test]
254
    #[allow(clippy::too_many_lines)]
255
    async fn it_computes_bandwise_expression() {
1✔
256
        let data: Vec<RasterTile2D<u8>> = vec![
1✔
257
            RasterTile2D {
1✔
258
                time: TimeInterval::new_unchecked(0, 5),
1✔
259
                tile_position: [-1, 0].into(),
1✔
260
                band: 0,
1✔
261
                global_geo_transform: TestDefault::test_default(),
1✔
262
                grid_array: Grid::new([2, 2].into(), vec![0, 1, 2, 3]).unwrap().into(),
1✔
263
                properties: Default::default(),
1✔
264
                cache_hint: CacheHint::default(),
1✔
265
            },
1✔
266
            RasterTile2D {
1✔
267
                time: TimeInterval::new_unchecked(0, 5),
1✔
268
                tile_position: [-1, 1].into(),
1✔
269
                band: 0,
1✔
270
                global_geo_transform: TestDefault::test_default(),
1✔
271
                grid_array: Grid::new([2, 2].into(), vec![4, 5, 6, 7]).unwrap().into(),
1✔
272
                properties: Default::default(),
1✔
273
                cache_hint: CacheHint::default(),
1✔
274
            },
1✔
275
            RasterTile2D {
1✔
276
                time: TimeInterval::new_unchecked(5, 10),
1✔
277
                tile_position: [-1, 0].into(),
1✔
278
                band: 0,
1✔
279
                global_geo_transform: TestDefault::test_default(),
1✔
280
                grid_array: Grid::new([2, 2].into(), vec![8, 9, 10, 11]).unwrap().into(),
1✔
281
                properties: Default::default(),
1✔
282
                cache_hint: CacheHint::default(),
1✔
283
            },
1✔
284
            RasterTile2D {
1✔
285
                time: TimeInterval::new_unchecked(5, 10),
1✔
286
                tile_position: [-1, 1].into(),
1✔
287
                band: 0,
1✔
288
                global_geo_transform: TestDefault::test_default(),
1✔
289
                grid_array: Grid::new([2, 2].into(), vec![12, 13, 14, 15])
1✔
290
                    .unwrap()
1✔
291
                    .into(),
1✔
292
                properties: Default::default(),
1✔
293
                cache_hint: CacheHint::default(),
1✔
294
            },
1✔
295
        ];
1✔
296

1✔
297
        let data2: Vec<RasterTile2D<u8>> = vec![
1✔
298
            RasterTile2D {
1✔
299
                time: TimeInterval::new_unchecked(0, 5),
1✔
300
                tile_position: [-1, 0].into(),
1✔
301
                band: 0,
1✔
302
                global_geo_transform: TestDefault::test_default(),
1✔
303
                grid_array: Grid::new([2, 2].into(), vec![16, 17, 18, 19])
1✔
304
                    .unwrap()
1✔
305
                    .into(),
1✔
306
                properties: Default::default(),
1✔
307
                cache_hint: CacheHint::default(),
1✔
308
            },
1✔
309
            RasterTile2D {
1✔
310
                time: TimeInterval::new_unchecked(0, 5),
1✔
311
                tile_position: [-1, 1].into(),
1✔
312
                band: 0,
1✔
313
                global_geo_transform: TestDefault::test_default(),
1✔
314
                grid_array: Grid::new([2, 2].into(), vec![20, 21, 22, 23])
1✔
315
                    .unwrap()
1✔
316
                    .into(),
1✔
317
                properties: Default::default(),
1✔
318
                cache_hint: CacheHint::default(),
1✔
319
            },
1✔
320
            RasterTile2D {
1✔
321
                time: TimeInterval::new_unchecked(5, 10),
1✔
322
                tile_position: [-1, 0].into(),
1✔
323
                band: 0,
1✔
324
                global_geo_transform: TestDefault::test_default(),
1✔
325
                grid_array: Grid::new([2, 2].into(), vec![24, 25, 26, 27])
1✔
326
                    .unwrap()
1✔
327
                    .into(),
1✔
328
                properties: Default::default(),
1✔
329
                cache_hint: CacheHint::default(),
1✔
330
            },
1✔
331
            RasterTile2D {
1✔
332
                time: TimeInterval::new_unchecked(5, 10),
1✔
333
                tile_position: [-1, 1].into(),
1✔
334
                band: 0,
1✔
335
                global_geo_transform: TestDefault::test_default(),
1✔
336
                grid_array: Grid::new([2, 2].into(), vec![28, 29, 30, 31])
1✔
337
                    .unwrap()
1✔
338
                    .into(),
1✔
339
                properties: Default::default(),
1✔
340
                cache_hint: CacheHint::default(),
1✔
341
            },
1✔
342
        ];
1✔
343

1✔
344
        let mrs1 = MockRasterSource {
1✔
345
            params: MockRasterSourceParams {
1✔
346
                data: data.clone(),
1✔
347
                result_descriptor: RasterResultDescriptor {
1✔
348
                    data_type: RasterDataType::U8,
1✔
349
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
350
                    time: None,
1✔
351
                    bbox: None,
1✔
352
                    resolution: None,
1✔
353
                    bands: RasterBandDescriptors::new_single_band(),
1✔
354
                },
1✔
355
            },
1✔
356
        }
1✔
357
        .boxed();
1✔
358

1✔
359
        let mrs2 = MockRasterSource {
1✔
360
            params: MockRasterSourceParams {
1✔
361
                data: data2.clone(),
1✔
362
                result_descriptor: RasterResultDescriptor {
1✔
363
                    data_type: RasterDataType::U8,
1✔
364
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
365
                    time: None,
1✔
366
                    bbox: None,
1✔
367
                    resolution: None,
1✔
368
                    bands: RasterBandDescriptors::new_single_band(),
1✔
369
                },
1✔
370
            },
1✔
371
        }
1✔
372
        .boxed();
1✔
373

1✔
374
        let stacker = RasterStacker {
1✔
375
            params: RasterStackerParams {
1✔
376
                rename_bands: RenameBands::Default,
1✔
377
            },
1✔
378
            sources: MultipleRasterSources {
1✔
379
                rasters: vec![mrs1, mrs2],
1✔
380
            },
1✔
381
        }
1✔
382
        .boxed();
1✔
383

1✔
384
        let expression = BandwiseExpression {
1✔
385
            params: BandwiseExpressionParams {
1✔
386
                expression: "x + 1".to_string(),
1✔
387
                output_type: RasterDataType::U8,
1✔
388
                map_no_data: false,
1✔
389
            },
1✔
390
            sources: SingleRasterSource { raster: stacker },
1✔
391
        }
1✔
392
        .boxed();
1✔
393

1✔
394
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
395
        exe_ctx.tiling_specification.tile_size_in_pixels = GridShape {
1✔
396
            shape_array: [2, 2],
1✔
397
        };
1✔
398

1✔
399
        let query_rect = RasterQueryRectangle {
1✔
400
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 1.).into(), (3., 0.).into()),
1✔
401
            time_interval: TimeInterval::new_unchecked(0, 10),
1✔
402
            spatial_resolution: SpatialResolution::one(),
1✔
403
            attributes: [0, 1].try_into().unwrap(),
1✔
404
        };
1✔
405

1✔
406
        let query_ctx = MockQueryContext::test_default();
1✔
407

1✔
408
        let op = expression
1✔
409
            .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
410
            .await
1✔
411
            .unwrap();
1✔
412

1✔
413
        let qp = op.query_processor().unwrap().get_u8().unwrap();
1✔
414

1✔
415
        let result = qp
1✔
416
            .raster_query(query_rect, &query_ctx)
1✔
417
            .await
1✔
418
            .unwrap()
1✔
419
            .collect::<Vec<_>>()
1✔
420
            .await;
16✔
421
        let result = result.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
422

1✔
423
        let expected: Vec<RasterTile2D<u8>> = data
1✔
424
            .into_iter()
1✔
425
            .zip(data2.into_iter().map(|mut tile| {
4✔
426
                tile.band = 1;
4✔
427
                tile
4✔
428
            }))
4✔
429
            .flat_map(|(a, b)| vec![a.clone(), b.clone()])
4✔
430
            .map(|mut tile| {
8✔
431
                tile.grid_array = tile.grid_array.map_elements(|in_value: u8| in_value + 1);
32✔
432
                tile
8✔
433
            })
8✔
434
            .collect();
1✔
435

1✔
436
        assert!(expected.tiles_equal_ignoring_cache_hint(&result));
1✔
437
    }
1✔
438
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc