• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 10178074589

31 Jul 2024 09:34AM UTC coverage: 91.068% (+0.4%) from 90.682%
10178074589

push

github

web-flow
Merge pull request #973 from geo-engine/remove-XGB-update-toolchain

Remove-XGB-update-toolchain

81 of 88 new or added lines in 29 files covered. (92.05%)

456 existing lines in 119 files now uncovered.

131088 of 143945 relevant lines covered (91.07%)

53581.03 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.93
/operators/src/processing/raster_scaling.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
3
    OperatorName, RasterBandDescriptor, RasterOperator, RasterQueryProcessor,
4
    RasterResultDescriptor, SingleRasterSource, TypedRasterQueryProcessor, WorkflowOperatorPath,
5
};
6
use crate::util::Result;
7
use async_trait::async_trait;
8
use futures::{StreamExt, TryStreamExt};
9

10
use geoengine_datatypes::raster::{
11
    CheckedMulThenAddTransformation, CheckedSubThenDivTransformation, ElementScaling,
12
    ScalingTransformation,
13
};
14
use geoengine_datatypes::{
15
    primitives::Measurement,
16
    raster::{Pixel, RasterPropertiesKey, RasterTile2D},
17
};
18
use num::FromPrimitive;
19
use num_traits::AsPrimitive;
20
use rayon::ThreadPool;
21
use serde::{Deserialize, Serialize};
22
use std::marker::PhantomData;
23
use std::sync::Arc;
24

UNCOV
25
#[derive(Debug, Serialize, Deserialize, Clone)]
×
26
#[serde(rename_all = "camelCase")]
27
pub struct RasterScalingParams {
28
    slope: SlopeOffsetSelection,
29
    offset: SlopeOffsetSelection,
30
    output_measurement: Option<Measurement>,
31
    scaling_mode: ScalingMode,
32
}
33

UNCOV
34
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
×
35
#[serde(rename_all = "camelCase")]
36
pub enum ScalingMode {
37
    MulSlopeAddOffset,
38
    SubOffsetDivSlope,
39
}
40

UNCOV
41
#[derive(Debug, Serialize, Deserialize, Clone)]
×
42
#[serde(rename_all = "camelCase", tag = "type")]
43
enum SlopeOffsetSelection {
44
    Auto,
45
    MetadataKey(RasterPropertiesKey),
46
    Constant { value: f64 },
47
}
48

49
impl Default for SlopeOffsetSelection {
50
    fn default() -> Self {
×
51
        Self::Auto
×
52
    }
×
53
}
54

55
/// The raster scaling operator scales/unscales the values of a raster by a given scale factor and offset.
56
/// This is done by applying the following formulas to every pixel.
57
/// For unscaling the formula is:
58
///
59
/// `p_new = p_old * slope + offset`
60
///
61
/// For scaling the formula is:
62
///
63
/// `p_new = (p_old - offset) / slope`
64
///
65
/// `p_old` and `p_new` refer to the old and new pixel values,
66
/// The slope and offset values are either properties attached to the input raster or a fixed value.
67
///
68
/// An example for Meteosat Second Generation properties is:
69
///
70
/// - offset: `msg.calibration_offset`
71
/// - slope: `msg.calibration_slope`
72
pub type RasterScaling = Operator<RasterScalingParams, SingleRasterSource>;
73

74
impl OperatorName for RasterScaling {
75
    const TYPE_NAME: &'static str = "RasterScaling";
76
}
77

78
pub struct InitializedRasterScalingOperator {
79
    name: CanonicOperatorName,
80
    slope: SlopeOffsetSelection,
81
    offset: SlopeOffsetSelection,
82
    result_descriptor: RasterResultDescriptor,
83
    source: Box<dyn InitializedRasterOperator>,
84
    scaling_mode: ScalingMode,
85
}
86

87
#[typetag::serde]
×
88
#[async_trait]
89
impl RasterOperator for RasterScaling {
90
    async fn _initialize(
91
        self: Box<Self>,
92
        path: WorkflowOperatorPath,
93
        context: &dyn ExecutionContext,
94
    ) -> Result<Box<dyn InitializedRasterOperator>> {
2✔
95
        let name = CanonicOperatorName::from(&self);
2✔
96

2✔
97
        let input = self.sources.initialize_sources(path, context).await?;
2✔
98
        let in_desc = input.raster.result_descriptor();
2✔
99

2✔
100
        let out_desc = RasterResultDescriptor {
2✔
101
            spatial_reference: in_desc.spatial_reference,
2✔
102
            data_type: in_desc.data_type,
2✔
103

2✔
104
            bbox: in_desc.bbox,
2✔
105
            time: in_desc.time,
2✔
106
            resolution: in_desc.resolution,
2✔
107
            bands: in_desc
2✔
108
                .bands
2✔
109
                .iter()
2✔
110
                .map(|b| {
2✔
111
                    RasterBandDescriptor::new(
2✔
112
                        b.name.clone(),
2✔
113
                        self.params
2✔
114
                            .output_measurement
2✔
115
                            .clone()
2✔
116
                            .unwrap_or_else(|| b.measurement.clone()),
2✔
117
                    )
2✔
118
                })
2✔
119
                .collect::<Vec<_>>()
2✔
120
                .try_into()?,
2✔
121
        };
2✔
122

2✔
123
        let initialized_operator = InitializedRasterScalingOperator {
2✔
124
            name,
2✔
125
            slope: self.params.slope,
2✔
126
            offset: self.params.offset,
2✔
127
            result_descriptor: out_desc,
2✔
128
            source: input.raster,
2✔
129
            scaling_mode: self.params.scaling_mode,
2✔
130
        };
2✔
131

2✔
132
        Ok(initialized_operator.boxed())
2✔
133
    }
2✔
134

135
    span_fn!(RasterScaling);
136
}
137

138
impl InitializedRasterOperator for InitializedRasterScalingOperator {
139
    fn result_descriptor(&self) -> &RasterResultDescriptor {
2✔
140
        &self.result_descriptor
2✔
141
    }
2✔
142

143
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
144
        let slope = self.slope.clone();
2✔
145
        let offset = self.offset.clone();
2✔
146
        let source = self.source.query_processor()?;
2✔
147
        let scaling_mode = self.scaling_mode;
2✔
148

149
        let res = match scaling_mode {
2✔
150
            ScalingMode::SubOffsetDivSlope => {
151
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, CheckedSubThenDivTransformation>(self.result_descriptor.clone(), slope, offset,  source_proc)) })
1✔
152
            }
153
            ScalingMode::MulSlopeAddOffset => {
154
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, CheckedMulThenAddTransformation>(self.result_descriptor.clone(), slope, offset,  source_proc)) })
1✔
155
            }
156
        };
157

158
        Ok(res)
2✔
159
    }
2✔
160

161
    fn canonic_name(&self) -> CanonicOperatorName {
×
162
        self.name.clone()
×
163
    }
×
164
}
165

166
struct RasterTransformationProcessor<Q, P, S>
167
where
168
    Q: RasterQueryProcessor<RasterType = P>,
169
{
170
    source: Q,
171
    result_descriptor: RasterResultDescriptor,
172
    slope: SlopeOffsetSelection,
173
    offset: SlopeOffsetSelection,
174
    _transformation: PhantomData<S>,
175
}
176

177
fn create_boxed_processor<Q, P, S>(
2✔
178
    result_descriptor: RasterResultDescriptor,
2✔
179
    slope: SlopeOffsetSelection,
2✔
180
    offset: SlopeOffsetSelection,
2✔
181
    source: Q,
2✔
182
) -> Box<dyn RasterQueryProcessor<RasterType = P>>
2✔
183
where
2✔
184
    Q: RasterQueryProcessor<RasterType = P> + 'static,
2✔
185
    P: Pixel + FromPrimitive + 'static + Default,
2✔
186
    f64: AsPrimitive<P>,
2✔
187
    S: Send + Sync + 'static + ScalingTransformation<P>,
2✔
188
{
2✔
189
    RasterTransformationProcessor::<Q, P, S>::create(result_descriptor, slope, offset, source)
2✔
190
        .boxed()
2✔
191
}
2✔
192

193
impl<Q, P, S> RasterTransformationProcessor<Q, P, S>
194
where
195
    Q: RasterQueryProcessor<RasterType = P> + 'static,
196
    P: Pixel + FromPrimitive + 'static + Default,
197
    f64: AsPrimitive<P>,
198
    S: Send + Sync + 'static + ScalingTransformation<P>,
199
{
200
    pub fn create(
2✔
201
        result_descriptor: RasterResultDescriptor,
2✔
202
        slope: SlopeOffsetSelection,
2✔
203
        offset: SlopeOffsetSelection,
2✔
204
        source: Q,
2✔
205
    ) -> RasterTransformationProcessor<Q, P, S> {
2✔
206
        RasterTransformationProcessor {
2✔
207
            source,
2✔
208
            result_descriptor,
2✔
209
            slope,
2✔
210
            offset,
2✔
211
            _transformation: PhantomData,
2✔
212
        }
2✔
213
    }
2✔
214

215
    async fn scale_tile_async(
2✔
216
        &self,
2✔
217
        tile: RasterTile2D<P>,
2✔
218
        _pool: Arc<ThreadPool>,
2✔
219
    ) -> Result<RasterTile2D<P>> {
2✔
220
        // either use the provided metadata/constant or the default values from the properties
221
        let offset = match &self.offset {
2✔
222
            SlopeOffsetSelection::MetadataKey(key) => tile.properties.number_property::<P>(key)?,
×
223
            SlopeOffsetSelection::Constant { value } => value.as_(),
×
224
            SlopeOffsetSelection::Auto => tile.properties.offset().as_(),
2✔
225
        };
226

227
        let slope = match &self.slope {
2✔
228
            SlopeOffsetSelection::MetadataKey(key) => tile.properties.number_property::<P>(key)?,
×
229
            SlopeOffsetSelection::Constant { value } => value.as_(),
×
230
            SlopeOffsetSelection::Auto => tile.properties.scale().as_(),
2✔
231
        };
232

233
        let res_tile =
2✔
234
            crate::util::spawn_blocking(move || tile.transform_elements::<S>(slope, offset))
2✔
235
                .await?;
2✔
236

237
        Ok(res_tile)
2✔
238
    }
2✔
239
}
240

241
#[async_trait]
242
impl<Q, P, S> RasterQueryProcessor for RasterTransformationProcessor<Q, P, S>
243
where
244
    P: Pixel + FromPrimitive + 'static + Default,
245
    f64: AsPrimitive<P>,
246
    Q: RasterQueryProcessor<RasterType = P> + 'static,
247
    S: Send + Sync + 'static + ScalingTransformation<P>,
248
{
249
    type RasterType = P;
250

251
    async fn raster_query<'a>(
252
        &'a self,
253
        query: geoengine_datatypes::primitives::RasterQueryRectangle,
254
        ctx: &'a dyn crate::engine::QueryContext,
255
    ) -> Result<
256
        futures::stream::BoxStream<
257
            'a,
258
            Result<geoengine_datatypes::raster::RasterTile2D<Self::RasterType>>,
259
        >,
260
    > {
2✔
261
        let src = self.source.raster_query(query, ctx).await?;
2✔
262
        let rs = src.and_then(move |tile| self.scale_tile_async(tile, ctx.thread_pool().clone()));
2✔
263
        Ok(rs.boxed())
2✔
264
    }
2✔
265

266
    fn raster_result_descriptor(&self) -> &RasterResultDescriptor {
2✔
267
        &self.result_descriptor
2✔
268
    }
2✔
269
}
270

271
#[cfg(test)]
272
mod tests {
273

274
    use geoengine_datatypes::{
275
        primitives::{
276
            BandSelection, CacheHint, SpatialPartition2D, SpatialResolution, TimeInterval,
277
        },
278
        raster::{
279
            Grid2D, GridOrEmpty2D, GridShape, MaskedGrid2D, RasterDataType, RasterProperties,
280
            TileInformation, TilingSpecification,
281
        },
282
        spatial_reference::SpatialReference,
283
        util::test::TestDefault,
284
    };
285

286
    use crate::{
287
        engine::{ChunkByteSize, MockExecutionContext, RasterBandDescriptors},
288
        mock::{MockRasterSource, MockRasterSourceParams},
289
    };
290

291
    use super::*;
292

293
    #[tokio::test]
294
    async fn test_unscale() {
1✔
295
        let grid_shape = [2, 2].into();
1✔
296

1✔
297
        let tiling_specification = TilingSpecification {
1✔
298
            origin_coordinate: [0.0, 0.0].into(),
1✔
299
            tile_size_in_pixels: grid_shape,
1✔
300
        };
1✔
301

1✔
302
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![7_u8, 7, 7, 6]).unwrap());
1✔
303

1✔
304
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
305
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
306

1✔
307
        let mut raster_props = RasterProperties::default();
1✔
308
        raster_props.set_scale(2.0);
1✔
309
        raster_props.set_offset(1.0);
1✔
310

1✔
311
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
312
            TimeInterval::default(),
1✔
313
            TileInformation {
1✔
314
                global_geo_transform: TestDefault::test_default(),
1✔
315
                global_tile_position: [0, 0].into(),
1✔
316
                tile_size_in_pixels: grid_shape,
1✔
317
            },
1✔
318
            0,
1✔
319
            raster.into(),
1✔
320
            raster_props,
1✔
321
            CacheHint::default(),
1✔
322
        );
1✔
323

1✔
324
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
325

1✔
326
        let mrs = MockRasterSource {
1✔
327
            params: MockRasterSourceParams {
1✔
328
                data: vec![raster_tile],
1✔
329
                result_descriptor: RasterResultDescriptor {
1✔
330
                    data_type: RasterDataType::U8,
1✔
331
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
332
                    bbox: None,
1✔
333
                    time: None,
1✔
334
                    resolution: Some(spatial_resolution),
1✔
335
                    bands: RasterBandDescriptors::new_single_band(),
1✔
336
                },
1✔
337
            },
1✔
338
        }
1✔
339
        .boxed();
1✔
340

1✔
341
        let scaling_mode = ScalingMode::MulSlopeAddOffset;
1✔
342

1✔
343
        let output_measurement = None;
1✔
344

1✔
345
        let op = RasterScaling {
1✔
346
            params: RasterScalingParams {
1✔
347
                slope: SlopeOffsetSelection::Auto,
1✔
348
                offset: SlopeOffsetSelection::Auto,
1✔
349
                output_measurement,
1✔
350
                scaling_mode,
1✔
351
            },
1✔
352
            sources: SingleRasterSource { raster: mrs },
1✔
353
        }
1✔
354
        .boxed();
1✔
355

1✔
356
        let initialized_op = op
1✔
357
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
358
            .await
1✔
359
            .unwrap();
1✔
360

1✔
361
        let result_descriptor = initialized_op.result_descriptor();
1✔
362

1✔
363
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
364
        assert_eq!(
1✔
365
            result_descriptor.bands[0].measurement,
1✔
366
            Measurement::Unitless
1✔
367
        );
1✔
368

1✔
369
        let query_processor = initialized_op.query_processor().unwrap();
1✔
370

1✔
371
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
372
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
373
            spatial_resolution: SpatialResolution::one(),
1✔
374
            time_interval: TimeInterval::default(),
1✔
375
            attributes: BandSelection::first(),
1✔
376
        };
1✔
377

1✔
378
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
379
            panic!("expected TypedRasterQueryProcessor::U8");
1✔
380
        };
1✔
381

1✔
382
        let stream = typed_processor
1✔
383
            .raster_query(query, &query_ctx)
1✔
384
            .await
1✔
385
            .unwrap();
1✔
386

1✔
387
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
388

1✔
389
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
390

1✔
391
        let result_grid = result_tile.grid_array.clone();
1✔
392

1✔
393
        match result_grid {
1✔
394
            GridOrEmpty2D::Grid(grid) => {
1✔
395
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
396

1✔
397
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
398

1✔
399
                let expected = vec![Some(15), Some(15), Some(15), Some(13)];
1✔
400

1✔
401
                assert_eq!(res, expected);
1✔
402
            }
1✔
403
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
1✔
404
        }
1✔
405
    }
1✔
406

407
    #[tokio::test]
408
    async fn test_scale() {
1✔
409
        let grid_shape = [2, 2].into();
1✔
410

1✔
411
        let tiling_specification = TilingSpecification {
1✔
412
            origin_coordinate: [0.0, 0.0].into(),
1✔
413
            tile_size_in_pixels: grid_shape,
1✔
414
        };
1✔
415

1✔
416
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![15_u8, 15, 15, 13]).unwrap());
1✔
417

1✔
418
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
419
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
420

1✔
421
        let mut raster_props = RasterProperties::default();
1✔
422
        raster_props.set_scale(2.0);
1✔
423
        raster_props.set_offset(1.0);
1✔
424

1✔
425
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
426
            TimeInterval::default(),
1✔
427
            TileInformation {
1✔
428
                global_geo_transform: TestDefault::test_default(),
1✔
429
                global_tile_position: [0, 0].into(),
1✔
430
                tile_size_in_pixels: grid_shape,
1✔
431
            },
1✔
432
            0,
1✔
433
            raster.into(),
1✔
434
            raster_props,
1✔
435
            CacheHint::default(),
1✔
436
        );
1✔
437

1✔
438
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
439

1✔
440
        let mrs = MockRasterSource {
1✔
441
            params: MockRasterSourceParams {
1✔
442
                data: vec![raster_tile],
1✔
443
                result_descriptor: RasterResultDescriptor {
1✔
444
                    data_type: RasterDataType::U8,
1✔
445
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
446
                    bbox: None,
1✔
447
                    time: None,
1✔
448
                    resolution: Some(spatial_resolution),
1✔
449
                    bands: RasterBandDescriptors::new_single_band(),
1✔
450
                },
1✔
451
            },
1✔
452
        }
1✔
453
        .boxed();
1✔
454

1✔
455
        let scaling_mode = ScalingMode::SubOffsetDivSlope;
1✔
456

1✔
457
        let output_measurement = None;
1✔
458

1✔
459
        let params = RasterScalingParams {
1✔
460
            slope: SlopeOffsetSelection::Auto,
1✔
461
            offset: SlopeOffsetSelection::Auto,
1✔
462
            output_measurement,
1✔
463
            scaling_mode,
1✔
464
        };
1✔
465

1✔
466
        let op = RasterScaling {
1✔
467
            params,
1✔
468
            sources: SingleRasterSource { raster: mrs },
1✔
469
        }
1✔
470
        .boxed();
1✔
471

1✔
472
        let initialized_op = op
1✔
473
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
474
            .await
1✔
475
            .unwrap();
1✔
476

1✔
477
        let result_descriptor = initialized_op.result_descriptor();
1✔
478

1✔
479
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
480
        assert_eq!(
1✔
481
            result_descriptor.bands[0].measurement,
1✔
482
            Measurement::Unitless
1✔
483
        );
1✔
484

1✔
485
        let query_processor = initialized_op.query_processor().unwrap();
1✔
486

1✔
487
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
488
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
489
            spatial_resolution: SpatialResolution::one(),
1✔
490
            time_interval: TimeInterval::default(),
1✔
491
            attributes: BandSelection::first(),
1✔
492
        };
1✔
493

1✔
494
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
495
            panic!("expected TypedRasterQueryProcessor::U8");
1✔
496
        };
1✔
497

1✔
498
        let stream = typed_processor
1✔
499
            .raster_query(query, &query_ctx)
1✔
500
            .await
1✔
501
            .unwrap();
1✔
502

1✔
503
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
504

1✔
505
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
506

1✔
507
        let result_grid = result_tile.grid_array.clone();
1✔
508

1✔
509
        match result_grid {
1✔
510
            GridOrEmpty2D::Grid(grid) => {
1✔
511
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
512

1✔
513
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
514

1✔
515
                let expected = vec![Some(7), Some(7), Some(7), Some(6)];
1✔
516

1✔
517
                assert_eq!(res, expected);
1✔
518
            }
1✔
519
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
1✔
520
        }
1✔
521
    }
1✔
522
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc