• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5006008836

pending completion
5006008836

push

github

GitHub
Merge #785 #787

936 of 936 new or added lines in 50 files covered. (100.0%)

96010 of 107707 relevant lines covered (89.14%)

72676.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.99
/operators/src/processing/point_in_polygon.rs
1
mod tester;
2
mod wrapper;
3

4
use std::cmp::min;
5
use std::sync::Arc;
6

7
use futures::stream::BoxStream;
8
use futures::{StreamExt, TryStreamExt};
9
use geoengine_datatypes::dataset::DataId;
10
use geoengine_datatypes::primitives::VectorQueryRectangle;
11
use rayon::ThreadPool;
12
use serde::{Deserialize, Serialize};
13
use snafu::ensure;
14

15
use crate::adapters::FeatureCollectionChunkMerger;
16
use crate::engine::{
17
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
18
    OperatorName, QueryContext, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
19
    VectorResultDescriptor, WorkflowOperatorPath,
20
};
21
use crate::engine::{OperatorData, QueryProcessor};
22
use crate::error;
23
use crate::util::Result;
24
use arrow::array::BooleanArray;
25
use async_trait::async_trait;
26
use geoengine_datatypes::collections::{
27
    FeatureCollectionInfos, FeatureCollectionModifications, GeometryCollection,
28
    MultiPointCollection, MultiPolygonCollection, VectorDataType,
29
};
30
pub use tester::PointInPolygonTester;
31
pub use wrapper::PointInPolygonTesterWithCollection;
32

33
/// The point in polygon filter requires two inputs in the following order:
34
/// 1. a `MultiPointCollection` source
35
/// 2. a `MultiPolygonCollection` source
36
/// Then, it filters the `MultiPolygonCollection`s so that only those features are retained that are in any polygon.
37
pub type PointInPolygonFilter = Operator<PointInPolygonFilterParams, PointInPolygonFilterSource>;
38

39
impl OperatorName for PointInPolygonFilter {
40
    const TYPE_NAME: &'static str = "PointInPolygonFilter";
41
}
42

43
#[derive(Debug, Clone, Deserialize, Serialize)]
6✔
44
pub struct PointInPolygonFilterParams {}
45

46
#[derive(Debug, Clone, Deserialize, Serialize)]
6✔
47
pub struct PointInPolygonFilterSource {
48
    pub points: Box<dyn VectorOperator>,
49
    pub polygons: Box<dyn VectorOperator>,
50
}
51

52
impl OperatorData for PointInPolygonFilterSource {
53
    fn data_ids_collect(&self, data_ids: &mut Vec<DataId>) {
×
54
        self.points.data_ids_collect(data_ids);
×
55
        self.polygons.data_ids_collect(data_ids);
×
56
    }
×
57
}
58

59
struct InitializedPointInPolygonFilterSource {
60
    points: Box<dyn InitializedVectorOperator>,
61
    polygons: Box<dyn InitializedVectorOperator>,
62
}
63

64
#[async_trait]
65
impl InitializedSources<InitializedPointInPolygonFilterSource> for PointInPolygonFilterSource {
66
    async fn initialize_sources(
6✔
67
        self,
6✔
68
        path: WorkflowOperatorPath,
6✔
69
        context: &dyn ExecutionContext,
6✔
70
    ) -> Result<InitializedPointInPolygonFilterSource> {
6✔
71
        let points_path = path.clone_and_append(0);
6✔
72
        let polygons_path = path.clone_and_append(1);
6✔
73

6✔
74
        Ok(InitializedPointInPolygonFilterSource {
6✔
75
            points: self.points.initialize(points_path, context).await?,
6✔
76
            polygons: self.polygons.initialize(polygons_path, context).await?,
6✔
77
        })
78
    }
12✔
79
}
80

81
#[typetag::serde]
×
82
#[async_trait]
83
impl VectorOperator for PointInPolygonFilter {
84
    async fn _initialize(
6✔
85
        self: Box<Self>,
6✔
86
        path: WorkflowOperatorPath,
6✔
87
        context: &dyn ExecutionContext,
6✔
88
    ) -> Result<Box<dyn InitializedVectorOperator>> {
6✔
89
        let name = CanonicOperatorName::from(&self);
6✔
90

91
        let initialized_source = self.sources.initialize_sources(path, context).await?;
6✔
92

93
        let points_rd = initialized_source.points.result_descriptor();
6✔
94
        let polygons_rd = initialized_source.polygons.result_descriptor();
6✔
95

6✔
96
        ensure!(
6✔
97
            points_rd.data_type == VectorDataType::MultiPoint,
6✔
98
            error::InvalidType {
×
99
                expected: VectorDataType::MultiPoint.to_string(),
×
100
                found: points_rd.data_type.to_string(),
×
101
            }
×
102
        );
103
        ensure!(
6✔
104
            polygons_rd.data_type == VectorDataType::MultiPolygon,
6✔
105
            error::InvalidType {
×
106
                expected: VectorDataType::MultiPolygon.to_string(),
×
107
                found: polygons_rd.data_type.to_string(),
×
108
            }
×
109
        );
110

111
        ensure!(
6✔
112
            points_rd.spatial_reference == polygons_rd.spatial_reference,
6✔
113
            crate::error::InvalidSpatialReference {
1✔
114
                expected: points_rd.spatial_reference,
1✔
115
                found: polygons_rd.spatial_reference,
1✔
116
            }
1✔
117
        );
118

119
        // We use the result descriptor of the points because in the worst case no feature will be excluded.
120
        // We cannot use the polygon bbox because a `MultiPoint` could have one point within a polygon (and
121
        // thus be included in the result) and one point outside of the bbox of the polygons.
122
        let out_desc = initialized_source.points.result_descriptor().clone();
5✔
123

5✔
124
        let initialized_operator = InitializedPointInPolygonFilter {
5✔
125
            name,
5✔
126
            result_descriptor: out_desc,
5✔
127
            points: initialized_source.points,
5✔
128
            polygons: initialized_source.polygons,
5✔
129
        };
5✔
130

5✔
131
        Ok(initialized_operator.boxed())
5✔
132
    }
12✔
133

134
    span_fn!(PointInPolygonFilter);
×
135
}
136

137
pub struct InitializedPointInPolygonFilter {
138
    name: CanonicOperatorName,
139
    points: Box<dyn InitializedVectorOperator>,
140
    polygons: Box<dyn InitializedVectorOperator>,
141
    result_descriptor: VectorResultDescriptor,
142
}
143

144
impl InitializedVectorOperator for InitializedPointInPolygonFilter {
145
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
146
        let point_processor = self
5✔
147
            .points
5✔
148
            .query_processor()?
5✔
149
            .multi_point()
5✔
150
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
151

152
        let polygon_processor = self
5✔
153
            .polygons
5✔
154
            .query_processor()?
5✔
155
            .multi_polygon()
5✔
156
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
157

5✔
158
        Ok(TypedVectorQueryProcessor::MultiPoint(
5✔
159
            PointInPolygonFilterProcessor::new(point_processor, polygon_processor).boxed(),
5✔
160
        ))
5✔
161
    }
5✔
162

163
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
164
        &self.result_descriptor
×
165
    }
×
166

167
    fn canonic_name(&self) -> CanonicOperatorName {
×
168
        self.name.clone()
×
169
    }
×
170
}
171

172
pub struct PointInPolygonFilterProcessor {
173
    points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
174
    polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
175
}
176

177
impl PointInPolygonFilterProcessor {
178
    pub fn new(
5✔
179
        points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
5✔
180
        polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
5✔
181
    ) -> Self {
5✔
182
        Self { points, polygons }
5✔
183
    }
5✔
184

185
    fn filter_parallel(
10✔
186
        points: &Arc<MultiPointCollection>,
10✔
187
        polygons: &MultiPolygonCollection,
10✔
188
        thread_pool: &ThreadPool,
10✔
189
    ) -> Vec<bool> {
10✔
190
        debug_assert!(!points.is_empty());
10✔
191

192
        // TODO: parallelize over coordinate rather than features
193

194
        let tester = Arc::new(PointInPolygonTester::new(polygons)); // TODO: multithread
10✔
195

10✔
196
        let parallelism = thread_pool.current_num_threads();
10✔
197
        let chunk_size = (points.len() as f64 / parallelism as f64).ceil() as usize;
10✔
198

10✔
199
        let mut result = vec![false; points.len()];
10✔
200

10✔
201
        thread_pool.scope(|scope| {
10✔
202
            let num_features = points.len();
10✔
203
            let feature_offsets = points.feature_offsets();
10✔
204
            let time_intervals = points.time_intervals();
10✔
205
            let coordinates = points.coordinates();
10✔
206

207
            for (chunk_index, chunk_result) in result.chunks_mut(chunk_size).enumerate() {
20✔
208
                let feature_index_start = chunk_index * chunk_size;
20✔
209
                let features_index_end = min(feature_index_start + chunk_size, num_features);
20✔
210
                let tester = tester.clone();
20✔
211

20✔
212
                scope.spawn(move |_| {
20✔
213
                    for (
214
                        feature_index,
22✔
215
                        ((coordinates_start_index, coordinates_end_index), time_interval),
22✔
216
                    ) in two_tuple_windows(
20✔
217
                        feature_offsets[feature_index_start..=features_index_end]
20✔
218
                            .iter()
20✔
219
                            .map(|&c| c as usize),
42✔
220
                    )
20✔
221
                    .zip(time_intervals[feature_index_start..features_index_end].iter())
20✔
222
                    .enumerate()
20✔
223
                    {
22✔
224
                        let is_multi_point_in_polygon_collection = coordinates
22✔
225
                            [coordinates_start_index..coordinates_end_index]
22✔
226
                            .iter()
22✔
227
                            .any(|coordinate| {
22✔
228
                                tester.any_polygon_contains_coordinate(coordinate, time_interval)
22✔
229
                            });
22✔
230

22✔
231
                        chunk_result[feature_index] = is_multi_point_in_polygon_collection;
22✔
232
                    }
22✔
233
                });
20✔
234
            }
20✔
235
        });
10✔
236

10✔
237
        result
10✔
238
    }
10✔
239

240
    async fn filter_points(
10✔
241
        ctx: &dyn QueryContext,
10✔
242
        points: Arc<MultiPointCollection>,
10✔
243
        polygons: MultiPolygonCollection,
10✔
244
        initial_filter: &BooleanArray,
10✔
245
    ) -> Result<BooleanArray> {
10✔
246
        let thread_pool = ctx.thread_pool().clone();
10✔
247

10✔
248
        let thread_points = points.clone();
10✔
249
        let filter = crate::util::spawn_blocking(move || {
10✔
250
            Self::filter_parallel(&thread_points, &polygons, &thread_pool)
10✔
251
        })
10✔
252
        .await?;
10✔
253

254
        arrow::compute::or(initial_filter, &filter.into()).map_err(Into::into)
10✔
255
    }
10✔
256
}
257

258
#[async_trait]
259
impl VectorQueryProcessor for PointInPolygonFilterProcessor {
260
    type VectorType = MultiPointCollection;
261

262
    async fn vector_query<'a>(
6✔
263
        &'a self,
6✔
264
        query: VectorQueryRectangle,
6✔
265
        ctx: &'a dyn QueryContext,
6✔
266
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
6✔
267
        let filtered_stream =
6✔
268
            self.points
6✔
269
                .query(query, ctx)
6✔
270
                .await?
×
271
                .and_then(move |points| async move {
8✔
272
                    if points.is_empty() {
8✔
273
                        return Ok(points);
1✔
274
                    }
7✔
275

7✔
276
                    let initial_filter = BooleanArray::from(vec![false; points.len()]);
7✔
277
                    let arc_points = Arc::new(points);
7✔
278

279
                    let filter = self
7✔
280
                        .polygons
7✔
281
                        .query(query, ctx)
7✔
282
                        .await?
×
283
                        .fold(Ok(initial_filter), |filter, polygons| async {
11✔
284
                            let polygons = polygons?;
11✔
285

286
                            if polygons.is_empty() {
11✔
287
                                return filter;
1✔
288
                            }
10✔
289

10✔
290
                            Self::filter_points(ctx, arc_points.clone(), polygons, &filter?).await
10✔
291
                        })
11✔
292
                        .await?;
10✔
293

294
                    arc_points.filter(filter).map_err(Into::into)
7✔
295
                });
8✔
296

6✔
297
        Ok(
6✔
298
            FeatureCollectionChunkMerger::new(filtered_stream.fuse(), ctx.chunk_byte_size().into())
6✔
299
                .boxed(),
6✔
300
        )
6✔
301
    }
12✔
302
}
303

304
/// Loop through an iterator by yielding the current and previous tuple. Starts with the
305
/// (first, second) item, so the iterator must have more than one item to create an output.
306
fn two_tuple_windows<I, T>(mut iter: I) -> impl Iterator<Item = (T, T)>
20✔
307
where
20✔
308
    I: Iterator<Item = T>,
20✔
309
    T: Copy,
20✔
310
{
20✔
311
    let mut last = iter.next();
20✔
312

20✔
313
    iter.map(move |item| {
22✔
314
        let output = (last.unwrap(), item);
22✔
315
        last = Some(item);
22✔
316
        output
22✔
317
    })
22✔
318
}
20✔
319

320
#[cfg(test)]
321
mod tests {
322

323
    use super::*;
324
    use std::str::FromStr;
325

326
    use geoengine_datatypes::primitives::{
327
        BoundingBox2D, Coordinate2D, MultiPoint, MultiPolygon, SpatialResolution, TimeInterval,
328
    };
329
    use geoengine_datatypes::spatial_reference::SpatialReference;
330
    use geoengine_datatypes::util::test::TestDefault;
331

332
    use crate::engine::{ChunkByteSize, MockExecutionContext, MockQueryContext};
333
    use crate::error::Error;
334
    use crate::mock::MockFeatureCollectionSource;
335

336
    #[test]
1✔
337
    fn point_in_polygon_boundary_conditions() {
1✔
338
        let collection = MultiPolygonCollection::from_data(
1✔
339
            vec![MultiPolygon::new(vec![vec![vec![
1✔
340
                (0.0, 0.0).into(),
1✔
341
                (10.0, 0.0).into(),
1✔
342
                (10.0, 10.0).into(),
1✔
343
                (0.0, 10.0).into(),
1✔
344
                (0.0, 0.0).into(),
1✔
345
            ]]])
1✔
346
            .unwrap()],
1✔
347
            vec![Default::default(); 1],
1✔
348
            Default::default(),
1✔
349
        )
1✔
350
        .unwrap();
1✔
351

1✔
352
        let tester = PointInPolygonTester::new(&collection);
1✔
353

1✔
354
        // the algorithm is not stable for boundary cases directly on the edges
1✔
355

1✔
356
        assert!(tester.any_polygon_contains_coordinate(
1✔
357
            &Coordinate2D::new(0.000_001, 0.000_001),
1✔
358
            &Default::default()
1✔
359
        ),);
1✔
360
        assert!(tester.any_polygon_contains_coordinate(
1✔
361
            &Coordinate2D::new(0.000_001, 0.1),
1✔
362
            &Default::default()
1✔
363
        ),);
1✔
364
        assert!(tester.any_polygon_contains_coordinate(
1✔
365
            &Coordinate2D::new(0.1, 0.000_001),
1✔
366
            &Default::default()
1✔
367
        ),);
1✔
368

369
        assert!(tester
1✔
370
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 9.9), &Default::default()),);
1✔
371
        assert!(tester
1✔
372
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.0, 9.9), &Default::default()),);
1✔
373
        assert!(tester
1✔
374
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 10.0), &Default::default()),);
1✔
375

376
        assert!(!tester
1✔
377
            .any_polygon_contains_coordinate(&Coordinate2D::new(-0.1, -0.1), &Default::default()),);
1✔
378
        assert!(!tester
1✔
379
            .any_polygon_contains_coordinate(&Coordinate2D::new(0.0, -0.1), &Default::default()),);
1✔
380
        assert!(!tester
1✔
381
            .any_polygon_contains_coordinate(&Coordinate2D::new(-0.1, 0.0), &Default::default()),);
1✔
382

383
        assert!(!tester
1✔
384
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.1, 10.1), &Default::default()),);
1✔
385
        assert!(!tester
1✔
386
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.1, 9.9), &Default::default()),);
1✔
387
        assert!(!tester
1✔
388
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 10.1), &Default::default()),);
1✔
389
    }
1✔
390

391
    #[tokio::test]
1✔
392
    async fn all() -> Result<()> {
1✔
393
        let points = MultiPointCollection::from_data(
1✔
394
            MultiPoint::many(vec![(0.001, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
395
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
396
            Default::default(),
1✔
397
        )?;
1✔
398

399
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
400

401
        let polygon_source =
1✔
402
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
403
                vec![MultiPolygon::new(vec![vec![vec![
1✔
404
                    (0.0, 0.0).into(),
1✔
405
                    (10.0, 0.0).into(),
1✔
406
                    (10.0, 10.0).into(),
1✔
407
                    (0.0, 10.0).into(),
1✔
408
                    (0.0, 0.0).into(),
1✔
409
                ]]])?],
1✔
410
                vec![TimeInterval::new_unchecked(0, 1); 1],
1✔
411
                Default::default(),
1✔
412
            )?)
×
413
            .boxed();
1✔
414

415
        let operator = PointInPolygonFilter {
1✔
416
            params: PointInPolygonFilterParams {},
1✔
417
            sources: PointInPolygonFilterSource {
1✔
418
                points: point_source,
1✔
419
                polygons: polygon_source,
1✔
420
            },
1✔
421
        }
1✔
422
        .boxed()
1✔
423
        .initialize(
1✔
424
            WorkflowOperatorPath::initialize_root(),
1✔
425
            &MockExecutionContext::test_default(),
1✔
426
        )
1✔
427
        .await?;
×
428

429
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
430

1✔
431
        let query_rectangle = VectorQueryRectangle {
1✔
432
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
433
            time_interval: TimeInterval::default(),
1✔
434
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
435
        };
1✔
436
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
437

438
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
439

440
        let result = query
1✔
441
            .map(Result::unwrap)
1✔
442
            .collect::<Vec<MultiPointCollection>>()
1✔
443
            .await;
1✔
444

445
        assert_eq!(result.len(), 1);
1✔
446

447
        assert_eq!(result[0], points);
1✔
448

449
        Ok(())
1✔
450
    }
451

452
    #[tokio::test]
1✔
453
    async fn none() -> Result<()> {
1✔
454
        let points = MultiPointCollection::from_data(
1✔
455
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
456
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
457
            Default::default(),
1✔
458
        )?;
1✔
459

460
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
461

462
        let polygon_source = MockFeatureCollectionSource::single(
1✔
463
            MultiPolygonCollection::from_data(vec![], vec![], Default::default())?,
1✔
464
        )
465
        .boxed();
1✔
466

467
        let operator = PointInPolygonFilter {
1✔
468
            params: PointInPolygonFilterParams {},
1✔
469
            sources: PointInPolygonFilterSource {
1✔
470
                points: point_source,
1✔
471
                polygons: polygon_source,
1✔
472
            },
1✔
473
        }
1✔
474
        .boxed()
1✔
475
        .initialize(
1✔
476
            WorkflowOperatorPath::initialize_root(),
1✔
477
            &MockExecutionContext::test_default(),
1✔
478
        )
1✔
479
        .await?;
×
480

481
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
482

1✔
483
        let query_rectangle = VectorQueryRectangle {
1✔
484
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
485
            time_interval: TimeInterval::default(),
1✔
486
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
487
        };
1✔
488
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
489

490
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
491

492
        let result = query
1✔
493
            .map(Result::unwrap)
1✔
494
            .collect::<Vec<MultiPointCollection>>()
1✔
495
            .await;
×
496

497
        assert_eq!(result.len(), 0);
1✔
498

499
        Ok(())
1✔
500
    }
501

502
    #[tokio::test]
1✔
503
    async fn time() -> Result<()> {
1✔
504
        let points = MultiPointCollection::from_data(
1✔
505
            MultiPoint::many(vec![(1.0, 1.1), (2.0, 2.1), (3.0, 3.1)]).unwrap(),
1✔
506
            vec![
1✔
507
                TimeInterval::new(0, 1)?,
1✔
508
                TimeInterval::new(5, 6)?,
1✔
509
                TimeInterval::new(0, 5)?,
1✔
510
            ],
511
            Default::default(),
1✔
512
        )?;
×
513

514
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
515

516
        let polygon = MultiPolygon::new(vec![vec![vec![
1✔
517
            (0.0, 0.0).into(),
1✔
518
            (10.0, 0.0).into(),
1✔
519
            (10.0, 10.0).into(),
1✔
520
            (0.0, 10.0).into(),
1✔
521
            (0.0, 0.0).into(),
1✔
522
        ]]])?;
1✔
523

524
        let polygon_source =
1✔
525
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
526
                vec![polygon.clone(), polygon],
1✔
527
                vec![TimeInterval::new(0, 1)?, TimeInterval::new(1, 2)?],
1✔
528
                Default::default(),
1✔
529
            )?)
×
530
            .boxed();
1✔
531

532
        let operator = PointInPolygonFilter {
1✔
533
            params: PointInPolygonFilterParams {},
1✔
534
            sources: PointInPolygonFilterSource {
1✔
535
                points: point_source,
1✔
536
                polygons: polygon_source,
1✔
537
            },
1✔
538
        }
1✔
539
        .boxed()
1✔
540
        .initialize(
1✔
541
            WorkflowOperatorPath::initialize_root(),
1✔
542
            &MockExecutionContext::test_default(),
1✔
543
        )
1✔
544
        .await?;
×
545

546
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
547

1✔
548
        let query_rectangle = VectorQueryRectangle {
1✔
549
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
550
            time_interval: TimeInterval::default(),
1✔
551
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
552
        };
1✔
553
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
554

555
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
556

557
        let result = query
1✔
558
            .map(Result::unwrap)
1✔
559
            .collect::<Vec<MultiPointCollection>>()
1✔
560
            .await;
1✔
561

562
        assert_eq!(result.len(), 1);
1✔
563

564
        assert_eq!(result[0], points.filter(vec![true, false, true])?);
1✔
565

566
        Ok(())
1✔
567
    }
568

569
    // #[async::main]
570
    #[tokio::test]
1✔
571
    async fn multiple_inputs() -> Result<()> {
1✔
572
        let points1 = MultiPointCollection::from_data(
1✔
573
            MultiPoint::many(vec![(5.0, 5.1), (15.0, 15.1)]).unwrap(),
1✔
574
            vec![TimeInterval::new(0, 1)?; 2],
1✔
575
            Default::default(),
1✔
576
        )?;
×
577
        let points2 = MultiPointCollection::from_data(
1✔
578
            MultiPoint::many(vec![(6.0, 6.1), (16.0, 16.1)]).unwrap(),
1✔
579
            vec![TimeInterval::new(1, 2)?; 2],
1✔
580
            Default::default(),
1✔
581
        )?;
×
582

583
        let point_source =
1✔
584
            MockFeatureCollectionSource::multiple(vec![points1.clone(), points2.clone()]).boxed();
1✔
585

586
        let polygon1 = MultiPolygon::new(vec![vec![vec![
1✔
587
            (0.0, 0.0).into(),
1✔
588
            (10.0, 0.0).into(),
1✔
589
            (10.0, 10.0).into(),
1✔
590
            (0.0, 10.0).into(),
1✔
591
            (0.0, 0.0).into(),
1✔
592
        ]]])?;
1✔
593
        let polygon2 = MultiPolygon::new(vec![vec![vec![
1✔
594
            (10.0, 10.0).into(),
1✔
595
            (20.0, 10.0).into(),
1✔
596
            (20.0, 20.0).into(),
1✔
597
            (10.0, 20.0).into(),
1✔
598
            (10.0, 10.0).into(),
1✔
599
        ]]])?;
1✔
600

601
        let polygon_source = MockFeatureCollectionSource::multiple(vec![
1✔
602
            MultiPolygonCollection::from_data(
1✔
603
                vec![polygon1.clone()],
1✔
604
                vec![TimeInterval::new(0, 1)?],
1✔
605
                Default::default(),
1✔
606
            )?,
×
607
            MultiPolygonCollection::from_data(
608
                vec![polygon1, polygon2],
1✔
609
                vec![TimeInterval::new(1, 2)?, TimeInterval::new(1, 2)?],
1✔
610
                Default::default(),
1✔
611
            )?,
×
612
        ])
613
        .boxed();
1✔
614

615
        let operator = PointInPolygonFilter {
1✔
616
            params: PointInPolygonFilterParams {},
1✔
617
            sources: PointInPolygonFilterSource {
1✔
618
                points: point_source,
1✔
619
                polygons: polygon_source,
1✔
620
            },
1✔
621
        }
1✔
622
        .boxed()
1✔
623
        .initialize(
1✔
624
            WorkflowOperatorPath::initialize_root(),
1✔
625
            &MockExecutionContext::test_default(),
1✔
626
        )
1✔
627
        .await?;
×
628

629
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
630

1✔
631
        let query_rectangle = VectorQueryRectangle {
1✔
632
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
633
            time_interval: TimeInterval::default(),
1✔
634
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
635
        };
1✔
636

1✔
637
        let ctx_one_chunk = MockQueryContext::new(ChunkByteSize::MAX);
1✔
638
        let ctx_minimal_chunks = MockQueryContext::new(ChunkByteSize::MIN);
1✔
639

640
        let query = query_processor
1✔
641
            .query(query_rectangle, &ctx_minimal_chunks)
1✔
642
            .await
×
643
            .unwrap();
1✔
644

645
        let result = query
1✔
646
            .map(Result::unwrap)
1✔
647
            .collect::<Vec<MultiPointCollection>>()
1✔
648
            .await;
4✔
649

650
        assert_eq!(result.len(), 2);
1✔
651

652
        assert_eq!(result[0], points1.filter(vec![true, false])?);
1✔
653
        assert_eq!(result[1], points2);
1✔
654

655
        let query = query_processor
1✔
656
            .query(query_rectangle, &ctx_one_chunk)
1✔
657
            .await
×
658
            .unwrap();
1✔
659

660
        let result = query
1✔
661
            .map(Result::unwrap)
1✔
662
            .collect::<Vec<MultiPointCollection>>()
1✔
663
            .await;
4✔
664

665
        assert_eq!(result.len(), 1);
1✔
666

667
        assert_eq!(
1✔
668
            result[0],
1✔
669
            points1.filter(vec![true, false])?.append(&points2)?
1✔
670
        );
671

672
        Ok(())
1✔
673
    }
674

675
    #[tokio::test]
1✔
676
    async fn empty_points() {
1✔
677
        let point_collection =
1✔
678
            MultiPointCollection::from_data(vec![], vec![], Default::default()).unwrap();
1✔
679

1✔
680
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
681
            vec![MultiPolygon::new(vec![vec![vec![
1✔
682
                (0.0, 0.0).into(),
1✔
683
                (10.0, 0.0).into(),
1✔
684
                (10.0, 10.0).into(),
1✔
685
                (0.0, 10.0).into(),
1✔
686
                (0.0, 0.0).into(),
1✔
687
            ]]])
1✔
688
            .unwrap()],
1✔
689
            vec![TimeInterval::default()],
1✔
690
            Default::default(),
1✔
691
        )
1✔
692
        .unwrap();
1✔
693

694
        let operator = PointInPolygonFilter {
1✔
695
            params: PointInPolygonFilterParams {},
1✔
696
            sources: PointInPolygonFilterSource {
1✔
697
                points: MockFeatureCollectionSource::single(point_collection).boxed(),
1✔
698
                polygons: MockFeatureCollectionSource::single(polygon_collection).boxed(),
1✔
699
            },
1✔
700
        }
1✔
701
        .boxed()
1✔
702
        .initialize(
1✔
703
            WorkflowOperatorPath::initialize_root(),
1✔
704
            &MockExecutionContext::test_default(),
1✔
705
        )
1✔
706
        .await
×
707
        .unwrap();
1✔
708

1✔
709
        let query_rectangle = VectorQueryRectangle {
1✔
710
            spatial_bounds: BoundingBox2D::new((-10., -10.).into(), (10., 10.).into()).unwrap(),
1✔
711
            time_interval: TimeInterval::default(),
1✔
712
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
713
        };
1✔
714

1✔
715
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
716

1✔
717
        let query_context = MockQueryContext::test_default();
1✔
718

719
        let query = query_processor
1✔
720
            .query(query_rectangle, &query_context)
1✔
721
            .await
×
722
            .unwrap();
1✔
723

724
        let result = query
1✔
725
            .map(Result::unwrap)
1✔
726
            .collect::<Vec<MultiPointCollection>>()
1✔
727
            .await;
×
728

729
        assert_eq!(result.len(), 0);
1✔
730
    }
731

732
    #[tokio::test]
1✔
733
    async fn it_checks_sref() {
1✔
734
        let point_collection =
1✔
735
            MultiPointCollection::from_data(vec![], vec![], Default::default()).unwrap();
1✔
736

1✔
737
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
738
            vec![MultiPolygon::new(vec![vec![vec![
1✔
739
                (0.0, 0.0).into(),
1✔
740
                (10.0, 0.0).into(),
1✔
741
                (10.0, 10.0).into(),
1✔
742
                (0.0, 10.0).into(),
1✔
743
                (0.0, 0.0).into(),
1✔
744
            ]]])
1✔
745
            .unwrap()],
1✔
746
            vec![TimeInterval::default()],
1✔
747
            Default::default(),
1✔
748
        )
1✔
749
        .unwrap();
1✔
750

751
        let operator = PointInPolygonFilter {
1✔
752
            params: PointInPolygonFilterParams {},
1✔
753
            sources: PointInPolygonFilterSource {
1✔
754
                points: MockFeatureCollectionSource::with_collections_and_sref(
1✔
755
                    vec![point_collection],
1✔
756
                    SpatialReference::epsg_4326(),
1✔
757
                )
1✔
758
                .boxed(),
1✔
759
                polygons: MockFeatureCollectionSource::with_collections_and_sref(
1✔
760
                    vec![polygon_collection],
1✔
761
                    SpatialReference::from_str("EPSG:3857").unwrap(),
1✔
762
                )
1✔
763
                .boxed(),
1✔
764
            },
1✔
765
        }
1✔
766
        .boxed()
1✔
767
        .initialize(
1✔
768
            WorkflowOperatorPath::initialize_root(),
1✔
769
            &MockExecutionContext::test_default(),
1✔
770
        )
1✔
771
        .await;
×
772

773
        assert!(matches!(
1✔
774
            operator,
1✔
775
            Err(Error::InvalidSpatialReference {
776
                expected: _,
777
                found: _,
778
            })
779
        ));
780
    }
781
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc