• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 7006568925

27 Nov 2023 02:07PM UTC coverage: 89.651% (+0.2%) from 89.498%
7006568925

push

github

web-flow
Merge pull request #888 from geo-engine/raster_stacks

raster stacking

4032 of 4274 new or added lines in 107 files covered. (94.34%)

12 existing lines in 8 files now uncovered.

113020 of 126066 relevant lines covered (89.65%)

59901.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.94
/operators/src/processing/raster_scaling.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
3
    OperatorName, RasterBandDescriptor, RasterBandDescriptors, RasterOperator,
4
    RasterQueryProcessor, RasterResultDescriptor, SingleRasterSource, TypedRasterQueryProcessor,
5
    WorkflowOperatorPath,
6
};
7
use crate::util::Result;
8
use async_trait::async_trait;
9
use futures::{StreamExt, TryStreamExt};
10

11
use geoengine_datatypes::raster::{
12
    CheckedMulThenAddTransformation, CheckedSubThenDivTransformation, ElementScaling,
13
    ScalingTransformation,
14
};
15
use geoengine_datatypes::{
16
    primitives::Measurement,
17
    raster::{Pixel, RasterPropertiesKey, RasterTile2D},
18
};
19
use num::FromPrimitive;
20
use num_traits::AsPrimitive;
21
use rayon::ThreadPool;
22
use serde::{Deserialize, Serialize};
23
use snafu::ensure;
24
use std::marker::PhantomData;
25
use std::sync::Arc;
26

27
#[derive(Debug, Serialize, Deserialize, Clone)]
2✔
28
#[serde(rename_all = "camelCase")]
29
pub struct RasterScalingParams {
30
    slope: SlopeOffsetSelection,
31
    offset: SlopeOffsetSelection,
32
    output_measurement: Option<Measurement>,
33
    scaling_mode: ScalingMode,
34
}
35

36
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
2✔
37
#[serde(rename_all = "camelCase")]
38
pub enum ScalingMode {
39
    MulSlopeAddOffset,
40
    SubOffsetDivSlope,
41
}
42

43
#[derive(Debug, Serialize, Deserialize, Clone)]
4✔
44
#[serde(rename_all = "camelCase", tag = "type")]
45
enum SlopeOffsetSelection {
46
    Auto,
47
    MetadataKey(RasterPropertiesKey),
48
    Constant { value: f64 },
49
}
50

51
impl Default for SlopeOffsetSelection {
52
    fn default() -> Self {
×
53
        Self::Auto
×
54
    }
×
55
}
56

57
/// The raster scaling operator scales/unscales the values of a raster by a given scale factor and offset.
58
/// This is done by applying the following formulas to every pixel.
59
/// For unscaling the formula is:
60
///
61
/// `p_new = p_old * slope + offset`
62
///
63
/// For scaling the formula is:
64
///
65
/// `p_new = (p_old - offset) / slope`
66
///
67
/// `p_old` and `p_new` refer to the old and new pixel values,
68
/// The slope and offset values are either properties attached to the input raster or a fixed value.
69
///
70
/// An example for Meteosat Second Generation properties is:
71
///
72
/// - offset: `msg.calibration_offset`
73
/// - slope: `msg.calibration_slope`
74
pub type RasterScaling = Operator<RasterScalingParams, SingleRasterSource>;
75

76
impl OperatorName for RasterScaling {
77
    const TYPE_NAME: &'static str = "RasterScaling";
78
}
79

80
pub struct InitializedRasterScalingOperator {
81
    name: CanonicOperatorName,
82
    slope: SlopeOffsetSelection,
83
    offset: SlopeOffsetSelection,
84
    result_descriptor: RasterResultDescriptor,
85
    source: Box<dyn InitializedRasterOperator>,
86
    scaling_mode: ScalingMode,
87
}
88

89
#[typetag::serde]
×
90
#[async_trait]
91
impl RasterOperator for RasterScaling {
92
    async fn _initialize(
2✔
93
        self: Box<Self>,
2✔
94
        path: WorkflowOperatorPath,
2✔
95
        context: &dyn ExecutionContext,
2✔
96
    ) -> Result<Box<dyn InitializedRasterOperator>> {
2✔
97
        let name = CanonicOperatorName::from(&self);
2✔
98

99
        let input = self.sources.initialize_sources(path, context).await?;
2✔
100
        let in_desc = input.raster.result_descriptor();
2✔
101

2✔
102
        // TODO: implement multi-band functionality and remove this check
2✔
103
        ensure!(
2✔
104
            in_desc.bands.len() == 1,
2✔
NEW
105
            crate::error::OperatorDoesNotSupportMultiBandsSourcesYet {
×
NEW
106
                operator: RasterScaling::TYPE_NAME
×
NEW
107
            }
×
108
        );
109

110
        let out_desc = RasterResultDescriptor {
2✔
111
            spatial_reference: in_desc.spatial_reference,
2✔
112
            data_type: in_desc.data_type,
2✔
113

2✔
114
            bbox: in_desc.bbox,
2✔
115
            time: in_desc.time,
2✔
116
            resolution: in_desc.resolution,
2✔
117
            bands: RasterBandDescriptors::new(vec![RasterBandDescriptor::new(
2✔
118
                in_desc.bands[0].name.clone(),
2✔
119
                self.params
2✔
120
                    .output_measurement
2✔
121
                    .unwrap_or_else(|| in_desc.bands[0].measurement.clone()),
2✔
122
            )])?,
2✔
123
        };
124

125
        let initialized_operator = InitializedRasterScalingOperator {
2✔
126
            name,
2✔
127
            slope: self.params.slope,
2✔
128
            offset: self.params.offset,
2✔
129
            result_descriptor: out_desc,
2✔
130
            source: input.raster,
2✔
131
            scaling_mode: self.params.scaling_mode,
2✔
132
        };
2✔
133

2✔
134
        Ok(initialized_operator.boxed())
2✔
135
    }
4✔
136

137
    span_fn!(RasterScaling);
×
138
}
139

140
impl InitializedRasterOperator for InitializedRasterScalingOperator {
141
    fn result_descriptor(&self) -> &RasterResultDescriptor {
2✔
142
        &self.result_descriptor
2✔
143
    }
2✔
144

145
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
146
        let slope = self.slope.clone();
2✔
147
        let offset = self.offset.clone();
2✔
148
        let source = self.source.query_processor()?;
2✔
149
        let scaling_mode = self.scaling_mode;
2✔
150

151
        let res = match scaling_mode {
2✔
152
            ScalingMode::SubOffsetDivSlope => {
153
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, CheckedSubThenDivTransformation>(slope, offset,  source_proc)) })
1✔
154
            }
155
            ScalingMode::MulSlopeAddOffset => {
156
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, CheckedMulThenAddTransformation>(slope, offset,  source_proc)) })
1✔
157
            }
158
        };
159

160
        Ok(res)
2✔
161
    }
2✔
162

163
    fn canonic_name(&self) -> CanonicOperatorName {
×
164
        self.name.clone()
×
165
    }
×
166
}
167

168
struct RasterTransformationProcessor<Q, P, S>
169
where
170
    Q: RasterQueryProcessor<RasterType = P>,
171
{
172
    source: Q,
173
    slope: SlopeOffsetSelection,
174
    offset: SlopeOffsetSelection,
175
    _transformation: PhantomData<S>,
176
}
177

178
fn create_boxed_processor<Q, P, S>(
2✔
179
    slope: SlopeOffsetSelection,
2✔
180
    offset: SlopeOffsetSelection,
2✔
181
    source: Q,
2✔
182
) -> Box<dyn RasterQueryProcessor<RasterType = P>>
2✔
183
where
2✔
184
    Q: RasterQueryProcessor<RasterType = P> + 'static,
2✔
185
    P: Pixel + FromPrimitive + 'static + Default,
2✔
186
    f64: AsPrimitive<P>,
2✔
187
    S: Send + Sync + 'static + ScalingTransformation<P>,
2✔
188
{
2✔
189
    RasterTransformationProcessor::<Q, P, S>::create(slope, offset, source).boxed()
2✔
190
}
2✔
191

192
impl<Q, P, S> RasterTransformationProcessor<Q, P, S>
193
where
194
    Q: RasterQueryProcessor<RasterType = P> + 'static,
195
    P: Pixel + FromPrimitive + 'static + Default,
196
    f64: AsPrimitive<P>,
197
    S: Send + Sync + 'static + ScalingTransformation<P>,
198
{
199
    pub fn create(
2✔
200
        slope: SlopeOffsetSelection,
2✔
201
        offset: SlopeOffsetSelection,
2✔
202
        source: Q,
2✔
203
    ) -> RasterTransformationProcessor<Q, P, S> {
2✔
204
        RasterTransformationProcessor {
2✔
205
            source,
2✔
206
            slope,
2✔
207
            offset,
2✔
208
            _transformation: PhantomData,
2✔
209
        }
2✔
210
    }
2✔
211

212
    async fn scale_tile_async(
2✔
213
        &self,
2✔
214
        tile: RasterTile2D<P>,
2✔
215
        _pool: Arc<ThreadPool>,
2✔
216
    ) -> Result<RasterTile2D<P>> {
2✔
217
        // either use the provided metadata/constant or the default values from the properties
218
        let offset = match &self.offset {
2✔
219
            SlopeOffsetSelection::MetadataKey(key) => tile.properties.number_property::<P>(key)?,
×
220
            SlopeOffsetSelection::Constant { value } => value.as_(),
×
221
            SlopeOffsetSelection::Auto => tile.properties.offset().as_(),
2✔
222
        };
223

224
        let slope = match &self.slope {
2✔
225
            SlopeOffsetSelection::MetadataKey(key) => tile.properties.number_property::<P>(key)?,
×
226
            SlopeOffsetSelection::Constant { value } => value.as_(),
×
227
            SlopeOffsetSelection::Auto => tile.properties.scale().as_(),
2✔
228
        };
229

230
        let res_tile =
2✔
231
            crate::util::spawn_blocking(move || tile.transform_elements::<S>(slope, offset))
2✔
232
                .await?;
2✔
233

234
        Ok(res_tile)
2✔
235
    }
2✔
236
}
237

238
#[async_trait]
239
impl<Q, P, S> RasterQueryProcessor for RasterTransformationProcessor<Q, P, S>
240
where
241
    P: Pixel + FromPrimitive + 'static + Default,
242
    f64: AsPrimitive<P>,
243
    Q: RasterQueryProcessor<RasterType = P> + 'static,
244
    S: Send + Sync + 'static + ScalingTransformation<P>,
245
{
246
    type RasterType = P;
247

248
    async fn raster_query<'a>(
2✔
249
        &'a self,
2✔
250
        query: geoengine_datatypes::primitives::RasterQueryRectangle,
2✔
251
        ctx: &'a dyn crate::engine::QueryContext,
2✔
252
    ) -> Result<
2✔
253
        futures::stream::BoxStream<
2✔
254
            'a,
2✔
255
            Result<geoengine_datatypes::raster::RasterTile2D<Self::RasterType>>,
2✔
256
        >,
2✔
257
    > {
2✔
258
        let src = self.source.raster_query(query, ctx).await?;
2✔
259
        let rs = src.and_then(move |tile| self.scale_tile_async(tile, ctx.thread_pool().clone()));
2✔
260
        Ok(rs.boxed())
2✔
261
    }
4✔
262
}
263

264
#[cfg(test)]
265
mod tests {
266

267
    use geoengine_datatypes::{
268
        primitives::{
269
            BandSelection, CacheHint, SpatialPartition2D, SpatialResolution, TimeInterval,
270
        },
271
        raster::{
272
            Grid2D, GridOrEmpty2D, GridShape, MaskedGrid2D, RasterDataType, RasterProperties,
273
            TileInformation, TilingSpecification,
274
        },
275
        spatial_reference::SpatialReference,
276
        util::test::TestDefault,
277
    };
278

279
    use crate::{
280
        engine::{ChunkByteSize, MockExecutionContext},
281
        mock::{MockRasterSource, MockRasterSourceParams},
282
    };
283

284
    use super::*;
285

286
    #[tokio::test]
1✔
287
    async fn test_unscale() {
1✔
288
        let grid_shape = [2, 2].into();
1✔
289

1✔
290
        let tiling_specification = TilingSpecification {
1✔
291
            origin_coordinate: [0.0, 0.0].into(),
1✔
292
            tile_size_in_pixels: grid_shape,
1✔
293
        };
1✔
294

1✔
295
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![7_u8, 7, 7, 6]).unwrap());
1✔
296

1✔
297
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
298
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
299

1✔
300
        let mut raster_props = RasterProperties::default();
1✔
301
        raster_props.set_scale(2.0);
1✔
302
        raster_props.set_offset(1.0);
1✔
303

1✔
304
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
305
            TimeInterval::default(),
1✔
306
            TileInformation {
1✔
307
                global_geo_transform: TestDefault::test_default(),
1✔
308
                global_tile_position: [0, 0].into(),
1✔
309
                tile_size_in_pixels: grid_shape,
1✔
310
            },
1✔
311
            0,
1✔
312
            raster.into(),
1✔
313
            raster_props,
1✔
314
            CacheHint::default(),
1✔
315
        );
1✔
316

1✔
317
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
318

1✔
319
        let mrs = MockRasterSource {
1✔
320
            params: MockRasterSourceParams {
1✔
321
                data: vec![raster_tile],
1✔
322
                result_descriptor: RasterResultDescriptor {
1✔
323
                    data_type: RasterDataType::U8,
1✔
324
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
325
                    bbox: None,
1✔
326
                    time: None,
1✔
327
                    resolution: Some(spatial_resolution),
1✔
328
                    bands: RasterBandDescriptors::new_single_band(),
1✔
329
                },
1✔
330
            },
1✔
331
        }
1✔
332
        .boxed();
1✔
333

1✔
334
        let scaling_mode = ScalingMode::MulSlopeAddOffset;
1✔
335

1✔
336
        let output_measurement = None;
1✔
337

1✔
338
        let op = RasterScaling {
1✔
339
            params: RasterScalingParams {
1✔
340
                slope: SlopeOffsetSelection::Auto,
1✔
341
                offset: SlopeOffsetSelection::Auto,
1✔
342
                output_measurement,
1✔
343
                scaling_mode,
1✔
344
            },
1✔
345
            sources: SingleRasterSource { raster: mrs },
1✔
346
        }
1✔
347
        .boxed();
1✔
348

349
        let initialized_op = op
1✔
350
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
351
            .await
×
352
            .unwrap();
1✔
353

1✔
354
        let result_descriptor = initialized_op.result_descriptor();
1✔
355

1✔
356
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
357
        assert_eq!(
1✔
358
            result_descriptor.bands[0].measurement,
1✔
359
            Measurement::Unitless
1✔
360
        );
1✔
361

362
        let query_processor = initialized_op.query_processor().unwrap();
1✔
363

1✔
364
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
365
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
366
            spatial_resolution: SpatialResolution::one(),
1✔
367
            time_interval: TimeInterval::default(),
1✔
368
            attributes: BandSelection::first(),
1✔
369
        };
1✔
370

371
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
372
            panic!("expected TypedRasterQueryProcessor::U8");
×
373
        };
374

375
        let stream = typed_processor
1✔
376
            .raster_query(query, &query_ctx)
1✔
377
            .await
×
378
            .unwrap();
1✔
379

380
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
381

382
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
383

1✔
384
        let result_grid = result_tile.grid_array.clone();
1✔
385

1✔
386
        match result_grid {
1✔
387
            GridOrEmpty2D::Grid(grid) => {
1✔
388
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
389

390
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
391

1✔
392
                let expected = vec![Some(15), Some(15), Some(15), Some(13)];
1✔
393

1✔
394
                assert_eq!(res, expected);
1✔
395
            }
396
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
×
397
        }
398
    }
399

400
    #[tokio::test]
1✔
401
    async fn test_scale() {
1✔
402
        let grid_shape = [2, 2].into();
1✔
403

1✔
404
        let tiling_specification = TilingSpecification {
1✔
405
            origin_coordinate: [0.0, 0.0].into(),
1✔
406
            tile_size_in_pixels: grid_shape,
1✔
407
        };
1✔
408

1✔
409
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![15_u8, 15, 15, 13]).unwrap());
1✔
410

1✔
411
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
412
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
413

1✔
414
        let mut raster_props = RasterProperties::default();
1✔
415
        raster_props.set_scale(2.0);
1✔
416
        raster_props.set_offset(1.0);
1✔
417

1✔
418
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
419
            TimeInterval::default(),
1✔
420
            TileInformation {
1✔
421
                global_geo_transform: TestDefault::test_default(),
1✔
422
                global_tile_position: [0, 0].into(),
1✔
423
                tile_size_in_pixels: grid_shape,
1✔
424
            },
1✔
425
            0,
1✔
426
            raster.into(),
1✔
427
            raster_props,
1✔
428
            CacheHint::default(),
1✔
429
        );
1✔
430

1✔
431
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
432

1✔
433
        let mrs = MockRasterSource {
1✔
434
            params: MockRasterSourceParams {
1✔
435
                data: vec![raster_tile],
1✔
436
                result_descriptor: RasterResultDescriptor {
1✔
437
                    data_type: RasterDataType::U8,
1✔
438
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
439
                    bbox: None,
1✔
440
                    time: None,
1✔
441
                    resolution: Some(spatial_resolution),
1✔
442
                    bands: RasterBandDescriptors::new_single_band(),
1✔
443
                },
1✔
444
            },
1✔
445
        }
1✔
446
        .boxed();
1✔
447

1✔
448
        let scaling_mode = ScalingMode::SubOffsetDivSlope;
1✔
449

1✔
450
        let output_measurement = None;
1✔
451

1✔
452
        let params = RasterScalingParams {
1✔
453
            slope: SlopeOffsetSelection::Auto,
1✔
454
            offset: SlopeOffsetSelection::Auto,
1✔
455
            output_measurement,
1✔
456
            scaling_mode,
1✔
457
        };
1✔
458

1✔
459
        let op = RasterScaling {
1✔
460
            params,
1✔
461
            sources: SingleRasterSource { raster: mrs },
1✔
462
        }
1✔
463
        .boxed();
1✔
464

465
        let initialized_op = op
1✔
466
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
467
            .await
×
468
            .unwrap();
1✔
469

1✔
470
        let result_descriptor = initialized_op.result_descriptor();
1✔
471

1✔
472
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
473
        assert_eq!(
1✔
474
            result_descriptor.bands[0].measurement,
1✔
475
            Measurement::Unitless
1✔
476
        );
1✔
477

478
        let query_processor = initialized_op.query_processor().unwrap();
1✔
479

1✔
480
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
481
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
482
            spatial_resolution: SpatialResolution::one(),
1✔
483
            time_interval: TimeInterval::default(),
1✔
484
            attributes: BandSelection::first(),
1✔
485
        };
1✔
486

487
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
488
            panic!("expected TypedRasterQueryProcessor::U8");
×
489
        };
490

491
        let stream = typed_processor
1✔
492
            .raster_query(query, &query_ctx)
1✔
493
            .await
×
494
            .unwrap();
1✔
495

496
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
497

498
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
499

1✔
500
        let result_grid = result_tile.grid_array.clone();
1✔
501

1✔
502
        match result_grid {
1✔
503
            GridOrEmpty2D::Grid(grid) => {
1✔
504
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
505

506
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
507

1✔
508
                let expected = vec![Some(7), Some(7), Some(7), Some(6)];
1✔
509

1✔
510
                assert_eq!(res, expected);
1✔
511
            }
512
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
×
513
        }
514
    }
515
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc