• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.23
/operators/src/processing/bandwise_expression/mod.rs
1
use std::sync::Arc;
2

3
use crate::engine::{
4
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
5
    OperatorName, QueryContext, RasterOperator, RasterQueryProcessor, RasterResultDescriptor,
6
    ResultDescriptor, SingleRasterSource, TypedRasterQueryProcessor, WorkflowOperatorPath,
7
};
8

9
use crate::util::Result;
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::{StreamExt, TryStreamExt};
13
use geoengine_datatypes::primitives::RasterQueryRectangle;
14
use geoengine_datatypes::raster::{
15
    GridOrEmpty2D, MapElementsParallel, Pixel, RasterDataType, RasterTile2D,
16
};
17
use geoengine_expression::{
18
    DataType, ExpressionAst, ExpressionParser, LinkedExpression, Parameter,
19
};
20
use serde::{Deserialize, Serialize};
21

22
use super::expression::get_expression_dependencies;
23
use super::RasterExpressionError;
24

25
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
26
#[serde(rename_all = "camelCase")]
27
pub struct BandwiseExpressionParams {
28
    pub expression: String,
29
    pub output_type: RasterDataType,
30
    pub map_no_data: bool,
31
    // TODO: new unit for each band?
32
}
33

34
/// This `QueryProcessor` performs a unary expression on all bands of its input raster series.
35
pub type BandwiseExpression = Operator<BandwiseExpressionParams, SingleRasterSource>;
36

37
impl OperatorName for BandwiseExpression {
38
    const TYPE_NAME: &'static str = "BandwiseExpression";
39
}
40

41
#[typetag::serde]
×
42
#[async_trait]
43
impl RasterOperator for BandwiseExpression {
44
    async fn _initialize(
45
        self: Box<Self>,
46
        path: WorkflowOperatorPath,
47
        context: &dyn ExecutionContext,
48
    ) -> Result<Box<dyn InitializedRasterOperator>> {
1✔
49
        let name = CanonicOperatorName::from(&self);
1✔
50

51
        let source = self
1✔
52
            .sources
1✔
53
            .initialize_sources(path.clone(), context)
1✔
54
            .await?
1✔
55
            .raster;
56

57
        let in_descriptor = source.result_descriptor();
1✔
58

1✔
59
        // TODO: ensure all bands have same measurement unit?
1✔
60

1✔
61
        let result_descriptor = in_descriptor.map_data_type(|_| self.params.output_type);
1✔
62

1✔
63
        let parameters = vec![Parameter::Number("x".into())];
1✔
64

65
        let expression = ExpressionParser::new(&parameters, DataType::Number)
1✔
66
            .map_err(RasterExpressionError::from)?
1✔
67
            .parse(
1✔
68
                "expression", // TODO: what is the name used for?
1✔
69
                &self.params.expression,
1✔
70
            )
1✔
71
            .map_err(RasterExpressionError::from)?;
1✔
72

73
        Ok(Box::new(InitializedBandwiseExpression {
1✔
74
            name,
1✔
75
            path,
1✔
76
            result_descriptor,
1✔
77
            source,
1✔
78
            expression,
1✔
79
            map_no_data: self.params.map_no_data,
1✔
80
        }))
1✔
81
    }
2✔
82

83
    span_fn!(BandwiseExpression);
84
}
85

86
pub struct InitializedBandwiseExpression {
87
    name: CanonicOperatorName,
88
    path: WorkflowOperatorPath,
89
    result_descriptor: RasterResultDescriptor,
90
    source: Box<dyn InitializedRasterOperator>,
91
    expression: ExpressionAst,
92
    map_no_data: bool,
93
}
94

95
impl InitializedRasterOperator for InitializedBandwiseExpression {
96
    fn result_descriptor(&self) -> &RasterResultDescriptor {
1✔
97
        &self.result_descriptor
1✔
98
    }
1✔
99

100
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
101
        let typed_raster_processor = self.source.query_processor()?.into_f64();
1✔
102

1✔
103
        let output_type = self.result_descriptor().data_type;
1✔
104

105
        // TODO: spawn a blocking task for the compilation process
106
        let expression_dependencies = get_expression_dependencies()
1✔
107
            .map_err(|source| RasterExpressionError::Dependencies { source })?;
1✔
108

109
        let expression = LinkedExpression::new(
1✔
110
            self.expression.name(),
1✔
111
            &self.expression.code(),
1✔
112
            expression_dependencies,
1✔
113
        )
1✔
114
        .map_err(RasterExpressionError::from)?;
1✔
115

116
        Ok(call_generic_raster_processor!(
×
117
            output_type,
1✔
118
            BandwiseExpressionProcessor::new(
1✔
119
                typed_raster_processor,
1✔
120
                self.result_descriptor.clone(),
1✔
121
                expression,
1✔
122
                self.map_no_data
1✔
123
            )
1✔
124
            .boxed()
1✔
125
        ))
126
    }
1✔
127

128
    fn canonic_name(&self) -> CanonicOperatorName {
×
129
        self.name.clone()
×
130
    }
×
131

NEW
132
    fn name(&self) -> &'static str {
×
NEW
133
        BandwiseExpression::TYPE_NAME
×
NEW
134
    }
×
135

NEW
136
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
137
        self.path.clone()
×
NEW
138
    }
×
139
}
140

141
pub(crate) struct BandwiseExpressionProcessor<TO> {
142
    source: Box<dyn RasterQueryProcessor<RasterType = f64>>,
143
    result_descriptor: RasterResultDescriptor,
144
    expression: Arc<LinkedExpression>,
145
    map_no_data: bool,
146
    phantom: std::marker::PhantomData<TO>,
147
}
148

149
impl<TO> BandwiseExpressionProcessor<TO>
150
where
151
    TO: Pixel,
152
{
153
    pub fn new(
1✔
154
        source: Box<dyn RasterQueryProcessor<RasterType = f64>>,
1✔
155
        result_descriptor: RasterResultDescriptor,
1✔
156
        expression: LinkedExpression,
1✔
157
        map_no_data: bool,
1✔
158
    ) -> Self {
1✔
159
        Self {
1✔
160
            source,
1✔
161
            result_descriptor,
1✔
162
            expression: Arc::new(expression),
1✔
163
            map_no_data,
1✔
164
            phantom: Default::default(),
1✔
165
        }
1✔
166
    }
1✔
167

168
    #[inline]
169
    fn compute_expression(
8✔
170
        raster: RasterTile2D<f64>,
8✔
171
        expression: &LinkedExpression,
8✔
172
        map_no_data: bool,
8✔
173
    ) -> Result<GridOrEmpty2D<TO>> {
8✔
174
        let expression = unsafe {
8✔
175
            // we have to "trust" that the function has the signature we expect
176
            expression
8✔
177
                .function_1::<Option<f64>>()
8✔
178
                .map_err(RasterExpressionError::from)?
8✔
179
        };
180

181
        let map_fn = |in_value: Option<f64>| {
32✔
182
            // TODO: could be a |in_value: T1| if map no data is false!
32✔
183
            if !map_no_data && in_value.is_none() {
32✔
184
                return None;
×
185
            }
32✔
186

32✔
187
            let result = expression(in_value);
32✔
188

32✔
189
            result.map(TO::from_)
32✔
190
        };
32✔
191

192
        let res = raster.grid_array.map_elements_parallel(map_fn);
8✔
193

8✔
194
        Result::Ok(res)
8✔
195
    }
8✔
196
}
197

198
#[async_trait]
199
impl<TO> RasterQueryProcessor for BandwiseExpressionProcessor<TO>
200
where
201
    TO: Pixel,
202
{
203
    type RasterType = TO;
204

205
    async fn raster_query<'a>(
206
        &'a self,
207
        query: RasterQueryRectangle,
208
        ctx: &'a dyn QueryContext,
209
    ) -> Result<BoxStream<'a, Result<RasterTile2D<TO>>>> {
1✔
210
        let stream = self
1✔
211
            .source
1✔
212
            .raster_query(query, ctx)
1✔
213
            .await?
1✔
214
            .and_then(move |tile| async move {
8✔
215
                let expression = self.expression.clone();
8✔
216
                let map_no_data = self.map_no_data;
8✔
217

8✔
218
                let time = tile.time;
8✔
219
                let tile_position = tile.tile_position;
8✔
220
                let band = tile.band;
8✔
221
                let global_geo_transform = tile.global_geo_transform;
8✔
222
                let cache_hint = tile.cache_hint;
8✔
223

224
                let out = crate::util::spawn_blocking_with_thread_pool(
8✔
225
                    ctx.thread_pool().clone(),
8✔
226
                    move || Self::compute_expression(tile, &expression, map_no_data),
8✔
227
                )
8✔
228
                .await??;
8✔
229

230
                Ok(RasterTile2D::new(
8✔
231
                    time,
8✔
232
                    tile_position,
8✔
233
                    band,
8✔
234
                    global_geo_transform,
8✔
235
                    out,
8✔
236
                    cache_hint,
8✔
237
                ))
8✔
238
            });
16✔
239

1✔
240
        Ok(stream.boxed())
1✔
241
    }
2✔
242

243
    fn raster_result_descriptor(&self) -> &RasterResultDescriptor {
1✔
244
        &self.result_descriptor
1✔
245
    }
1✔
246
}
247

248
#[cfg(test)]
249
mod tests {
250
    use geoengine_datatypes::{
251
        primitives::{CacheHint, SpatialPartition2D, SpatialResolution, TimeInterval},
252
        raster::{Grid, GridShape, MapElements, RenameBands, TilesEqualIgnoringCacheHint},
253
        spatial_reference::SpatialReference,
254
        util::test::TestDefault,
255
    };
256

257
    use crate::{
258
        engine::{
259
            MockExecutionContext, MockQueryContext, MultipleRasterSources, RasterBandDescriptors,
260
        },
261
        mock::{MockRasterSource, MockRasterSourceParams},
262
        processing::{RasterStacker, RasterStackerParams},
263
    };
264

265
    use super::*;
266

267
    #[tokio::test]
268
    #[allow(clippy::too_many_lines)]
269
    async fn it_computes_bandwise_expression() {
1✔
270
        let data: Vec<RasterTile2D<u8>> = vec![
1✔
271
            RasterTile2D {
1✔
272
                time: TimeInterval::new_unchecked(0, 5),
1✔
273
                tile_position: [-1, 0].into(),
1✔
274
                band: 0,
1✔
275
                global_geo_transform: TestDefault::test_default(),
1✔
276
                grid_array: Grid::new([2, 2].into(), vec![0, 1, 2, 3]).unwrap().into(),
1✔
277
                properties: Default::default(),
1✔
278
                cache_hint: CacheHint::default(),
1✔
279
            },
1✔
280
            RasterTile2D {
1✔
281
                time: TimeInterval::new_unchecked(0, 5),
1✔
282
                tile_position: [-1, 1].into(),
1✔
283
                band: 0,
1✔
284
                global_geo_transform: TestDefault::test_default(),
1✔
285
                grid_array: Grid::new([2, 2].into(), vec![4, 5, 6, 7]).unwrap().into(),
1✔
286
                properties: Default::default(),
1✔
287
                cache_hint: CacheHint::default(),
1✔
288
            },
1✔
289
            RasterTile2D {
1✔
290
                time: TimeInterval::new_unchecked(5, 10),
1✔
291
                tile_position: [-1, 0].into(),
1✔
292
                band: 0,
1✔
293
                global_geo_transform: TestDefault::test_default(),
1✔
294
                grid_array: Grid::new([2, 2].into(), vec![8, 9, 10, 11]).unwrap().into(),
1✔
295
                properties: Default::default(),
1✔
296
                cache_hint: CacheHint::default(),
1✔
297
            },
1✔
298
            RasterTile2D {
1✔
299
                time: TimeInterval::new_unchecked(5, 10),
1✔
300
                tile_position: [-1, 1].into(),
1✔
301
                band: 0,
1✔
302
                global_geo_transform: TestDefault::test_default(),
1✔
303
                grid_array: Grid::new([2, 2].into(), vec![12, 13, 14, 15])
1✔
304
                    .unwrap()
1✔
305
                    .into(),
1✔
306
                properties: Default::default(),
1✔
307
                cache_hint: CacheHint::default(),
1✔
308
            },
1✔
309
        ];
1✔
310

1✔
311
        let data2: Vec<RasterTile2D<u8>> = vec![
1✔
312
            RasterTile2D {
1✔
313
                time: TimeInterval::new_unchecked(0, 5),
1✔
314
                tile_position: [-1, 0].into(),
1✔
315
                band: 0,
1✔
316
                global_geo_transform: TestDefault::test_default(),
1✔
317
                grid_array: Grid::new([2, 2].into(), vec![16, 17, 18, 19])
1✔
318
                    .unwrap()
1✔
319
                    .into(),
1✔
320
                properties: Default::default(),
1✔
321
                cache_hint: CacheHint::default(),
1✔
322
            },
1✔
323
            RasterTile2D {
1✔
324
                time: TimeInterval::new_unchecked(0, 5),
1✔
325
                tile_position: [-1, 1].into(),
1✔
326
                band: 0,
1✔
327
                global_geo_transform: TestDefault::test_default(),
1✔
328
                grid_array: Grid::new([2, 2].into(), vec![20, 21, 22, 23])
1✔
329
                    .unwrap()
1✔
330
                    .into(),
1✔
331
                properties: Default::default(),
1✔
332
                cache_hint: CacheHint::default(),
1✔
333
            },
1✔
334
            RasterTile2D {
1✔
335
                time: TimeInterval::new_unchecked(5, 10),
1✔
336
                tile_position: [-1, 0].into(),
1✔
337
                band: 0,
1✔
338
                global_geo_transform: TestDefault::test_default(),
1✔
339
                grid_array: Grid::new([2, 2].into(), vec![24, 25, 26, 27])
1✔
340
                    .unwrap()
1✔
341
                    .into(),
1✔
342
                properties: Default::default(),
1✔
343
                cache_hint: CacheHint::default(),
1✔
344
            },
1✔
345
            RasterTile2D {
1✔
346
                time: TimeInterval::new_unchecked(5, 10),
1✔
347
                tile_position: [-1, 1].into(),
1✔
348
                band: 0,
1✔
349
                global_geo_transform: TestDefault::test_default(),
1✔
350
                grid_array: Grid::new([2, 2].into(), vec![28, 29, 30, 31])
1✔
351
                    .unwrap()
1✔
352
                    .into(),
1✔
353
                properties: Default::default(),
1✔
354
                cache_hint: CacheHint::default(),
1✔
355
            },
1✔
356
        ];
1✔
357

1✔
358
        let mrs1 = MockRasterSource {
1✔
359
            params: MockRasterSourceParams {
1✔
360
                data: data.clone(),
1✔
361
                result_descriptor: RasterResultDescriptor {
1✔
362
                    data_type: RasterDataType::U8,
1✔
363
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
364
                    time: None,
1✔
365
                    bbox: None,
1✔
366
                    resolution: None,
1✔
367
                    bands: RasterBandDescriptors::new_single_band(),
1✔
368
                },
1✔
369
            },
1✔
370
        }
1✔
371
        .boxed();
1✔
372

1✔
373
        let mrs2 = MockRasterSource {
1✔
374
            params: MockRasterSourceParams {
1✔
375
                data: data2.clone(),
1✔
376
                result_descriptor: RasterResultDescriptor {
1✔
377
                    data_type: RasterDataType::U8,
1✔
378
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
379
                    time: None,
1✔
380
                    bbox: None,
1✔
381
                    resolution: None,
1✔
382
                    bands: RasterBandDescriptors::new_single_band(),
1✔
383
                },
1✔
384
            },
1✔
385
        }
1✔
386
        .boxed();
1✔
387

1✔
388
        let stacker = RasterStacker {
1✔
389
            params: RasterStackerParams {
1✔
390
                rename_bands: RenameBands::Default,
1✔
391
            },
1✔
392
            sources: MultipleRasterSources {
1✔
393
                rasters: vec![mrs1, mrs2],
1✔
394
            },
1✔
395
        }
1✔
396
        .boxed();
1✔
397

1✔
398
        let expression = BandwiseExpression {
1✔
399
            params: BandwiseExpressionParams {
1✔
400
                expression: "x + 1".to_string(),
1✔
401
                output_type: RasterDataType::U8,
1✔
402
                map_no_data: false,
1✔
403
            },
1✔
404
            sources: SingleRasterSource { raster: stacker },
1✔
405
        }
1✔
406
        .boxed();
1✔
407

1✔
408
        let mut exe_ctx = MockExecutionContext::test_default();
1✔
409
        exe_ctx.tiling_specification.tile_size_in_pixels = GridShape {
1✔
410
            shape_array: [2, 2],
1✔
411
        };
1✔
412

1✔
413
        let query_rect = RasterQueryRectangle {
1✔
414
            spatial_bounds: SpatialPartition2D::new_unchecked((0., 1.).into(), (3., 0.).into()),
1✔
415
            time_interval: TimeInterval::new_unchecked(0, 10),
1✔
416
            spatial_resolution: SpatialResolution::one(),
1✔
417
            attributes: [0, 1].try_into().unwrap(),
1✔
418
        };
1✔
419

1✔
420
        let query_ctx = MockQueryContext::test_default();
1✔
421

1✔
422
        let op = expression
1✔
423
            .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctx)
1✔
424
            .await
1✔
425
            .unwrap();
1✔
426

1✔
427
        let qp = op.query_processor().unwrap().get_u8().unwrap();
1✔
428

1✔
429
        let result = qp
1✔
430
            .raster_query(query_rect, &query_ctx)
1✔
431
            .await
1✔
432
            .unwrap()
1✔
433
            .collect::<Vec<_>>()
1✔
434
            .await;
1✔
435
        let result = result.into_iter().collect::<Result<Vec<_>>>().unwrap();
1✔
436

1✔
437
        let expected: Vec<RasterTile2D<u8>> = data
1✔
438
            .into_iter()
1✔
439
            .zip(data2.into_iter().map(|mut tile| {
4✔
440
                tile.band = 1;
4✔
441
                tile
4✔
442
            }))
4✔
443
            .flat_map(|(a, b)| vec![a.clone(), b.clone()])
4✔
444
            .map(|mut tile| {
8✔
445
                tile.grid_array = tile.grid_array.map_elements(|in_value: u8| in_value + 1);
32✔
446
                tile
8✔
447
            })
8✔
448
            .collect();
1✔
449

1✔
450
        assert!(expected.tiles_equal_ignoring_cache_hint(&result));
1✔
451
    }
1✔
452
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc