• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.1
/operators/src/processing/raster_scaling.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
3
    OperatorName, RasterBandDescriptor, RasterOperator, RasterQueryProcessor,
4
    RasterResultDescriptor, SingleRasterSource, TypedRasterQueryProcessor, WorkflowOperatorPath,
5
};
6
use crate::util::Result;
7
use async_trait::async_trait;
8
use futures::{StreamExt, TryStreamExt};
9

10
use geoengine_datatypes::raster::{
11
    CheckedMulThenAddTransformation, CheckedSubThenDivTransformation, ElementScaling,
12
    ScalingTransformation,
13
};
14
use geoengine_datatypes::{
15
    primitives::Measurement,
16
    raster::{Pixel, RasterPropertiesKey, RasterTile2D},
17
};
18
use num::FromPrimitive;
19
use num_traits::AsPrimitive;
20
use rayon::ThreadPool;
21
use serde::{Deserialize, Serialize};
22
use std::marker::PhantomData;
23
use std::sync::Arc;
24

25
#[derive(Debug, Serialize, Deserialize, Clone)]
26
#[serde(rename_all = "camelCase")]
27
pub struct RasterScalingParams {
28
    slope: SlopeOffsetSelection,
29
    offset: SlopeOffsetSelection,
30
    output_measurement: Option<Measurement>,
31
    scaling_mode: ScalingMode,
32
}
33

34
#[derive(Debug, Serialize, Deserialize, Clone, Copy)]
35
#[serde(rename_all = "camelCase")]
36
pub enum ScalingMode {
37
    MulSlopeAddOffset,
38
    SubOffsetDivSlope,
39
}
40

41
#[derive(Debug, Serialize, Deserialize, Clone)]
42
#[serde(rename_all = "camelCase", tag = "type")]
43
enum SlopeOffsetSelection {
44
    Auto,
45
    MetadataKey(RasterPropertiesKey),
46
    Constant { value: f64 },
47
}
48

49
impl Default for SlopeOffsetSelection {
50
    fn default() -> Self {
×
51
        Self::Auto
×
52
    }
×
53
}
54

55
/// The raster scaling operator scales/unscales the values of a raster by a given scale factor and offset.
56
/// This is done by applying the following formulas to every pixel.
57
/// For unscaling the formula is:
58
///
59
/// `p_new = p_old * slope + offset`
60
///
61
/// For scaling the formula is:
62
///
63
/// `p_new = (p_old - offset) / slope`
64
///
65
/// `p_old` and `p_new` refer to the old and new pixel values,
66
/// The slope and offset values are either properties attached to the input raster or a fixed value.
67
///
68
/// An example for Meteosat Second Generation properties is:
69
///
70
/// - offset: `msg.calibration_offset`
71
/// - slope: `msg.calibration_slope`
72
pub type RasterScaling = Operator<RasterScalingParams, SingleRasterSource>;
73

74
impl OperatorName for RasterScaling {
75
    const TYPE_NAME: &'static str = "RasterScaling";
76
}
77

78
pub struct InitializedRasterScalingOperator {
79
    name: CanonicOperatorName,
80
    path: WorkflowOperatorPath,
81
    slope: SlopeOffsetSelection,
82
    offset: SlopeOffsetSelection,
83
    result_descriptor: RasterResultDescriptor,
84
    source: Box<dyn InitializedRasterOperator>,
85
    scaling_mode: ScalingMode,
86
}
87

88
#[typetag::serde]
×
89
#[async_trait]
90
impl RasterOperator for RasterScaling {
91
    async fn _initialize(
92
        self: Box<Self>,
93
        path: WorkflowOperatorPath,
94
        context: &dyn ExecutionContext,
95
    ) -> Result<Box<dyn InitializedRasterOperator>> {
2✔
96
        let name = CanonicOperatorName::from(&self);
2✔
97

98
        let input = self
2✔
99
            .sources
2✔
100
            .initialize_sources(path.clone(), context)
2✔
101
            .await?;
2✔
102
        let in_desc = input.raster.result_descriptor();
2✔
103

104
        let out_desc = RasterResultDescriptor {
2✔
105
            spatial_reference: in_desc.spatial_reference,
2✔
106
            data_type: in_desc.data_type,
2✔
107

2✔
108
            bbox: in_desc.bbox,
2✔
109
            time: in_desc.time,
2✔
110
            resolution: in_desc.resolution,
2✔
111
            bands: in_desc
2✔
112
                .bands
2✔
113
                .iter()
2✔
114
                .map(|b| {
2✔
115
                    RasterBandDescriptor::new(
2✔
116
                        b.name.clone(),
2✔
117
                        self.params
2✔
118
                            .output_measurement
2✔
119
                            .clone()
2✔
120
                            .unwrap_or_else(|| b.measurement.clone()),
2✔
121
                    )
2✔
122
                })
2✔
123
                .collect::<Vec<_>>()
2✔
124
                .try_into()?,
2✔
125
        };
126

127
        let initialized_operator = InitializedRasterScalingOperator {
2✔
128
            name,
2✔
129
            path,
2✔
130
            slope: self.params.slope,
2✔
131
            offset: self.params.offset,
2✔
132
            result_descriptor: out_desc,
2✔
133
            source: input.raster,
2✔
134
            scaling_mode: self.params.scaling_mode,
2✔
135
        };
2✔
136

2✔
137
        Ok(initialized_operator.boxed())
2✔
138
    }
4✔
139

140
    span_fn!(RasterScaling);
141
}
142

143
impl InitializedRasterOperator for InitializedRasterScalingOperator {
144
    fn result_descriptor(&self) -> &RasterResultDescriptor {
2✔
145
        &self.result_descriptor
2✔
146
    }
2✔
147

148
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
149
        let slope = self.slope.clone();
2✔
150
        let offset = self.offset.clone();
2✔
151
        let source = self.source.query_processor()?;
2✔
152
        let scaling_mode = self.scaling_mode;
2✔
153

154
        let res = match scaling_mode {
2✔
155
            ScalingMode::SubOffsetDivSlope => {
156
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, CheckedSubThenDivTransformation>(self.result_descriptor.clone(), slope, offset,  source_proc)) })
1✔
157
            }
158
            ScalingMode::MulSlopeAddOffset => {
159
                call_on_generic_raster_processor!(source, source_proc => { TypedRasterQueryProcessor::from(create_boxed_processor::<_,_, CheckedMulThenAddTransformation>(self.result_descriptor.clone(), slope, offset,  source_proc)) })
1✔
160
            }
161
        };
162

163
        Ok(res)
2✔
164
    }
2✔
165

166
    fn canonic_name(&self) -> CanonicOperatorName {
×
167
        self.name.clone()
×
168
    }
×
169

NEW
170
    fn name(&self) -> &'static str {
×
NEW
171
        RasterScaling::TYPE_NAME
×
NEW
172
    }
×
173

NEW
174
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
175
        self.path.clone()
×
NEW
176
    }
×
177
}
178

179
struct RasterTransformationProcessor<Q, P, S>
180
where
181
    Q: RasterQueryProcessor<RasterType = P>,
182
{
183
    source: Q,
184
    result_descriptor: RasterResultDescriptor,
185
    slope: SlopeOffsetSelection,
186
    offset: SlopeOffsetSelection,
187
    _transformation: PhantomData<S>,
188
}
189

190
fn create_boxed_processor<Q, P, S>(
2✔
191
    result_descriptor: RasterResultDescriptor,
2✔
192
    slope: SlopeOffsetSelection,
2✔
193
    offset: SlopeOffsetSelection,
2✔
194
    source: Q,
2✔
195
) -> Box<dyn RasterQueryProcessor<RasterType = P>>
2✔
196
where
2✔
197
    Q: RasterQueryProcessor<RasterType = P> + 'static,
2✔
198
    P: Pixel + FromPrimitive + 'static + Default,
2✔
199
    f64: AsPrimitive<P>,
2✔
200
    S: Send + Sync + 'static + ScalingTransformation<P>,
2✔
201
{
2✔
202
    RasterTransformationProcessor::<Q, P, S>::create(result_descriptor, slope, offset, source)
2✔
203
        .boxed()
2✔
204
}
2✔
205

206
impl<Q, P, S> RasterTransformationProcessor<Q, P, S>
207
where
208
    Q: RasterQueryProcessor<RasterType = P> + 'static,
209
    P: Pixel + FromPrimitive + 'static + Default,
210
    f64: AsPrimitive<P>,
211
    S: Send + Sync + 'static + ScalingTransformation<P>,
212
{
213
    pub fn create(
2✔
214
        result_descriptor: RasterResultDescriptor,
2✔
215
        slope: SlopeOffsetSelection,
2✔
216
        offset: SlopeOffsetSelection,
2✔
217
        source: Q,
2✔
218
    ) -> RasterTransformationProcessor<Q, P, S> {
2✔
219
        RasterTransformationProcessor {
2✔
220
            source,
2✔
221
            result_descriptor,
2✔
222
            slope,
2✔
223
            offset,
2✔
224
            _transformation: PhantomData,
2✔
225
        }
2✔
226
    }
2✔
227

228
    async fn scale_tile_async(
2✔
229
        &self,
2✔
230
        tile: RasterTile2D<P>,
2✔
231
        _pool: Arc<ThreadPool>,
2✔
232
    ) -> Result<RasterTile2D<P>> {
2✔
233
        // either use the provided metadata/constant or the default values from the properties
234
        let offset = match &self.offset {
2✔
235
            SlopeOffsetSelection::MetadataKey(key) => tile.properties.number_property::<P>(key)?,
×
236
            SlopeOffsetSelection::Constant { value } => value.as_(),
×
237
            SlopeOffsetSelection::Auto => tile.properties.offset().as_(),
2✔
238
        };
239

240
        let slope = match &self.slope {
2✔
241
            SlopeOffsetSelection::MetadataKey(key) => tile.properties.number_property::<P>(key)?,
×
242
            SlopeOffsetSelection::Constant { value } => value.as_(),
×
243
            SlopeOffsetSelection::Auto => tile.properties.scale().as_(),
2✔
244
        };
245

246
        let res_tile =
2✔
247
            crate::util::spawn_blocking(move || tile.transform_elements::<S>(slope, offset))
2✔
248
                .await?;
2✔
249

250
        Ok(res_tile)
2✔
251
    }
2✔
252
}
253

254
#[async_trait]
255
impl<Q, P, S> RasterQueryProcessor for RasterTransformationProcessor<Q, P, S>
256
where
257
    P: Pixel + FromPrimitive + 'static + Default,
258
    f64: AsPrimitive<P>,
259
    Q: RasterQueryProcessor<RasterType = P> + 'static,
260
    S: Send + Sync + 'static + ScalingTransformation<P>,
261
{
262
    type RasterType = P;
263

264
    async fn raster_query<'a>(
265
        &'a self,
266
        query: geoengine_datatypes::primitives::RasterQueryRectangle,
267
        ctx: &'a dyn crate::engine::QueryContext,
268
    ) -> Result<
269
        futures::stream::BoxStream<
270
            'a,
271
            Result<geoengine_datatypes::raster::RasterTile2D<Self::RasterType>>,
272
        >,
273
    > {
2✔
274
        let src = self.source.raster_query(query, ctx).await?;
2✔
275
        let rs = src.and_then(move |tile| self.scale_tile_async(tile, ctx.thread_pool().clone()));
2✔
276
        Ok(rs.boxed())
2✔
277
    }
4✔
278

279
    fn raster_result_descriptor(&self) -> &RasterResultDescriptor {
2✔
280
        &self.result_descriptor
2✔
281
    }
2✔
282
}
283

284
#[cfg(test)]
285
mod tests {
286

287
    use geoengine_datatypes::{
288
        primitives::{
289
            BandSelection, CacheHint, SpatialPartition2D, SpatialResolution, TimeInterval,
290
        },
291
        raster::{
292
            Grid2D, GridOrEmpty2D, GridShape, MaskedGrid2D, RasterDataType, RasterProperties,
293
            TileInformation, TilingSpecification,
294
        },
295
        spatial_reference::SpatialReference,
296
        util::test::TestDefault,
297
    };
298

299
    use crate::{
300
        engine::{ChunkByteSize, MockExecutionContext, RasterBandDescriptors},
301
        mock::{MockRasterSource, MockRasterSourceParams},
302
    };
303

304
    use super::*;
305

306
    #[tokio::test]
307
    async fn test_unscale() {
1✔
308
        let grid_shape = [2, 2].into();
1✔
309

1✔
310
        let tiling_specification = TilingSpecification {
1✔
311
            origin_coordinate: [0.0, 0.0].into(),
1✔
312
            tile_size_in_pixels: grid_shape,
1✔
313
        };
1✔
314

1✔
315
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![7_u8, 7, 7, 6]).unwrap());
1✔
316

1✔
317
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
318
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
319

1✔
320
        let mut raster_props = RasterProperties::default();
1✔
321
        raster_props.set_scale(2.0);
1✔
322
        raster_props.set_offset(1.0);
1✔
323

1✔
324
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
325
            TimeInterval::default(),
1✔
326
            TileInformation {
1✔
327
                global_geo_transform: TestDefault::test_default(),
1✔
328
                global_tile_position: [0, 0].into(),
1✔
329
                tile_size_in_pixels: grid_shape,
1✔
330
            },
1✔
331
            0,
1✔
332
            raster.into(),
1✔
333
            raster_props,
1✔
334
            CacheHint::default(),
1✔
335
        );
1✔
336

1✔
337
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
338

1✔
339
        let mrs = MockRasterSource {
1✔
340
            params: MockRasterSourceParams {
1✔
341
                data: vec![raster_tile],
1✔
342
                result_descriptor: RasterResultDescriptor {
1✔
343
                    data_type: RasterDataType::U8,
1✔
344
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
345
                    bbox: None,
1✔
346
                    time: None,
1✔
347
                    resolution: Some(spatial_resolution),
1✔
348
                    bands: RasterBandDescriptors::new_single_band(),
1✔
349
                },
1✔
350
            },
1✔
351
        }
1✔
352
        .boxed();
1✔
353

1✔
354
        let scaling_mode = ScalingMode::MulSlopeAddOffset;
1✔
355

1✔
356
        let output_measurement = None;
1✔
357

1✔
358
        let op = RasterScaling {
1✔
359
            params: RasterScalingParams {
1✔
360
                slope: SlopeOffsetSelection::Auto,
1✔
361
                offset: SlopeOffsetSelection::Auto,
1✔
362
                output_measurement,
1✔
363
                scaling_mode,
1✔
364
            },
1✔
365
            sources: SingleRasterSource { raster: mrs },
1✔
366
        }
1✔
367
        .boxed();
1✔
368

1✔
369
        let initialized_op = op
1✔
370
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
371
            .await
1✔
372
            .unwrap();
1✔
373

1✔
374
        let result_descriptor = initialized_op.result_descriptor();
1✔
375

1✔
376
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
377
        assert_eq!(
1✔
378
            result_descriptor.bands[0].measurement,
1✔
379
            Measurement::Unitless
1✔
380
        );
1✔
381

1✔
382
        let query_processor = initialized_op.query_processor().unwrap();
1✔
383

1✔
384
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
385
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
386
            spatial_resolution: SpatialResolution::one(),
1✔
387
            time_interval: TimeInterval::default(),
1✔
388
            attributes: BandSelection::first(),
1✔
389
        };
1✔
390

1✔
391
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
392
            panic!("expected TypedRasterQueryProcessor::U8");
1✔
393
        };
1✔
394

1✔
395
        let stream = typed_processor
1✔
396
            .raster_query(query, &query_ctx)
1✔
397
            .await
1✔
398
            .unwrap();
1✔
399

1✔
400
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
401

1✔
402
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
403

1✔
404
        let result_grid = result_tile.grid_array.clone();
1✔
405

1✔
406
        match result_grid {
1✔
407
            GridOrEmpty2D::Grid(grid) => {
1✔
408
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
409

1✔
410
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
411

1✔
412
                let expected = vec![Some(15), Some(15), Some(15), Some(13)];
1✔
413

1✔
414
                assert_eq!(res, expected);
1✔
415
            }
1✔
416
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
1✔
417
        }
1✔
418
    }
1✔
419

420
    #[tokio::test]
421
    async fn test_scale() {
1✔
422
        let grid_shape = [2, 2].into();
1✔
423

1✔
424
        let tiling_specification = TilingSpecification {
1✔
425
            origin_coordinate: [0.0, 0.0].into(),
1✔
426
            tile_size_in_pixels: grid_shape,
1✔
427
        };
1✔
428

1✔
429
        let raster = MaskedGrid2D::from(Grid2D::new(grid_shape, vec![15_u8, 15, 15, 13]).unwrap());
1✔
430

1✔
431
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
432
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
433

1✔
434
        let mut raster_props = RasterProperties::default();
1✔
435
        raster_props.set_scale(2.0);
1✔
436
        raster_props.set_offset(1.0);
1✔
437

1✔
438
        let raster_tile = RasterTile2D::new_with_tile_info_and_properties(
1✔
439
            TimeInterval::default(),
1✔
440
            TileInformation {
1✔
441
                global_geo_transform: TestDefault::test_default(),
1✔
442
                global_tile_position: [0, 0].into(),
1✔
443
                tile_size_in_pixels: grid_shape,
1✔
444
            },
1✔
445
            0,
1✔
446
            raster.into(),
1✔
447
            raster_props,
1✔
448
            CacheHint::default(),
1✔
449
        );
1✔
450

1✔
451
        let spatial_resolution = raster_tile.spatial_resolution();
1✔
452

1✔
453
        let mrs = MockRasterSource {
1✔
454
            params: MockRasterSourceParams {
1✔
455
                data: vec![raster_tile],
1✔
456
                result_descriptor: RasterResultDescriptor {
1✔
457
                    data_type: RasterDataType::U8,
1✔
458
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
459
                    bbox: None,
1✔
460
                    time: None,
1✔
461
                    resolution: Some(spatial_resolution),
1✔
462
                    bands: RasterBandDescriptors::new_single_band(),
1✔
463
                },
1✔
464
            },
1✔
465
        }
1✔
466
        .boxed();
1✔
467

1✔
468
        let scaling_mode = ScalingMode::SubOffsetDivSlope;
1✔
469

1✔
470
        let output_measurement = None;
1✔
471

1✔
472
        let params = RasterScalingParams {
1✔
473
            slope: SlopeOffsetSelection::Auto,
1✔
474
            offset: SlopeOffsetSelection::Auto,
1✔
475
            output_measurement,
1✔
476
            scaling_mode,
1✔
477
        };
1✔
478

1✔
479
        let op = RasterScaling {
1✔
480
            params,
1✔
481
            sources: SingleRasterSource { raster: mrs },
1✔
482
        }
1✔
483
        .boxed();
1✔
484

1✔
485
        let initialized_op = op
1✔
486
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
487
            .await
1✔
488
            .unwrap();
1✔
489

1✔
490
        let result_descriptor = initialized_op.result_descriptor();
1✔
491

1✔
492
        assert_eq!(result_descriptor.data_type, RasterDataType::U8);
1✔
493
        assert_eq!(
1✔
494
            result_descriptor.bands[0].measurement,
1✔
495
            Measurement::Unitless
1✔
496
        );
1✔
497

1✔
498
        let query_processor = initialized_op.query_processor().unwrap();
1✔
499

1✔
500
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
501
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
502
            spatial_resolution: SpatialResolution::one(),
1✔
503
            time_interval: TimeInterval::default(),
1✔
504
            attributes: BandSelection::first(),
1✔
505
        };
1✔
506

1✔
507
        let TypedRasterQueryProcessor::U8(typed_processor) = query_processor else {
1✔
508
            panic!("expected TypedRasterQueryProcessor::U8");
1✔
509
        };
1✔
510

1✔
511
        let stream = typed_processor
1✔
512
            .raster_query(query, &query_ctx)
1✔
513
            .await
1✔
514
            .unwrap();
1✔
515

1✔
516
        let results = stream.collect::<Vec<Result<RasterTile2D<u8>>>>().await;
1✔
517

1✔
518
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
519

1✔
520
        let result_grid = result_tile.grid_array.clone();
1✔
521

1✔
522
        match result_grid {
1✔
523
            GridOrEmpty2D::Grid(grid) => {
1✔
524
                assert_eq!(grid.shape(), &GridShape::new([2, 2]));
1✔
525

1✔
526
                let res = grid.masked_element_deref_iterator().collect::<Vec<_>>();
1✔
527

1✔
528
                let expected = vec![Some(7), Some(7), Some(7), Some(6)];
1✔
529

1✔
530
                assert_eq!(res, expected);
1✔
531
            }
1✔
532
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
1✔
533
        }
1✔
534
    }
1✔
535
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc