• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 7006568925

27 Nov 2023 02:07PM UTC coverage: 89.651% (+0.2%) from 89.498%
7006568925

push

github

web-flow
Merge pull request #888 from geo-engine/raster_stacks

raster stacking

4032 of 4274 new or added lines in 107 files covered. (94.34%)

12 existing lines in 8 files now uncovered.

113020 of 126066 relevant lines covered (89.65%)

59901.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.31
/operators/src/processing/raster_type_conversion.rs
1
use async_trait::async_trait;
2
use futures::{stream::BoxStream, StreamExt, TryFutureExt, TryStreamExt};
3
use geoengine_datatypes::{
4
    primitives::{BandSelection, RasterQueryRectangle, SpatialPartition2D},
5
    raster::{ConvertDataType, Pixel, RasterDataType, RasterTile2D},
6
};
7
use serde::{Deserialize, Serialize};
8
use snafu::ensure;
9

10
use crate::engine::{
11
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedSources, Operator,
12
    OperatorName, QueryContext, QueryProcessor, RasterOperator, RasterQueryProcessor,
13
    RasterResultDescriptor, SingleRasterSource, TypedRasterQueryProcessor, WorkflowOperatorPath,
14
};
15
use crate::util::Result;
16

17
#[derive(Debug, Serialize, Deserialize, Clone)]
1✔
18
#[serde(rename_all = "camelCase")]
19
pub struct RasterTypeConversionParams {
20
    output_data_type: RasterDataType,
21
}
22

23
/// This operator converts the type of raster data into another type. This may cause precision loss as e.g. `3.1_f32` converted to `u8` will result in `3_u8`.
24
/// In case the value range is to small the operator will clip the values at the bounds of the data range. An example is this: The `u32` value `10000_u32` is converted to `u8`, which has a value range of 0..256. The result is `255_u8` since this is the highest value a `u8` can represent.
25
pub type RasterTypeConversion = Operator<RasterTypeConversionParams, SingleRasterSource>;
26

27
impl OperatorName for RasterTypeConversion {
28
    const TYPE_NAME: &'static str = "RasterTypeConversion";
29
}
30

31
pub struct InitializedRasterTypeConversionOperator {
32
    name: CanonicOperatorName,
33
    result_descriptor: RasterResultDescriptor,
34
    source: Box<dyn InitializedRasterOperator>,
35
}
36

37
#[typetag::serde]
×
38
#[async_trait]
39
impl RasterOperator for RasterTypeConversion {
40
    async fn _initialize(
1✔
41
        self: Box<Self>,
1✔
42
        path: WorkflowOperatorPath,
1✔
43
        context: &dyn ExecutionContext,
1✔
44
    ) -> Result<Box<dyn InitializedRasterOperator>> {
1✔
45
        let name = CanonicOperatorName::from(&self);
1✔
46

47
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
1✔
48
        let in_desc = initialized_sources.raster.result_descriptor();
1✔
49

1✔
50
        // TODO: implement multi-band functionality and remove this check
1✔
51
        ensure!(
1✔
52
            in_desc.bands.len() == 1,
1✔
NEW
53
            crate::error::OperatorDoesNotSupportMultiBandsSourcesYet {
×
NEW
54
                operator: RasterTypeConversion::TYPE_NAME
×
NEW
55
            }
×
56
        );
57

58
        let out_data_type = self.params.output_data_type;
1✔
59

1✔
60
        let out_desc = RasterResultDescriptor {
1✔
61
            spatial_reference: in_desc.spatial_reference,
1✔
62
            data_type: out_data_type,
1✔
63
            bbox: in_desc.bbox,
1✔
64
            time: in_desc.time,
1✔
65
            resolution: in_desc.resolution,
1✔
66
            bands: in_desc.bands.clone(),
1✔
67
        };
1✔
68

1✔
69
        let initialized_operator = InitializedRasterTypeConversionOperator {
1✔
70
            name,
1✔
71
            result_descriptor: out_desc,
1✔
72
            source: initialized_sources.raster,
1✔
73
        };
1✔
74

1✔
75
        Ok(initialized_operator.boxed())
1✔
76
    }
2✔
77

78
    span_fn!(RasterTypeConversion);
×
79
}
80

81
impl InitializedRasterOperator for InitializedRasterTypeConversionOperator {
82
    fn result_descriptor(&self) -> &RasterResultDescriptor {
1✔
83
        &self.result_descriptor
1✔
84
    }
1✔
85

86
    fn query_processor(&self) -> Result<TypedRasterQueryProcessor> {
1✔
87
        let source = self.source.query_processor()?;
1✔
88
        let out_data_type = self.result_descriptor.data_type;
1✔
89

90
        let res_op = call_on_generic_raster_processor!(source, source_proc => {
1✔
91
            call_generic_raster_processor!(out_data_type,
1✔
92
                RasterTypeConversionQueryProcessor::create_boxed(source_proc)
1✔
93
            )
94
        });
95

96
        Ok(res_op)
1✔
97
    }
1✔
98

99
    fn canonic_name(&self) -> CanonicOperatorName {
×
100
        self.name.clone()
×
101
    }
×
102
}
103

104
pub struct RasterTypeConversionQueryProcessor<
105
    Q: RasterQueryProcessor<RasterType = PIn>,
106
    PIn: Pixel,
107
    POut: Pixel,
108
> {
109
    query_processor: Q,
110
    _p_out: std::marker::PhantomData<POut>,
111
}
112

113
impl<Q, PIn, POut> RasterTypeConversionQueryProcessor<Q, PIn, POut>
114
where
115
    Q: 'static + RasterQueryProcessor<RasterType = PIn>,
116
    RasterTile2D<PIn>: ConvertDataType<RasterTile2D<POut>>,
117
    PIn: Pixel,
118
    POut: Pixel,
119
{
120
    pub fn new(query_processor: Q) -> Self {
46✔
121
        Self {
46✔
122
            query_processor,
46✔
123
            _p_out: std::marker::PhantomData,
46✔
124
        }
46✔
125
    }
46✔
126

127
    pub fn create_boxed(source: Q) -> Box<dyn RasterQueryProcessor<RasterType = POut>> {
1✔
128
        RasterTypeConversionQueryProcessor::new(source).boxed()
1✔
129
    }
1✔
130
}
131

132
#[async_trait]
133
impl<Q, PIn: Pixel, POut: Pixel> QueryProcessor for RasterTypeConversionQueryProcessor<Q, PIn, POut>
134
where
135
    RasterTile2D<PIn>: ConvertDataType<RasterTile2D<POut>>,
136
    Q: RasterQueryProcessor<RasterType = PIn>,
137
{
138
    type Output = RasterTile2D<POut>;
139
    type SpatialBounds = SpatialPartition2D;
140
    type Selection = BandSelection;
141

142
    async fn _query<'b>(
60✔
143
        &'b self,
60✔
144
        query: RasterQueryRectangle,
60✔
145
        ctx: &'b dyn QueryContext,
60✔
146
    ) -> Result<BoxStream<'b, Result<Self::Output>>> {
60✔
147
        let stream = self.query_processor.raster_query(query, ctx).await?;
60✔
148
        let converted_stream = stream.and_then(move |tile| {
112✔
149
            crate::util::spawn_blocking(|| tile.convert_data_type()).map_err(Into::into)
112✔
150
        });
112✔
151

60✔
152
        Ok(converted_stream.boxed())
60✔
153
    }
120✔
154
}
155

156
#[cfg(test)]
157
mod tests {
158
    use geoengine_datatypes::{
159
        primitives::{CacheHint, Measurement, SpatialPartition2D, SpatialResolution, TimeInterval},
160
        raster::{
161
            Grid2D, GridOrEmpty2D, MaskedGrid2D, RasterDataType, TileInformation,
162
            TilingSpecification,
163
        },
164
        spatial_reference::SpatialReference,
165
        util::test::TestDefault,
166
    };
167

168
    use crate::{
169
        engine::{ChunkByteSize, MockExecutionContext, RasterBandDescriptors},
170
        mock::{MockRasterSource, MockRasterSourceParams},
171
    };
172

173
    use super::*;
174

175
    #[tokio::test]
1✔
176
    #[allow(clippy::float_cmp)]
177
    async fn test_type_conversion() {
1✔
178
        let grid_shape = [2, 2].into();
1✔
179

1✔
180
        let tiling_specification = TilingSpecification {
1✔
181
            origin_coordinate: [0.0, 0.0].into(),
1✔
182
            tile_size_in_pixels: grid_shape,
1✔
183
        };
1✔
184

1✔
185
        let raster: MaskedGrid2D<u8> = Grid2D::new(grid_shape, vec![7_u8, 7, 7, 6]).unwrap().into();
1✔
186

1✔
187
        let ctx = MockExecutionContext::new_with_tiling_spec(tiling_specification);
1✔
188
        let query_ctx = ctx.mock_query_context(ChunkByteSize::test_default());
1✔
189

1✔
190
        let raster_tile = RasterTile2D::new_with_tile_info(
1✔
191
            TimeInterval::default(),
1✔
192
            TileInformation {
1✔
193
                global_geo_transform: TestDefault::test_default(),
1✔
194
                global_tile_position: [0, 0].into(),
1✔
195
                tile_size_in_pixels: grid_shape,
1✔
196
            },
1✔
197
            0,
1✔
198
            raster.into(),
1✔
199
            CacheHint::default(),
1✔
200
        );
1✔
201

1✔
202
        let mrs = MockRasterSource {
1✔
203
            params: MockRasterSourceParams {
1✔
204
                data: vec![raster_tile],
1✔
205
                result_descriptor: RasterResultDescriptor {
1✔
206
                    data_type: RasterDataType::U8,
1✔
207
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
208
                    bbox: None,
1✔
209
                    time: None,
1✔
210
                    resolution: None,
1✔
211
                    bands: RasterBandDescriptors::new_single_band(),
1✔
212
                },
1✔
213
            },
1✔
214
        }
1✔
215
        .boxed();
1✔
216

1✔
217
        let op = RasterTypeConversion {
1✔
218
            params: RasterTypeConversionParams {
1✔
219
                output_data_type: RasterDataType::F32,
1✔
220
            },
1✔
221
            sources: SingleRasterSource { raster: mrs },
1✔
222
        }
1✔
223
        .boxed();
1✔
224

225
        let initialized_op = op
1✔
226
            .initialize(WorkflowOperatorPath::initialize_root(), &ctx)
1✔
227
            .await
×
228
            .unwrap();
1✔
229

1✔
230
        let result_descriptor = initialized_op.result_descriptor();
1✔
231

1✔
232
        assert_eq!(result_descriptor.data_type, RasterDataType::F32);
1✔
233
        assert_eq!(
1✔
234
            result_descriptor.bands[0].measurement,
1✔
235
            Measurement::Unitless
1✔
236
        );
1✔
237

238
        let query_processor = initialized_op.query_processor().unwrap();
1✔
239

1✔
240
        let query = geoengine_datatypes::primitives::RasterQueryRectangle {
1✔
241
            spatial_bounds: SpatialPartition2D::new((0., 0.).into(), (2., -2.).into()).unwrap(),
1✔
242
            spatial_resolution: SpatialResolution::one(),
1✔
243
            time_interval: TimeInterval::default(),
1✔
244
            attributes: BandSelection::first(),
1✔
245
        };
1✔
246

247
        let TypedRasterQueryProcessor::F32(typed_processor) = query_processor else {
1✔
248
            panic!("expected TypedRasterQueryProcessor::F32");
×
249
        };
250

251
        let stream = typed_processor
1✔
252
            .raster_query(query, &query_ctx)
1✔
253
            .await
×
254
            .unwrap();
1✔
255

256
        let results = stream.collect::<Vec<Result<RasterTile2D<f32>>>>().await;
1✔
257

258
        let result_tile = results.as_slice()[0].as_ref().unwrap();
1✔
259

1✔
260
        let result_grid = result_tile.grid_array.clone();
1✔
261

1✔
262
        match result_grid {
1✔
263
            GridOrEmpty2D::Grid(masked_grid) => {
1✔
264
                assert_eq!(masked_grid.inner_grid.shape, [2, 2].into());
1✔
265
                assert_eq!(masked_grid.inner_grid.data, &[7., 7., 7., 6.]);
1✔
266
            }
267
            GridOrEmpty2D::Empty(_) => panic!("expected GridOrEmpty2D::Grid"),
×
268
        }
269
    }
270
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc