• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.45
/operators/src/processing/raster_scaling.rs
1
use crate::engine::{
2
    CreateSpan, ExecutionContext, InitializedRasterOperator, Operator, OperatorName,
3
    RasterOperator, RasterQueryProcessor, RasterResultDescriptor, SingleRasterSource,
4
    TypedRasterQueryProcessor,
5
};
6
use crate::util::Result;
7
use async_trait::async_trait;
8
use futures::{StreamExt, TryStreamExt};
9

10
use geoengine_datatypes::raster::{
11
    ElementScaling, ScaleTransformation, ScalingTransformation, UnscaleTransformation,
12
};
13
use geoengine_datatypes::{
14
    primitives::Measurement,
15
    raster::{Pixel, RasterProperties, RasterPropertiesKey, RasterTile2D},
16
};
17
use num::FromPrimitive;
18
use num_traits::AsPrimitive;
19
use rayon::ThreadPool;
20
use serde::{Deserialize, Serialize};
21
use std::marker::PhantomData;
22
use std::sync::Arc;
23
use tracing::{span, Level};
24

25
#[derive(Debug, Serialize, Deserialize, Clone)]
×
26
#[serde(rename_all = "camelCase")]
27
pub struct RasterScalingParams {
28
    slope: PropertiesKeyOrValue,
29
    offset: PropertiesKeyOrValue,
30
    output_measurement: Option<Measurement>,
31
    scaling_mode: ScalingMode,
32
}
33

34
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
×
35
#[serde(rename_all = "camelCase")]
36
pub enum ScalingMode {
37
    Scale,
38
    Unscale,
39
}
40

41
#[derive(Debug, Serialize, Deserialize, Clone)]
4✔
42
#[serde(rename_all = "camelCase", tag = "type")]
43
enum PropertiesKeyOrValue {
44
    MetadataKey(RasterPropertiesKey),
45
    Constant { value: f64 },
46
}
47

48
/// The raster scaling operator scales/unscales the values of a raster by a given scale factor and offset.
49
/// This is done by applying the following formulas to every pixel.
50
/// For unscaling the formula is:
51
///
52
/// `p_new = p_old * slope + offset`
53
///
54
/// For scaling the formula is:
55
///
56
/// `p_new = (p_old - offset) / slope`
57
///
58
/// `p_old` and `p_new` refer to the old and new pixel values,
59
/// The slope and offset values are either properties attached to the input raster or a fixed value.
60
///
61
/// An example for Meteosat Second Generation properties is:
62
///
63
/// - offset: `msg.calibration_offset`
64
/// - slope: `msg.calibration_slope`
65
pub type RasterScaling = Operator<RasterScalingParams, SingleRasterSource>;
66

67
impl OperatorName for RasterScaling {
68
    const TYPE_NAME: &'static str = "RasterScaling";
69
}
70

71
pub struct InitializedRasterScalingOperator {
72
    slope: PropertiesKeyOrValue,
73
    offset: PropertiesKeyOrValue,
74
    result_descriptor: RasterResultDescriptor,
75
    source: Box<dyn InitializedRasterOperator>,
76
    scaling_mode: ScalingMode,
77
}
78

79
#[typetag::serde]
×
80
#[async_trait]
81
impl RasterOperator for RasterScaling {
82
    async fn _initialize(
2✔
83
        self: Box<Self>,
2✔
84
        context: &dyn ExecutionContext,
2✔
85
    ) -> Result<Box<dyn InitializedRasterOperator>> {
2✔
86
        let input = self.sources.raster.initialize(context).await?;
2✔
87
        let in_desc = input.result_descriptor();
2✔
88

2✔
89
        let out_desc = RasterResultDescriptor {
2✔
90
            spatial_reference: in_desc.spatial_reference,
2✔
91
            data_type: in_desc.data_type,
2✔
92
            measurement: self
2✔
93
                .params
2✔
94
                .output_measurement
2✔
95
                .unwrap_or_else(|| in_desc.measurement.clone()),
2✔
96
            bbox: in_desc.bbox,
2✔
97
            time: in_desc.time,
2✔
98
            resolution: in_desc.resolution,
2✔
99
        };
2✔
100

2✔
101
        let initialized_operator = InitializedRasterScalingOperator {
2✔
102
            slope: self.params.slope,
2✔
103
            offset: self.params.offset,
2✔
104
            result_descriptor: out_desc,
2✔
105
            source: input,
2✔
106
            scaling_mode: self.params.scaling_mode,
2✔
107
        };
2✔
108

2✔
109
        Ok(initialized_operator.boxed())
2✔
110
    }
4✔
111

112
    span_fn!(RasterScaling);
×
113
}
114

115
impl InitializedRasterOperator for InitializedRasterScalingOperator {
116
    fn result_descriptor(&self) -> &RasterResultDescriptor {
2✔
117
        &self.result_descriptor
2✔
118
    }
2✔
119

120
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
121
        let slope = self.slope.clone();
2✔
122
        let offset = self.offset.clone();
2✔
123
        let source = self.source.query_processor()?;
2✔
124
        let scaling_mode = self.scaling_mode;
2✔
125

126
        let res = match scaling_mode {
2✔
127
            ScalingMode::Scale => {
128
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, ScaleTransformation>(slope, offset,  source_proc)) })
1✔
129
            }
130
            ScalingMode::Unscale => {
131
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, UnscaleTransformation>(slope, offset,  source_proc)) })
1✔
132
            }
133
        };
134

135
        Ok(res)
2✔
136
    }
2✔
137
}
138

139
struct RasterTransformationProcessor<Q, P, S>
140
where
141
    Q: RasterQueryProcessor<RasterType = P>,
142
{
143
    source: Q,
144
    slope: PropertiesKeyOrValue,
145
    offset: PropertiesKeyOrValue,
146
    _transformation: PhantomData<S>,
147
}
148

149
fn create_boxed_processor<Q, P, S>(
2✔
150
    slope: PropertiesKeyOrValue,
2✔
151
    offset: PropertiesKeyOrValue,
2✔
152
    source: Q,
2✔
153
) -> Box<dyn RasterQueryProcessor<RasterType = P>>
2✔
154
where
2✔
155
    Q: RasterQueryProcessor<RasterType = P> + 'static,
2✔
156
    P: Pixel + FromPrimitive + 'static + Default,
2✔
157
    f64: AsPrimitive<P>,
2✔
158
    S: Send + Sync + 'static + ScalingTransformation<P>,
2✔
159
{
2✔
160
    RasterTransformationProcessor::<Q, P, S>::create(slope, offset, source).boxed()
2✔
161
}
2✔
162

163
impl<Q, P, S> RasterTransformationProcessor<Q, P, S>
164
where
165
    Q: RasterQueryProcessor<RasterType = P> + 'static,
166
    P: Pixel + FromPrimitive + 'static + Default,
167
    f64: AsPrimitive<P>,
168
    S: Send + Sync + 'static + ScalingTransformation<P>,
169
{
170
    pub fn create(
2✔
171
        slope: PropertiesKeyOrValue,
2✔
172
        offset: PropertiesKeyOrValue,
2✔
173
        source: Q,
2✔
174
    ) -> RasterTransformationProcessor<Q, P, S> {
2✔
175
        RasterTransformationProcessor {
2✔
176
            source,
2✔
177
            slope,
2✔
178
            offset,
2✔
179
            _transformation: PhantomData,
2✔
180
        }
2✔
181
    }
2✔
182

183
    async fn scale_tile_async(
2✔
184
        &self,
2✔
185
        tile: RasterTile2D<P>,
2✔
186
        _pool: Arc<ThreadPool>,
2✔
187
    ) -> Result<RasterTile2D<P>> {
2✔
188
        let offset = Self::prop_value(&self.offset, &tile.properties)?;
2✔
189
        let slope = Self::prop_value(&self.slope, &tile.properties)?;
2✔
190

191
        let res_tile =
2✔
192
            crate::util::spawn_blocking(move || tile.transform_elements::<S>(slope, offset))
2✔
193
                .await?;
2✔
194

195
        Ok(res_tile)
2✔
196
    }
2✔
197

198
    fn prop_value(prop_key_or_value: &PropertiesKeyOrValue, props: &RasterProperties) -> Result<P> {
4✔
199
        let value = match prop_key_or_value {
4✔
200
            PropertiesKeyOrValue::MetadataKey(key) => props.number_property::<P>(key)?,
4✔
201
            PropertiesKeyOrValue::Constant { value } => value.as_(),
×
202
        };
203
        Ok(value)
4✔
204
    }
4✔
205
}
206

207
#[async_trait]
208
impl<Q, P, S> RasterQueryProcessor for RasterTransformationProcessor<Q, P, S>
209
where
210
    P: Pixel + FromPrimitive + 'static + Default,
211
    f64: AsPrimitive<P>,
212
    Q: RasterQueryProcessor<RasterType = P> + 'static,
213
    S: Send + Sync + 'static + ScalingTransformation<P>,
214
{
215
    type RasterType = P;
216

217
    async fn raster_query<'a>(
2✔
218
        &'a self,
2✔
219
        query: geoengine_datatypes::primitives::RasterQueryRectangle,
2✔
220
        ctx: &'a dyn crate::engine::QueryContext,
2✔
221
    ) -> Result<
2✔
222
        futures::stream::BoxStream<
2✔
223
            'a,
2✔
224
            Result<geoengine_datatypes::raster::RasterTile2D<Self::RasterType>>,
2✔
225
        >,
2✔
226
    > {
2✔
227
        let src = self.source.raster_query(query, ctx).await?;
2✔
228
        let rs = src.and_then(move |tile| self.scale_tile_async(tile, ctx.thread_pool().clone()));
2✔
229
        Ok(rs.boxed())
2✔
230
    }
4✔
231
}
232

233
#[cfg(test)]
234
mod tests {
235

236
    use geoengine_datatypes::{
237
        primitives::{SpatialPartition2D, SpatialResolution, TimeInterval},
238
        raster::{
239
            Grid2D, GridOrEmpty2D, GridShape, MaskedGrid2D, RasterDataType, TileInformation,
240
            TilingSpecification,
241
        },
242
        spatial_reference::SpatialReference,
243
        util::test::TestDefault,
244
    };
245

246
    use crate::{
247
        engine::{ChunkByteSize, MockExecutionContext},
248
        mock::{MockRasterSource, MockRasterSourceParams},
249
    };
250

251
    use super::*;
252

253
    #[tokio::test]
1✔
254
    async fn test_unscale() {
1✔
255
        let grid_shape = [2, 2].into();
1✔
256

1✔
257
        let tiling_specification = TilingSpecification {
1✔
258
            origin_coordinate: [0.0, 0.0].into(),
1✔
259
            tile_size_in_pixels: grid_shape,
1✔
260
        };
1✔
261

1✔
262
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![7_u8, 7, 7, 6]).unwrap());
1✔
263

1✔
264
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
265
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
266

1✔
267
        let mut raster_props = RasterProperties::default();
1✔
268
        raster_props.set_scale(2.0);
1✔
269
        raster_props.set_offset(1.0);
1✔
270

1✔
271
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
272
            TimeInterval::default(),
1✔
273
            TileInformation {
1✔
274
                global_geo_transform: TestDefault::test_default(),
1✔
275
                global_tile_position: [0, 0].into(),
1✔
276
                tile_size_in_pixels: grid_shape,
1✔
277
            },
1✔
278
            raster.into(),
1✔
279
            raster_props,
1✔
280
        );
1✔
281

1✔
282
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
283

1✔
284
        let mrs = MockRasterSource {
1✔
285
            params: MockRasterSourceParams {
1✔
286
                data: vec![raster_tile],
1✔
287
                result_descriptor: RasterResultDescriptor {
1✔
288
                    data_type: RasterDataType::U8,
1✔
289
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
290
                    measurement: Measurement::Unitless,
1✔
291
                    bbox: None,
1✔
292
                    time: None,
1✔
293
                    resolution: Some(spatial_resolution),
1✔
294
                },
1✔
295
            },
1✔
296
        }
1✔
297
        .boxed();
1✔
298

1✔
299
        let slope = PropertiesKeyOrValue::MetadataKey(RasterPropertiesKey {
1✔
300
            domain: None,
1✔
301
            key: "scale".to_string(),
1✔
302
        });
1✔
303
        let offset = PropertiesKeyOrValue::MetadataKey(RasterPropertiesKey {
1✔
304
            domain: None,
1✔
305
            key: "offset".to_string(),
1✔
306
        });
1✔
307

1✔
308
        let scaling_mode = ScalingMode::Unscale;
1✔
309

1✔
310
        let output_measurement = None;
1✔
311

1✔
312
        let op = RasterScaling {
1✔
313
            params: RasterScalingParams {
1✔
314
                slope,
1✔
315
                offset,
1✔
316
                output_measurement,
1✔
317
                scaling_mode,
1✔
318
            },
1✔
319
            sources: SingleRasterSource { raster: mrs },
1✔
320
        }
1✔
321
        .boxed();
1✔
322

323
        let initialized_op = op.initialize(&ctx).await.unwrap();
1✔
324

1✔
325
        let result_descriptor = initialized_op.result_descriptor();
1✔
326

1✔
327
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
328
        assert_eq!(result_descriptor.measurement, Measurement::Unitless);
1✔
329

330
        let query_processor = initialized_op.query_processor().unwrap();
1✔
331

1✔
332
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
333
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
334
            spatial_resolution: SpatialResolution::one(),
1✔
335
            time_interval: TimeInterval::default(),
1✔
336
        };
1✔
337

338
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
339
            panic!("expected TypedRasterQueryProcessor::U8");
×
340
        };
341

342
        let stream = typed_processor
1✔
343
            .raster_query(query, &query_ctx)
1✔
344
            .await
×
345
            .unwrap();
1✔
346

347
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
348

349
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
350

1✔
351
        let result_grid = result_tile.grid_array.clone();
1✔
352

1✔
353
        match result_grid {
1✔
354
            GridOrEmpty2D::Grid(grid) => {
1✔
355
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
356

357
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
358

1✔
359
                let expected = vec![Some(15), Some(15), Some(15), Some(13)];
1✔
360

1✔
361
                assert_eq!(res, expected);
1✔
362
            }
363
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
×
364
        }
365
    }
366

367
    #[tokio::test]
1✔
368
    async fn test_scale() {
1✔
369
        let grid_shape = [2, 2].into();
1✔
370

1✔
371
        let tiling_specification = TilingSpecification {
1✔
372
            origin_coordinate: [0.0, 0.0].into(),
1✔
373
            tile_size_in_pixels: grid_shape,
1✔
374
        };
1✔
375

1✔
376
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![15_u8, 15, 15, 13]).unwrap());
1✔
377

1✔
378
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
379
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
380

1✔
381
        let mut raster_props = RasterProperties::default();
1✔
382
        raster_props.set_scale(2.0);
1✔
383
        raster_props.set_offset(1.0);
1✔
384

1✔
385
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
386
            TimeInterval::default(),
1✔
387
            TileInformation {
1✔
388
                global_geo_transform: TestDefault::test_default(),
1✔
389
                global_tile_position: [0, 0].into(),
1✔
390
                tile_size_in_pixels: grid_shape,
1✔
391
            },
1✔
392
            raster.into(),
1✔
393
            raster_props,
1✔
394
        );
1✔
395

1✔
396
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
397

1✔
398
        let mrs = MockRasterSource {
1✔
399
            params: MockRasterSourceParams {
1✔
400
                data: vec![raster_tile],
1✔
401
                result_descriptor: RasterResultDescriptor {
1✔
402
                    data_type: RasterDataType::U8,
1✔
403
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
404
                    measurement: Measurement::Unitless,
1✔
405
                    bbox: None,
1✔
406
                    time: None,
1✔
407
                    resolution: Some(spatial_resolution),
1✔
408
                },
1✔
409
            },
1✔
410
        }
1✔
411
        .boxed();
1✔
412

1✔
413
        let slope = PropertiesKeyOrValue::MetadataKey(RasterPropertiesKey {
1✔
414
            domain: None,
1✔
415
            key: "scale".to_string(),
1✔
416
        });
1✔
417
        let offset = PropertiesKeyOrValue::MetadataKey(RasterPropertiesKey {
1✔
418
            domain: None,
1✔
419
            key: "offset".to_string(),
1✔
420
        });
1✔
421

1✔
422
        let scaling_mode = ScalingMode::Scale;
1✔
423

1✔
424
        let output_measurement = None;
1✔
425

1✔
426
        let params = RasterScalingParams {
1✔
427
            slope,
1✔
428
            offset,
1✔
429
            output_measurement,
1✔
430
            scaling_mode,
1✔
431
        };
1✔
432

1✔
433
        let op = RasterScaling {
1✔
434
            params,
1✔
435
            sources: SingleRasterSource { raster: mrs },
1✔
436
        }
1✔
437
        .boxed();
1✔
438

439
        let initialized_op = op.initialize(&ctx).await.unwrap();
1✔
440

1✔
441
        let result_descriptor = initialized_op.result_descriptor();
1✔
442

1✔
443
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
444
        assert_eq!(result_descriptor.measurement, Measurement::Unitless);
1✔
445

446
        let query_processor = initialized_op.query_processor().unwrap();
1✔
447

1✔
448
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
449
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
450
            spatial_resolution: SpatialResolution::one(),
1✔
451
            time_interval: TimeInterval::default(),
1✔
452
        };
1✔
453

454
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
455
            panic!("expected TypedRasterQueryProcessor::U8");
×
456
        };
457

458
        let stream = typed_processor
1✔
459
            .raster_query(query, &query_ctx)
1✔
460
            .await
×
461
            .unwrap();
1✔
462

463
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
464

465
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
466

1✔
467
        let result_grid = result_tile.grid_array.clone();
1✔
468

1✔
469
        match result_grid {
1✔
470
            GridOrEmpty2D::Grid(grid) => {
1✔
471
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
472

473
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
474

1✔
475
                let expected = vec![Some(7), Some(7), Some(7), Some(6)];
1✔
476

1✔
477
                assert_eq!(res, expected);
1✔
478
            }
479
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
×
480
        }
481
    }
482
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc