• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.05
/operators/src/processing/raster_vector_join/mod.rs
1
mod aggregated;
2
mod aggregator;
3
mod non_aggregated;
4
mod util;
5

6
use crate::engine::{
7
    CanonicOperatorName, ExecutionContext, InitializedRasterOperator, InitializedVectorOperator,
8
    Operator, OperatorName, SingleVectorMultipleRasterSources, TypedRasterQueryProcessor,
9
    TypedVectorQueryProcessor, VectorColumnInfo, VectorOperator, VectorQueryProcessor,
10
    VectorResultDescriptor, WorkflowOperatorPath,
11
};
12
use crate::error::{self, ColumnNameConflict, Error};
13
use crate::processing::raster_vector_join::non_aggregated::RasterVectorJoinProcessor;
14
use crate::util::Result;
15

16
use crate::processing::raster_vector_join::aggregated::RasterVectorAggregateJoinProcessor;
17
use async_trait::async_trait;
18
use geoengine_datatypes::collections::VectorDataType;
19
use geoengine_datatypes::primitives::FeatureDataType;
20
use geoengine_datatypes::raster::{Pixel, RasterDataType, RenameBands};
21
use serde::{Deserialize, Serialize};
22
use snafu::ensure;
23

24
use self::aggregator::{
25
    Aggregator, FirstValueFloatAggregator, FirstValueIntAggregator, MeanValueAggregator,
26
    TypedAggregator,
27
};
28

29
/// An operator that attaches raster values to vector data
30
pub type RasterVectorJoin = Operator<RasterVectorJoinParams, SingleVectorMultipleRasterSources>;
31

32
impl OperatorName for RasterVectorJoin {
33
    const TYPE_NAME: &'static str = "RasterVectorJoin";
34
}
35

36
const MAX_NUMBER_OF_RASTER_INPUTS: usize = 8;
37

38
/// The parameter spec for `RasterVectorJoin`
39
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
40
#[serde(rename_all = "camelCase")]
41
pub struct RasterVectorJoinParams {
42
    /// The names of the new columns are derived from the names of the raster bands.
43
    /// This parameter specifies how to perform the derivation.
44
    pub names: ColumnNames,
45

46
    /// Specifies which method is used for aggregating values for a feature
47
    pub feature_aggregation: FeatureAggregationMethod,
48

49
    /// Whether NO DATA values should be ignored in aggregating the joined feature data
50
    /// `false` by default
51
    #[serde(default)]
52
    pub feature_aggregation_ignore_no_data: bool,
53

54
    /// Specifies which method is used for aggregating values over time
55
    pub temporal_aggregation: TemporalAggregationMethod,
56

57
    /// Whether NO DATA values should be ignored in aggregating the joined temporal data
58
    /// `false` by default
59
    #[serde(default)]
60
    pub temporal_aggregation_ignore_no_data: bool,
61
}
62

63
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
64
#[serde(rename_all = "camelCase", tag = "type", content = "values")]
65
pub enum ColumnNames {
66
    Default, // use the input band name and append " (n)" to the band name for the `n`-th conflict,
67
    Suffix(Vec<String>), // A suffix for every input, to be appended to the original band names
68
    Names(Vec<String>), // A column name for each band, to be used instead of the original band names
69
}
70

71
impl From<ColumnNames> for RenameBands {
72
    fn from(column_names: ColumnNames) -> Self {
5✔
73
        match column_names {
5✔
74
            ColumnNames::Default => RenameBands::Default,
4✔
75
            ColumnNames::Suffix(suffixes) => RenameBands::Suffix(suffixes),
×
76
            ColumnNames::Names(names) => RenameBands::Rename(names),
1✔
77
        }
78
    }
5✔
79
}
80

81
/// How to aggregate the values for the geometries inside a feature e.g.
82
/// the mean of all the raster values corresponding to the individual
83
/// points inside a `MultiPoint` feature.
84
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Copy)]
85
#[serde(rename_all = "camelCase")]
86
pub enum FeatureAggregationMethod {
87
    First,
88
    Mean,
89
}
90

91
/// How to aggregate the values over time
92
/// If there are multiple rasters valid during the validity of a feature
93
/// the featuer is either split into multiple (None-aggregation) or the
94
/// values are aggreagated
95
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Copy)]
96
#[serde(rename_all = "camelCase")]
97
pub enum TemporalAggregationMethod {
98
    None,
99
    First,
100
    Mean,
101
}
102

103
#[allow(clippy::too_many_lines)]
104
#[typetag::serde]
2✔
105
#[async_trait]
106
impl VectorOperator for RasterVectorJoin {
107
    async fn _initialize(
108
        mut self: Box<Self>,
109
        path: WorkflowOperatorPath,
110
        context: &dyn ExecutionContext,
111
    ) -> Result<Box<dyn InitializedVectorOperator>> {
6✔
112
        ensure!(
6✔
113
            (1..=MAX_NUMBER_OF_RASTER_INPUTS).contains(&self.sources.rasters.len()),
6✔
114
            error::InvalidNumberOfRasterInputs {
×
115
                expected: 1..MAX_NUMBER_OF_RASTER_INPUTS,
×
116
                found: self.sources.rasters.len()
×
117
            }
×
118
        );
119

120
        let name = CanonicOperatorName::from(&self);
6✔
121

122
        let vector_source = self
6✔
123
            .sources
6✔
124
            .vector
6✔
125
            .initialize(path.clone_and_append(0), context)
6✔
126
            .await?;
6✔
127

128
        let vector_rd = vector_source.result_descriptor();
6✔
129

6✔
130
        ensure!(
6✔
131
            vector_rd.data_type != VectorDataType::Data,
6✔
132
            error::InvalidType {
×
133
                expected: format!(
×
134
                    "{}, {} or {}",
×
135
                    VectorDataType::MultiPoint,
×
136
                    VectorDataType::MultiLineString,
×
137
                    VectorDataType::MultiPolygon
×
138
                ),
×
139
                found: VectorDataType::Data.to_string()
×
140
            },
×
141
        );
142

143
        let raster_sources = futures::future::try_join_all(
6✔
144
            self.sources
6✔
145
                .rasters
6✔
146
                .into_iter()
6✔
147
                .enumerate()
6✔
148
                .map(|(i, op)| op.initialize(path.clone_and_append(i as u8 + 1), context)),
7✔
149
        )
6✔
150
        .await?;
6✔
151

152
        let source_descriptors = raster_sources
6✔
153
            .iter()
6✔
154
            .map(InitializedRasterOperator::result_descriptor)
6✔
155
            .collect::<Vec<_>>();
6✔
156

6✔
157
        let spatial_reference = vector_rd.spatial_reference;
6✔
158

159
        for other_spatial_reference in source_descriptors
7✔
160
            .iter()
6✔
161
            .map(|source_descriptor| source_descriptor.spatial_reference)
7✔
162
        {
163
            ensure!(
7✔
164
                spatial_reference == other_spatial_reference,
7✔
165
                crate::error::InvalidSpatialReference {
1✔
166
                    expected: spatial_reference,
1✔
167
                    found: other_spatial_reference,
1✔
168
                }
1✔
169
            );
170
        }
171

172
        let raster_sources_bands = source_descriptors
5✔
173
            .iter()
5✔
174
            .map(|rd| rd.bands.count() as usize)
6✔
175
            .collect::<Vec<_>>();
5✔
176

5✔
177
        let rename_bands: RenameBands = self.params.names.clone().into();
5✔
178

179
        let new_column_names = rename_bands.apply(
5✔
180
            source_descriptors
5✔
181
                .iter()
5✔
182
                .map(|d| d.bands.iter().map(|b| b.name.clone()).collect())
9✔
183
                .collect(),
5✔
184
        )?;
5✔
185

186
        for name in vector_rd.columns.keys() {
5✔
187
            ensure!(
×
188
                !new_column_names.contains(name),
×
189
                ColumnNameConflict { name }
×
190
            );
191
        }
192

193
        let params = self.params;
5✔
194

5✔
195
        let result_descriptor = vector_rd.map_columns(|columns| {
5✔
196
            let mut columns = columns.clone();
5✔
197
            let mut new_column_name_idx = 0;
5✔
198

199
            for source_descriptor in &source_descriptors {
11✔
200
                let feature_data_type = match params.temporal_aggregation {
6✔
201
                    TemporalAggregationMethod::First | TemporalAggregationMethod::None => {
202
                        match source_descriptor.data_type {
4✔
203
                            RasterDataType::U8
204
                            | RasterDataType::U16
205
                            | RasterDataType::U32
206
                            | RasterDataType::U64
207
                            | RasterDataType::I8
208
                            | RasterDataType::I16
209
                            | RasterDataType::I32
210
                            | RasterDataType::I64 => FeatureDataType::Int,
4✔
211
                            RasterDataType::F32 | RasterDataType::F64 => FeatureDataType::Float,
×
212
                        }
213
                    }
214
                    TemporalAggregationMethod::Mean => FeatureDataType::Float,
2✔
215
                };
216

217
                for band in source_descriptor.bands.iter() {
9✔
218
                    let column_name = new_column_names[new_column_name_idx].clone();
9✔
219
                    new_column_name_idx += 1;
9✔
220

9✔
221
                    columns.insert(
9✔
222
                        column_name,
9✔
223
                        VectorColumnInfo {
9✔
224
                            data_type: feature_data_type,
9✔
225
                            measurement: band.measurement.clone(),
9✔
226
                        },
9✔
227
                    );
9✔
228
                }
9✔
229
            }
230
            columns
5✔
231
        });
5✔
232

5✔
233
        Ok(InitializedRasterVectorJoin {
5✔
234
            name,
5✔
235
            path,
5✔
236
            result_descriptor,
5✔
237
            vector_source,
5✔
238
            raster_sources,
5✔
239
            raster_sources_bands,
5✔
240
            state: params,
5✔
241
            new_column_names,
5✔
242
        }
5✔
243
        .boxed())
5✔
244
    }
12✔
245

246
    span_fn!(RasterVectorJoin);
247
}
248

249
pub struct RasterInput {
250
    pub processor: TypedRasterQueryProcessor,
251
    pub column_names: Vec<String>,
252
}
253

254
pub struct InitializedRasterVectorJoin {
255
    name: CanonicOperatorName,
256
    path: WorkflowOperatorPath,
257
    result_descriptor: VectorResultDescriptor,
258
    vector_source: Box<dyn InitializedVectorOperator>,
259
    raster_sources: Vec<Box<dyn InitializedRasterOperator>>,
260
    raster_sources_bands: Vec<usize>,
261
    state: RasterVectorJoinParams,
262
    new_column_names: Vec<String>,
263
}
264

265
impl InitializedVectorOperator for InitializedRasterVectorJoin {
266
    fn result_descriptor(&self) -> &VectorResultDescriptor {
2✔
267
        &self.result_descriptor
2✔
268
    }
2✔
269

270
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
4✔
271
        let mut raster_inputs = vec![];
4✔
272

4✔
273
        let mut names = self.new_column_names.clone();
4✔
274
        for (raster_source, num_bands) in self
4✔
275
            .raster_sources
4✔
276
            .iter()
4✔
277
            .zip(self.raster_sources_bands.iter())
4✔
278
        {
279
            let processor = raster_source.query_processor()?;
4✔
280
            let column_names = names.drain(0..*num_bands).collect::<Vec<_>>();
4✔
281

4✔
282
            raster_inputs.push(RasterInput {
4✔
283
                processor,
4✔
284
                column_names,
4✔
285
            });
4✔
286
        }
287

288
        Ok(match self.vector_source.query_processor()? {
4✔
289
            TypedVectorQueryProcessor::Data(_) => unreachable!(),
×
290
            TypedVectorQueryProcessor::MultiPoint(points) => {
4✔
291
                TypedVectorQueryProcessor::MultiPoint(match self.state.temporal_aggregation {
4✔
292
                    TemporalAggregationMethod::None => RasterVectorJoinProcessor::new(
×
293
                        points,
×
294
                        self.result_descriptor.clone(),
×
295
                        raster_inputs,
×
296
                        self.state.feature_aggregation,
×
297
                        self.state.feature_aggregation_ignore_no_data,
×
298
                    )
×
299
                    .boxed(),
×
300
                    TemporalAggregationMethod::First | TemporalAggregationMethod::Mean => {
301
                        RasterVectorAggregateJoinProcessor::new(
4✔
302
                            points,
4✔
303
                            self.result_descriptor.clone(),
4✔
304
                            raster_inputs,
4✔
305
                            self.state.feature_aggregation,
4✔
306
                            self.state.feature_aggregation_ignore_no_data,
4✔
307
                            self.state.temporal_aggregation,
4✔
308
                            self.state.temporal_aggregation_ignore_no_data,
4✔
309
                        )
4✔
310
                        .boxed()
4✔
311
                    }
312
                })
313
            }
314
            TypedVectorQueryProcessor::MultiPolygon(polygons) => {
×
315
                TypedVectorQueryProcessor::MultiPolygon(match self.state.temporal_aggregation {
×
316
                    TemporalAggregationMethod::None => RasterVectorJoinProcessor::new(
×
317
                        polygons,
×
318
                        self.result_descriptor.clone(),
×
319
                        raster_inputs,
×
320
                        self.state.feature_aggregation,
×
321
                        self.state.feature_aggregation_ignore_no_data,
×
322
                    )
×
323
                    .boxed(),
×
324
                    TemporalAggregationMethod::First | TemporalAggregationMethod::Mean => {
325
                        RasterVectorAggregateJoinProcessor::new(
×
326
                            polygons,
×
327
                            self.result_descriptor.clone(),
×
328
                            raster_inputs,
×
329
                            self.state.feature_aggregation,
×
330
                            self.state.feature_aggregation_ignore_no_data,
×
331
                            self.state.temporal_aggregation,
×
332
                            self.state.temporal_aggregation_ignore_no_data,
×
333
                        )
×
334
                        .boxed()
×
335
                    }
336
                })
337
            }
338
            TypedVectorQueryProcessor::MultiLineString(_) => return Err(Error::NotYetImplemented),
×
339
        })
340
    }
4✔
341

342
    fn canonic_name(&self) -> CanonicOperatorName {
×
343
        self.name.clone()
×
344
    }
×
345

NEW
346
    fn name(&self) -> &'static str {
×
NEW
347
        RasterVectorJoin::TYPE_NAME
×
NEW
348
    }
×
349

NEW
350
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
351
        self.path.clone()
×
NEW
352
    }
×
353
}
354

355
pub fn create_feature_aggregator<P: Pixel>(
31✔
356
    number_of_features: usize,
31✔
357
    aggregation: FeatureAggregationMethod,
31✔
358
    ignore_no_data: bool,
31✔
359
) -> TypedAggregator {
31✔
360
    match aggregation {
31✔
361
        FeatureAggregationMethod::First => match P::TYPE {
16✔
362
            RasterDataType::U8
363
            | RasterDataType::U16
364
            | RasterDataType::U32
365
            | RasterDataType::U64
366
            | RasterDataType::I8
367
            | RasterDataType::I16
368
            | RasterDataType::I32
369
            | RasterDataType::I64 => {
370
                FirstValueIntAggregator::new(number_of_features, ignore_no_data).into_typed()
16✔
371
            }
372
            RasterDataType::F32 | RasterDataType::F64 => {
373
                FirstValueFloatAggregator::new(number_of_features, ignore_no_data).into_typed()
×
374
            }
375
        },
376
        FeatureAggregationMethod::Mean => {
377
            MeanValueAggregator::new(number_of_features, ignore_no_data).into_typed()
15✔
378
        }
379
    }
380
}
31✔
381

382
#[cfg(test)]
383
mod tests {
384
    use super::*;
385
    use std::str::FromStr;
386

387
    use crate::engine::{
388
        ChunkByteSize, MockExecutionContext, MockQueryContext, QueryProcessor,
389
        RasterBandDescriptor, RasterBandDescriptors, RasterOperator, RasterResultDescriptor,
390
    };
391
    use crate::mock::{MockFeatureCollectionSource, MockRasterSource, MockRasterSourceParams};
392
    use crate::source::{GdalSource, GdalSourceParameters};
393
    use crate::util::gdal::add_ndvi_dataset;
394
    use futures::StreamExt;
395
    use geoengine_datatypes::collections::{FeatureCollectionInfos, MultiPointCollection};
396
    use geoengine_datatypes::dataset::NamedData;
397
    use geoengine_datatypes::primitives::{
398
        BoundingBox2D, ColumnSelection, DataRef, DateTime, FeatureDataRef, MultiPoint,
399
        SpatialResolution, TimeInterval, VectorQueryRectangle,
400
    };
401
    use geoengine_datatypes::primitives::{CacheHint, Measurement};
402
    use geoengine_datatypes::raster::RasterTile2D;
403
    use geoengine_datatypes::spatial_reference::SpatialReference;
404
    use geoengine_datatypes::util::{gdal::hide_gdal_errors, test::TestDefault};
405
    use serde_json::json;
406

407
    #[test]
408
    fn serialization() {
1✔
409
        let raster_vector_join = RasterVectorJoin {
1✔
410
            params: RasterVectorJoinParams {
1✔
411
                names: ColumnNames::Names(vec!["foo".to_string(), "bar".to_string()]),
1✔
412
                feature_aggregation: FeatureAggregationMethod::First,
1✔
413
                feature_aggregation_ignore_no_data: false,
1✔
414
                temporal_aggregation: TemporalAggregationMethod::Mean,
1✔
415
                temporal_aggregation_ignore_no_data: false,
1✔
416
            },
1✔
417
            sources: SingleVectorMultipleRasterSources {
1✔
418
                vector: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![]).boxed(),
1✔
419
                rasters: vec![],
1✔
420
            },
1✔
421
        };
1✔
422

1✔
423
        let serialized = json!({
1✔
424
            "type": "RasterVectorJoin",
1✔
425
            "params": {
1✔
426
                "names": {
1✔
427
                    "type": "names",
1✔
428
                    "values": ["foo", "bar"],
1✔
429
                },
1✔
430
                "featureAggregation": "first",
1✔
431
                "temporalAggregation": "mean",
1✔
432
            },
1✔
433
            "sources": {
1✔
434
                "vector": {
1✔
435
                    "type": "MockFeatureCollectionSourceMultiPoint",
1✔
436
                    "params": {
1✔
437
                        "collections": [],
1✔
438
                        "spatialReference": "EPSG:4326",
1✔
439
                        "measurements": {},
1✔
440
                    }
1✔
441
                },
1✔
442
                "rasters": [],
1✔
443
            },
1✔
444
        })
1✔
445
        .to_string();
1✔
446

1✔
447
        let deserialized: RasterVectorJoin = serde_json::from_str(&serialized).unwrap();
1✔
448

1✔
449
        assert_eq!(deserialized.params, raster_vector_join.params);
1✔
450
    }
1✔
451

452
    fn ndvi_source(name: NamedData) -> Box<dyn RasterOperator> {
4✔
453
        let gdal_source = GdalSource {
4✔
454
            params: GdalSourceParameters { data: name },
4✔
455
        };
4✔
456

4✔
457
        gdal_source.boxed()
4✔
458
    }
4✔
459

460
    #[tokio::test]
461
    async fn ndvi_time_point() {
1✔
462
        let point_source = MockFeatureCollectionSource::single(
1✔
463
            MultiPointCollection::from_data(
1✔
464
                MultiPoint::many(vec![
1✔
465
                    (-13.95, 20.05),
1✔
466
                    (-14.05, 20.05),
1✔
467
                    (-13.95, 19.95),
1✔
468
                    (-14.05, 19.95),
1✔
469
                ])
1✔
470
                .unwrap(),
1✔
471
                vec![
1✔
472
                    TimeInterval::new(
1✔
473
                        DateTime::new_utc(2014, 1, 1, 0, 0, 0),
1✔
474
                        DateTime::new_utc(2014, 1, 1, 0, 0, 0),
1✔
475
                    )
1✔
476
                    .unwrap();
1✔
477
                    4
1✔
478
                ],
1✔
479
                Default::default(),
1✔
480
                CacheHint::default(),
1✔
481
            )
1✔
482
            .unwrap(),
1✔
483
        )
1✔
484
        .boxed();
1✔
485

1✔
486
        let mut exe_ctc = MockExecutionContext::test_default();
1✔
487
        let ndvi_id = add_ndvi_dataset(&mut exe_ctc);
1✔
488

1✔
489
        let operator = RasterVectorJoin {
1✔
490
            params: RasterVectorJoinParams {
1✔
491
                names: ColumnNames::Default,
1✔
492
                feature_aggregation: FeatureAggregationMethod::First,
1✔
493
                feature_aggregation_ignore_no_data: false,
1✔
494
                temporal_aggregation: TemporalAggregationMethod::First,
1✔
495
                temporal_aggregation_ignore_no_data: false,
1✔
496
            },
1✔
497
            sources: SingleVectorMultipleRasterSources {
1✔
498
                vector: point_source,
1✔
499
                rasters: vec![ndvi_source(ndvi_id.clone())],
1✔
500
            },
1✔
501
        };
1✔
502

1✔
503
        let operator = operator
1✔
504
            .boxed()
1✔
505
            .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctc)
1✔
506
            .await
1✔
507
            .unwrap();
1✔
508

1✔
509
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
510

1✔
511
        let result = query_processor
1✔
512
            .query(
1✔
513
                VectorQueryRectangle {
1✔
514
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
515
                        .unwrap(),
1✔
516
                    time_interval: TimeInterval::default(),
1✔
517
                    spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
518
                    attributes: ColumnSelection::all(),
1✔
519
                },
1✔
520
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
521
            )
1✔
522
            .await
1✔
523
            .unwrap()
1✔
524
            .map(Result::unwrap)
1✔
525
            .collect::<Vec<MultiPointCollection>>()
1✔
526
            .await;
1✔
527

1✔
528
        assert_eq!(result.len(), 1);
1✔
529

1✔
530
        let FeatureDataRef::Int(data) = result[0].data("ndvi").unwrap() else {
1✔
531
            unreachable!();
1✔
532
        };
1✔
533

1✔
534
        // these values are taken from loading the tiff in QGIS
1✔
535
        assert_eq!(data.as_ref(), &[54, 55, 51, 55]);
1✔
536
    }
1✔
537

538
    #[tokio::test]
539
    #[allow(clippy::float_cmp)]
540
    async fn ndvi_time_range() {
1✔
541
        let point_source = MockFeatureCollectionSource::single(
1✔
542
            MultiPointCollection::from_data(
1✔
543
                MultiPoint::many(vec![
1✔
544
                    (-13.95, 20.05),
1✔
545
                    (-14.05, 20.05),
1✔
546
                    (-13.95, 19.95),
1✔
547
                    (-14.05, 19.95),
1✔
548
                ])
1✔
549
                .unwrap(),
1✔
550
                vec![
1✔
551
                    TimeInterval::new(
1✔
552
                        DateTime::new_utc(2014, 1, 1, 0, 0, 0),
1✔
553
                        DateTime::new_utc(2014, 3, 1, 0, 0, 0),
1✔
554
                    )
1✔
555
                    .unwrap();
1✔
556
                    4
1✔
557
                ],
1✔
558
                Default::default(),
1✔
559
                CacheHint::default(),
1✔
560
            )
1✔
561
            .unwrap(),
1✔
562
        )
1✔
563
        .boxed();
1✔
564

1✔
565
        let mut exe_ctc = MockExecutionContext::test_default();
1✔
566
        let ndvi_id = add_ndvi_dataset(&mut exe_ctc);
1✔
567

1✔
568
        let operator = RasterVectorJoin {
1✔
569
            params: RasterVectorJoinParams {
1✔
570
                names: ColumnNames::Default,
1✔
571
                feature_aggregation: FeatureAggregationMethod::First,
1✔
572
                feature_aggregation_ignore_no_data: false,
1✔
573
                temporal_aggregation: TemporalAggregationMethod::Mean,
1✔
574
                temporal_aggregation_ignore_no_data: false,
1✔
575
            },
1✔
576
            sources: SingleVectorMultipleRasterSources {
1✔
577
                vector: point_source,
1✔
578
                rasters: vec![ndvi_source(ndvi_id.clone())],
1✔
579
            },
1✔
580
        };
1✔
581

1✔
582
        let operator = operator
1✔
583
            .boxed()
1✔
584
            .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctc)
1✔
585
            .await
1✔
586
            .unwrap();
1✔
587

1✔
588
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
589

1✔
590
        let result = query_processor
1✔
591
            .query(
1✔
592
                VectorQueryRectangle {
1✔
593
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
594
                        .unwrap(),
1✔
595
                    time_interval: TimeInterval::default(),
1✔
596
                    spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
597
                    attributes: ColumnSelection::all(),
1✔
598
                },
1✔
599
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
600
            )
1✔
601
            .await
1✔
602
            .unwrap()
1✔
603
            .map(Result::unwrap)
1✔
604
            .collect::<Vec<MultiPointCollection>>()
1✔
605
            .await;
1✔
606

1✔
607
        assert_eq!(result.len(), 1);
1✔
608

1✔
609
        let FeatureDataRef::Float(data) = result[0].data("ndvi").unwrap() else {
1✔
610
            unreachable!();
1✔
611
        };
1✔
612

1✔
613
        // these values are taken from loading the tiff in QGIS
1✔
614
        assert_eq!(
1✔
615
            data.as_ref(),
1✔
616
            &[
1✔
617
                (54. + 52.) / 2.,
1✔
618
                (55. + 55.) / 2.,
1✔
619
                (51. + 50.) / 2.,
1✔
620
                (55. + 53.) / 2.,
1✔
621
            ]
1✔
622
        );
1✔
623
    }
1✔
624

625
    #[tokio::test]
626
    #[allow(clippy::float_cmp)]
627
    async fn ndvi_with_default_time() {
1✔
628
        hide_gdal_errors();
1✔
629

1✔
630
        let point_source = MockFeatureCollectionSource::single(
1✔
631
            MultiPointCollection::from_data(
1✔
632
                MultiPoint::many(vec![
1✔
633
                    (-13.95, 20.05),
1✔
634
                    (-14.05, 20.05),
1✔
635
                    (-13.95, 19.95),
1✔
636
                    (-14.05, 19.95),
1✔
637
                ])
1✔
638
                .unwrap(),
1✔
639
                vec![TimeInterval::default(); 4],
1✔
640
                Default::default(),
1✔
641
                CacheHint::default(),
1✔
642
            )
1✔
643
            .unwrap(),
1✔
644
        )
1✔
645
        .boxed();
1✔
646

1✔
647
        let mut exe_ctc = MockExecutionContext::test_default();
1✔
648
        let ndvi_id = add_ndvi_dataset(&mut exe_ctc);
1✔
649

1✔
650
        let operator = RasterVectorJoin {
1✔
651
            params: RasterVectorJoinParams {
1✔
652
                names: ColumnNames::Default,
1✔
653
                feature_aggregation: FeatureAggregationMethod::First,
1✔
654
                feature_aggregation_ignore_no_data: false,
1✔
655
                temporal_aggregation: TemporalAggregationMethod::Mean,
1✔
656
                temporal_aggregation_ignore_no_data: false,
1✔
657
            },
1✔
658
            sources: SingleVectorMultipleRasterSources {
1✔
659
                vector: point_source,
1✔
660
                rasters: vec![ndvi_source(ndvi_id.clone())],
1✔
661
            },
1✔
662
        };
1✔
663

1✔
664
        let operator = operator
1✔
665
            .boxed()
1✔
666
            .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctc)
1✔
667
            .await
1✔
668
            .unwrap();
1✔
669

1✔
670
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
671

1✔
672
        let result = query_processor
1✔
673
            .query(
1✔
674
                VectorQueryRectangle {
1✔
675
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
676
                        .unwrap(),
1✔
677
                    time_interval: TimeInterval::default(),
1✔
678
                    spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
679
                    attributes: ColumnSelection::all(),
1✔
680
                },
1✔
681
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
682
            )
1✔
683
            .await
1✔
684
            .unwrap()
1✔
685
            .map(Result::unwrap)
1✔
686
            .collect::<Vec<MultiPointCollection>>()
1✔
687
            .await;
1✔
688

1✔
689
        assert_eq!(result.len(), 1);
1✔
690

1✔
691
        let FeatureDataRef::Float(data) = result[0].data("ndvi").unwrap() else {
1✔
692
            unreachable!();
1✔
693
        };
1✔
694

1✔
695
        assert_eq!(data.as_ref(), &[0., 0., 0., 0.]);
1✔
696

1✔
697
        assert_eq!(data.nulls(), vec![true, true, true, true]);
1✔
698
    }
1✔
699

700
    #[tokio::test]
701
    async fn it_checks_sref() {
1✔
702
        let point_source = MockFeatureCollectionSource::with_collections_and_sref(
1✔
703
            vec![MultiPointCollection::from_data(
1✔
704
                MultiPoint::many(vec![
1✔
705
                    (-13.95, 20.05),
1✔
706
                    (-14.05, 20.05),
1✔
707
                    (-13.95, 19.95),
1✔
708
                    (-14.05, 19.95),
1✔
709
                ])
1✔
710
                .unwrap(),
1✔
711
                vec![TimeInterval::default(); 4],
1✔
712
                Default::default(),
1✔
713
                CacheHint::default(),
1✔
714
            )
1✔
715
            .unwrap()],
1✔
716
            SpatialReference::from_str("EPSG:3857").unwrap(),
1✔
717
        )
1✔
718
        .boxed();
1✔
719

1✔
720
        let mut exe_ctc = MockExecutionContext::test_default();
1✔
721
        let ndvi_id = add_ndvi_dataset(&mut exe_ctc);
1✔
722

1✔
723
        let operator = RasterVectorJoin {
1✔
724
            params: RasterVectorJoinParams {
1✔
725
                names: ColumnNames::Default,
1✔
726
                feature_aggregation: FeatureAggregationMethod::First,
1✔
727
                feature_aggregation_ignore_no_data: false,
1✔
728
                temporal_aggregation: TemporalAggregationMethod::Mean,
1✔
729
                temporal_aggregation_ignore_no_data: false,
1✔
730
            },
1✔
731
            sources: SingleVectorMultipleRasterSources {
1✔
732
                vector: point_source,
1✔
733
                rasters: vec![ndvi_source(ndvi_id.clone())],
1✔
734
            },
1✔
735
        }
1✔
736
        .boxed()
1✔
737
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctc)
1✔
738
        .await;
1✔
739

1✔
740
        assert!(matches!(
1✔
741
            operator,
1✔
742
            Err(Error::InvalidSpatialReference {
1✔
743
                expected: _,
1✔
744
                found: _,
1✔
745
            })
1✔
746
        ));
1✔
747
    }
1✔
748

749
    #[tokio::test]
750
    #[allow(clippy::too_many_lines)]
751
    async fn it_includes_bands_in_result_descriptor() {
1✔
752
        let point_source = MockFeatureCollectionSource::with_collections_and_sref(
1✔
753
            vec![MultiPointCollection::from_data(
1✔
754
                MultiPoint::many(vec![
1✔
755
                    (-13.95, 20.05),
1✔
756
                    (-14.05, 20.05),
1✔
757
                    (-13.95, 19.95),
1✔
758
                    (-14.05, 19.95),
1✔
759
                ])
1✔
760
                .unwrap(),
1✔
761
                vec![TimeInterval::default(); 4],
1✔
762
                Default::default(),
1✔
763
                CacheHint::default(),
1✔
764
            )
1✔
765
            .unwrap()],
1✔
766
            SpatialReference::from_str("EPSG:4326").unwrap(),
1✔
767
        )
1✔
768
        .boxed();
1✔
769

1✔
770
        let raster_source = MockRasterSource {
1✔
771
            params: MockRasterSourceParams {
1✔
772
                data: Vec::<RasterTile2D<u8>>::new(),
1✔
773
                result_descriptor: RasterResultDescriptor {
1✔
774
                    data_type: RasterDataType::U8,
1✔
775
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
776
                    time: None,
1✔
777
                    bbox: None,
1✔
778
                    resolution: None,
1✔
779
                    bands: RasterBandDescriptors::new(vec![
1✔
780
                        RasterBandDescriptor::new_unitless("band_0".into()),
1✔
781
                        RasterBandDescriptor::new_unitless("band_1".into()),
1✔
782
                    ])
1✔
783
                    .unwrap(),
1✔
784
                },
1✔
785
            },
1✔
786
        }
1✔
787
        .boxed();
1✔
788

1✔
789
        let raster_source2 = MockRasterSource {
1✔
790
            params: MockRasterSourceParams {
1✔
791
                data: Vec::<RasterTile2D<u8>>::new(),
1✔
792
                result_descriptor: RasterResultDescriptor {
1✔
793
                    data_type: RasterDataType::U8,
1✔
794
                    spatial_reference: SpatialReference::epsg_4326().into(),
1✔
795
                    time: None,
1✔
796
                    bbox: None,
1✔
797
                    resolution: None,
1✔
798
                    bands: RasterBandDescriptors::new(vec![
1✔
799
                        RasterBandDescriptor::new_unitless("band_0".into()),
1✔
800
                        RasterBandDescriptor::new_unitless("band_1".into()),
1✔
801
                        RasterBandDescriptor::new_unitless("band_2".into()),
1✔
802
                    ])
1✔
803
                    .unwrap(),
1✔
804
                },
1✔
805
            },
1✔
806
        }
1✔
807
        .boxed();
1✔
808

1✔
809
        let exe_ctc = MockExecutionContext::test_default();
1✔
810

1✔
811
        let join = RasterVectorJoin {
1✔
812
            params: RasterVectorJoinParams {
1✔
813
                names: ColumnNames::Default,
1✔
814
                feature_aggregation: FeatureAggregationMethod::First,
1✔
815
                feature_aggregation_ignore_no_data: false,
1✔
816
                temporal_aggregation: TemporalAggregationMethod::None,
1✔
817
                temporal_aggregation_ignore_no_data: false,
1✔
818
            },
1✔
819
            sources: SingleVectorMultipleRasterSources {
1✔
820
                vector: point_source,
1✔
821
                rasters: vec![raster_source, raster_source2],
1✔
822
            },
1✔
823
        }
1✔
824
        .boxed()
1✔
825
        .initialize(WorkflowOperatorPath::initialize_root(), &exe_ctc)
1✔
826
        .await
1✔
827
        .unwrap();
1✔
828

1✔
829
        assert_eq!(
1✔
830
            join.result_descriptor(),
1✔
831
            &VectorResultDescriptor {
1✔
832
                data_type: VectorDataType::MultiPoint,
1✔
833
                spatial_reference: SpatialReference::epsg_4326().into(),
1✔
834
                columns: [
1✔
835
                    (
1✔
836
                        "band_0".to_string(),
1✔
837
                        VectorColumnInfo {
1✔
838
                            data_type: FeatureDataType::Int,
1✔
839
                            measurement: Measurement::Unitless
1✔
840
                        }
1✔
841
                    ),
1✔
842
                    (
1✔
843
                        "band_1".to_string(),
1✔
844
                        VectorColumnInfo {
1✔
845
                            data_type: FeatureDataType::Int,
1✔
846
                            measurement: Measurement::Unitless
1✔
847
                        }
1✔
848
                    ),
1✔
849
                    (
1✔
850
                        "band_0 (1)".to_string(),
1✔
851
                        VectorColumnInfo {
1✔
852
                            data_type: FeatureDataType::Int,
1✔
853
                            measurement: Measurement::Unitless
1✔
854
                        }
1✔
855
                    ),
1✔
856
                    (
1✔
857
                        "band_1 (1)".to_string(),
1✔
858
                        VectorColumnInfo {
1✔
859
                            data_type: FeatureDataType::Int,
1✔
860
                            measurement: Measurement::Unitless
1✔
861
                        }
1✔
862
                    ),
1✔
863
                    (
1✔
864
                        "band_2".to_string(),
1✔
865
                        VectorColumnInfo {
1✔
866
                            data_type: FeatureDataType::Int,
1✔
867
                            measurement: Measurement::Unitless
1✔
868
                        }
1✔
869
                    )
1✔
870
                ]
1✔
871
                .into_iter()
1✔
872
                .collect(),
1✔
873
                time: None,
1✔
874
                bbox: None
1✔
875
            }
1✔
876
        );
1✔
877
    }
1✔
878
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc