• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.69
/operators/src/processing/point_in_polygon.rs
1
mod tester;
2
mod wrapper;
3

4
use std::cmp::min;
5
use std::sync::Arc;
6

7
use futures::stream::BoxStream;
8
use futures::{StreamExt, TryStreamExt};
9
use geoengine_datatypes::dataset::NamedData;
10
use geoengine_datatypes::primitives::CacheHint;
11
use geoengine_datatypes::primitives::VectorQueryRectangle;
12
use rayon::ThreadPool;
13
use serde::{Deserialize, Serialize};
14
use snafu::ensure;
15

16
use crate::adapters::FeatureCollectionChunkMerger;
17
use crate::engine::{
18
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
19
    OperatorName, QueryContext, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
20
    VectorResultDescriptor, WorkflowOperatorPath,
21
};
22
use crate::engine::{OperatorData, QueryProcessor};
23
use crate::error::{self, Error};
24
use crate::util::Result;
25
use arrow::array::BooleanArray;
26
use async_trait::async_trait;
27
use geoengine_datatypes::collections::{
28
    FeatureCollectionInfos, FeatureCollectionModifications, GeometryCollection,
29
    MultiPointCollection, MultiPolygonCollection, VectorDataType,
30
};
31
pub use tester::PointInPolygonTester;
32
pub use wrapper::PointInPolygonTesterWithCollection;
33

34
/// The point in polygon filter requires two inputs in the following order:
35
/// 1. a `MultiPointCollection` source
36
/// 2. a `MultiPolygonCollection` source
37
///     Then, it filters the `MultiPolygonCollection`s so that only those features are retained that are in any polygon.
38
pub type PointInPolygonFilter = Operator<PointInPolygonFilterParams, PointInPolygonFilterSource>;
39

40
impl OperatorName for PointInPolygonFilter {
41
    const TYPE_NAME: &'static str = "PointInPolygonFilter";
42
}
43

44
#[derive(Debug, Clone, Deserialize, Serialize)]
×
45
pub struct PointInPolygonFilterParams {}
46

47
#[derive(Debug, Clone, Deserialize, Serialize)]
×
48
pub struct PointInPolygonFilterSource {
49
    pub points: Box<dyn VectorOperator>,
50
    pub polygons: Box<dyn VectorOperator>,
51
}
52

53
impl OperatorData for PointInPolygonFilterSource {
54
    fn data_names_collect(&self, data_names: &mut Vec<NamedData>) {
×
55
        self.points.data_names_collect(data_names);
×
56
        self.polygons.data_names_collect(data_names);
×
57
    }
×
58
}
59

60
struct InitializedPointInPolygonFilterSource {
61
    points: Box<dyn InitializedVectorOperator>,
62
    polygons: Box<dyn InitializedVectorOperator>,
63
}
64

65
#[async_trait]
66
impl InitializedSources<InitializedPointInPolygonFilterSource> for PointInPolygonFilterSource {
67
    async fn initialize_sources(
68
        self,
69
        path: WorkflowOperatorPath,
70
        context: &dyn ExecutionContext,
71
    ) -> Result<InitializedPointInPolygonFilterSource> {
6✔
72
        let points_path = path.clone_and_append(0);
6✔
73
        let polygons_path = path.clone_and_append(1);
6✔
74

6✔
75
        Ok(InitializedPointInPolygonFilterSource {
6✔
76
            points: self.points.initialize(points_path, context).await?,
6✔
77
            polygons: self.polygons.initialize(polygons_path, context).await?,
6✔
78
        })
79
    }
12✔
80
}
81

82
#[typetag::serde]
×
83
#[async_trait]
84
impl VectorOperator for PointInPolygonFilter {
85
    async fn _initialize(
86
        self: Box<Self>,
87
        path: WorkflowOperatorPath,
88
        context: &dyn ExecutionContext,
89
    ) -> Result<Box<dyn InitializedVectorOperator>> {
6✔
90
        let name = CanonicOperatorName::from(&self);
6✔
91

92
        let initialized_source = self.sources.initialize_sources(path, context).await?;
6✔
93

94
        let points_rd = initialized_source.points.result_descriptor();
6✔
95
        let polygons_rd = initialized_source.polygons.result_descriptor();
6✔
96

6✔
97
        ensure!(
6✔
98
            points_rd.data_type == VectorDataType::MultiPoint,
6✔
UNCOV
99
            error::InvalidType {
×
100
                expected: VectorDataType::MultiPoint.to_string(),
×
101
                found: points_rd.data_type.to_string(),
×
102
            }
×
103
        );
104
        ensure!(
6✔
105
            polygons_rd.data_type == VectorDataType::MultiPolygon,
6✔
UNCOV
106
            error::InvalidType {
×
107
                expected: VectorDataType::MultiPolygon.to_string(),
×
108
                found: polygons_rd.data_type.to_string(),
×
109
            }
×
110
        );
111

112
        ensure!(
6✔
113
            points_rd.spatial_reference == polygons_rd.spatial_reference,
6✔
114
            crate::error::InvalidSpatialReference {
1✔
115
                expected: points_rd.spatial_reference,
1✔
116
                found: polygons_rd.spatial_reference,
1✔
117
            }
1✔
118
        );
119

120
        // We use the result descriptor of the points because in the worst case no feature will be excluded.
121
        // We cannot use the polygon bbox because a `MultiPoint` could have one point within a polygon (and
122
        // thus be included in the result) and one point outside of the bbox of the polygons.
123
        let out_desc = initialized_source.points.result_descriptor().clone();
5✔
124

5✔
125
        let initialized_operator = InitializedPointInPolygonFilter {
5✔
126
            name,
5✔
127
            result_descriptor: out_desc,
5✔
128
            points: initialized_source.points,
5✔
129
            polygons: initialized_source.polygons,
5✔
130
        };
5✔
131

5✔
132
        Ok(initialized_operator.boxed())
5✔
133
    }
12✔
134

135
    span_fn!(PointInPolygonFilter);
136
}
137

138
pub struct InitializedPointInPolygonFilter {
139
    name: CanonicOperatorName,
140
    points: Box<dyn InitializedVectorOperator>,
141
    polygons: Box<dyn InitializedVectorOperator>,
142
    result_descriptor: VectorResultDescriptor,
143
}
144

145
impl InitializedVectorOperator for InitializedPointInPolygonFilter {
146
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
147
        let point_processor = self
5✔
148
            .points
5✔
149
            .query_processor()?
5✔
150
            .multi_point()
5✔
151
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
152

153
        let polygon_processor = self
5✔
154
            .polygons
5✔
155
            .query_processor()?
5✔
156
            .multi_polygon()
5✔
157
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
158

5✔
159
        Ok(TypedVectorQueryProcessor::MultiPoint(
5✔
160
            PointInPolygonFilterProcessor::new(
5✔
161
                self.result_descriptor.clone(),
5✔
162
                point_processor,
5✔
163
                polygon_processor,
5✔
164
            )
5✔
165
            .boxed(),
5✔
166
        ))
5✔
167
    }
5✔
168

169
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
170
        &self.result_descriptor
×
171
    }
×
172

173
    fn canonic_name(&self) -> CanonicOperatorName {
×
174
        self.name.clone()
×
175
    }
×
176
}
177

178
pub struct PointInPolygonFilterProcessor {
179
    result_descriptor: VectorResultDescriptor,
180
    points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
181
    polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
182
}
183

184
impl PointInPolygonFilterProcessor {
185
    pub fn new(
5✔
186
        result_descriptor: VectorResultDescriptor,
5✔
187
        points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
5✔
188
        polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
5✔
189
    ) -> Self {
5✔
190
        Self {
5✔
191
            result_descriptor,
5✔
192
            points,
5✔
193
            polygons,
5✔
194
        }
5✔
195
    }
5✔
196

197
    fn filter_parallel(
10✔
198
        points: &Arc<MultiPointCollection>,
10✔
199
        polygons: &MultiPolygonCollection,
10✔
200
        thread_pool: &ThreadPool,
10✔
201
    ) -> Vec<bool> {
10✔
202
        debug_assert!(!points.is_empty());
10✔
203

204
        // TODO: parallelize over coordinate rather than features
205

206
        let tester = Arc::new(PointInPolygonTester::new(polygons)); // TODO: multithread
10✔
207

10✔
208
        let parallelism = thread_pool.current_num_threads();
10✔
209
        let chunk_size = (points.len() as f64 / parallelism as f64).ceil() as usize;
10✔
210

10✔
211
        let mut result = vec![false; points.len()];
10✔
212

10✔
213
        thread_pool.scope(|scope| {
10✔
214
            let num_features = points.len();
10✔
215
            let feature_offsets = points.feature_offsets();
10✔
216
            let time_intervals = points.time_intervals();
10✔
217
            let coordinates = points.coordinates();
10✔
218

219
            for (chunk_index, chunk_result) in result.chunks_mut(chunk_size).enumerate() {
22✔
220
                let feature_index_start = chunk_index * chunk_size;
22✔
221
                let features_index_end = min(feature_index_start + chunk_size, num_features);
22✔
222
                let tester = tester.clone();
22✔
223

22✔
224
                scope.spawn(move |_| {
22✔
225
                    for (
226
                        feature_index,
22✔
227
                        ((coordinates_start_index, coordinates_end_index), time_interval),
22✔
228
                    ) in two_tuple_windows(
22✔
229
                        feature_offsets[feature_index_start..=features_index_end]
22✔
230
                            .iter()
22✔
231
                            .map(|&c| c as usize),
44✔
232
                    )
22✔
233
                    .zip(time_intervals[feature_index_start..features_index_end].iter())
22✔
234
                    .enumerate()
22✔
235
                    {
22✔
236
                        let is_multi_point_in_polygon_collection = coordinates
22✔
237
                            [coordinates_start_index..coordinates_end_index]
22✔
238
                            .iter()
22✔
239
                            .any(|coordinate| {
22✔
240
                                tester.any_polygon_contains_coordinate(coordinate, time_interval)
22✔
241
                            });
22✔
242

22✔
243
                        chunk_result[feature_index] = is_multi_point_in_polygon_collection;
22✔
244
                    }
22✔
245
                });
22✔
246
            }
22✔
247
        });
10✔
248

10✔
249
        result
10✔
250
    }
10✔
251

252
    async fn filter_points(
10✔
253
        ctx: &dyn QueryContext,
10✔
254
        points: Arc<MultiPointCollection>,
10✔
255
        polygons: MultiPolygonCollection,
10✔
256
        initial_filter: &BooleanArray,
10✔
257
    ) -> Result<BooleanArray> {
10✔
258
        let thread_pool = ctx.thread_pool().clone();
10✔
259

10✔
260
        let thread_points = points.clone();
10✔
261
        let filter = crate::util::spawn_blocking(move || {
10✔
262
            Self::filter_parallel(&thread_points, &polygons, &thread_pool)
10✔
263
        })
10✔
264
        .await?;
10✔
265

266
        arrow::compute::or(initial_filter, &filter.into()).map_err(Into::into)
10✔
267
    }
10✔
268
}
269

270
#[async_trait]
271
impl VectorQueryProcessor for PointInPolygonFilterProcessor {
272
    type VectorType = MultiPointCollection;
273

274
    async fn vector_query<'a>(
275
        &'a self,
276
        query: VectorQueryRectangle,
277
        ctx: &'a dyn QueryContext,
278
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
6✔
279
        let filtered_stream =
6✔
280
            self.points
6✔
281
                .query(query.clone(), ctx)
6✔
UNCOV
282
                .await?
×
283
                .and_then(move |points| {
8✔
284
                    let query: geoengine_datatypes::primitives::QueryRectangle<
8✔
285
                        geoengine_datatypes::primitives::BoundingBox2D,
8✔
286
                        geoengine_datatypes::primitives::ColumnSelection,
8✔
287
                    > = query.clone();
8✔
288
                    async move {
8✔
289
                        if points.is_empty() {
8✔
290
                            return Ok(points);
1✔
291
                        }
7✔
292

7✔
293
                        let initial_filter = BooleanArray::from(vec![false; points.len()]);
7✔
294
                        let arc_points = Arc::new(points);
7✔
295

296
                        let (filter, cache_hint) = self
7✔
297
                            .polygons
7✔
298
                            .query(query.clone(), ctx)
7✔
UNCOV
299
                            .await?
×
300
                            .try_fold(
7✔
301
                                (initial_filter, CacheHint::max_duration()),
7✔
302
                                |acc, polygons| {
11✔
303
                                    let arc_points = arc_points.clone();
11✔
304
                                    async move {
11✔
305
                                        let (filter, mut cache_hint) = acc;
11✔
306
                                        let polygons = polygons;
11✔
307

11✔
308
                                        cache_hint.merge_with(&polygons.cache_hint);
11✔
309

11✔
310
                                        if polygons.is_empty() {
11✔
311
                                            return Ok((filter, cache_hint));
1✔
312
                                        }
10✔
313

10✔
314
                                        Ok((
10✔
315
                                            Self::filter_points(
10✔
316
                                                ctx,
10✔
317
                                                arc_points.clone(),
10✔
318
                                                polygons,
10✔
319
                                                &filter,
10✔
320
                                            )
10✔
321
                                            .await?,
10✔
322
                                            cache_hint,
10✔
323
                                        ))
324
                                    }
11✔
325
                                },
11✔
326
                            )
7✔
327
                            .await?;
10✔
328

329
                        let mut new_points =
7✔
330
                            arc_points.filter(filter).map_err(Into::<Error>::into)?;
7✔
331
                        new_points.cache_hint = cache_hint;
7✔
332

7✔
333
                        Ok(new_points)
7✔
334
                    }
8✔
335
                });
8✔
336

6✔
337
        Ok(
6✔
338
            FeatureCollectionChunkMerger::new(filtered_stream.fuse(), ctx.chunk_byte_size().into())
6✔
339
                .boxed(),
6✔
340
        )
6✔
341
    }
12✔
342

343
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
6✔
344
        &self.result_descriptor
6✔
345
    }
6✔
346
}
347

348
/// Loop through an iterator by yielding the current and previous tuple. Starts with the
349
/// (first, second) item, so the iterator must have more than one item to create an output.
350
fn two_tuple_windows<I, T>(mut iter: I) -> impl Iterator<Item = (T, T)>
22✔
351
where
22✔
352
    I: Iterator<Item = T>,
22✔
353
    T: Copy,
22✔
354
{
22✔
355
    let mut last = iter.next();
22✔
356

22✔
357
    iter.map(move |item| {
22✔
358
        let output = (
22✔
359
            last.expect("it should have a first tuple in a two-tuple window"),
22✔
360
            item,
22✔
361
        );
22✔
362
        last = Some(item);
22✔
363
        output
22✔
364
    })
22✔
365
}
22✔
366

367
#[cfg(test)]
368
mod tests {
369

370
    use super::*;
371
    use std::str::FromStr;
372

373
    use geoengine_datatypes::collections::ChunksEqualIgnoringCacheHint;
374
    use geoengine_datatypes::primitives::{
375
        BoundingBox2D, Coordinate2D, MultiPoint, MultiPolygon, SpatialResolution, TimeInterval,
376
    };
377
    use geoengine_datatypes::primitives::{CacheHint, ColumnSelection};
378
    use geoengine_datatypes::spatial_reference::SpatialReference;
379
    use geoengine_datatypes::util::test::TestDefault;
380

381
    use crate::engine::{ChunkByteSize, MockExecutionContext, MockQueryContext};
382
    use crate::error::Error;
383
    use crate::mock::MockFeatureCollectionSource;
384

385
    #[test]
386
    fn point_in_polygon_boundary_conditions() {
1✔
387
        let collection = MultiPolygonCollection::from_data(
1✔
388
            vec![MultiPolygon::new(vec![vec![vec![
1✔
389
                (0.0, 0.0).into(),
1✔
390
                (10.0, 0.0).into(),
1✔
391
                (10.0, 10.0).into(),
1✔
392
                (0.0, 10.0).into(),
1✔
393
                (0.0, 0.0).into(),
1✔
394
            ]]])
1✔
395
            .unwrap()],
1✔
396
            vec![Default::default(); 1],
1✔
397
            Default::default(),
1✔
398
            CacheHint::default(),
1✔
399
        )
1✔
400
        .unwrap();
1✔
401

1✔
402
        let tester = PointInPolygonTester::new(&collection);
1✔
403

1✔
404
        // the algorithm is not stable for boundary cases directly on the edges
1✔
405

1✔
406
        assert!(tester.any_polygon_contains_coordinate(
1✔
407
            &Coordinate2D::new(0.000_001, 0.000_001),
1✔
408
            &Default::default()
1✔
409
        ),);
1✔
410
        assert!(tester.any_polygon_contains_coordinate(
1✔
411
            &Coordinate2D::new(0.000_001, 0.1),
1✔
412
            &Default::default()
1✔
413
        ),);
1✔
414
        assert!(tester.any_polygon_contains_coordinate(
1✔
415
            &Coordinate2D::new(0.1, 0.000_001),
1✔
416
            &Default::default()
1✔
417
        ),);
1✔
418

419
        assert!(tester
1✔
420
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 9.9), &Default::default()),);
1✔
421
        assert!(tester
1✔
422
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.0, 9.9), &Default::default()),);
1✔
423
        assert!(tester
1✔
424
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 10.0), &Default::default()),);
1✔
425

426
        assert!(!tester
1✔
427
            .any_polygon_contains_coordinate(&Coordinate2D::new(-0.1, -0.1), &Default::default()),);
1✔
428
        assert!(!tester
1✔
429
            .any_polygon_contains_coordinate(&Coordinate2D::new(0.0, -0.1), &Default::default()),);
1✔
430
        assert!(!tester
1✔
431
            .any_polygon_contains_coordinate(&Coordinate2D::new(-0.1, 0.0), &Default::default()),);
1✔
432

433
        assert!(!tester
1✔
434
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.1, 10.1), &Default::default()),);
1✔
435
        assert!(!tester
1✔
436
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.1, 9.9), &Default::default()),);
1✔
437
        assert!(!tester
1✔
438
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 10.1), &Default::default()),);
1✔
439
    }
1✔
440

441
    #[tokio::test]
442
    async fn all() -> Result<()> {
1✔
443
        let points = MultiPointCollection::from_data(
1✔
444
            MultiPoint::many(vec![(0.001, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
445
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
446
            Default::default(),
1✔
447
            CacheHint::default(),
1✔
448
        )?;
1✔
449

1✔
450
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
451

1✔
452
        let polygon_source =
1✔
453
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
454
                vec![MultiPolygon::new(vec![vec![vec![
1✔
455
                    (0.0, 0.0).into(),
1✔
456
                    (10.0, 0.0).into(),
1✔
457
                    (10.0, 10.0).into(),
1✔
458
                    (0.0, 10.0).into(),
1✔
459
                    (0.0, 0.0).into(),
1✔
460
                ]]])?],
1✔
461
                vec![TimeInterval::new_unchecked(0, 1); 1],
1✔
462
                Default::default(),
1✔
463
                CacheHint::default(),
1✔
464
            )?)
1✔
465
            .boxed();
1✔
466

1✔
467
        let operator = PointInPolygonFilter {
1✔
468
            params: PointInPolygonFilterParams {},
1✔
469
            sources: PointInPolygonFilterSource {
1✔
470
                points: point_source,
1✔
471
                polygons: polygon_source,
1✔
472
            },
1✔
473
        }
1✔
474
        .boxed()
1✔
475
        .initialize(
1✔
476
            WorkflowOperatorPath::initialize_root(),
1✔
477
            &MockExecutionContext::test_default(),
1✔
478
        )
1✔
479
        .await?;
1✔
480

1✔
481
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
482

1✔
483
        let query_rectangle = VectorQueryRectangle {
1✔
484
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
485
            time_interval: TimeInterval::default(),
1✔
486
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
487
            attributes: ColumnSelection::all(),
1✔
488
        };
1✔
489
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
490

1✔
491
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
492

1✔
493
        let result = query
1✔
494
            .map(Result::unwrap)
1✔
495
            .collect::<Vec<MultiPointCollection>>()
1✔
496
            .await;
1✔
497

1✔
498
        assert_eq!(result.len(), 1);
1✔
499

1✔
500
        assert!(result[0].chunks_equal_ignoring_cache_hint(&points));
1✔
501

1✔
502
        Ok(())
1✔
503
    }
1✔
504

505
    #[tokio::test]
506
    async fn empty() -> Result<()> {
1✔
507
        let points = MultiPointCollection::from_data(
1✔
508
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
509
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
510
            Default::default(),
1✔
511
            CacheHint::default(),
1✔
512
        )?;
1✔
513

1✔
514
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
515

1✔
516
        let polygon_source =
1✔
517
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
518
                vec![],
1✔
519
                vec![],
1✔
520
                Default::default(),
1✔
521
                CacheHint::default(),
1✔
522
            )?)
1✔
523
            .boxed();
1✔
524

1✔
525
        let operator = PointInPolygonFilter {
1✔
526
            params: PointInPolygonFilterParams {},
1✔
527
            sources: PointInPolygonFilterSource {
1✔
528
                points: point_source,
1✔
529
                polygons: polygon_source,
1✔
530
            },
1✔
531
        }
1✔
532
        .boxed()
1✔
533
        .initialize(
1✔
534
            WorkflowOperatorPath::initialize_root(),
1✔
535
            &MockExecutionContext::test_default(),
1✔
536
        )
1✔
537
        .await?;
1✔
538

1✔
539
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
540

1✔
541
        let query_rectangle = VectorQueryRectangle {
1✔
542
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
543
            time_interval: TimeInterval::default(),
1✔
544
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
545
            attributes: ColumnSelection::all(),
1✔
546
        };
1✔
547
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
548

1✔
549
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
550

1✔
551
        let result = query
1✔
552
            .map(Result::unwrap)
1✔
553
            .collect::<Vec<MultiPointCollection>>()
1✔
554
            .await;
1✔
555

1✔
556
        assert_eq!(result.len(), 1);
1✔
557
        assert!(result[0].chunks_equal_ignoring_cache_hint(&MultiPointCollection::empty()));
1✔
558

1✔
559
        Ok(())
1✔
560
    }
1✔
561

562
    #[tokio::test]
563
    async fn time() -> Result<()> {
1✔
564
        let points = MultiPointCollection::from_data(
1✔
565
            MultiPoint::many(vec![(1.0, 1.1), (2.0, 2.1), (3.0, 3.1)]).unwrap(),
1✔
566
            vec![
1✔
567
                TimeInterval::new(0, 1)?,
1✔
568
                TimeInterval::new(5, 6)?,
1✔
569
                TimeInterval::new(0, 5)?,
1✔
570
            ],
1✔
571
            Default::default(),
1✔
572
            CacheHint::default(),
1✔
573
        )?;
1✔
574

1✔
575
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
576

1✔
577
        let polygon = MultiPolygon::new(vec![vec![vec![
1✔
578
            (0.0, 0.0).into(),
1✔
579
            (10.0, 0.0).into(),
1✔
580
            (10.0, 10.0).into(),
1✔
581
            (0.0, 10.0).into(),
1✔
582
            (0.0, 0.0).into(),
1✔
583
        ]]])?;
1✔
584

1✔
585
        let polygon_source =
1✔
586
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
587
                vec![polygon.clone(), polygon],
1✔
588
                vec![TimeInterval::new(0, 1)?, TimeInterval::new(1, 2)?],
1✔
589
                Default::default(),
1✔
590
                CacheHint::default(),
1✔
591
            )?)
1✔
592
            .boxed();
1✔
593

1✔
594
        let operator = PointInPolygonFilter {
1✔
595
            params: PointInPolygonFilterParams {},
1✔
596
            sources: PointInPolygonFilterSource {
1✔
597
                points: point_source,
1✔
598
                polygons: polygon_source,
1✔
599
            },
1✔
600
        }
1✔
601
        .boxed()
1✔
602
        .initialize(
1✔
603
            WorkflowOperatorPath::initialize_root(),
1✔
604
            &MockExecutionContext::test_default(),
1✔
605
        )
1✔
606
        .await?;
1✔
607

1✔
608
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
609

1✔
610
        let query_rectangle = VectorQueryRectangle {
1✔
611
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
612
            time_interval: TimeInterval::default(),
1✔
613
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
614
            attributes: ColumnSelection::all(),
1✔
615
        };
1✔
616
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
617

1✔
618
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
619

1✔
620
        let result = query
1✔
621
            .map(Result::unwrap)
1✔
622
            .collect::<Vec<MultiPointCollection>>()
1✔
623
            .await;
1✔
624

1✔
625
        assert_eq!(result.len(), 1);
1✔
626

1✔
627
        assert!(
1✔
628
            result[0].chunks_equal_ignoring_cache_hint(&points.filter(vec![true, false, true])?)
1✔
629
        );
1✔
630

1✔
631
        Ok(())
1✔
632
    }
1✔
633

634
    // #[async::main]
635
    #[tokio::test]
636
    async fn multiple_inputs() -> Result<()> {
1✔
637
        let points1 = MultiPointCollection::from_data(
1✔
638
            MultiPoint::many(vec![(5.0, 5.1), (15.0, 15.1)]).unwrap(),
1✔
639
            vec![TimeInterval::new(0, 1)?; 2],
1✔
640
            Default::default(),
1✔
641
            CacheHint::default(),
1✔
642
        )?;
1✔
643
        let points2 = MultiPointCollection::from_data(
1✔
644
            MultiPoint::many(vec![(6.0, 6.1), (16.0, 16.1)]).unwrap(),
1✔
645
            vec![TimeInterval::new(1, 2)?; 2],
1✔
646
            Default::default(),
1✔
647
            CacheHint::default(),
1✔
648
        )?;
1✔
649

1✔
650
        let point_source =
1✔
651
            MockFeatureCollectionSource::multiple(vec![points1.clone(), points2.clone()]).boxed();
1✔
652

1✔
653
        let polygon1 = MultiPolygon::new(vec![vec![vec![
1✔
654
            (0.0, 0.0).into(),
1✔
655
            (10.0, 0.0).into(),
1✔
656
            (10.0, 10.0).into(),
1✔
657
            (0.0, 10.0).into(),
1✔
658
            (0.0, 0.0).into(),
1✔
659
        ]]])?;
1✔
660
        let polygon2 = MultiPolygon::new(vec![vec![vec![
1✔
661
            (10.0, 10.0).into(),
1✔
662
            (20.0, 10.0).into(),
1✔
663
            (20.0, 20.0).into(),
1✔
664
            (10.0, 20.0).into(),
1✔
665
            (10.0, 10.0).into(),
1✔
666
        ]]])?;
1✔
667

1✔
668
        let polygon_source = MockFeatureCollectionSource::multiple(vec![
1✔
669
            MultiPolygonCollection::from_data(
1✔
670
                vec![polygon1.clone()],
1✔
671
                vec![TimeInterval::new(0, 1)?],
1✔
672
                Default::default(),
1✔
673
                CacheHint::default(),
1✔
674
            )?,
1✔
675
            MultiPolygonCollection::from_data(
1✔
676
                vec![polygon1, polygon2],
1✔
677
                vec![TimeInterval::new(1, 2)?, TimeInterval::new(1, 2)?],
1✔
678
                Default::default(),
1✔
679
                CacheHint::default(),
1✔
680
            )?,
1✔
681
        ])
1✔
682
        .boxed();
1✔
683

1✔
684
        let operator = PointInPolygonFilter {
1✔
685
            params: PointInPolygonFilterParams {},
1✔
686
            sources: PointInPolygonFilterSource {
1✔
687
                points: point_source,
1✔
688
                polygons: polygon_source,
1✔
689
            },
1✔
690
        }
1✔
691
        .boxed()
1✔
692
        .initialize(
1✔
693
            WorkflowOperatorPath::initialize_root(),
1✔
694
            &MockExecutionContext::test_default(),
1✔
695
        )
1✔
696
        .await?;
1✔
697

1✔
698
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
699

1✔
700
        let query_rectangle = VectorQueryRectangle {
1✔
701
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
702
            time_interval: TimeInterval::default(),
1✔
703
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
704
            attributes: ColumnSelection::all(),
1✔
705
        };
1✔
706

1✔
707
        let ctx_one_chunk = MockQueryContext::new(ChunkByteSize::MAX);
1✔
708
        let ctx_minimal_chunks = MockQueryContext::new(ChunkByteSize::MIN);
1✔
709

1✔
710
        let query = query_processor
1✔
711
            .query(query_rectangle.clone(), &ctx_minimal_chunks)
1✔
712
            .await
1✔
713
            .unwrap();
1✔
714

1✔
715
        let result = query
1✔
716
            .map(Result::unwrap)
1✔
717
            .collect::<Vec<MultiPointCollection>>()
1✔
718
            .await;
4✔
719

1✔
720
        assert_eq!(result.len(), 2);
1✔
721

1✔
722
        assert!(result[0].chunks_equal_ignoring_cache_hint(&points1.filter(vec![true, false])?));
1✔
723
        assert!(result[1].chunks_equal_ignoring_cache_hint(&points2));
1✔
724

1✔
725
        let query = query_processor
1✔
726
            .query(query_rectangle, &ctx_one_chunk)
1✔
727
            .await
1✔
728
            .unwrap();
1✔
729

1✔
730
        let result = query
1✔
731
            .map(Result::unwrap)
1✔
732
            .collect::<Vec<MultiPointCollection>>()
1✔
733
            .await;
4✔
734

1✔
735
        assert_eq!(result.len(), 1);
1✔
736

1✔
737
        assert!(result[0].chunks_equal_ignoring_cache_hint(
1✔
738
            &points1.filter(vec![true, false])?.append(&points2)?
1✔
739
        ));
1✔
740

1✔
741
        Ok(())
1✔
742
    }
1✔
743

744
    #[tokio::test]
745
    async fn empty_points() {
1✔
746
        let point_collection = MultiPointCollection::from_data(
1✔
747
            vec![],
1✔
748
            vec![],
1✔
749
            Default::default(),
1✔
750
            CacheHint::default(),
1✔
751
        )
1✔
752
        .unwrap();
1✔
753

1✔
754
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
755
            vec![MultiPolygon::new(vec![vec![vec![
1✔
756
                (0.0, 0.0).into(),
1✔
757
                (10.0, 0.0).into(),
1✔
758
                (10.0, 10.0).into(),
1✔
759
                (0.0, 10.0).into(),
1✔
760
                (0.0, 0.0).into(),
1✔
761
            ]]])
1✔
762
            .unwrap()],
1✔
763
            vec![TimeInterval::default()],
1✔
764
            Default::default(),
1✔
765
            CacheHint::default(),
1✔
766
        )
1✔
767
        .unwrap();
1✔
768

1✔
769
        let operator = PointInPolygonFilter {
1✔
770
            params: PointInPolygonFilterParams {},
1✔
771
            sources: PointInPolygonFilterSource {
1✔
772
                points: MockFeatureCollectionSource::single(point_collection).boxed(),
1✔
773
                polygons: MockFeatureCollectionSource::single(polygon_collection).boxed(),
1✔
774
            },
1✔
775
        }
1✔
776
        .boxed()
1✔
777
        .initialize(
1✔
778
            WorkflowOperatorPath::initialize_root(),
1✔
779
            &MockExecutionContext::test_default(),
1✔
780
        )
1✔
781
        .await
1✔
782
        .unwrap();
1✔
783

1✔
784
        let query_rectangle = VectorQueryRectangle {
1✔
785
            spatial_bounds: BoundingBox2D::new((-10., -10.).into(), (10., 10.).into()).unwrap(),
1✔
786
            time_interval: TimeInterval::default(),
1✔
787
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
788
            attributes: ColumnSelection::all(),
1✔
789
        };
1✔
790

1✔
791
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
792

1✔
793
        let query_context = MockQueryContext::test_default();
1✔
794

1✔
795
        let query = query_processor
1✔
796
            .query(query_rectangle, &query_context)
1✔
797
            .await
1✔
798
            .unwrap();
1✔
799

1✔
800
        let result = query
1✔
801
            .map(Result::unwrap)
1✔
802
            .collect::<Vec<MultiPointCollection>>()
1✔
803
            .await;
1✔
804

1✔
805
        assert_eq!(result.len(), 1);
1✔
806
        assert!(result[0].chunks_equal_ignoring_cache_hint(&MultiPointCollection::empty()));
1✔
807
    }
1✔
808

809
    #[tokio::test]
810
    async fn it_checks_sref() {
1✔
811
        let point_collection = MultiPointCollection::from_data(
1✔
812
            vec![],
1✔
813
            vec![],
1✔
814
            Default::default(),
1✔
815
            CacheHint::default(),
1✔
816
        )
1✔
817
        .unwrap();
1✔
818

1✔
819
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
820
            vec![MultiPolygon::new(vec![vec![vec![
1✔
821
                (0.0, 0.0).into(),
1✔
822
                (10.0, 0.0).into(),
1✔
823
                (10.0, 10.0).into(),
1✔
824
                (0.0, 10.0).into(),
1✔
825
                (0.0, 0.0).into(),
1✔
826
            ]]])
1✔
827
            .unwrap()],
1✔
828
            vec![TimeInterval::default()],
1✔
829
            Default::default(),
1✔
830
            CacheHint::default(),
1✔
831
        )
1✔
832
        .unwrap();
1✔
833

1✔
834
        let operator = PointInPolygonFilter {
1✔
835
            params: PointInPolygonFilterParams {},
1✔
836
            sources: PointInPolygonFilterSource {
1✔
837
                points: MockFeatureCollectionSource::with_collections_and_sref(
1✔
838
                    vec![point_collection],
1✔
839
                    SpatialReference::epsg_4326(),
1✔
840
                )
1✔
841
                .boxed(),
1✔
842
                polygons: MockFeatureCollectionSource::with_collections_and_sref(
1✔
843
                    vec![polygon_collection],
1✔
844
                    SpatialReference::from_str("EPSG:3857").unwrap(),
1✔
845
                )
1✔
846
                .boxed(),
1✔
847
            },
1✔
848
        }
1✔
849
        .boxed()
1✔
850
        .initialize(
1✔
851
            WorkflowOperatorPath::initialize_root(),
1✔
852
            &MockExecutionContext::test_default(),
1✔
853
        )
1✔
854
        .await;
1✔
855

1✔
856
        assert!(matches!(
1✔
857
            operator,
1✔
858
            Err(Error::InvalidSpatialReference {
1✔
859
                expected: _,
1✔
860
                found: _,
1✔
861
            })
1✔
862
        ));
1✔
863
    }
1✔
864
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc