• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 16167706152

09 Jul 2025 11:08AM UTC coverage: 88.738% (-1.0%) from 89.762%
16167706152

push

github

web-flow
refactor: Updates-2025-07-02 (#1062)

* rust 1.88

* clippy auto-fixes

* manual clippy fixes

* update deps

* cargo update

* update onnx

* cargo fmt

* update sqlfluff

121 of 142 new or added lines in 29 files covered. (85.21%)

300 existing lines in 88 files now uncovered.

111259 of 125379 relevant lines covered (88.74%)

77910.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.81
/operators/src/processing/point_in_polygon.rs
1
mod tester;
2
mod wrapper;
3

4
use std::cmp::min;
5
use std::sync::Arc;
6

7
use futures::stream::BoxStream;
8
use futures::{StreamExt, TryStreamExt};
9
use geoengine_datatypes::dataset::NamedData;
10
use geoengine_datatypes::primitives::CacheHint;
11
use geoengine_datatypes::primitives::VectorQueryRectangle;
12
use rayon::ThreadPool;
13
use serde::{Deserialize, Serialize};
14
use snafu::ensure;
15

16
use crate::adapters::FeatureCollectionChunkMerger;
17
use crate::engine::{
18
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
19
    OperatorName, QueryContext, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
20
    VectorResultDescriptor, WorkflowOperatorPath,
21
};
22
use crate::engine::{OperatorData, QueryProcessor};
23
use crate::error::{self, Error};
24
use crate::util::Result;
25
use arrow::array::BooleanArray;
26
use async_trait::async_trait;
27
use geoengine_datatypes::collections::{
28
    FeatureCollectionInfos, FeatureCollectionModifications, GeometryCollection,
29
    MultiPointCollection, MultiPolygonCollection, VectorDataType,
30
};
31
pub use tester::PointInPolygonTester;
32
pub use wrapper::PointInPolygonTesterWithCollection;
33

34
/// The point in polygon filter requires two inputs in the following order:
35
/// 1. a `MultiPointCollection` source
36
/// 2. a `MultiPolygonCollection` source
37
///
38
/// Then, it filters the `MultiPolygonCollection`s so that only those features are retained that are in any polygon.
39
pub type PointInPolygonFilter = Operator<PointInPolygonFilterParams, PointInPolygonFilterSource>;
40

41
impl OperatorName for PointInPolygonFilter {
42
    const TYPE_NAME: &'static str = "PointInPolygonFilter";
43
}
44

45
#[derive(Debug, Clone, Deserialize, Serialize)]
46
pub struct PointInPolygonFilterParams {}
47

48
#[derive(Debug, Clone, Deserialize, Serialize)]
49
pub struct PointInPolygonFilterSource {
50
    pub points: Box<dyn VectorOperator>,
51
    pub polygons: Box<dyn VectorOperator>,
52
}
53

54
impl OperatorData for PointInPolygonFilterSource {
55
    fn data_names_collect(&self, data_names: &mut Vec<NamedData>) {
×
56
        self.points.data_names_collect(data_names);
×
57
        self.polygons.data_names_collect(data_names);
×
58
    }
×
59
}
60

61
struct InitializedPointInPolygonFilterSource {
62
    points: Box<dyn InitializedVectorOperator>,
63
    polygons: Box<dyn InitializedVectorOperator>,
64
}
65

66
#[async_trait]
67
impl InitializedSources<InitializedPointInPolygonFilterSource> for PointInPolygonFilterSource {
68
    async fn initialize_sources(
69
        self,
70
        path: WorkflowOperatorPath,
71
        context: &dyn ExecutionContext,
72
    ) -> Result<InitializedPointInPolygonFilterSource> {
12✔
73
        let points_path = path.clone_and_append(0);
6✔
74
        let polygons_path = path.clone_and_append(1);
6✔
75

76
        Ok(InitializedPointInPolygonFilterSource {
77
            points: self.points.initialize(points_path, context).await?,
6✔
78
            polygons: self.polygons.initialize(polygons_path, context).await?,
6✔
79
        })
80
    }
12✔
81
}
82

83
#[typetag::serde]
×
84
#[async_trait]
85
impl VectorOperator for PointInPolygonFilter {
86
    async fn _initialize(
87
        self: Box<Self>,
88
        path: WorkflowOperatorPath,
89
        context: &dyn ExecutionContext,
90
    ) -> Result<Box<dyn InitializedVectorOperator>> {
12✔
91
        let name = CanonicOperatorName::from(&self);
6✔
92

93
        let initialized_source = self
6✔
94
            .sources
6✔
95
            .initialize_sources(path.clone(), context)
6✔
96
            .await?;
6✔
97

98
        let points_rd = initialized_source.points.result_descriptor();
6✔
99
        let polygons_rd = initialized_source.polygons.result_descriptor();
6✔
100

101
        ensure!(
6✔
102
            points_rd.data_type == VectorDataType::MultiPoint,
6✔
103
            error::InvalidType {
×
104
                expected: VectorDataType::MultiPoint.to_string(),
×
105
                found: points_rd.data_type.to_string(),
×
106
            }
×
107
        );
108
        ensure!(
6✔
109
            polygons_rd.data_type == VectorDataType::MultiPolygon,
6✔
110
            error::InvalidType {
×
111
                expected: VectorDataType::MultiPolygon.to_string(),
×
112
                found: polygons_rd.data_type.to_string(),
×
113
            }
×
114
        );
115

116
        ensure!(
6✔
117
            points_rd.spatial_reference == polygons_rd.spatial_reference,
6✔
118
            crate::error::InvalidSpatialReference {
1✔
119
                expected: points_rd.spatial_reference,
1✔
120
                found: polygons_rd.spatial_reference,
1✔
121
            }
1✔
122
        );
123

124
        // We use the result descriptor of the points because in the worst case no feature will be excluded.
125
        // We cannot use the polygon bbox because a `MultiPoint` could have one point within a polygon (and
126
        // thus be included in the result) and one point outside of the bbox of the polygons.
127
        let out_desc = initialized_source.points.result_descriptor().clone();
5✔
128

129
        let initialized_operator = InitializedPointInPolygonFilter {
5✔
130
            name,
5✔
131
            path,
5✔
132
            result_descriptor: out_desc,
5✔
133
            points: initialized_source.points,
5✔
134
            polygons: initialized_source.polygons,
5✔
135
        };
5✔
136

137
        Ok(initialized_operator.boxed())
5✔
138
    }
12✔
139

140
    span_fn!(PointInPolygonFilter);
141
}
142

143
pub struct InitializedPointInPolygonFilter {
144
    name: CanonicOperatorName,
145
    path: WorkflowOperatorPath,
146
    points: Box<dyn InitializedVectorOperator>,
147
    polygons: Box<dyn InitializedVectorOperator>,
148
    result_descriptor: VectorResultDescriptor,
149
}
150

151
impl InitializedVectorOperator for InitializedPointInPolygonFilter {
152
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
153
        let point_processor = self
5✔
154
            .points
5✔
155
            .query_processor()?
5✔
156
            .multi_point()
5✔
157
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
158

159
        let polygon_processor = self
5✔
160
            .polygons
5✔
161
            .query_processor()?
5✔
162
            .multi_polygon()
5✔
163
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
164

165
        Ok(TypedVectorQueryProcessor::MultiPoint(
5✔
166
            PointInPolygonFilterProcessor::new(
5✔
167
                self.result_descriptor.clone(),
5✔
168
                point_processor,
5✔
169
                polygon_processor,
5✔
170
            )
5✔
171
            .boxed(),
5✔
172
        ))
5✔
173
    }
5✔
174

175
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
176
        &self.result_descriptor
×
177
    }
×
178

179
    fn canonic_name(&self) -> CanonicOperatorName {
×
180
        self.name.clone()
×
181
    }
×
182

183
    fn name(&self) -> &'static str {
×
184
        PointInPolygonFilter::TYPE_NAME
×
185
    }
×
186

187
    fn path(&self) -> WorkflowOperatorPath {
×
188
        self.path.clone()
×
189
    }
×
190

191
    fn boxed(self) -> Box<dyn InitializedVectorOperator>
5✔
192
    where
5✔
193
        Self: Sized + 'static,
5✔
194
    {
195
        Box::new(self)
5✔
196
    }
5✔
197
}
198
pub struct PointInPolygonFilterProcessor {
199
    result_descriptor: VectorResultDescriptor,
200
    points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
201
    polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
202
}
203

204
impl PointInPolygonFilterProcessor {
205
    pub fn new(
5✔
206
        result_descriptor: VectorResultDescriptor,
5✔
207
        points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
5✔
208
        polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
5✔
209
    ) -> Self {
5✔
210
        Self {
5✔
211
            result_descriptor,
5✔
212
            points,
5✔
213
            polygons,
5✔
214
        }
5✔
215
    }
5✔
216

217
    fn filter_parallel(
10✔
218
        points: &Arc<MultiPointCollection>,
10✔
219
        polygons: &MultiPolygonCollection,
10✔
220
        thread_pool: &ThreadPool,
10✔
221
    ) -> Vec<bool> {
10✔
222
        debug_assert!(!points.is_empty());
10✔
223

224
        // TODO: parallelize over coordinate rather than features
225

226
        let tester = Arc::new(PointInPolygonTester::new(polygons)); // TODO: multithread
10✔
227

228
        let parallelism = thread_pool.current_num_threads();
10✔
229
        let chunk_size = (points.len() as f64 / parallelism as f64).ceil() as usize;
10✔
230

231
        let mut result = vec![false; points.len()];
10✔
232

233
        thread_pool.scope(|scope| {
10✔
234
            let num_features = points.len();
10✔
235
            let feature_offsets = points.feature_offsets();
10✔
236
            let time_intervals = points.time_intervals();
10✔
237
            let coordinates = points.coordinates();
10✔
238

239
            for (chunk_index, chunk_result) in result.chunks_mut(chunk_size).enumerate() {
22✔
240
                let feature_index_start = chunk_index * chunk_size;
22✔
241
                let features_index_end = min(feature_index_start + chunk_size, num_features);
22✔
242
                let tester = tester.clone();
22✔
243

244
                scope.spawn(move |_| {
22✔
245
                    for (
246
                        feature_index,
22✔
247
                        ((coordinates_start_index, coordinates_end_index), time_interval),
22✔
248
                    ) in two_tuple_windows(
22✔
249
                        feature_offsets[feature_index_start..=features_index_end]
22✔
250
                            .iter()
22✔
251
                            .map(|&c| c as usize),
44✔
252
                    )
253
                    .zip(time_intervals[feature_index_start..features_index_end].iter())
22✔
254
                    .enumerate()
22✔
255
                    {
256
                        let is_multi_point_in_polygon_collection = coordinates
22✔
257
                            [coordinates_start_index..coordinates_end_index]
22✔
258
                            .iter()
22✔
259
                            .any(|coordinate| {
22✔
260
                                tester.any_polygon_contains_coordinate(coordinate, time_interval)
22✔
261
                            });
22✔
262

263
                        chunk_result[feature_index] = is_multi_point_in_polygon_collection;
22✔
264
                    }
265
                });
22✔
266
            }
267
        });
10✔
268

269
        result
10✔
270
    }
10✔
271

272
    async fn filter_points(
10✔
273
        ctx: &dyn QueryContext,
10✔
274
        points: Arc<MultiPointCollection>,
10✔
275
        polygons: MultiPolygonCollection,
10✔
276
        initial_filter: &BooleanArray,
10✔
277
    ) -> Result<BooleanArray> {
10✔
278
        let thread_pool = ctx.thread_pool().clone();
10✔
279

280
        let thread_points = points.clone();
10✔
281
        let filter = crate::util::spawn_blocking(move || {
10✔
282
            Self::filter_parallel(&thread_points, &polygons, &thread_pool)
10✔
283
        })
10✔
284
        .await?;
10✔
285

286
        arrow::compute::or(initial_filter, &filter.into()).map_err(Into::into)
10✔
287
    }
10✔
288
}
289

290
#[async_trait]
291
impl VectorQueryProcessor for PointInPolygonFilterProcessor {
292
    type VectorType = MultiPointCollection;
293

294
    async fn vector_query<'a>(
295
        &'a self,
296
        query: VectorQueryRectangle,
297
        ctx: &'a dyn QueryContext,
298
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
12✔
299
        let filtered_stream =
6✔
300
            self.points
6✔
301
                .query(query.clone(), ctx)
6✔
302
                .await?
6✔
303
                .and_then(move |points| {
8✔
304
                    let query: geoengine_datatypes::primitives::QueryRectangle<
8✔
305
                        geoengine_datatypes::primitives::BoundingBox2D,
8✔
306
                        geoengine_datatypes::primitives::ColumnSelection,
8✔
307
                    > = query.clone();
8✔
308
                    async move {
8✔
309
                        if points.is_empty() {
8✔
310
                            return Ok(points);
1✔
311
                        }
7✔
312

313
                        let initial_filter = BooleanArray::from(vec![false; points.len()]);
7✔
314
                        let arc_points = Arc::new(points);
7✔
315

316
                        let (filter, cache_hint) = self
7✔
317
                            .polygons
7✔
318
                            .query(query.clone(), ctx)
7✔
319
                            .await?
7✔
320
                            .try_fold(
7✔
321
                                (initial_filter, CacheHint::max_duration()),
7✔
322
                                |acc, polygons| {
11✔
323
                                    let arc_points = arc_points.clone();
11✔
324
                                    async move {
11✔
325
                                        let (filter, mut cache_hint) = acc;
11✔
326
                                        let polygons = polygons;
11✔
327

328
                                        cache_hint.merge_with(&polygons.cache_hint);
11✔
329

330
                                        if polygons.is_empty() {
11✔
331
                                            return Ok((filter, cache_hint));
1✔
332
                                        }
10✔
333

334
                                        Ok((
335
                                            Self::filter_points(
10✔
336
                                                ctx,
10✔
337
                                                arc_points.clone(),
10✔
338
                                                polygons,
10✔
339
                                                &filter,
10✔
340
                                            )
10✔
341
                                            .await?,
10✔
342
                                            cache_hint,
10✔
343
                                        ))
344
                                    }
11✔
345
                                },
11✔
346
                            )
347
                            .await?;
7✔
348

349
                        let mut new_points =
7✔
350
                            arc_points.filter(filter).map_err(Into::<Error>::into)?;
7✔
351
                        new_points.cache_hint = cache_hint;
7✔
352

353
                        Ok(new_points)
7✔
354
                    }
8✔
355
                });
8✔
356

357
        Ok(
6✔
358
            FeatureCollectionChunkMerger::new(filtered_stream.fuse(), ctx.chunk_byte_size().into())
6✔
359
                .boxed(),
6✔
360
        )
6✔
361
    }
12✔
362

363
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
6✔
364
        &self.result_descriptor
6✔
365
    }
6✔
366
}
367

368
/// Loop through an iterator by yielding the current and previous tuple. Starts with the
369
/// (first, second) item, so the iterator must have more than one item to create an output.
370
fn two_tuple_windows<I, T>(mut iter: I) -> impl Iterator<Item = (T, T)>
22✔
371
where
22✔
372
    I: Iterator<Item = T>,
22✔
373
    T: Copy,
22✔
374
{
375
    let mut last = iter.next();
22✔
376

377
    iter.map(move |item| {
22✔
378
        let output = (
22✔
379
            last.expect("it should have a first tuple in a two-tuple window"),
22✔
380
            item,
22✔
381
        );
22✔
382
        last = Some(item);
22✔
383
        output
22✔
384
    })
22✔
385
}
22✔
386

387
#[cfg(test)]
388
mod tests {
389

390
    use super::*;
391
    use std::str::FromStr;
392

393
    use geoengine_datatypes::collections::ChunksEqualIgnoringCacheHint;
394
    use geoengine_datatypes::primitives::{
395
        BoundingBox2D, Coordinate2D, MultiPoint, MultiPolygon, SpatialResolution, TimeInterval,
396
    };
397
    use geoengine_datatypes::primitives::{CacheHint, ColumnSelection};
398
    use geoengine_datatypes::spatial_reference::SpatialReference;
399
    use geoengine_datatypes::util::test::TestDefault;
400

401
    use crate::engine::{ChunkByteSize, MockExecutionContext, MockQueryContext};
402
    use crate::error::Error;
403
    use crate::mock::MockFeatureCollectionSource;
404

405
    #[test]
406
    fn point_in_polygon_boundary_conditions() {
1✔
407
        let collection = MultiPolygonCollection::from_data(
1✔
408
            vec![
1✔
409
                MultiPolygon::new(vec![vec![vec![
1✔
410
                    (0.0, 0.0).into(),
1✔
411
                    (10.0, 0.0).into(),
1✔
412
                    (10.0, 10.0).into(),
1✔
413
                    (0.0, 10.0).into(),
1✔
414
                    (0.0, 0.0).into(),
1✔
415
                ]]])
416
                .unwrap(),
1✔
417
            ],
418
            vec![Default::default(); 1],
1✔
419
            Default::default(),
1✔
420
            CacheHint::default(),
1✔
421
        )
422
        .unwrap();
1✔
423

424
        let tester = PointInPolygonTester::new(&collection);
1✔
425

426
        // the algorithm is not stable for boundary cases directly on the edges
427

428
        assert!(tester.any_polygon_contains_coordinate(
1✔
429
            &Coordinate2D::new(0.000_001, 0.000_001),
1✔
430
            &Default::default()
1✔
431
        ),);
432
        assert!(tester.any_polygon_contains_coordinate(
1✔
433
            &Coordinate2D::new(0.000_001, 0.1),
1✔
434
            &Default::default()
1✔
435
        ),);
436
        assert!(tester.any_polygon_contains_coordinate(
1✔
437
            &Coordinate2D::new(0.1, 0.000_001),
1✔
438
            &Default::default()
1✔
439
        ),);
440

441
        assert!(
1✔
442
            tester
1✔
443
                .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 9.9), &Default::default()),
1✔
444
        );
445
        assert!(
1✔
446
            tester.any_polygon_contains_coordinate(
1✔
447
                &Coordinate2D::new(10.0, 9.9),
1✔
448
                &Default::default()
1✔
449
            ),
450
        );
451
        assert!(
1✔
452
            tester.any_polygon_contains_coordinate(
1✔
453
                &Coordinate2D::new(9.9, 10.0),
1✔
454
                &Default::default()
1✔
455
            ),
456
        );
457

458
        assert!(
1✔
459
            !tester.any_polygon_contains_coordinate(
1✔
460
                &Coordinate2D::new(-0.1, -0.1),
1✔
461
                &Default::default()
1✔
462
            ),
1✔
463
        );
464
        assert!(
1✔
465
            !tester.any_polygon_contains_coordinate(
1✔
466
                &Coordinate2D::new(0.0, -0.1),
1✔
467
                &Default::default()
1✔
468
            ),
1✔
469
        );
470
        assert!(
1✔
471
            !tester.any_polygon_contains_coordinate(
1✔
472
                &Coordinate2D::new(-0.1, 0.0),
1✔
473
                &Default::default()
1✔
474
            ),
1✔
475
        );
476

477
        assert!(
1✔
478
            !tester.any_polygon_contains_coordinate(
1✔
479
                &Coordinate2D::new(10.1, 10.1),
1✔
480
                &Default::default()
1✔
481
            ),
1✔
482
        );
483
        assert!(
1✔
484
            !tester.any_polygon_contains_coordinate(
1✔
485
                &Coordinate2D::new(10.1, 9.9),
1✔
486
                &Default::default()
1✔
487
            ),
1✔
488
        );
489
        assert!(
1✔
490
            !tester.any_polygon_contains_coordinate(
1✔
491
                &Coordinate2D::new(9.9, 10.1),
1✔
492
                &Default::default()
1✔
493
            ),
1✔
494
        );
495
    }
1✔
496

497
    #[tokio::test]
498
    async fn all() -> Result<()> {
1✔
499
        let points = MultiPointCollection::from_data(
1✔
500
            MultiPoint::many(vec![(0.001, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
501
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
502
            Default::default(),
1✔
503
            CacheHint::default(),
1✔
UNCOV
504
        )?;
×
505

506
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
507

508
        let polygon_source =
1✔
509
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
510
                vec![MultiPolygon::new(vec![vec![vec![
1✔
511
                    (0.0, 0.0).into(),
1✔
512
                    (10.0, 0.0).into(),
1✔
513
                    (10.0, 10.0).into(),
1✔
514
                    (0.0, 10.0).into(),
1✔
515
                    (0.0, 0.0).into(),
1✔
UNCOV
516
                ]]])?],
×
517
                vec![TimeInterval::new_unchecked(0, 1); 1],
1✔
518
                Default::default(),
1✔
519
                CacheHint::default(),
1✔
UNCOV
520
            )?)
×
521
            .boxed();
1✔
522

523
        let operator = PointInPolygonFilter {
1✔
524
            params: PointInPolygonFilterParams {},
1✔
525
            sources: PointInPolygonFilterSource {
1✔
526
                points: point_source,
1✔
527
                polygons: polygon_source,
1✔
528
            },
1✔
529
        }
1✔
530
        .boxed()
1✔
531
        .initialize(
1✔
532
            WorkflowOperatorPath::initialize_root(),
1✔
533
            &MockExecutionContext::test_default(),
1✔
534
        )
1✔
535
        .await?;
1✔
536

537
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
538

539
        let query_rectangle = VectorQueryRectangle {
1✔
540
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
541
            time_interval: TimeInterval::default(),
1✔
542
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
543
            attributes: ColumnSelection::all(),
1✔
544
        };
1✔
545
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
546

547
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
548

549
        let result = query
1✔
550
            .map(Result::unwrap)
1✔
551
            .collect::<Vec<MultiPointCollection>>()
1✔
552
            .await;
1✔
553

554
        assert_eq!(result.len(), 1);
1✔
555

556
        assert!(result[0].chunks_equal_ignoring_cache_hint(&points));
1✔
557

558
        Ok(())
2✔
559
    }
1✔
560

561
    #[tokio::test]
562
    async fn empty() -> Result<()> {
1✔
563
        let points = MultiPointCollection::from_data(
1✔
564
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
565
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
566
            Default::default(),
1✔
567
            CacheHint::default(),
1✔
UNCOV
568
        )?;
×
569

570
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
571

572
        let polygon_source =
1✔
573
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
574
                vec![],
1✔
575
                vec![],
1✔
576
                Default::default(),
1✔
577
                CacheHint::default(),
1✔
UNCOV
578
            )?)
×
579
            .boxed();
1✔
580

581
        let operator = PointInPolygonFilter {
1✔
582
            params: PointInPolygonFilterParams {},
1✔
583
            sources: PointInPolygonFilterSource {
1✔
584
                points: point_source,
1✔
585
                polygons: polygon_source,
1✔
586
            },
1✔
587
        }
1✔
588
        .boxed()
1✔
589
        .initialize(
1✔
590
            WorkflowOperatorPath::initialize_root(),
1✔
591
            &MockExecutionContext::test_default(),
1✔
592
        )
1✔
593
        .await?;
1✔
594

595
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
596

597
        let query_rectangle = VectorQueryRectangle {
1✔
598
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
599
            time_interval: TimeInterval::default(),
1✔
600
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
601
            attributes: ColumnSelection::all(),
1✔
602
        };
1✔
603
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
604

605
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
606

607
        let result = query
1✔
608
            .map(Result::unwrap)
1✔
609
            .collect::<Vec<MultiPointCollection>>()
1✔
610
            .await;
1✔
611

612
        assert_eq!(result.len(), 1);
1✔
613
        assert!(result[0].chunks_equal_ignoring_cache_hint(&MultiPointCollection::empty()));
1✔
614

615
        Ok(())
2✔
616
    }
1✔
617

618
    #[tokio::test]
619
    async fn time() -> Result<()> {
1✔
620
        let points = MultiPointCollection::from_data(
1✔
621
            MultiPoint::many(vec![(1.0, 1.1), (2.0, 2.1), (3.0, 3.1)]).unwrap(),
1✔
622
            vec![
1✔
623
                TimeInterval::new(0, 1)?,
1✔
624
                TimeInterval::new(5, 6)?,
1✔
625
                TimeInterval::new(0, 5)?,
1✔
626
            ],
627
            Default::default(),
1✔
628
            CacheHint::default(),
1✔
UNCOV
629
        )?;
×
630

631
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
632

633
        let polygon = MultiPolygon::new(vec![vec![vec![
1✔
634
            (0.0, 0.0).into(),
1✔
635
            (10.0, 0.0).into(),
1✔
636
            (10.0, 10.0).into(),
1✔
637
            (0.0, 10.0).into(),
1✔
638
            (0.0, 0.0).into(),
1✔
UNCOV
639
        ]]])?;
×
640

641
        let polygon_source =
1✔
642
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
643
                vec![polygon.clone(), polygon],
1✔
644
                vec![TimeInterval::new(0, 1)?, TimeInterval::new(1, 2)?],
1✔
645
                Default::default(),
1✔
646
                CacheHint::default(),
1✔
UNCOV
647
            )?)
×
648
            .boxed();
1✔
649

650
        let operator = PointInPolygonFilter {
1✔
651
            params: PointInPolygonFilterParams {},
1✔
652
            sources: PointInPolygonFilterSource {
1✔
653
                points: point_source,
1✔
654
                polygons: polygon_source,
1✔
655
            },
1✔
656
        }
1✔
657
        .boxed()
1✔
658
        .initialize(
1✔
659
            WorkflowOperatorPath::initialize_root(),
1✔
660
            &MockExecutionContext::test_default(),
1✔
661
        )
1✔
662
        .await?;
1✔
663

664
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
665

666
        let query_rectangle = VectorQueryRectangle {
1✔
667
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
668
            time_interval: TimeInterval::default(),
1✔
669
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
670
            attributes: ColumnSelection::all(),
1✔
671
        };
1✔
672
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
673

674
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
675

676
        let result = query
1✔
677
            .map(Result::unwrap)
1✔
678
            .collect::<Vec<MultiPointCollection>>()
1✔
679
            .await;
1✔
680

681
        assert_eq!(result.len(), 1);
1✔
682

683
        assert!(
1✔
684
            result[0].chunks_equal_ignoring_cache_hint(&points.filter(vec![true, false, true])?)
1✔
685
        );
686

687
        Ok(())
2✔
688
    }
1✔
689

690
    // #[async::main]
691
    #[tokio::test]
692
    async fn multiple_inputs() -> Result<()> {
1✔
693
        let points1 = MultiPointCollection::from_data(
1✔
694
            MultiPoint::many(vec![(5.0, 5.1), (15.0, 15.1)]).unwrap(),
1✔
695
            vec![TimeInterval::new(0, 1)?; 2],
1✔
696
            Default::default(),
1✔
697
            CacheHint::default(),
1✔
UNCOV
698
        )?;
×
699
        let points2 = MultiPointCollection::from_data(
1✔
700
            MultiPoint::many(vec![(6.0, 6.1), (16.0, 16.1)]).unwrap(),
1✔
701
            vec![TimeInterval::new(1, 2)?; 2],
1✔
702
            Default::default(),
1✔
703
            CacheHint::default(),
1✔
UNCOV
704
        )?;
×
705

706
        let point_source =
1✔
707
            MockFeatureCollectionSource::multiple(vec![points1.clone(), points2.clone()]).boxed();
1✔
708

709
        let polygon1 = MultiPolygon::new(vec![vec![vec![
1✔
710
            (0.0, 0.0).into(),
1✔
711
            (10.0, 0.0).into(),
1✔
712
            (10.0, 10.0).into(),
1✔
713
            (0.0, 10.0).into(),
1✔
714
            (0.0, 0.0).into(),
1✔
UNCOV
715
        ]]])?;
×
716
        let polygon2 = MultiPolygon::new(vec![vec![vec![
1✔
717
            (10.0, 10.0).into(),
1✔
718
            (20.0, 10.0).into(),
1✔
719
            (20.0, 20.0).into(),
1✔
720
            (10.0, 20.0).into(),
1✔
721
            (10.0, 10.0).into(),
1✔
UNCOV
722
        ]]])?;
×
723

724
        let polygon_source = MockFeatureCollectionSource::multiple(vec![
1✔
725
            MultiPolygonCollection::from_data(
1✔
726
                vec![polygon1.clone()],
1✔
727
                vec![TimeInterval::new(0, 1)?],
1✔
728
                Default::default(),
1✔
729
                CacheHint::default(),
1✔
UNCOV
730
            )?,
×
731
            MultiPolygonCollection::from_data(
1✔
732
                vec![polygon1, polygon2],
1✔
733
                vec![TimeInterval::new(1, 2)?, TimeInterval::new(1, 2)?],
1✔
734
                Default::default(),
1✔
735
                CacheHint::default(),
1✔
UNCOV
736
            )?,
×
737
        ])
738
        .boxed();
1✔
739

740
        let operator = PointInPolygonFilter {
1✔
741
            params: PointInPolygonFilterParams {},
1✔
742
            sources: PointInPolygonFilterSource {
1✔
743
                points: point_source,
1✔
744
                polygons: polygon_source,
1✔
745
            },
1✔
746
        }
1✔
747
        .boxed()
1✔
748
        .initialize(
1✔
749
            WorkflowOperatorPath::initialize_root(),
1✔
750
            &MockExecutionContext::test_default(),
1✔
751
        )
1✔
752
        .await?;
1✔
753

754
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
755

756
        let query_rectangle = VectorQueryRectangle {
1✔
757
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
758
            time_interval: TimeInterval::default(),
1✔
759
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
760
            attributes: ColumnSelection::all(),
1✔
761
        };
1✔
762

763
        let ctx_one_chunk = MockQueryContext::new(ChunkByteSize::MAX);
1✔
764
        let ctx_minimal_chunks = MockQueryContext::new(ChunkByteSize::MIN);
1✔
765

766
        let query = query_processor
1✔
767
            .query(query_rectangle.clone(), &ctx_minimal_chunks)
1✔
768
            .await
1✔
769
            .unwrap();
1✔
770

771
        let result = query
1✔
772
            .map(Result::unwrap)
1✔
773
            .collect::<Vec<MultiPointCollection>>()
1✔
774
            .await;
1✔
775

776
        assert_eq!(result.len(), 2);
1✔
777

778
        assert!(result[0].chunks_equal_ignoring_cache_hint(&points1.filter(vec![true, false])?));
1✔
779
        assert!(result[1].chunks_equal_ignoring_cache_hint(&points2));
1✔
780

781
        let query = query_processor
1✔
782
            .query(query_rectangle, &ctx_one_chunk)
1✔
783
            .await
1✔
784
            .unwrap();
1✔
785

786
        let result = query
1✔
787
            .map(Result::unwrap)
1✔
788
            .collect::<Vec<MultiPointCollection>>()
1✔
789
            .await;
1✔
790

791
        assert_eq!(result.len(), 1);
1✔
792

793
        assert!(result[0].chunks_equal_ignoring_cache_hint(
1✔
794
            &points1.filter(vec![true, false])?.append(&points2)?
1✔
795
        ));
796

797
        Ok(())
2✔
798
    }
1✔
799

800
    #[tokio::test]
801
    async fn empty_points() {
1✔
802
        let point_collection = MultiPointCollection::from_data(
1✔
803
            vec![],
1✔
804
            vec![],
1✔
805
            Default::default(),
1✔
806
            CacheHint::default(),
1✔
807
        )
808
        .unwrap();
1✔
809

810
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
811
            vec![
1✔
812
                MultiPolygon::new(vec![vec![vec![
1✔
813
                    (0.0, 0.0).into(),
1✔
814
                    (10.0, 0.0).into(),
1✔
815
                    (10.0, 10.0).into(),
1✔
816
                    (0.0, 10.0).into(),
1✔
817
                    (0.0, 0.0).into(),
1✔
818
                ]]])
819
                .unwrap(),
1✔
820
            ],
821
            vec![TimeInterval::default()],
1✔
822
            Default::default(),
1✔
823
            CacheHint::default(),
1✔
824
        )
825
        .unwrap();
1✔
826

827
        let operator = PointInPolygonFilter {
1✔
828
            params: PointInPolygonFilterParams {},
1✔
829
            sources: PointInPolygonFilterSource {
1✔
830
                points: MockFeatureCollectionSource::single(point_collection).boxed(),
1✔
831
                polygons: MockFeatureCollectionSource::single(polygon_collection).boxed(),
1✔
832
            },
1✔
833
        }
1✔
834
        .boxed()
1✔
835
        .initialize(
1✔
836
            WorkflowOperatorPath::initialize_root(),
1✔
837
            &MockExecutionContext::test_default(),
1✔
838
        )
1✔
839
        .await
1✔
840
        .unwrap();
1✔
841

842
        let query_rectangle = VectorQueryRectangle {
1✔
843
            spatial_bounds: BoundingBox2D::new((-10., -10.).into(), (10., 10.).into()).unwrap(),
1✔
844
            time_interval: TimeInterval::default(),
1✔
845
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
846
            attributes: ColumnSelection::all(),
1✔
847
        };
1✔
848

849
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
850

851
        let query_context = MockQueryContext::test_default();
1✔
852

853
        let query = query_processor
1✔
854
            .query(query_rectangle, &query_context)
1✔
855
            .await
1✔
856
            .unwrap();
1✔
857

858
        let result = query
1✔
859
            .map(Result::unwrap)
1✔
860
            .collect::<Vec<MultiPointCollection>>()
1✔
861
            .await;
1✔
862

863
        assert_eq!(result.len(), 1);
1✔
864
        assert!(result[0].chunks_equal_ignoring_cache_hint(&MultiPointCollection::empty()));
1✔
865
    }
1✔
866

867
    #[tokio::test]
868
    async fn it_checks_sref() {
1✔
869
        let point_collection = MultiPointCollection::from_data(
1✔
870
            vec![],
1✔
871
            vec![],
1✔
872
            Default::default(),
1✔
873
            CacheHint::default(),
1✔
874
        )
875
        .unwrap();
1✔
876

877
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
878
            vec![
1✔
879
                MultiPolygon::new(vec![vec![vec![
1✔
880
                    (0.0, 0.0).into(),
1✔
881
                    (10.0, 0.0).into(),
1✔
882
                    (10.0, 10.0).into(),
1✔
883
                    (0.0, 10.0).into(),
1✔
884
                    (0.0, 0.0).into(),
1✔
885
                ]]])
886
                .unwrap(),
1✔
887
            ],
888
            vec![TimeInterval::default()],
1✔
889
            Default::default(),
1✔
890
            CacheHint::default(),
1✔
891
        )
892
        .unwrap();
1✔
893

894
        let operator = PointInPolygonFilter {
1✔
895
            params: PointInPolygonFilterParams {},
1✔
896
            sources: PointInPolygonFilterSource {
1✔
897
                points: MockFeatureCollectionSource::with_collections_and_sref(
1✔
898
                    vec![point_collection],
1✔
899
                    SpatialReference::epsg_4326(),
1✔
900
                )
1✔
901
                .boxed(),
1✔
902
                polygons: MockFeatureCollectionSource::with_collections_and_sref(
1✔
903
                    vec![polygon_collection],
1✔
904
                    SpatialReference::from_str("EPSG:3857").unwrap(),
1✔
905
                )
1✔
906
                .boxed(),
1✔
907
            },
1✔
908
        }
1✔
909
        .boxed()
1✔
910
        .initialize(
1✔
911
            WorkflowOperatorPath::initialize_root(),
1✔
912
            &MockExecutionContext::test_default(),
1✔
913
        )
1✔
914
        .await;
1✔
915

916
        assert!(matches!(
1✔
917
            operator,
1✔
918
            Err(Error::InvalidSpatialReference {
1✔
919
                expected: _,
1✔
920
                found: _,
1✔
921
            })
1✔
922
        ));
1✔
923
    }
1✔
924
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc