• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 7006568925

27 Nov 2023 02:07PM UTC coverage: 89.651% (+0.2%) from 89.498%
7006568925

push

github

web-flow
Merge pull request #888 from geo-engine/raster_stacks

raster stacking

4032 of 4274 new or added lines in 107 files covered. (94.34%)

12 existing lines in 8 files now uncovered.

113020 of 126066 relevant lines covered (89.65%)

59901.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.68
/operators/src/processing/point_in_polygon.rs
1
mod tester;
2
mod wrapper;
3

4
use std::cmp::min;
5
use std::sync::Arc;
6

7
use futures::stream::BoxStream;
8
use futures::{StreamExt, TryStreamExt};
9
use geoengine_datatypes::dataset::NamedData;
10
use geoengine_datatypes::primitives::CacheHint;
11
use geoengine_datatypes::primitives::VectorQueryRectangle;
12
use rayon::ThreadPool;
13
use serde::{Deserialize, Serialize};
14
use snafu::ensure;
15

16
use crate::adapters::FeatureCollectionChunkMerger;
17
use crate::engine::{
18
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
19
    OperatorName, QueryContext, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
20
    VectorResultDescriptor, WorkflowOperatorPath,
21
};
22
use crate::engine::{OperatorData, QueryProcessor};
23
use crate::error::{self, Error};
24
use crate::util::Result;
25
use arrow::array::BooleanArray;
26
use async_trait::async_trait;
27
use geoengine_datatypes::collections::{
28
    FeatureCollectionInfos, FeatureCollectionModifications, GeometryCollection,
29
    MultiPointCollection, MultiPolygonCollection, VectorDataType,
30
};
31
pub use tester::PointInPolygonTester;
32
pub use wrapper::PointInPolygonTesterWithCollection;
33

34
/// The point in polygon filter requires two inputs in the following order:
35
/// 1. a `MultiPointCollection` source
36
/// 2. a `MultiPolygonCollection` source
37
/// Then, it filters the `MultiPolygonCollection`s so that only those features are retained that are in any polygon.
38
pub type PointInPolygonFilter = Operator<PointInPolygonFilterParams, PointInPolygonFilterSource>;
39

40
impl OperatorName for PointInPolygonFilter {
41
    const TYPE_NAME: &'static str = "PointInPolygonFilter";
42
}
43

44
#[derive(Debug, Clone, Deserialize, Serialize)]
6✔
45
pub struct PointInPolygonFilterParams {}
46

47
#[derive(Debug, Clone, Deserialize, Serialize)]
6✔
48
pub struct PointInPolygonFilterSource {
49
    pub points: Box<dyn VectorOperator>,
50
    pub polygons: Box<dyn VectorOperator>,
51
}
52

53
impl OperatorData for PointInPolygonFilterSource {
54
    fn data_names_collect(&self, data_names: &mut Vec<NamedData>) {
×
55
        self.points.data_names_collect(data_names);
×
56
        self.polygons.data_names_collect(data_names);
×
57
    }
×
58
}
59

60
struct InitializedPointInPolygonFilterSource {
61
    points: Box<dyn InitializedVectorOperator>,
62
    polygons: Box<dyn InitializedVectorOperator>,
63
}
64

65
#[async_trait]
66
impl InitializedSources<InitializedPointInPolygonFilterSource> for PointInPolygonFilterSource {
67
    async fn initialize_sources(
6✔
68
        self,
6✔
69
        path: WorkflowOperatorPath,
6✔
70
        context: &dyn ExecutionContext,
6✔
71
    ) -> Result<InitializedPointInPolygonFilterSource> {
6✔
72
        let points_path = path.clone_and_append(0);
6✔
73
        let polygons_path = path.clone_and_append(1);
6✔
74

6✔
75
        Ok(InitializedPointInPolygonFilterSource {
6✔
76
            points: self.points.initialize(points_path, context).await?,
6✔
77
            polygons: self.polygons.initialize(polygons_path, context).await?,
6✔
78
        })
79
    }
12✔
80
}
81

82
#[typetag::serde]
×
83
#[async_trait]
84
impl VectorOperator for PointInPolygonFilter {
85
    async fn _initialize(
6✔
86
        self: Box<Self>,
6✔
87
        path: WorkflowOperatorPath,
6✔
88
        context: &dyn ExecutionContext,
6✔
89
    ) -> Result<Box<dyn InitializedVectorOperator>> {
6✔
90
        let name = CanonicOperatorName::from(&self);
6✔
91

92
        let initialized_source = self.sources.initialize_sources(path, context).await?;
6✔
93

94
        let points_rd = initialized_source.points.result_descriptor();
6✔
95
        let polygons_rd = initialized_source.polygons.result_descriptor();
6✔
96

6✔
97
        ensure!(
6✔
98
            points_rd.data_type == VectorDataType::MultiPoint,
6✔
99
            error::InvalidType {
×
100
                expected: VectorDataType::MultiPoint.to_string(),
×
101
                found: points_rd.data_type.to_string(),
×
102
            }
×
103
        );
104
        ensure!(
6✔
105
            polygons_rd.data_type == VectorDataType::MultiPolygon,
6✔
106
            error::InvalidType {
×
107
                expected: VectorDataType::MultiPolygon.to_string(),
×
108
                found: polygons_rd.data_type.to_string(),
×
109
            }
×
110
        );
111

112
        ensure!(
6✔
113
            points_rd.spatial_reference == polygons_rd.spatial_reference,
6✔
114
            crate::error::InvalidSpatialReference {
1✔
115
                expected: points_rd.spatial_reference,
1✔
116
                found: polygons_rd.spatial_reference,
1✔
117
            }
1✔
118
        );
119

120
        // We use the result descriptor of the points because in the worst case no feature will be excluded.
121
        // We cannot use the polygon bbox because a `MultiPoint` could have one point within a polygon (and
122
        // thus be included in the result) and one point outside of the bbox of the polygons.
123
        let out_desc = initialized_source.points.result_descriptor().clone();
5✔
124

5✔
125
        let initialized_operator = InitializedPointInPolygonFilter {
5✔
126
            name,
5✔
127
            result_descriptor: out_desc,
5✔
128
            points: initialized_source.points,
5✔
129
            polygons: initialized_source.polygons,
5✔
130
        };
5✔
131

5✔
132
        Ok(initialized_operator.boxed())
5✔
133
    }
12✔
134

135
    span_fn!(PointInPolygonFilter);
×
136
}
137

138
pub struct InitializedPointInPolygonFilter {
139
    name: CanonicOperatorName,
140
    points: Box<dyn InitializedVectorOperator>,
141
    polygons: Box<dyn InitializedVectorOperator>,
142
    result_descriptor: VectorResultDescriptor,
143
}
144

145
impl InitializedVectorOperator for InitializedPointInPolygonFilter {
146
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
147
        let point_processor = self
5✔
148
            .points
5✔
149
            .query_processor()?
5✔
150
            .multi_point()
5✔
151
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
152

153
        let polygon_processor = self
5✔
154
            .polygons
5✔
155
            .query_processor()?
5✔
156
            .multi_polygon()
5✔
157
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
158

5✔
159
        Ok(TypedVectorQueryProcessor::MultiPoint(
5✔
160
            PointInPolygonFilterProcessor::new(point_processor, polygon_processor).boxed(),
5✔
161
        ))
5✔
162
    }
5✔
163

164
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
165
        &self.result_descriptor
×
166
    }
×
167

168
    fn canonic_name(&self) -> CanonicOperatorName {
×
169
        self.name.clone()
×
170
    }
×
171
}
172

173
pub struct PointInPolygonFilterProcessor {
174
    points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
175
    polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
176
}
177

178
impl PointInPolygonFilterProcessor {
179
    pub fn new(
5✔
180
        points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
5✔
181
        polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
5✔
182
    ) -> Self {
5✔
183
        Self { points, polygons }
5✔
184
    }
5✔
185

186
    fn filter_parallel(
10✔
187
        points: &Arc<MultiPointCollection>,
10✔
188
        polygons: &MultiPolygonCollection,
10✔
189
        thread_pool: &ThreadPool,
10✔
190
    ) -> Vec<bool> {
10✔
191
        debug_assert!(!points.is_empty());
10✔
192

193
        // TODO: parallelize over coordinate rather than features
194

195
        let tester = Arc::new(PointInPolygonTester::new(polygons)); // TODO: multithread
10✔
196

10✔
197
        let parallelism = thread_pool.current_num_threads();
10✔
198
        let chunk_size = (points.len() as f64 / parallelism as f64).ceil() as usize;
10✔
199

10✔
200
        let mut result = vec![false; points.len()];
10✔
201

10✔
202
        thread_pool.scope(|scope| {
10✔
203
            let num_features = points.len();
10✔
204
            let feature_offsets = points.feature_offsets();
10✔
205
            let time_intervals = points.time_intervals();
10✔
206
            let coordinates = points.coordinates();
10✔
207

208
            for (chunk_index, chunk_result) in result.chunks_mut(chunk_size).enumerate() {
22✔
209
                let feature_index_start = chunk_index * chunk_size;
22✔
210
                let features_index_end = min(feature_index_start + chunk_size, num_features);
22✔
211
                let tester = tester.clone();
22✔
212

22✔
213
                scope.spawn(move |_| {
22✔
214
                    for (
215
                        feature_index,
22✔
216
                        ((coordinates_start_index, coordinates_end_index), time_interval),
22✔
217
                    ) in two_tuple_windows(
22✔
218
                        feature_offsets[feature_index_start..=features_index_end]
22✔
219
                            .iter()
22✔
220
                            .map(|&c| c as usize),
44✔
221
                    )
22✔
222
                    .zip(time_intervals[feature_index_start..features_index_end].iter())
22✔
223
                    .enumerate()
22✔
224
                    {
22✔
225
                        let is_multi_point_in_polygon_collection = coordinates
22✔
226
                            [coordinates_start_index..coordinates_end_index]
22✔
227
                            .iter()
22✔
228
                            .any(|coordinate| {
22✔
229
                                tester.any_polygon_contains_coordinate(coordinate, time_interval)
22✔
230
                            });
22✔
231

22✔
232
                        chunk_result[feature_index] = is_multi_point_in_polygon_collection;
22✔
233
                    }
22✔
234
                });
22✔
235
            }
22✔
236
        });
10✔
237

10✔
238
        result
10✔
239
    }
10✔
240

241
    async fn filter_points(
10✔
242
        ctx: &dyn QueryContext,
10✔
243
        points: Arc<MultiPointCollection>,
10✔
244
        polygons: MultiPolygonCollection,
10✔
245
        initial_filter: &BooleanArray,
10✔
246
    ) -> Result<BooleanArray> {
10✔
247
        let thread_pool = ctx.thread_pool().clone();
10✔
248

10✔
249
        let thread_points = points.clone();
10✔
250
        let filter = crate::util::spawn_blocking(move || {
10✔
251
            Self::filter_parallel(&thread_points, &polygons, &thread_pool)
10✔
252
        })
10✔
253
        .await?;
10✔
254

255
        arrow::compute::or(initial_filter, &filter.into()).map_err(Into::into)
10✔
256
    }
10✔
257
}
258

259
#[async_trait]
260
impl VectorQueryProcessor for PointInPolygonFilterProcessor {
261
    type VectorType = MultiPointCollection;
262

263
    async fn vector_query<'a>(
6✔
264
        &'a self,
6✔
265
        query: VectorQueryRectangle,
6✔
266
        ctx: &'a dyn QueryContext,
6✔
267
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
6✔
268
        let filtered_stream =
6✔
269
            self.points
6✔
270
                .query(query.clone(), ctx)
6✔
271
                .await?
×
272
                .and_then(move |points| {
8✔
273
                    let query: geoengine_datatypes::primitives::QueryRectangle<
8✔
274
                        geoengine_datatypes::primitives::BoundingBox2D,
8✔
275
                        geoengine_datatypes::primitives::ColumnSelection,
8✔
276
                    > = query.clone();
8✔
277
                    async move {
8✔
278
                        if points.is_empty() {
8✔
279
                            return Ok(points);
1✔
280
                        }
7✔
281

7✔
282
                        let initial_filter = BooleanArray::from(vec![false; points.len()]);
7✔
283
                        let arc_points = Arc::new(points);
7✔
284

285
                        let (filter, cache_hint) = self
7✔
286
                            .polygons
7✔
287
                            .query(query.clone(), ctx)
7✔
NEW
288
                            .await?
×
289
                            .try_fold(
7✔
290
                                (initial_filter, CacheHint::max_duration()),
7✔
291
                                |acc, polygons| {
11✔
292
                                    let arc_points = arc_points.clone();
11✔
293
                                    async move {
11✔
294
                                        let (filter, mut cache_hint) = acc;
11✔
295
                                        let polygons = polygons;
11✔
296

11✔
297
                                        cache_hint.merge_with(&polygons.cache_hint);
11✔
298

11✔
299
                                        if polygons.is_empty() {
11✔
300
                                            return Ok((filter, cache_hint));
1✔
301
                                        }
10✔
302

10✔
303
                                        Ok((
10✔
304
                                            Self::filter_points(
10✔
305
                                                ctx,
10✔
306
                                                arc_points.clone(),
10✔
307
                                                polygons,
10✔
308
                                                &filter,
10✔
309
                                            )
10✔
310
                                            .await?,
10✔
311
                                            cache_hint,
10✔
312
                                        ))
313
                                    }
11✔
314
                                },
11✔
315
                            )
7✔
316
                            .await?;
10✔
317

318
                        let mut new_points =
7✔
319
                            arc_points.filter(filter).map_err(Into::<Error>::into)?;
7✔
320
                        new_points.cache_hint = cache_hint;
7✔
321

7✔
322
                        Ok(new_points)
7✔
323
                    }
8✔
324
                });
8✔
325

6✔
326
        Ok(
6✔
327
            FeatureCollectionChunkMerger::new(filtered_stream.fuse(), ctx.chunk_byte_size().into())
6✔
328
                .boxed(),
6✔
329
        )
6✔
330
    }
12✔
331
}
332

333
/// Loop through an iterator by yielding the current and previous tuple. Starts with the
334
/// (first, second) item, so the iterator must have more than one item to create an output.
335
fn two_tuple_windows<I, T>(mut iter: I) -> impl Iterator<Item = (T, T)>
22✔
336
where
22✔
337
    I: Iterator<Item = T>,
22✔
338
    T: Copy,
22✔
339
{
22✔
340
    let mut last = iter.next();
22✔
341

22✔
342
    iter.map(move |item| {
22✔
343
        let output = (last.unwrap(), item);
22✔
344
        last = Some(item);
22✔
345
        output
22✔
346
    })
22✔
347
}
22✔
348

349
#[cfg(test)]
350
mod tests {
351

352
    use super::*;
353
    use std::str::FromStr;
354

355
    use geoengine_datatypes::collections::ChunksEqualIgnoringCacheHint;
356
    use geoengine_datatypes::primitives::{
357
        BoundingBox2D, Coordinate2D, MultiPoint, MultiPolygon, SpatialResolution, TimeInterval,
358
    };
359
    use geoengine_datatypes::primitives::{CacheHint, ColumnSelection};
360
    use geoengine_datatypes::spatial_reference::SpatialReference;
361
    use geoengine_datatypes::util::test::TestDefault;
362

363
    use crate::engine::{ChunkByteSize, MockExecutionContext, MockQueryContext};
364
    use crate::error::Error;
365
    use crate::mock::MockFeatureCollectionSource;
366

367
    #[test]
1✔
368
    fn point_in_polygon_boundary_conditions() {
1✔
369
        let collection = MultiPolygonCollection::from_data(
1✔
370
            vec![MultiPolygon::new(vec![vec![vec![
1✔
371
                (0.0, 0.0).into(),
1✔
372
                (10.0, 0.0).into(),
1✔
373
                (10.0, 10.0).into(),
1✔
374
                (0.0, 10.0).into(),
1✔
375
                (0.0, 0.0).into(),
1✔
376
            ]]])
1✔
377
            .unwrap()],
1✔
378
            vec![Default::default(); 1],
1✔
379
            Default::default(),
1✔
380
            CacheHint::default(),
1✔
381
        )
1✔
382
        .unwrap();
1✔
383

1✔
384
        let tester = PointInPolygonTester::new(&collection);
1✔
385

1✔
386
        // the algorithm is not stable for boundary cases directly on the edges
1✔
387

1✔
388
        assert!(tester.any_polygon_contains_coordinate(
1✔
389
            &Coordinate2D::new(0.000_001, 0.000_001),
1✔
390
            &Default::default()
1✔
391
        ),);
1✔
392
        assert!(tester.any_polygon_contains_coordinate(
1✔
393
            &Coordinate2D::new(0.000_001, 0.1),
1✔
394
            &Default::default()
1✔
395
        ),);
1✔
396
        assert!(tester.any_polygon_contains_coordinate(
1✔
397
            &Coordinate2D::new(0.1, 0.000_001),
1✔
398
            &Default::default()
1✔
399
        ),);
1✔
400

401
        assert!(tester
1✔
402
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 9.9), &Default::default()),);
1✔
403
        assert!(tester
1✔
404
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.0, 9.9), &Default::default()),);
1✔
405
        assert!(tester
1✔
406
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 10.0), &Default::default()),);
1✔
407

408
        assert!(!tester
1✔
409
            .any_polygon_contains_coordinate(&Coordinate2D::new(-0.1, -0.1), &Default::default()),);
1✔
410
        assert!(!tester
1✔
411
            .any_polygon_contains_coordinate(&Coordinate2D::new(0.0, -0.1), &Default::default()),);
1✔
412
        assert!(!tester
1✔
413
            .any_polygon_contains_coordinate(&Coordinate2D::new(-0.1, 0.0), &Default::default()),);
1✔
414

415
        assert!(!tester
1✔
416
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.1, 10.1), &Default::default()),);
1✔
417
        assert!(!tester
1✔
418
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.1, 9.9), &Default::default()),);
1✔
419
        assert!(!tester
1✔
420
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 10.1), &Default::default()),);
1✔
421
    }
1✔
422

423
    #[tokio::test]
1✔
424
    async fn all() -> Result<()> {
1✔
425
        let points = MultiPointCollection::from_data(
1✔
426
            MultiPoint::many(vec![(0.001, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
427
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
428
            Default::default(),
1✔
429
            CacheHint::default(),
1✔
430
        )?;
1✔
431

432
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
433

434
        let polygon_source =
1✔
435
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
436
                vec![MultiPolygon::new(vec![vec![vec![
1✔
437
                    (0.0, 0.0).into(),
1✔
438
                    (10.0, 0.0).into(),
1✔
439
                    (10.0, 10.0).into(),
1✔
440
                    (0.0, 10.0).into(),
1✔
441
                    (0.0, 0.0).into(),
1✔
442
                ]]])?],
1✔
443
                vec![TimeInterval::new_unchecked(0, 1); 1],
1✔
444
                Default::default(),
1✔
445
                CacheHint::default(),
1✔
446
            )?)
×
447
            .boxed();
1✔
448

449
        let operator = PointInPolygonFilter {
1✔
450
            params: PointInPolygonFilterParams {},
1✔
451
            sources: PointInPolygonFilterSource {
1✔
452
                points: point_source,
1✔
453
                polygons: polygon_source,
1✔
454
            },
1✔
455
        }
1✔
456
        .boxed()
1✔
457
        .initialize(
1✔
458
            WorkflowOperatorPath::initialize_root(),
1✔
459
            &MockExecutionContext::test_default(),
1✔
460
        )
1✔
461
        .await?;
×
462

463
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
464

1✔
465
        let query_rectangle = VectorQueryRectangle {
1✔
466
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
467
            time_interval: TimeInterval::default(),
1✔
468
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
469
            attributes: ColumnSelection::all(),
1✔
470
        };
1✔
471
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
472

473
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
474

475
        let result = query
1✔
476
            .map(Result::unwrap)
1✔
477
            .collect::<Vec<MultiPointCollection>>()
1✔
478
            .await;
1✔
479

480
        assert_eq!(result.len(), 1);
1✔
481

482
        assert!(result[0].chunks_equal_ignoring_cache_hint(&points));
1✔
483

484
        Ok(())
1✔
485
    }
486

487
    #[tokio::test]
1✔
488
    async fn empty() -> Result<()> {
1✔
489
        let points = MultiPointCollection::from_data(
1✔
490
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
491
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
492
            Default::default(),
1✔
493
            CacheHint::default(),
1✔
494
        )?;
1✔
495

496
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
497

498
        let polygon_source =
1✔
499
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
500
                vec![],
1✔
501
                vec![],
1✔
502
                Default::default(),
1✔
503
                CacheHint::default(),
1✔
504
            )?)
1✔
505
            .boxed();
1✔
506

507
        let operator = PointInPolygonFilter {
1✔
508
            params: PointInPolygonFilterParams {},
1✔
509
            sources: PointInPolygonFilterSource {
1✔
510
                points: point_source,
1✔
511
                polygons: polygon_source,
1✔
512
            },
1✔
513
        }
1✔
514
        .boxed()
1✔
515
        .initialize(
1✔
516
            WorkflowOperatorPath::initialize_root(),
1✔
517
            &MockExecutionContext::test_default(),
1✔
518
        )
1✔
519
        .await?;
×
520

521
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
522

1✔
523
        let query_rectangle = VectorQueryRectangle {
1✔
524
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
525
            time_interval: TimeInterval::default(),
1✔
526
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
527
            attributes: ColumnSelection::all(),
1✔
528
        };
1✔
529
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
530

531
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
532

533
        let result = query
1✔
534
            .map(Result::unwrap)
1✔
535
            .collect::<Vec<MultiPointCollection>>()
1✔
536
            .await;
×
537

538
        assert_eq!(result.len(), 1);
1✔
539
        assert!(result[0].chunks_equal_ignoring_cache_hint(&MultiPointCollection::empty()));
1✔
540

541
        Ok(())
1✔
542
    }
543

544
    #[tokio::test]
1✔
545
    async fn time() -> Result<()> {
1✔
546
        let points = MultiPointCollection::from_data(
1✔
547
            MultiPoint::many(vec![(1.0, 1.1), (2.0, 2.1), (3.0, 3.1)]).unwrap(),
1✔
548
            vec![
1✔
549
                TimeInterval::new(0, 1)?,
1✔
550
                TimeInterval::new(5, 6)?,
1✔
551
                TimeInterval::new(0, 5)?,
1✔
552
            ],
553
            Default::default(),
1✔
554
            CacheHint::default(),
1✔
555
        )?;
×
556

557
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
558

559
        let polygon = MultiPolygon::new(vec![vec![vec![
1✔
560
            (0.0, 0.0).into(),
1✔
561
            (10.0, 0.0).into(),
1✔
562
            (10.0, 10.0).into(),
1✔
563
            (0.0, 10.0).into(),
1✔
564
            (0.0, 0.0).into(),
1✔
565
        ]]])?;
1✔
566

567
        let polygon_source =
1✔
568
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
569
                vec![polygon.clone(), polygon],
1✔
570
                vec![TimeInterval::new(0, 1)?, TimeInterval::new(1, 2)?],
1✔
571
                Default::default(),
1✔
572
                CacheHint::default(),
1✔
573
            )?)
×
574
            .boxed();
1✔
575

576
        let operator = PointInPolygonFilter {
1✔
577
            params: PointInPolygonFilterParams {},
1✔
578
            sources: PointInPolygonFilterSource {
1✔
579
                points: point_source,
1✔
580
                polygons: polygon_source,
1✔
581
            },
1✔
582
        }
1✔
583
        .boxed()
1✔
584
        .initialize(
1✔
585
            WorkflowOperatorPath::initialize_root(),
1✔
586
            &MockExecutionContext::test_default(),
1✔
587
        )
1✔
588
        .await?;
×
589

590
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
591

1✔
592
        let query_rectangle = VectorQueryRectangle {
1✔
593
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
594
            time_interval: TimeInterval::default(),
1✔
595
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
596
            attributes: ColumnSelection::all(),
1✔
597
        };
1✔
598
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
599

600
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
601

602
        let result = query
1✔
603
            .map(Result::unwrap)
1✔
604
            .collect::<Vec<MultiPointCollection>>()
1✔
605
            .await;
1✔
606

607
        assert_eq!(result.len(), 1);
1✔
608

609
        assert!(
1✔
610
            result[0].chunks_equal_ignoring_cache_hint(&points.filter(vec![true, false, true])?)
1✔
611
        );
612

613
        Ok(())
1✔
614
    }
615

616
    // #[async::main]
617
    #[tokio::test]
1✔
618
    async fn multiple_inputs() -> Result<()> {
1✔
619
        let points1 = MultiPointCollection::from_data(
1✔
620
            MultiPoint::many(vec![(5.0, 5.1), (15.0, 15.1)]).unwrap(),
1✔
621
            vec![TimeInterval::new(0, 1)?; 2],
1✔
622
            Default::default(),
1✔
623
            CacheHint::default(),
1✔
624
        )?;
×
625
        let points2 = MultiPointCollection::from_data(
1✔
626
            MultiPoint::many(vec![(6.0, 6.1), (16.0, 16.1)]).unwrap(),
1✔
627
            vec![TimeInterval::new(1, 2)?; 2],
1✔
628
            Default::default(),
1✔
629
            CacheHint::default(),
1✔
630
        )?;
×
631

632
        let point_source =
1✔
633
            MockFeatureCollectionSource::multiple(vec![points1.clone(), points2.clone()]).boxed();
1✔
634

635
        let polygon1 = MultiPolygon::new(vec![vec![vec![
1✔
636
            (0.0, 0.0).into(),
1✔
637
            (10.0, 0.0).into(),
1✔
638
            (10.0, 10.0).into(),
1✔
639
            (0.0, 10.0).into(),
1✔
640
            (0.0, 0.0).into(),
1✔
641
        ]]])?;
1✔
642
        let polygon2 = MultiPolygon::new(vec![vec![vec![
1✔
643
            (10.0, 10.0).into(),
1✔
644
            (20.0, 10.0).into(),
1✔
645
            (20.0, 20.0).into(),
1✔
646
            (10.0, 20.0).into(),
1✔
647
            (10.0, 10.0).into(),
1✔
648
        ]]])?;
1✔
649

650
        let polygon_source = MockFeatureCollectionSource::multiple(vec![
1✔
651
            MultiPolygonCollection::from_data(
1✔
652
                vec![polygon1.clone()],
1✔
653
                vec![TimeInterval::new(0, 1)?],
1✔
654
                Default::default(),
1✔
655
                CacheHint::default(),
1✔
656
            )?,
×
657
            MultiPolygonCollection::from_data(
658
                vec![polygon1, polygon2],
1✔
659
                vec![TimeInterval::new(1, 2)?, TimeInterval::new(1, 2)?],
1✔
660
                Default::default(),
1✔
661
                CacheHint::default(),
1✔
662
            )?,
×
663
        ])
664
        .boxed();
1✔
665

666
        let operator = PointInPolygonFilter {
1✔
667
            params: PointInPolygonFilterParams {},
1✔
668
            sources: PointInPolygonFilterSource {
1✔
669
                points: point_source,
1✔
670
                polygons: polygon_source,
1✔
671
            },
1✔
672
        }
1✔
673
        .boxed()
1✔
674
        .initialize(
1✔
675
            WorkflowOperatorPath::initialize_root(),
1✔
676
            &MockExecutionContext::test_default(),
1✔
677
        )
1✔
678
        .await?;
×
679

680
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
681

1✔
682
        let query_rectangle = VectorQueryRectangle {
1✔
683
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
684
            time_interval: TimeInterval::default(),
1✔
685
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
686
            attributes: ColumnSelection::all(),
1✔
687
        };
1✔
688

1✔
689
        let ctx_one_chunk = MockQueryContext::new(ChunkByteSize::MAX);
1✔
690
        let ctx_minimal_chunks = MockQueryContext::new(ChunkByteSize::MIN);
1✔
691

692
        let query = query_processor
1✔
693
            .query(query_rectangle.clone(), &ctx_minimal_chunks)
1✔
694
            .await
×
695
            .unwrap();
1✔
696

697
        let result = query
1✔
698
            .map(Result::unwrap)
1✔
699
            .collect::<Vec<MultiPointCollection>>()
1✔
700
            .await;
4✔
701

702
        assert_eq!(result.len(), 2);
1✔
703

704
        assert!(result[0].chunks_equal_ignoring_cache_hint(&points1.filter(vec![true, false])?));
1✔
705
        assert!(result[1].chunks_equal_ignoring_cache_hint(&points2));
1✔
706

707
        let query = query_processor
1✔
708
            .query(query_rectangle, &ctx_one_chunk)
1✔
709
            .await
×
710
            .unwrap();
1✔
711

712
        let result = query
1✔
713
            .map(Result::unwrap)
1✔
714
            .collect::<Vec<MultiPointCollection>>()
1✔
715
            .await;
4✔
716

717
        assert_eq!(result.len(), 1);
1✔
718

719
        assert!(result[0].chunks_equal_ignoring_cache_hint(
1✔
720
            &points1.filter(vec![true, false])?.append(&points2)?
1✔
721
        ));
722

723
        Ok(())
1✔
724
    }
725

726
    #[tokio::test]
1✔
727
    async fn empty_points() {
1✔
728
        let point_collection = MultiPointCollection::from_data(
1✔
729
            vec![],
1✔
730
            vec![],
1✔
731
            Default::default(),
1✔
732
            CacheHint::default(),
1✔
733
        )
1✔
734
        .unwrap();
1✔
735

1✔
736
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
737
            vec![MultiPolygon::new(vec![vec![vec![
1✔
738
                (0.0, 0.0).into(),
1✔
739
                (10.0, 0.0).into(),
1✔
740
                (10.0, 10.0).into(),
1✔
741
                (0.0, 10.0).into(),
1✔
742
                (0.0, 0.0).into(),
1✔
743
            ]]])
1✔
744
            .unwrap()],
1✔
745
            vec![TimeInterval::default()],
1✔
746
            Default::default(),
1✔
747
            CacheHint::default(),
1✔
748
        )
1✔
749
        .unwrap();
1✔
750

751
        let operator = PointInPolygonFilter {
1✔
752
            params: PointInPolygonFilterParams {},
1✔
753
            sources: PointInPolygonFilterSource {
1✔
754
                points: MockFeatureCollectionSource::single(point_collection).boxed(),
1✔
755
                polygons: MockFeatureCollectionSource::single(polygon_collection).boxed(),
1✔
756
            },
1✔
757
        }
1✔
758
        .boxed()
1✔
759
        .initialize(
1✔
760
            WorkflowOperatorPath::initialize_root(),
1✔
761
            &MockExecutionContext::test_default(),
1✔
762
        )
1✔
763
        .await
×
764
        .unwrap();
1✔
765

1✔
766
        let query_rectangle = VectorQueryRectangle {
1✔
767
            spatial_bounds: BoundingBox2D::new((-10., -10.).into(), (10., 10.).into()).unwrap(),
1✔
768
            time_interval: TimeInterval::default(),
1✔
769
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
770
            attributes: ColumnSelection::all(),
1✔
771
        };
1✔
772

1✔
773
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
774

1✔
775
        let query_context = MockQueryContext::test_default();
1✔
776

777
        let query = query_processor
1✔
778
            .query(query_rectangle, &query_context)
1✔
779
            .await
×
780
            .unwrap();
1✔
781

782
        let result = query
1✔
783
            .map(Result::unwrap)
1✔
784
            .collect::<Vec<MultiPointCollection>>()
1✔
785
            .await;
×
786

787
        assert_eq!(result.len(), 1);
1✔
788
        assert!(result[0].chunks_equal_ignoring_cache_hint(&MultiPointCollection::empty()));
1✔
789
    }
790

791
    #[tokio::test]
1✔
792
    async fn it_checks_sref() {
1✔
793
        let point_collection = MultiPointCollection::from_data(
1✔
794
            vec![],
1✔
795
            vec![],
1✔
796
            Default::default(),
1✔
797
            CacheHint::default(),
1✔
798
        )
1✔
799
        .unwrap();
1✔
800

1✔
801
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
802
            vec![MultiPolygon::new(vec![vec![vec![
1✔
803
                (0.0, 0.0).into(),
1✔
804
                (10.0, 0.0).into(),
1✔
805
                (10.0, 10.0).into(),
1✔
806
                (0.0, 10.0).into(),
1✔
807
                (0.0, 0.0).into(),
1✔
808
            ]]])
1✔
809
            .unwrap()],
1✔
810
            vec![TimeInterval::default()],
1✔
811
            Default::default(),
1✔
812
            CacheHint::default(),
1✔
813
        )
1✔
814
        .unwrap();
1✔
815

816
        let operator = PointInPolygonFilter {
1✔
817
            params: PointInPolygonFilterParams {},
1✔
818
            sources: PointInPolygonFilterSource {
1✔
819
                points: MockFeatureCollectionSource::with_collections_and_sref(
1✔
820
                    vec![point_collection],
1✔
821
                    SpatialReference::epsg_4326(),
1✔
822
                )
1✔
823
                .boxed(),
1✔
824
                polygons: MockFeatureCollectionSource::with_collections_and_sref(
1✔
825
                    vec![polygon_collection],
1✔
826
                    SpatialReference::from_str("EPSG:3857").unwrap(),
1✔
827
                )
1✔
828
                .boxed(),
1✔
829
            },
1✔
830
        }
1✔
831
        .boxed()
1✔
832
        .initialize(
1✔
833
            WorkflowOperatorPath::initialize_root(),
1✔
834
            &MockExecutionContext::test_default(),
1✔
835
        )
1✔
836
        .await;
×
837

838
        assert!(matches!(
1✔
839
            operator,
1✔
840
            Err(Error::InvalidSpatialReference {
841
                expected: _,
842
                found: _,
843
            })
844
        ));
845
    }
846
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc