• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 4145938473

pending completion
4145938473

push

github

GitHub
Merge #717

77 of 77 new or added lines in 5 files covered. (100.0%)

90401 of 102701 relevant lines covered (88.02%)

75798.34 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.69
/operators/src/processing/raster_scaling.rs
1
use crate::engine::{
2
    CreateSpan, ExecutionContext, InitializedRasterOperator, Operator, OperatorName,
3
    RasterOperator, RasterQueryProcessor, RasterResultDescriptor, SingleRasterSource,
4
    TypedRasterQueryProcessor,
5
};
6
use crate::util::Result;
7
use async_trait::async_trait;
8
use futures::{StreamExt, TryStreamExt};
9

10
use geoengine_datatypes::raster::{
11
    ElementScaling, ScaleTransformation, ScalingTransformation, UnscaleTransformation,
12
};
13
use geoengine_datatypes::{
14
    primitives::Measurement,
15
    raster::{Pixel, RasterProperties, RasterPropertiesKey, RasterTile2D},
16
};
17
use num::FromPrimitive;
18
use num_traits::AsPrimitive;
19
use rayon::ThreadPool;
20
use serde::{Deserialize, Serialize};
21
use std::marker::PhantomData;
22
use std::sync::Arc;
23
use tracing::{span, Level};
24

25
#[derive(Debug, Serialize, Deserialize, Clone)]
×
26
#[serde(rename_all = "camelCase")]
27
pub struct RasterScalingParams {
28
    slope_key_or_value: Option<PropertiesKeyOrValue>,
29
    offset_key_or_value: Option<PropertiesKeyOrValue>,
30
    output_measurement: Option<Measurement>,
31
    scaling_mode: ScalingMode,
32
}
33

34
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
×
35
#[serde(rename_all = "camelCase")]
36
pub enum ScalingMode {
37
    Scale,
38
    Unscale,
39
}
40

41
#[derive(Debug, Serialize, Deserialize, Clone)]
×
42
#[serde(rename_all = "camelCase", tag = "type")]
43
enum PropertiesKeyOrValue {
44
    MetadataKey(RasterPropertiesKey),
45
    Constant { value: f64 },
46
}
47

48
/// The raster scaling operator scales/unscales the values of a raster by a given scale factor and offset.
49
/// This is done by applying the following formulas to every pixel.
50
/// For unscaling the formula is:
51
///
52
/// `p_new = p_old * slope + offset`
53
///
54
/// For scaling the formula is:
55
///
56
/// `p_new = (p_old - offset) / slope`
57
///
58
/// `p_old` and `p_new` refer to the old and new pixel values,
59
/// The slope and offset values are either properties attached to the input raster or a fixed value.
60
///
61
/// An example for Meteosat Second Generation properties is:
62
///
63
/// - offset: `msg.calibration_offset`
64
/// - slope: `msg.calibration_slope`
65
pub type RasterScaling = Operator<RasterScalingParams, SingleRasterSource>;
66

67
impl OperatorName for RasterScaling {
68
    const TYPE_NAME: &'static str = "RasterScaling";
69
}
70

71
pub struct InitializedRasterScalingOperator {
72
    slope_key_or_value: Option<PropertiesKeyOrValue>,
73
    offset_key_or_value: Option<PropertiesKeyOrValue>,
74
    result_descriptor: RasterResultDescriptor,
75
    source: Box<dyn InitializedRasterOperator>,
76
    scaling_mode: ScalingMode,
77
}
78

79
#[typetag::serde]
×
80
#[async_trait]
81
impl RasterOperator for RasterScaling {
82
    async fn _initialize(
2✔
83
        self: Box<Self>,
2✔
84
        context: &dyn ExecutionContext,
2✔
85
    ) -> Result<Box<dyn InitializedRasterOperator>> {
2✔
86
        let input = self.sources.raster.initialize(context).await?;
2✔
87
        let in_desc = input.result_descriptor();
2✔
88

2✔
89
        let out_desc = RasterResultDescriptor {
2✔
90
            spatial_reference: in_desc.spatial_reference,
2✔
91
            data_type: in_desc.data_type,
2✔
92
            measurement: self
2✔
93
                .params
2✔
94
                .output_measurement
2✔
95
                .unwrap_or_else(|| in_desc.measurement.clone()),
2✔
96
            bbox: in_desc.bbox,
2✔
97
            time: in_desc.time,
2✔
98
            resolution: in_desc.resolution,
2✔
99
        };
2✔
100

2✔
101
        let initialized_operator = InitializedRasterScalingOperator {
2✔
102
            slope_key_or_value: self.params.slope_key_or_value,
2✔
103
            offset_key_or_value: self.params.offset_key_or_value,
2✔
104
            result_descriptor: out_desc,
2✔
105
            source: input,
2✔
106
            scaling_mode: self.params.scaling_mode,
2✔
107
        };
2✔
108

2✔
109
        Ok(initialized_operator.boxed())
2✔
110
    }
4✔
111

112
    span_fn!(RasterScaling);
×
113
}
114

115
impl InitializedRasterOperator for InitializedRasterScalingOperator {
116
    fn result_descriptor(&self) -> &RasterResultDescriptor {
2✔
117
        &self.result_descriptor
2✔
118
    }
2✔
119

120
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
121
        let slope_key_or_value = self.slope_key_or_value.clone();
2✔
122
        let offset_key_or_value = self.offset_key_or_value.clone();
2✔
123
        let source = self.source.query_processor()?;
2✔
124
        let scaling_mode = self.scaling_mode;
2✔
125

126
        let res = match scaling_mode {
2✔
127
            ScalingMode::Scale => {
128
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, ScaleTransformation>(slope_key_or_value, offset_key_or_value,  source_proc)) })
1✔
129
            }
130
            ScalingMode::Unscale => {
131
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, UnscaleTransformation>(slope_key_or_value, offset_key_or_value,  source_proc)) })
1✔
132
            }
133
        };
134

135
        Ok(res)
2✔
136
    }
2✔
137
}
138

139
struct RasterTransformationProcessor<Q, P, S>
140
where
141
    Q: RasterQueryProcessor<RasterType = P>,
142
{
143
    source: Q,
144
    slope_key_or_value: Option<PropertiesKeyOrValue>,
145
    offset_key_or_value: Option<PropertiesKeyOrValue>,
146
    _transformation: PhantomData<S>,
147
}
148

149
fn create_boxed_processor<Q, P, S>(
2✔
150
    slope_key_or_value: Option<PropertiesKeyOrValue>,
2✔
151
    offset_key_or_value: Option<PropertiesKeyOrValue>,
2✔
152
    source: Q,
2✔
153
) -> Box<dyn RasterQueryProcessor<RasterType = P>>
2✔
154
where
2✔
155
    Q: RasterQueryProcessor<RasterType = P> + 'static,
2✔
156
    P: Pixel + FromPrimitive + 'static + Default,
2✔
157
    f64: AsPrimitive<P>,
2✔
158
    S: Send + Sync + 'static + ScalingTransformation<P>,
2✔
159
{
2✔
160
    RasterTransformationProcessor::<Q, P, S>::create(
2✔
161
        slope_key_or_value,
2✔
162
        offset_key_or_value,
2✔
163
        source,
2✔
164
    )
2✔
165
    .boxed()
2✔
166
}
2✔
167

168
impl<Q, P, S> RasterTransformationProcessor<Q, P, S>
169
where
170
    Q: RasterQueryProcessor<RasterType = P> + 'static,
171
    P: Pixel + FromPrimitive + 'static + Default,
172
    f64: AsPrimitive<P>,
173
    S: Send + Sync + 'static + ScalingTransformation<P>,
174
{
175
    pub fn create(
2✔
176
        slope_key_or_value: Option<PropertiesKeyOrValue>,
2✔
177
        offset_key_or_value: Option<PropertiesKeyOrValue>,
2✔
178
        source: Q,
2✔
179
    ) -> RasterTransformationProcessor<Q, P, S> {
2✔
180
        RasterTransformationProcessor {
2✔
181
            source,
2✔
182
            slope_key_or_value,
2✔
183
            offset_key_or_value,
2✔
184
            _transformation: PhantomData,
2✔
185
        }
2✔
186
    }
2✔
187

188
    async fn scale_tile_async(
2✔
189
        &self,
2✔
190
        tile: RasterTile2D<P>,
2✔
191
        _pool: Arc<ThreadPool>,
2✔
192
    ) -> Result<RasterTile2D<P>> {
2✔
193
        // either use the provided metadata/constant or the default values from the properties
194
        let offset = if let Some(offset_key_or_value) = &self.offset_key_or_value {
2✔
195
            Self::prop_value(offset_key_or_value, &tile.properties)?
×
196
        } else {
197
            tile.properties.offset().as_()
2✔
198
        };
199

200
        let slope = if let Some(slope_key_or_value) = &self.slope_key_or_value {
2✔
201
            Self::prop_value(slope_key_or_value, &tile.properties)?
×
202
        } else {
203
            tile.properties.scale().as_()
2✔
204
        };
205

206
        let res_tile =
2✔
207
            crate::util::spawn_blocking(move || tile.transform_elements::<S>(slope, offset))
2✔
208
                .await?;
2✔
209

210
        Ok(res_tile)
2✔
211
    }
2✔
212

213
    fn prop_value(prop_key_or_value: &PropertiesKeyOrValue, props: &RasterProperties) -> Result<P> {
×
214
        let value = match prop_key_or_value {
×
215
            PropertiesKeyOrValue::MetadataKey(key) => props.number_property::<P>(key)?,
×
216
            PropertiesKeyOrValue::Constant { value } => value.as_(),
×
217
        };
218
        Ok(value)
×
219
    }
×
220
}
221

222
#[async_trait]
223
impl<Q, P, S> RasterQueryProcessor for RasterTransformationProcessor<Q, P, S>
224
where
225
    P: Pixel + FromPrimitive + 'static + Default,
226
    f64: AsPrimitive<P>,
227
    Q: RasterQueryProcessor<RasterType = P> + 'static,
228
    S: Send + Sync + 'static + ScalingTransformation<P>,
229
{
230
    type RasterType = P;
231

232
    async fn raster_query<'a>(
2✔
233
        &'a self,
2✔
234
        query: geoengine_datatypes::primitives::RasterQueryRectangle,
2✔
235
        ctx: &'a dyn crate::engine::QueryContext,
2✔
236
    ) -> Result<
2✔
237
        futures::stream::BoxStream<
2✔
238
            'a,
2✔
239
            Result<geoengine_datatypes::raster::RasterTile2D<Self::RasterType>>,
2✔
240
        >,
2✔
241
    > {
2✔
242
        let src = self.source.raster_query(query, ctx).await?;
2✔
243
        let rs = src.and_then(move |tile| self.scale_tile_async(tile, ctx.thread_pool().clone()));
2✔
244
        Ok(rs.boxed())
2✔
245
    }
4✔
246
}
247

248
#[cfg(test)]
249
mod tests {
250

251
    use geoengine_datatypes::{
252
        primitives::{SpatialPartition2D, SpatialResolution, TimeInterval},
253
        raster::{
254
            Grid2D, GridOrEmpty2D, GridShape, MaskedGrid2D, RasterDataType, TileInformation,
255
            TilingSpecification,
256
        },
257
        spatial_reference::SpatialReference,
258
        util::test::TestDefault,
259
    };
260

261
    use crate::{
262
        engine::{ChunkByteSize, MockExecutionContext},
263
        mock::{MockRasterSource, MockRasterSourceParams},
264
    };
265

266
    use super::*;
267

268
    #[tokio::test]
1✔
269
    async fn test_unscale() {
1✔
270
        let grid_shape = [2, 2].into();
1✔
271

1✔
272
        let tiling_specification = TilingSpecification {
1✔
273
            origin_coordinate: [0.0, 0.0].into(),
1✔
274
            tile_size_in_pixels: grid_shape,
1✔
275
        };
1✔
276

1✔
277
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![7_u8, 7, 7, 6]).unwrap());
1✔
278

1✔
279
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
280
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
281

1✔
282
        let mut raster_props = RasterProperties::default();
1✔
283
        raster_props.set_scale(2.0);
1✔
284
        raster_props.set_offset(1.0);
1✔
285

1✔
286
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
287
            TimeInterval::default(),
1✔
288
            TileInformation {
1✔
289
                global_geo_transform: TestDefault::test_default(),
1✔
290
                global_tile_position: [0, 0].into(),
1✔
291
                tile_size_in_pixels: grid_shape,
1✔
292
            },
1✔
293
            raster.into(),
1✔
294
            raster_props,
1✔
295
        );
1✔
296

1✔
297
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
298

1✔
299
        let mrs = MockRasterSource {
1✔
300
            params: MockRasterSourceParams {
1✔
301
                data: vec![raster_tile],
1✔
302
                result_descriptor: RasterResultDescriptor {
1✔
303
                    data_type: RasterDataType::U8,
1✔
304
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
305
                    measurement: Measurement::Unitless,
1✔
306
                    bbox: None,
1✔
307
                    time: None,
1✔
308
                    resolution: Some(spatial_resolution),
1✔
309
                },
1✔
310
            },
1✔
311
        }
1✔
312
        .boxed();
1✔
313

1✔
314
        let scaling_mode = ScalingMode::Unscale;
1✔
315

1✔
316
        let output_measurement = None;
1✔
317

1✔
318
        let op = RasterScaling {
1✔
319
            params: RasterScalingParams {
1✔
320
                slope_key_or_value: None,
1✔
321
                offset_key_or_value: None,
1✔
322
                output_measurement,
1✔
323
                scaling_mode,
1✔
324
            },
1✔
325
            sources: SingleRasterSource { raster: mrs },
1✔
326
        }
1✔
327
        .boxed();
1✔
328

329
        let initialized_op = op.initialize(&ctx).await.unwrap();
1✔
330

1✔
331
        let result_descriptor = initialized_op.result_descriptor();
1✔
332

1✔
333
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
334
        assert_eq!(result_descriptor.measurement, Measurement::Unitless);
1✔
335

336
        let query_processor = initialized_op.query_processor().unwrap();
1✔
337

1✔
338
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
339
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
340
            spatial_resolution: SpatialResolution::one(),
1✔
341
            time_interval: TimeInterval::default(),
1✔
342
        };
1✔
343

344
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
345
            panic!("expected TypedRasterQueryProcessor::U8");
×
346
        };
347

348
        let stream = typed_processor
1✔
349
            .raster_query(query, &query_ctx)
1✔
350
            .await
×
351
            .unwrap();
1✔
352

353
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
354

355
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
356

1✔
357
        let result_grid = result_tile.grid_array.clone();
1✔
358

1✔
359
        match result_grid {
1✔
360
            GridOrEmpty2D::Grid(grid) => {
1✔
361
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
362

363
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
364

1✔
365
                let expected = vec![Some(15), Some(15), Some(15), Some(13)];
1✔
366

1✔
367
                assert_eq!(res, expected);
1✔
368
            }
369
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
×
370
        }
371
    }
372

373
    #[tokio::test]
1✔
374
    async fn test_scale() {
1✔
375
        let grid_shape = [2, 2].into();
1✔
376

1✔
377
        let tiling_specification = TilingSpecification {
1✔
378
            origin_coordinate: [0.0, 0.0].into(),
1✔
379
            tile_size_in_pixels: grid_shape,
1✔
380
        };
1✔
381

1✔
382
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![15_u8, 15, 15, 13]).unwrap());
1✔
383

1✔
384
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
385
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
386

1✔
387
        let mut raster_props = RasterProperties::default();
1✔
388
        raster_props.set_scale(2.0);
1✔
389
        raster_props.set_offset(1.0);
1✔
390

1✔
391
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
392
            TimeInterval::default(),
1✔
393
            TileInformation {
1✔
394
                global_geo_transform: TestDefault::test_default(),
1✔
395
                global_tile_position: [0, 0].into(),
1✔
396
                tile_size_in_pixels: grid_shape,
1✔
397
            },
1✔
398
            raster.into(),
1✔
399
            raster_props,
1✔
400
        );
1✔
401

1✔
402
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
403

1✔
404
        let mrs = MockRasterSource {
1✔
405
            params: MockRasterSourceParams {
1✔
406
                data: vec![raster_tile],
1✔
407
                result_descriptor: RasterResultDescriptor {
1✔
408
                    data_type: RasterDataType::U8,
1✔
409
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
410
                    measurement: Measurement::Unitless,
1✔
411
                    bbox: None,
1✔
412
                    time: None,
1✔
413
                    resolution: Some(spatial_resolution),
1✔
414
                },
1✔
415
            },
1✔
416
        }
1✔
417
        .boxed();
1✔
418

1✔
419
        let scaling_mode = ScalingMode::Scale;
1✔
420

1✔
421
        let output_measurement = None;
1✔
422

1✔
423
        let params = RasterScalingParams {
1✔
424
            slope_key_or_value: None,
1✔
425
            offset_key_or_value: None,
1✔
426
            output_measurement,
1✔
427
            scaling_mode,
1✔
428
        };
1✔
429

1✔
430
        let op = RasterScaling {
1✔
431
            params,
1✔
432
            sources: SingleRasterSource { raster: mrs },
1✔
433
        }
1✔
434
        .boxed();
1✔
435

436
        let initialized_op = op.initialize(&ctx).await.unwrap();
1✔
437

1✔
438
        let result_descriptor = initialized_op.result_descriptor();
1✔
439

1✔
440
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
441
        assert_eq!(result_descriptor.measurement, Measurement::Unitless);
1✔
442

443
        let query_processor = initialized_op.query_processor().unwrap();
1✔
444

1✔
445
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
446
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
447
            spatial_resolution: SpatialResolution::one(),
1✔
448
            time_interval: TimeInterval::default(),
1✔
449
        };
1✔
450

451
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
452
            panic!("expected TypedRasterQueryProcessor::U8");
×
453
        };
454

455
        let stream = typed_processor
1✔
456
            .raster_query(query, &query_ctx)
1✔
457
            .await
×
458
            .unwrap();
1✔
459

460
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
461

462
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
463

1✔
464
        let result_grid = result_tile.grid_array.clone();
1✔
465

1✔
466
        match result_grid {
1✔
467
            GridOrEmpty2D::Grid(grid) => {
1✔
468
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
469

470
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
471

1✔
472
                let expected = vec![Some(7), Some(7), Some(7), Some(6)];
1✔
473

1✔
474
                assert_eq!(res, expected);
1✔
475
            }
476
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
×
477
        }
478
    }
479
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc