• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5006008836

pending completion
5006008836

push

github

GitHub
Merge #785 #787

936 of 936 new or added lines in 50 files covered. (100.0%)

96010 of 107707 relevant lines covered (89.14%)

72676.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.49
/operators/src/processing/raster_scaling.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
3
    OperatorName, RasterOperator, RasterQueryProcessor, RasterResultDescriptor, SingleRasterSource,
4
    TypedRasterQueryProcessor, WorkflowOperatorPath,
5
};
6
use crate::util::Result;
7
use async_trait::async_trait;
8
use futures::{StreamExt, TryStreamExt};
9

10
use geoengine_datatypes::raster::{
11
    CheckedMulThenAddTransformation, CheckedSubThenDivTransformation, ElementScaling,
12
    ScalingTransformation,
13
};
14
use geoengine_datatypes::{
15
    primitives::Measurement,
16
    raster::{Pixel, RasterPropertiesKey, RasterTile2D},
17
};
18
use num::FromPrimitive;
19
use num_traits::AsPrimitive;
20
use rayon::ThreadPool;
21
use serde::{Deserialize, Serialize};
22
use std::marker::PhantomData;
23
use std::sync::Arc;
24

25
#[derive(Debug, Serialize, Deserialize, Clone)]
2✔
26
#[serde(rename_all = "camelCase")]
27
pub struct RasterScalingParams {
28
    slope: SlopeOffsetSelection,
29
    offset: SlopeOffsetSelection,
30
    output_measurement: Option<Measurement>,
31
    scaling_mode: ScalingMode,
32
}
33

34
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
2✔
35
#[serde(rename_all = "camelCase")]
36
pub enum ScalingMode {
37
    MulSlopeAddOffset,
38
    SubOffsetDivSlope,
39
}
40

41
#[derive(Debug, Serialize, Deserialize, Clone)]
4✔
42
#[serde(rename_all = "camelCase", tag = "type")]
43
enum SlopeOffsetSelection {
44
    Auto,
45
    MetadataKey(RasterPropertiesKey),
46
    Constant { value: f64 },
47
}
48

49
impl Default for SlopeOffsetSelection {
50
    fn default() -> Self {
×
51
        Self::Auto
×
52
    }
×
53
}
54

55
/// The raster scaling operator scales/unscales the values of a raster by a given scale factor and offset.
56
/// This is done by applying the following formulas to every pixel.
57
/// For unscaling the formula is:
58
///
59
/// `p_new = p_old * slope + offset`
60
///
61
/// For scaling the formula is:
62
///
63
/// `p_new = (p_old - offset) / slope`
64
///
65
/// `p_old` and `p_new` refer to the old and new pixel values,
66
/// The slope and offset values are either properties attached to the input raster or a fixed value.
67
///
68
/// An example for Meteosat Second Generation properties is:
69
///
70
/// - offset: `msg.calibration_offset`
71
/// - slope: `msg.calibration_slope`
72
pub type RasterScaling = Operator<RasterScalingParams, SingleRasterSource>;
73

74
impl OperatorName for RasterScaling {
75
    const TYPE_NAME: &'static str = "RasterScaling";
76
}
77

78
pub struct InitializedRasterScalingOperator {
79
    name: CanonicOperatorName,
80
    slope: SlopeOffsetSelection,
81
    offset: SlopeOffsetSelection,
82
    result_descriptor: RasterResultDescriptor,
83
    source: Box<dyn InitializedRasterOperator>,
84
    scaling_mode: ScalingMode,
85
}
86

87
#[typetag::serde]
×
88
#[async_trait]
89
impl RasterOperator for RasterScaling {
90
    async fn _initialize(
2✔
91
        self: Box<Self>,
2✔
92
        path: WorkflowOperatorPath,
2✔
93
        context: &dyn ExecutionContext,
2✔
94
    ) -> Result<Box<dyn InitializedRasterOperator>> {
2✔
95
        let name = CanonicOperatorName::from(&self);
2✔
96

97
        let input = self.sources.initialize_sources(path, context).await?;
2✔
98
        let in_desc = input.raster.result_descriptor();
2✔
99

2✔
100
        let out_desc = RasterResultDescriptor {
2✔
101
            spatial_reference: in_desc.spatial_reference,
2✔
102
            data_type: in_desc.data_type,
2✔
103
            measurement: self
2✔
104
                .params
2✔
105
                .output_measurement
2✔
106
                .unwrap_or_else(|| in_desc.measurement.clone()),
2✔
107
            bbox: in_desc.bbox,
2✔
108
            time: in_desc.time,
2✔
109
            resolution: in_desc.resolution,
2✔
110
        };
2✔
111

2✔
112
        let initialized_operator = InitializedRasterScalingOperator {
2✔
113
            name,
2✔
114
            slope: self.params.slope,
2✔
115
            offset: self.params.offset,
2✔
116
            result_descriptor: out_desc,
2✔
117
            source: input.raster,
2✔
118
            scaling_mode: self.params.scaling_mode,
2✔
119
        };
2✔
120

2✔
121
        Ok(initialized_operator.boxed())
2✔
122
    }
4✔
123

124
    span_fn!(RasterScaling);
×
125
}
126

127
impl InitializedRasterOperator for InitializedRasterScalingOperator {
128
    fn result_descriptor(&self) -> &RasterResultDescriptor {
2✔
129
        &self.result_descriptor
2✔
130
    }
2✔
131

132
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
133
        let slope = self.slope.clone();
2✔
134
        let offset = self.offset.clone();
2✔
135
        let source = self.source.query_processor()?;
2✔
136
        let scaling_mode = self.scaling_mode;
2✔
137

138
        let res = match scaling_mode {
2✔
139
            ScalingMode::SubOffsetDivSlope => {
140
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, CheckedSubThenDivTransformation>(slope, offset,  source_proc)) })
1✔
141
            }
142
            ScalingMode::MulSlopeAddOffset => {
143
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, CheckedMulThenAddTransformation>(slope, offset,  source_proc)) })
1✔
144
            }
145
        };
146

147
        Ok(res)
2✔
148
    }
2✔
149

150
    fn canonic_name(&self) -> CanonicOperatorName {
×
151
        self.name.clone()
×
152
    }
×
153
}
154

155
struct RasterTransformationProcessor<Q, P, S>
156
where
157
    Q: RasterQueryProcessor<RasterType = P>,
158
{
159
    source: Q,
160
    slope: SlopeOffsetSelection,
161
    offset: SlopeOffsetSelection,
162
    _transformation: PhantomData<S>,
163
}
164

165
fn create_boxed_processor<Q, P, S>(
2✔
166
    slope: SlopeOffsetSelection,
2✔
167
    offset: SlopeOffsetSelection,
2✔
168
    source: Q,
2✔
169
) -> Box<dyn RasterQueryProcessor<RasterType = P>>
2✔
170
where
2✔
171
    Q: RasterQueryProcessor<RasterType = P> + 'static,
2✔
172
    P: Pixel + FromPrimitive + 'static + Default,
2✔
173
    f64: AsPrimitive<P>,
2✔
174
    S: Send + Sync + 'static + ScalingTransformation<P>,
2✔
175
{
2✔
176
    RasterTransformationProcessor::<Q, P, S>::create(slope, offset, source).boxed()
2✔
177
}
2✔
178

179
impl<Q, P, S> RasterTransformationProcessor<Q, P, S>
180
where
181
    Q: RasterQueryProcessor<RasterType = P> + 'static,
182
    P: Pixel + FromPrimitive + 'static + Default,
183
    f64: AsPrimitive<P>,
184
    S: Send + Sync + 'static + ScalingTransformation<P>,
185
{
186
    pub fn create(
2✔
187
        slope: SlopeOffsetSelection,
2✔
188
        offset: SlopeOffsetSelection,
2✔
189
        source: Q,
2✔
190
    ) -> RasterTransformationProcessor<Q, P, S> {
2✔
191
        RasterTransformationProcessor {
2✔
192
            source,
2✔
193
            slope,
2✔
194
            offset,
2✔
195
            _transformation: PhantomData,
2✔
196
        }
2✔
197
    }
2✔
198

199
    async fn scale_tile_async(
2✔
200
        &self,
2✔
201
        tile: RasterTile2D<P>,
2✔
202
        _pool: Arc<ThreadPool>,
2✔
203
    ) -> Result<RasterTile2D<P>> {
2✔
204
        // either use the provided metadata/constant or the default values from the properties
205
        let offset = match &self.offset {
2✔
206
            SlopeOffsetSelection::MetadataKey(key) => tile.properties.number_property::<P>(key)?,
×
207
            SlopeOffsetSelection::Constant { value } => value.as_(),
×
208
            SlopeOffsetSelection::Auto => tile.properties.offset().as_(),
2✔
209
        };
210

211
        let slope = match &self.slope {
2✔
212
            SlopeOffsetSelection::MetadataKey(key) => tile.properties.number_property::<P>(key)?,
×
213
            SlopeOffsetSelection::Constant { value } => value.as_(),
×
214
            SlopeOffsetSelection::Auto => tile.properties.scale().as_(),
2✔
215
        };
216

217
        let res_tile =
2✔
218
            crate::util::spawn_blocking(move || tile.transform_elements::<S>(slope, offset))
2✔
219
                .await?;
2✔
220

221
        Ok(res_tile)
2✔
222
    }
2✔
223
}
224

225
#[async_trait]
226
impl<Q, P, S> RasterQueryProcessor for RasterTransformationProcessor<Q, P, S>
227
where
228
    P: Pixel + FromPrimitive + 'static + Default,
229
    f64: AsPrimitive<P>,
230
    Q: RasterQueryProcessor<RasterType = P> + 'static,
231
    S: Send + Sync + 'static + ScalingTransformation<P>,
232
{
233
    type RasterType = P;
234

235
    async fn raster_query<'a>(
2✔
236
        &'a self,
2✔
237
        query: geoengine_datatypes::primitives::RasterQueryRectangle,
2✔
238
        ctx: &'a dyn crate::engine::QueryContext,
2✔
239
    ) -> Result<
2✔
240
        futures::stream::BoxStream<
2✔
241
            'a,
2✔
242
            Result<geoengine_datatypes::raster::RasterTile2D<Self::RasterType>>,
2✔
243
        >,
2✔
244
    > {
2✔
245
        let src = self.source.raster_query(query, ctx).await?;
2✔
246
        let rs = src.and_then(move |tile| self.scale_tile_async(tile, ctx.thread_pool().clone()));
2✔
247
        Ok(rs.boxed())
2✔
248
    }
4✔
249
}
250

251
#[cfg(test)]
252
mod tests {
253

254
    use geoengine_datatypes::{
255
        primitives::{SpatialPartition2D, SpatialResolution, TimeInterval},
256
        raster::{
257
            Grid2D, GridOrEmpty2D, GridShape, MaskedGrid2D, RasterDataType, RasterProperties,
258
            TileInformation, TilingSpecification,
259
        },
260
        spatial_reference::SpatialReference,
261
        util::test::TestDefault,
262
    };
263

264
    use crate::{
265
        engine::{ChunkByteSize, MockExecutionContext},
266
        mock::{MockRasterSource, MockRasterSourceParams},
267
    };
268

269
    use super::*;
270

271
    #[tokio::test]
1✔
272
    async fn test_unscale() {
1✔
273
        let grid_shape = [2, 2].into();
1✔
274

1✔
275
        let tiling_specification = TilingSpecification {
1✔
276
            origin_coordinate: [0.0, 0.0].into(),
1✔
277
            tile_size_in_pixels: grid_shape,
1✔
278
        };
1✔
279

1✔
280
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![7_u8, 7, 7, 6]).unwrap());
1✔
281

1✔
282
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
283
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
284

1✔
285
        let mut raster_props = RasterProperties::default();
1✔
286
        raster_props.set_scale(2.0);
1✔
287
        raster_props.set_offset(1.0);
1✔
288

1✔
289
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
290
            TimeInterval::default(),
1✔
291
            TileInformation {
1✔
292
                global_geo_transform: TestDefault::test_default(),
1✔
293
                global_tile_position: [0, 0].into(),
1✔
294
                tile_size_in_pixels: grid_shape,
1✔
295
            },
1✔
296
            raster.into(),
1✔
297
            raster_props,
1✔
298
        );
1✔
299

1✔
300
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
301

1✔
302
        let mrs = MockRasterSource {
1✔
303
            params: MockRasterSourceParams {
1✔
304
                data: vec![raster_tile],
1✔
305
                result_descriptor: RasterResultDescriptor {
1✔
306
                    data_type: RasterDataType::U8,
1✔
307
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
308
                    measurement: Measurement::Unitless,
1✔
309
                    bbox: None,
1✔
310
                    time: None,
1✔
311
                    resolution: Some(spatial_resolution),
1✔
312
                },
1✔
313
            },
1✔
314
        }
1✔
315
        .boxed();
1✔
316

1✔
317
        let scaling_mode = ScalingMode::MulSlopeAddOffset;
1✔
318

1✔
319
        let output_measurement = None;
1✔
320

1✔
321
        let op = RasterScaling {
1✔
322
            params: RasterScalingParams {
1✔
323
                slope: SlopeOffsetSelection::Auto,
1✔
324
                offset: SlopeOffsetSelection::Auto,
1✔
325
                output_measurement,
1✔
326
                scaling_mode,
1✔
327
            },
1✔
328
            sources: SingleRasterSource { raster: mrs },
1✔
329
        }
1✔
330
        .boxed();
1✔
331

332
        let initialized_op = op
1✔
333
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
334
            .await
×
335
            .unwrap();
1✔
336

1✔
337
        let result_descriptor = initialized_op.result_descriptor();
1✔
338

1✔
339
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
340
        assert_eq!(result_descriptor.measurement, Measurement::Unitless);
1✔
341

342
        let query_processor = initialized_op.query_processor().unwrap();
1✔
343

1✔
344
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
345
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
346
            spatial_resolution: SpatialResolution::one(),
1✔
347
            time_interval: TimeInterval::default(),
1✔
348
        };
1✔
349

350
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
351
            panic!("expected TypedRasterQueryProcessor::U8");
×
352
        };
353

354
        let stream = typed_processor
1✔
355
            .raster_query(query, &query_ctx)
1✔
356
            .await
×
357
            .unwrap();
1✔
358

359
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
360

361
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
362

1✔
363
        let result_grid = result_tile.grid_array.clone();
1✔
364

1✔
365
        match result_grid {
1✔
366
            GridOrEmpty2D::Grid(grid) => {
1✔
367
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
368

369
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
370

1✔
371
                let expected = vec![Some(15), Some(15), Some(15), Some(13)];
1✔
372

1✔
373
                assert_eq!(res, expected);
1✔
374
            }
375
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
×
376
        }
377
    }
378

379
    #[tokio::test]
1✔
380
    async fn test_scale() {
1✔
381
        let grid_shape = [2, 2].into();
1✔
382

1✔
383
        let tiling_specification = TilingSpecification {
1✔
384
            origin_coordinate: [0.0, 0.0].into(),
1✔
385
            tile_size_in_pixels: grid_shape,
1✔
386
        };
1✔
387

1✔
388
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![15_u8, 15, 15, 13]).unwrap());
1✔
389

1✔
390
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
391
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
392

1✔
393
        let mut raster_props = RasterProperties::default();
1✔
394
        raster_props.set_scale(2.0);
1✔
395
        raster_props.set_offset(1.0);
1✔
396

1✔
397
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
398
            TimeInterval::default(),
1✔
399
            TileInformation {
1✔
400
                global_geo_transform: TestDefault::test_default(),
1✔
401
                global_tile_position: [0, 0].into(),
1✔
402
                tile_size_in_pixels: grid_shape,
1✔
403
            },
1✔
404
            raster.into(),
1✔
405
            raster_props,
1✔
406
        );
1✔
407

1✔
408
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
409

1✔
410
        let mrs = MockRasterSource {
1✔
411
            params: MockRasterSourceParams {
1✔
412
                data: vec![raster_tile],
1✔
413
                result_descriptor: RasterResultDescriptor {
1✔
414
                    data_type: RasterDataType::U8,
1✔
415
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
416
                    measurement: Measurement::Unitless,
1✔
417
                    bbox: None,
1✔
418
                    time: None,
1✔
419
                    resolution: Some(spatial_resolution),
1✔
420
                },
1✔
421
            },
1✔
422
        }
1✔
423
        .boxed();
1✔
424

1✔
425
        let scaling_mode = ScalingMode::SubOffsetDivSlope;
1✔
426

1✔
427
        let output_measurement = None;
1✔
428

1✔
429
        let params = RasterScalingParams {
1✔
430
            slope: SlopeOffsetSelection::Auto,
1✔
431
            offset: SlopeOffsetSelection::Auto,
1✔
432
            output_measurement,
1✔
433
            scaling_mode,
1✔
434
        };
1✔
435

1✔
436
        let op = RasterScaling {
1✔
437
            params,
1✔
438
            sources: SingleRasterSource { raster: mrs },
1✔
439
        }
1✔
440
        .boxed();
1✔
441

442
        let initialized_op = op
1✔
443
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
444
            .await
×
445
            .unwrap();
1✔
446

1✔
447
        let result_descriptor = initialized_op.result_descriptor();
1✔
448

1✔
449
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
450
        assert_eq!(result_descriptor.measurement, Measurement::Unitless);
1✔
451

452
        let query_processor = initialized_op.query_processor().unwrap();
1✔
453

1✔
454
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
455
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
456
            spatial_resolution: SpatialResolution::one(),
1✔
457
            time_interval: TimeInterval::default(),
1✔
458
        };
1✔
459

460
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
461
            panic!("expected TypedRasterQueryProcessor::U8");
×
462
        };
463

464
        let stream = typed_processor
1✔
465
            .raster_query(query, &query_ctx)
1✔
466
            .await
×
467
            .unwrap();
1✔
468

469
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
470

471
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
472

1✔
473
        let result_grid = result_tile.grid_array.clone();
1✔
474

1✔
475
        match result_grid {
1✔
476
            GridOrEmpty2D::Grid(grid) => {
1✔
477
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
478

479
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
480

1✔
481
                let expected = vec![Some(7), Some(7), Some(7), Some(6)];
1✔
482

1✔
483
                assert_eq!(res, expected);
1✔
484
            }
485
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
×
486
        }
487
    }
488
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc