• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.21
/operators/src/processing/raster_type_conversion.rs
1
use async_trait::async_trait;
2
use futures::{stream::BoxStream, StreamExt, TryFutureExt, TryStreamExt};
3
use geoengine_datatypes::{
4
    primitives::{BandSelection, RasterQueryRectangle, SpatialPartition2D},
5
    raster::{ConvertDataType, Pixel, RasterDataType, RasterTile2D},
6
};
7
use serde::{Deserialize, Serialize};
8

9
use crate::engine::{
10
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
11
    OperatorName, QueryContext, QueryProcessor, RasterOperator, RasterQueryProcessor,
12
    RasterResultDescriptor, SingleRasterSource, TypedRasterQueryProcessor, WorkflowOperatorPath,
13
};
14
use crate::util::Result;
15

16
#[derive(Debug, Serialize, Deserialize, Clone)]
17
#[serde(rename_all = "camelCase")]
18
pub struct RasterTypeConversionParams {
19
    pub output_data_type: RasterDataType,
20
}
21

22
/// This operator converts the type of raster data into another type. This may cause precision loss as e.g. `3.1_f32` converted to `u8` will result in `3_u8`.
23
/// In case the value range is to small the operator will clip the values at the bounds of the data range. An example is this: The `u32` value `10000_u32` is converted to `u8`, which has a value range of 0..256. The result is `255_u8` since this is the highest value a `u8` can represent.
24
pub type RasterTypeConversion = Operator<RasterTypeConversionParams, SingleRasterSource>;
25

26
impl OperatorName for RasterTypeConversion {
27
    const TYPE_NAME: &'static str = "RasterTypeConversion";
28
}
29

30
pub struct InitializedRasterTypeConversionOperator {
31
    name: CanonicOperatorName,
32
    path: WorkflowOperatorPath,
33
    result_descriptor: RasterResultDescriptor,
34
    source: Box<dyn InitializedRasterOperator>,
35
}
36

37
#[typetag::serde]
×
38
#[async_trait]
39
impl RasterOperator for RasterTypeConversion {
40
    async fn _initialize(
41
        self: Box<Self>,
42
        path: WorkflowOperatorPath,
43
        context: &dyn ExecutionContext,
44
    ) -> Result<Box<dyn InitializedRasterOperator>> {
2✔
45
        let name = CanonicOperatorName::from(&self);
2✔
46

47
        let initialized_sources = self
2✔
48
            .sources
2✔
49
            .initialize_sources(path.clone(), context)
2✔
50
            .await?;
2✔
51
        let in_desc = initialized_sources.raster.result_descriptor();
2✔
52

2✔
53
        let out_data_type = self.params.output_data_type;
2✔
54

2✔
55
        let out_desc = RasterResultDescriptor {
2✔
56
            spatial_reference: in_desc.spatial_reference,
2✔
57
            data_type: out_data_type,
2✔
58
            bbox: in_desc.bbox,
2✔
59
            time: in_desc.time,
2✔
60
            resolution: in_desc.resolution,
2✔
61
            bands: in_desc.bands.clone(),
2✔
62
        };
2✔
63

2✔
64
        let initialized_operator = InitializedRasterTypeConversionOperator {
2✔
65
            name,
2✔
66
            path,
2✔
67
            result_descriptor: out_desc,
2✔
68
            source: initialized_sources.raster,
2✔
69
        };
2✔
70

2✔
71
        Ok(initialized_operator.boxed())
2✔
72
    }
4✔
73

74
    span_fn!(RasterTypeConversion);
75
}
76

77
impl InitializedRasterOperator for InitializedRasterTypeConversionOperator {
78
    fn result_descriptor(&self) -> &RasterResultDescriptor {
2✔
79
        &self.result_descriptor
2✔
80
    }
2✔
81

82
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
2✔
83
        let source = self.source.query_processor()?;
2✔
84
        let out_data_type = self.result_descriptor.data_type;
2✔
85

86
        let res_op = call_on_generic_raster_processor!(source, source_proc => {
2✔
87
            call_generic_raster_processor!(out_data_type,
2✔
88
                RasterTypeConversionQueryProcessor::create_boxed(source_proc)
×
89
            )
90
        });
91

92
        Ok(res_op)
2✔
93
    }
2✔
94

95
    fn canonic_name(&self) -> CanonicOperatorName {
×
96
        self.name.clone()
×
97
    }
×
98

NEW
99
    fn name(&self) -> &'static str {
×
NEW
100
        RasterTypeConversion::TYPE_NAME
×
NEW
101
    }
×
102

NEW
103
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
104
        self.path.clone()
×
NEW
105
    }
×
106
}
107

108
pub struct RasterTypeConversionQueryProcessor<
109
    Q: RasterQueryProcessor<RasterType = PIn>,
110
    PIn: Pixel,
111
    POut: Pixel,
112
> {
113
    query_processor: Q,
114
    _p_out: std::marker::PhantomData<POut>,
115
}
116

117
impl<Q, PIn, POut> RasterTypeConversionQueryProcessor<Q, PIn, POut>
118
where
119
    Q: 'static + RasterQueryProcessor<RasterType = PIn>,
120
    RasterTile2D<PIn>: ConvertDataType<RasterTile2D<POut>>,
121
    PIn: Pixel,
122
    POut: Pixel,
123
{
124
    pub fn new(query_processor: Q) -> Self {
22✔
125
        Self {
22✔
126
            query_processor,
22✔
127
            _p_out: std::marker::PhantomData,
22✔
128
        }
22✔
129
    }
22✔
130

131
    pub fn create_boxed(source: Q) -> Box<dyn RasterQueryProcessor<RasterType = POut>> {
2✔
132
        RasterTypeConversionQueryProcessor::new(source).boxed()
2✔
133
    }
2✔
134
}
135

136
#[async_trait]
137
impl<Q, PIn: Pixel, POut: Pixel> QueryProcessor for RasterTypeConversionQueryProcessor<Q, PIn, POut>
138
where
139
    RasterTile2D<PIn>: ConvertDataType<RasterTile2D<POut>>,
140
    Q: RasterQueryProcessor<RasterType = PIn>,
141
{
142
    type Output = RasterTile2D<POut>;
143
    type SpatialBounds = SpatialPartition2D;
144
    type Selection = BandSelection;
145
    type ResultDescription = RasterResultDescriptor;
146

147
    async fn _query<'b>(
148
        &'b self,
149
        query: RasterQueryRectangle,
150
        ctx: &'b dyn QueryContext,
151
    ) -> Result<BoxStream<'b, Result<Self::Output>>> {
34✔
152
        let stream = self.query_processor.raster_query(query, ctx).await?;
34✔
153
        let converted_stream = stream.and_then(move |tile| {
154✔
154
            crate::util::spawn_blocking(|| tile.convert_data_type()).map_err(Into::into)
154✔
155
        });
154✔
156

34✔
157
        Ok(converted_stream.boxed())
34✔
158
    }
68✔
159

160
    fn result_descriptor(&self) -> &Self::ResultDescription {
70✔
161
        self.query_processor.raster_result_descriptor()
70✔
162
    }
70✔
163
}
164

165
#[cfg(test)]
166
mod tests {
167
    use geoengine_datatypes::{
168
        primitives::{CacheHint, Measurement, SpatialPartition2D, SpatialResolution, TimeInterval},
169
        raster::{
170
            Grid2D, GridOrEmpty2D, MaskedGrid2D, RasterDataType, TileInformation,
171
            TilingSpecification,
172
        },
173
        spatial_reference::SpatialReference,
174
        util::test::TestDefault,
175
    };
176

177
    use crate::{
178
        engine::{ChunkByteSize, MockExecutionContext, RasterBandDescriptors},
179
        mock::{MockRasterSource, MockRasterSourceParams},
180
    };
181

182
    use super::*;
183

184
    #[tokio::test]
185
    #[allow(clippy::float_cmp)]
186
    async fn test_type_conversion() {
1✔
187
        let grid_shape = [2, 2].into();
1✔
188

1✔
189
        let tiling_specification = TilingSpecification {
1✔
190
            origin_coordinate: [0.0, 0.0].into(),
1✔
191
            tile_size_in_pixels: grid_shape,
1✔
192
        };
1✔
193

1✔
194
        let raster: MaskedGrid2D<u8> = Grid2D::new(grid_shape, vec![7_u8, 7, 7, 6]).unwrap().into();
1✔
195

1✔
196
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
197
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
198

1✔
199
        let raster_tile = RasterTile2D::new_with_tile_info(
1✔
200
            TimeInterval::default(),
1✔
201
            TileInformation {
1✔
202
                global_geo_transform: TestDefault::test_default(),
1✔
203
                global_tile_position: [0, 0].into(),
1✔
204
                tile_size_in_pixels: grid_shape,
1✔
205
            },
1✔
206
            0,
1✔
207
            raster.into(),
1✔
208
            CacheHint::default(),
1✔
209
        );
1✔
210

1✔
211
        let mrs = MockRasterSource {
1✔
212
            params: MockRasterSourceParams {
1✔
213
                data: vec![raster_tile],
1✔
214
                result_descriptor: RasterResultDescriptor {
1✔
215
                    data_type: RasterDataType::U8,
1✔
216
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
217
                    bbox: None,
1✔
218
                    time: None,
1✔
219
                    resolution: None,
1✔
220
                    bands: RasterBandDescriptors::new_single_band(),
1✔
221
                },
1✔
222
            },
1✔
223
        }
1✔
224
        .boxed();
1✔
225

1✔
226
        let op = RasterTypeConversion {
1✔
227
            params: RasterTypeConversionParams {
1✔
228
                output_data_type: RasterDataType::F32,
1✔
229
            },
1✔
230
            sources: SingleRasterSource { raster: mrs },
1✔
231
        }
1✔
232
        .boxed();
1✔
233

1✔
234
        let initialized_op = op
1✔
235
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
236
            .await
1✔
237
            .unwrap();
1✔
238

1✔
239
        let result_descriptor = initialized_op.result_descriptor();
1✔
240

1✔
241
        assert_eq!(result_descriptor.data_type, RasterDataType::F32);
1✔
242
        assert_eq!(
1✔
243
            result_descriptor.bands[0].measurement,
1✔
244
            Measurement::Unitless
1✔
245
        );
1✔
246

1✔
247
        let query_processor = initialized_op.query_processor().unwrap();
1✔
248

1✔
249
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
250
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
251
            spatial_resolution: SpatialResolution::one(),
1✔
252
            time_interval: TimeInterval::default(),
1✔
253
            attributes: BandSelection::first(),
1✔
254
        };
1✔
255

1✔
256
        let TypedRasterQueryProcessor::F32(typed_processor) = query_processor else {
1✔
257
            panic!("expected TypedRasterQueryProcessor::F32");
1✔
258
        };
1✔
259

1✔
260
        let stream = typed_processor
1✔
261
            .raster_query(query, &query_ctx)
1✔
262
            .await
1✔
263
            .unwrap();
1✔
264

1✔
265
        let results = stream.collect::<Vec<Result<RasterTile2D<f32>>>>().await;
1✔
266

1✔
267
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
268

1✔
269
        let result_grid = result_tile.grid_array.clone();
1✔
270

1✔
271
        match result_grid {
1✔
272
            GridOrEmpty2D::Grid(masked_grid) => {
1✔
273
                assert_eq!(masked_grid.inner_grid.shape, [2, 2].into());
1✔
274
                assert_eq!(masked_grid.inner_grid.data, &[7., 7., 7., 6.]);
1✔
275
            }
1✔
276
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
1✔
277
        }
1✔
278
    }
1✔
279
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc