• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.47
/operators/src/processing/point_in_polygon.rs
1
mod tester;
2
mod wrapper;
3

4
use std::cmp::min;
5
use std::sync::Arc;
6

7
use futures::stream::BoxStream;
8
use futures::{StreamExt, TryStreamExt};
9
use geoengine_datatypes::dataset::NamedData;
10
use geoengine_datatypes::primitives::CacheHint;
11
use geoengine_datatypes::primitives::VectorQueryRectangle;
12
use rayon::ThreadPool;
13
use serde::{Deserialize, Serialize};
14
use snafu::ensure;
15

16
use crate::adapters::FeatureCollectionChunkMerger;
17
use crate::engine::{
18
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
19
    OperatorName, QueryContext, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
20
    VectorResultDescriptor, WorkflowOperatorPath,
21
};
22
use crate::engine::{OperatorData, QueryProcessor};
23
use crate::error::{self, Error};
24
use crate::util::Result;
25
use arrow::array::BooleanArray;
26
use async_trait::async_trait;
27
use geoengine_datatypes::collections::{
28
    FeatureCollectionInfos, FeatureCollectionModifications, GeometryCollection,
29
    MultiPointCollection, MultiPolygonCollection, VectorDataType,
30
};
31
pub use tester::PointInPolygonTester;
32
pub use wrapper::PointInPolygonTesterWithCollection;
33

34
/// The point in polygon filter requires two inputs in the following order:
35
/// 1. a `MultiPointCollection` source
36
/// 2. a `MultiPolygonCollection` source
37
///     Then, it filters the `MultiPolygonCollection`s so that only those features are retained that are in any polygon.
38
pub type PointInPolygonFilter = Operator<PointInPolygonFilterParams, PointInPolygonFilterSource>;
39

40
impl OperatorName for PointInPolygonFilter {
41
    const TYPE_NAME: &'static str = "PointInPolygonFilter";
42
}
43

44
#[derive(Debug, Clone, Deserialize, Serialize)]
45
pub struct PointInPolygonFilterParams {}
46

47
#[derive(Debug, Clone, Deserialize, Serialize)]
48
pub struct PointInPolygonFilterSource {
49
    pub points: Box<dyn VectorOperator>,
50
    pub polygons: Box<dyn VectorOperator>,
51
}
52

53
impl OperatorData for PointInPolygonFilterSource {
54
    fn data_names_collect(&self, data_names: &mut Vec<NamedData>) {
×
55
        self.points.data_names_collect(data_names);
×
56
        self.polygons.data_names_collect(data_names);
×
57
    }
×
58
}
59

60
struct InitializedPointInPolygonFilterSource {
61
    points: Box<dyn InitializedVectorOperator>,
62
    polygons: Box<dyn InitializedVectorOperator>,
63
}
64

65
#[async_trait]
66
impl InitializedSources<InitializedPointInPolygonFilterSource> for PointInPolygonFilterSource {
67
    async fn initialize_sources(
68
        self,
69
        path: WorkflowOperatorPath,
70
        context: &dyn ExecutionContext,
71
    ) -> Result<InitializedPointInPolygonFilterSource> {
6✔
72
        let points_path = path.clone_and_append(0);
6✔
73
        let polygons_path = path.clone_and_append(1);
6✔
74

6✔
75
        Ok(InitializedPointInPolygonFilterSource {
6✔
76
            points: self.points.initialize(points_path, context).await?,
6✔
77
            polygons: self.polygons.initialize(polygons_path, context).await?,
6✔
78
        })
79
    }
12✔
80
}
81

82
#[typetag::serde]
×
83
#[async_trait]
84
impl VectorOperator for PointInPolygonFilter {
85
    async fn _initialize(
86
        self: Box<Self>,
87
        path: WorkflowOperatorPath,
88
        context: &dyn ExecutionContext,
89
    ) -> Result<Box<dyn InitializedVectorOperator>> {
6✔
90
        let name = CanonicOperatorName::from(&self);
6✔
91

92
        let initialized_source = self
6✔
93
            .sources
6✔
94
            .initialize_sources(path.clone(), context)
6✔
95
            .await?;
6✔
96

97
        let points_rd = initialized_source.points.result_descriptor();
6✔
98
        let polygons_rd = initialized_source.polygons.result_descriptor();
6✔
99

6✔
100
        ensure!(
6✔
101
            points_rd.data_type == VectorDataType::MultiPoint,
6✔
102
            error::InvalidType {
×
103
                expected: VectorDataType::MultiPoint.to_string(),
×
104
                found: points_rd.data_type.to_string(),
×
105
            }
×
106
        );
107
        ensure!(
6✔
108
            polygons_rd.data_type == VectorDataType::MultiPolygon,
6✔
109
            error::InvalidType {
×
110
                expected: VectorDataType::MultiPolygon.to_string(),
×
111
                found: polygons_rd.data_type.to_string(),
×
112
            }
×
113
        );
114

115
        ensure!(
6✔
116
            points_rd.spatial_reference == polygons_rd.spatial_reference,
6✔
117
            crate::error::InvalidSpatialReference {
1✔
118
                expected: points_rd.spatial_reference,
1✔
119
                found: polygons_rd.spatial_reference,
1✔
120
            }
1✔
121
        );
122

123
        // We use the result descriptor of the points because in the worst case no feature will be excluded.
124
        // We cannot use the polygon bbox because a `MultiPoint` could have one point within a polygon (and
125
        // thus be included in the result) and one point outside of the bbox of the polygons.
126
        let out_desc = initialized_source.points.result_descriptor().clone();
5✔
127

5✔
128
        let initialized_operator = InitializedPointInPolygonFilter {
5✔
129
            name,
5✔
130
            path,
5✔
131
            result_descriptor: out_desc,
5✔
132
            points: initialized_source.points,
5✔
133
            polygons: initialized_source.polygons,
5✔
134
        };
5✔
135

5✔
136
        Ok(initialized_operator.boxed())
5✔
137
    }
12✔
138

139
    span_fn!(PointInPolygonFilter);
140
}
141

142
pub struct InitializedPointInPolygonFilter {
143
    name: CanonicOperatorName,
144
    path: WorkflowOperatorPath,
145
    points: Box<dyn InitializedVectorOperator>,
146
    polygons: Box<dyn InitializedVectorOperator>,
147
    result_descriptor: VectorResultDescriptor,
148
}
149

150
impl InitializedVectorOperator for InitializedPointInPolygonFilter {
151
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
152
        let point_processor = self
5✔
153
            .points
5✔
154
            .query_processor()?
5✔
155
            .multi_point()
5✔
156
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
157

158
        let polygon_processor = self
5✔
159
            .polygons
5✔
160
            .query_processor()?
5✔
161
            .multi_polygon()
5✔
162
            .expect("checked in `PointInPolygonFilter` constructor");
5✔
163

5✔
164
        Ok(TypedVectorQueryProcessor::MultiPoint(
5✔
165
            PointInPolygonFilterProcessor::new(
5✔
166
                self.result_descriptor.clone(),
5✔
167
                point_processor,
5✔
168
                polygon_processor,
5✔
169
            )
5✔
170
            .boxed(),
5✔
171
        ))
5✔
172
    }
5✔
173

174
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
175
        &self.result_descriptor
×
176
    }
×
177

178
    fn canonic_name(&self) -> CanonicOperatorName {
×
179
        self.name.clone()
×
180
    }
×
181

NEW
182
    fn name(&self) -> &'static str {
×
NEW
183
        PointInPolygonFilter::TYPE_NAME
×
NEW
184
    }
×
185

NEW
186
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
187
        self.path.clone()
×
NEW
188
    }
×
189

190
    fn boxed(self) -> Box<dyn InitializedVectorOperator>
5✔
191
    where
5✔
192
        Self: Sized + 'static,
5✔
193
    {
5✔
194
        Box::new(self)
5✔
195
    }
5✔
196
}
197
pub struct PointInPolygonFilterProcessor {
198
    result_descriptor: VectorResultDescriptor,
199
    points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
200
    polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
201
}
202

203
impl PointInPolygonFilterProcessor {
204
    pub fn new(
5✔
205
        result_descriptor: VectorResultDescriptor,
5✔
206
        points: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
5✔
207
        polygons: Box<dyn VectorQueryProcessor<VectorType = MultiPolygonCollection>>,
5✔
208
    ) -> Self {
5✔
209
        Self {
5✔
210
            result_descriptor,
5✔
211
            points,
5✔
212
            polygons,
5✔
213
        }
5✔
214
    }
5✔
215

216
    fn filter_parallel(
10✔
217
        points: &Arc<MultiPointCollection>,
10✔
218
        polygons: &MultiPolygonCollection,
10✔
219
        thread_pool: &ThreadPool,
10✔
220
    ) -> Vec<bool> {
10✔
221
        debug_assert!(!points.is_empty());
10✔
222

223
        // TODO: parallelize over coordinate rather than features
224

225
        let tester = Arc::new(PointInPolygonTester::new(polygons)); // TODO: multithread
10✔
226

10✔
227
        let parallelism = thread_pool.current_num_threads();
10✔
228
        let chunk_size = (points.len() as f64 / parallelism as f64).ceil() as usize;
10✔
229

10✔
230
        let mut result = vec![false; points.len()];
10✔
231

10✔
232
        thread_pool.scope(|scope| {
10✔
233
            let num_features = points.len();
10✔
234
            let feature_offsets = points.feature_offsets();
10✔
235
            let time_intervals = points.time_intervals();
10✔
236
            let coordinates = points.coordinates();
10✔
237

238
            for (chunk_index, chunk_result) in result.chunks_mut(chunk_size).enumerate() {
22✔
239
                let feature_index_start = chunk_index * chunk_size;
22✔
240
                let features_index_end = min(feature_index_start + chunk_size, num_features);
22✔
241
                let tester = tester.clone();
22✔
242

22✔
243
                scope.spawn(move |_| {
22✔
244
                    for (
245
                        feature_index,
22✔
246
                        ((coordinates_start_index, coordinates_end_index), time_interval),
22✔
247
                    ) in two_tuple_windows(
22✔
248
                        feature_offsets[feature_index_start..=features_index_end]
22✔
249
                            .iter()
22✔
250
                            .map(|&c| c as usize),
44✔
251
                    )
22✔
252
                    .zip(time_intervals[feature_index_start..features_index_end].iter())
22✔
253
                    .enumerate()
22✔
254
                    {
22✔
255
                        let is_multi_point_in_polygon_collection = coordinates
22✔
256
                            [coordinates_start_index..coordinates_end_index]
22✔
257
                            .iter()
22✔
258
                            .any(|coordinate| {
22✔
259
                                tester.any_polygon_contains_coordinate(coordinate, time_interval)
22✔
260
                            });
22✔
261

22✔
262
                        chunk_result[feature_index] = is_multi_point_in_polygon_collection;
22✔
263
                    }
22✔
264
                });
22✔
265
            }
22✔
266
        });
10✔
267

10✔
268
        result
10✔
269
    }
10✔
270

271
    async fn filter_points(
10✔
272
        ctx: &dyn QueryContext,
10✔
273
        points: Arc<MultiPointCollection>,
10✔
274
        polygons: MultiPolygonCollection,
10✔
275
        initial_filter: &BooleanArray,
10✔
276
    ) -> Result<BooleanArray> {
10✔
277
        let thread_pool = ctx.thread_pool().clone();
10✔
278

10✔
279
        let thread_points = points.clone();
10✔
280
        let filter = crate::util::spawn_blocking(move || {
10✔
281
            Self::filter_parallel(&thread_points, &polygons, &thread_pool)
10✔
282
        })
10✔
283
        .await?;
10✔
284

285
        arrow::compute::or(initial_filter, &filter.into()).map_err(Into::into)
10✔
286
    }
10✔
287
}
288

289
#[async_trait]
290
impl VectorQueryProcessor for PointInPolygonFilterProcessor {
291
    type VectorType = MultiPointCollection;
292

293
    async fn vector_query<'a>(
294
        &'a self,
295
        query: VectorQueryRectangle,
296
        ctx: &'a dyn QueryContext,
297
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
6✔
298
        let filtered_stream =
6✔
299
            self.points
6✔
300
                .query(query.clone(), ctx)
6✔
301
                .await?
6✔
302
                .and_then(move |points| {
8✔
303
                    let query: geoengine_datatypes::primitives::QueryRectangle<
8✔
304
                        geoengine_datatypes::primitives::BoundingBox2D,
8✔
305
                        geoengine_datatypes::primitives::ColumnSelection,
8✔
306
                    > = query.clone();
8✔
307
                    async move {
8✔
308
                        if points.is_empty() {
8✔
309
                            return Ok(points);
1✔
310
                        }
7✔
311

7✔
312
                        let initial_filter = BooleanArray::from(vec![false; points.len()]);
7✔
313
                        let arc_points = Arc::new(points);
7✔
314

315
                        let (filter, cache_hint) = self
7✔
316
                            .polygons
7✔
317
                            .query(query.clone(), ctx)
7✔
318
                            .await?
7✔
319
                            .try_fold(
7✔
320
                                (initial_filter, CacheHint::max_duration()),
7✔
321
                                |acc, polygons| {
11✔
322
                                    let arc_points = arc_points.clone();
11✔
323
                                    async move {
11✔
324
                                        let (filter, mut cache_hint) = acc;
11✔
325
                                        let polygons = polygons;
11✔
326

11✔
327
                                        cache_hint.merge_with(&polygons.cache_hint);
11✔
328

11✔
329
                                        if polygons.is_empty() {
11✔
330
                                            return Ok((filter, cache_hint));
1✔
331
                                        }
10✔
332

10✔
333
                                        Ok((
10✔
334
                                            Self::filter_points(
10✔
335
                                                ctx,
10✔
336
                                                arc_points.clone(),
10✔
337
                                                polygons,
10✔
338
                                                &filter,
10✔
339
                                            )
10✔
340
                                            .await?,
10✔
341
                                            cache_hint,
10✔
342
                                        ))
343
                                    }
11✔
344
                                },
11✔
345
                            )
7✔
346
                            .await?;
7✔
347

348
                        let mut new_points =
7✔
349
                            arc_points.filter(filter).map_err(Into::<Error>::into)?;
7✔
350
                        new_points.cache_hint = cache_hint;
7✔
351

7✔
352
                        Ok(new_points)
7✔
353
                    }
8✔
354
                });
8✔
355

6✔
356
        Ok(
6✔
357
            FeatureCollectionChunkMerger::new(filtered_stream.fuse(), ctx.chunk_byte_size().into())
6✔
358
                .boxed(),
6✔
359
        )
6✔
360
    }
12✔
361

362
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
6✔
363
        &self.result_descriptor
6✔
364
    }
6✔
365
}
366

367
/// Loop through an iterator by yielding the current and previous tuple. Starts with the
368
/// (first, second) item, so the iterator must have more than one item to create an output.
369
fn two_tuple_windows<I, T>(mut iter: I) -> impl Iterator<Item = (T, T)>
22✔
370
where
22✔
371
    I: Iterator<Item = T>,
22✔
372
    T: Copy,
22✔
373
{
22✔
374
    let mut last = iter.next();
22✔
375

22✔
376
    iter.map(move |item| {
22✔
377
        let output = (
22✔
378
            last.expect("it should have a first tuple in a two-tuple window"),
22✔
379
            item,
22✔
380
        );
22✔
381
        last = Some(item);
22✔
382
        output
22✔
383
    })
22✔
384
}
22✔
385

386
#[cfg(test)]
387
mod tests {
388

389
    use super::*;
390
    use std::str::FromStr;
391

392
    use geoengine_datatypes::collections::ChunksEqualIgnoringCacheHint;
393
    use geoengine_datatypes::primitives::{
394
        BoundingBox2D, Coordinate2D, MultiPoint, MultiPolygon, SpatialResolution, TimeInterval,
395
    };
396
    use geoengine_datatypes::primitives::{CacheHint, ColumnSelection};
397
    use geoengine_datatypes::spatial_reference::SpatialReference;
398
    use geoengine_datatypes::util::test::TestDefault;
399

400
    use crate::engine::{ChunkByteSize, MockExecutionContext, MockQueryContext};
401
    use crate::error::Error;
402
    use crate::mock::MockFeatureCollectionSource;
403

404
    #[test]
405
    fn point_in_polygon_boundary_conditions() {
1✔
406
        let collection = MultiPolygonCollection::from_data(
1✔
407
            vec![MultiPolygon::new(vec![vec![vec![
1✔
408
                (0.0, 0.0).into(),
1✔
409
                (10.0, 0.0).into(),
1✔
410
                (10.0, 10.0).into(),
1✔
411
                (0.0, 10.0).into(),
1✔
412
                (0.0, 0.0).into(),
1✔
413
            ]]])
1✔
414
            .unwrap()],
1✔
415
            vec![Default::default(); 1],
1✔
416
            Default::default(),
1✔
417
            CacheHint::default(),
1✔
418
        )
1✔
419
        .unwrap();
1✔
420

1✔
421
        let tester = PointInPolygonTester::new(&collection);
1✔
422

1✔
423
        // the algorithm is not stable for boundary cases directly on the edges
1✔
424

1✔
425
        assert!(tester.any_polygon_contains_coordinate(
1✔
426
            &Coordinate2D::new(0.000_001, 0.000_001),
1✔
427
            &Default::default()
1✔
428
        ),);
1✔
429
        assert!(tester.any_polygon_contains_coordinate(
1✔
430
            &Coordinate2D::new(0.000_001, 0.1),
1✔
431
            &Default::default()
1✔
432
        ),);
1✔
433
        assert!(tester.any_polygon_contains_coordinate(
1✔
434
            &Coordinate2D::new(0.1, 0.000_001),
1✔
435
            &Default::default()
1✔
436
        ),);
1✔
437

438
        assert!(tester
1✔
439
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 9.9), &Default::default()),);
1✔
440
        assert!(tester
1✔
441
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.0, 9.9), &Default::default()),);
1✔
442
        assert!(tester
1✔
443
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 10.0), &Default::default()),);
1✔
444

445
        assert!(!tester
1✔
446
            .any_polygon_contains_coordinate(&Coordinate2D::new(-0.1, -0.1), &Default::default()),);
1✔
447
        assert!(!tester
1✔
448
            .any_polygon_contains_coordinate(&Coordinate2D::new(0.0, -0.1), &Default::default()),);
1✔
449
        assert!(!tester
1✔
450
            .any_polygon_contains_coordinate(&Coordinate2D::new(-0.1, 0.0), &Default::default()),);
1✔
451

452
        assert!(!tester
1✔
453
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.1, 10.1), &Default::default()),);
1✔
454
        assert!(!tester
1✔
455
            .any_polygon_contains_coordinate(&Coordinate2D::new(10.1, 9.9), &Default::default()),);
1✔
456
        assert!(!tester
1✔
457
            .any_polygon_contains_coordinate(&Coordinate2D::new(9.9, 10.1), &Default::default()),);
1✔
458
    }
1✔
459

460
    #[tokio::test]
461
    async fn all() -> Result<()> {
1✔
462
        let points = MultiPointCollection::from_data(
1✔
463
            MultiPoint::many(vec![(0.001, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
464
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
465
            Default::default(),
1✔
466
            CacheHint::default(),
1✔
467
        )?;
1✔
468

1✔
469
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
470

1✔
471
        let polygon_source =
1✔
472
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
473
                vec![MultiPolygon::new(vec![vec![vec![
1✔
474
                    (0.0, 0.0).into(),
1✔
475
                    (10.0, 0.0).into(),
1✔
476
                    (10.0, 10.0).into(),
1✔
477
                    (0.0, 10.0).into(),
1✔
478
                    (0.0, 0.0).into(),
1✔
479
                ]]])?],
1✔
480
                vec![TimeInterval::new_unchecked(0, 1); 1],
1✔
481
                Default::default(),
1✔
482
                CacheHint::default(),
1✔
483
            )?)
1✔
484
            .boxed();
1✔
485

1✔
486
        let operator = PointInPolygonFilter {
1✔
487
            params: PointInPolygonFilterParams {},
1✔
488
            sources: PointInPolygonFilterSource {
1✔
489
                points: point_source,
1✔
490
                polygons: polygon_source,
1✔
491
            },
1✔
492
        }
1✔
493
        .boxed()
1✔
494
        .initialize(
1✔
495
            WorkflowOperatorPath::initialize_root(),
1✔
496
            &MockExecutionContext::test_default(),
1✔
497
        )
1✔
498
        .await?;
1✔
499

1✔
500
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
501

1✔
502
        let query_rectangle = VectorQueryRectangle {
1✔
503
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
504
            time_interval: TimeInterval::default(),
1✔
505
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
506
            attributes: ColumnSelection::all(),
1✔
507
        };
1✔
508
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
509

1✔
510
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
511

1✔
512
        let result = query
1✔
513
            .map(Result::unwrap)
1✔
514
            .collect::<Vec<MultiPointCollection>>()
1✔
515
            .await;
1✔
516

1✔
517
        assert_eq!(result.len(), 1);
1✔
518

1✔
519
        assert!(result[0].chunks_equal_ignoring_cache_hint(&points));
1✔
520

1✔
521
        Ok(())
1✔
522
    }
1✔
523

524
    #[tokio::test]
525
    async fn empty() -> Result<()> {
1✔
526
        let points = MultiPointCollection::from_data(
1✔
527
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)]).unwrap(),
1✔
528
            vec![TimeInterval::new_unchecked(0, 1); 3],
1✔
529
            Default::default(),
1✔
530
            CacheHint::default(),
1✔
531
        )?;
1✔
532

1✔
533
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
534

1✔
535
        let polygon_source =
1✔
536
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
537
                vec![],
1✔
538
                vec![],
1✔
539
                Default::default(),
1✔
540
                CacheHint::default(),
1✔
541
            )?)
1✔
542
            .boxed();
1✔
543

1✔
544
        let operator = PointInPolygonFilter {
1✔
545
            params: PointInPolygonFilterParams {},
1✔
546
            sources: PointInPolygonFilterSource {
1✔
547
                points: point_source,
1✔
548
                polygons: polygon_source,
1✔
549
            },
1✔
550
        }
1✔
551
        .boxed()
1✔
552
        .initialize(
1✔
553
            WorkflowOperatorPath::initialize_root(),
1✔
554
            &MockExecutionContext::test_default(),
1✔
555
        )
1✔
556
        .await?;
1✔
557

1✔
558
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
559

1✔
560
        let query_rectangle = VectorQueryRectangle {
1✔
561
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
562
            time_interval: TimeInterval::default(),
1✔
563
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
564
            attributes: ColumnSelection::all(),
1✔
565
        };
1✔
566
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
567

1✔
568
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
569

1✔
570
        let result = query
1✔
571
            .map(Result::unwrap)
1✔
572
            .collect::<Vec<MultiPointCollection>>()
1✔
573
            .await;
1✔
574

1✔
575
        assert_eq!(result.len(), 1);
1✔
576
        assert!(result[0].chunks_equal_ignoring_cache_hint(&MultiPointCollection::empty()));
1✔
577

1✔
578
        Ok(())
1✔
579
    }
1✔
580

581
    #[tokio::test]
582
    async fn time() -> Result<()> {
1✔
583
        let points = MultiPointCollection::from_data(
1✔
584
            MultiPoint::many(vec![(1.0, 1.1), (2.0, 2.1), (3.0, 3.1)]).unwrap(),
1✔
585
            vec![
1✔
586
                TimeInterval::new(0, 1)?,
1✔
587
                TimeInterval::new(5, 6)?,
1✔
588
                TimeInterval::new(0, 5)?,
1✔
589
            ],
1✔
590
            Default::default(),
1✔
591
            CacheHint::default(),
1✔
592
        )?;
1✔
593

1✔
594
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
595

1✔
596
        let polygon = MultiPolygon::new(vec![vec![vec![
1✔
597
            (0.0, 0.0).into(),
1✔
598
            (10.0, 0.0).into(),
1✔
599
            (10.0, 10.0).into(),
1✔
600
            (0.0, 10.0).into(),
1✔
601
            (0.0, 0.0).into(),
1✔
602
        ]]])?;
1✔
603

1✔
604
        let polygon_source =
1✔
605
            MockFeatureCollectionSource::single(MultiPolygonCollection::from_data(
1✔
606
                vec![polygon.clone(), polygon],
1✔
607
                vec![TimeInterval::new(0, 1)?, TimeInterval::new(1, 2)?],
1✔
608
                Default::default(),
1✔
609
                CacheHint::default(),
1✔
610
            )?)
1✔
611
            .boxed();
1✔
612

1✔
613
        let operator = PointInPolygonFilter {
1✔
614
            params: PointInPolygonFilterParams {},
1✔
615
            sources: PointInPolygonFilterSource {
1✔
616
                points: point_source,
1✔
617
                polygons: polygon_source,
1✔
618
            },
1✔
619
        }
1✔
620
        .boxed()
1✔
621
        .initialize(
1✔
622
            WorkflowOperatorPath::initialize_root(),
1✔
623
            &MockExecutionContext::test_default(),
1✔
624
        )
1✔
625
        .await?;
1✔
626

1✔
627
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
628

1✔
629
        let query_rectangle = VectorQueryRectangle {
1✔
630
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
631
            time_interval: TimeInterval::default(),
1✔
632
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
633
            attributes: ColumnSelection::all(),
1✔
634
        };
1✔
635
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
636

1✔
637
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
638

1✔
639
        let result = query
1✔
640
            .map(Result::unwrap)
1✔
641
            .collect::<Vec<MultiPointCollection>>()
1✔
642
            .await;
1✔
643

1✔
644
        assert_eq!(result.len(), 1);
1✔
645

1✔
646
        assert!(
1✔
647
            result[0].chunks_equal_ignoring_cache_hint(&points.filter(vec![true, false, true])?)
1✔
648
        );
1✔
649

1✔
650
        Ok(())
1✔
651
    }
1✔
652

653
    // #[async::main]
654
    #[tokio::test]
655
    async fn multiple_inputs() -> Result<()> {
1✔
656
        let points1 = MultiPointCollection::from_data(
1✔
657
            MultiPoint::many(vec![(5.0, 5.1), (15.0, 15.1)]).unwrap(),
1✔
658
            vec![TimeInterval::new(0, 1)?; 2],
1✔
659
            Default::default(),
1✔
660
            CacheHint::default(),
1✔
661
        )?;
1✔
662
        let points2 = MultiPointCollection::from_data(
1✔
663
            MultiPoint::many(vec![(6.0, 6.1), (16.0, 16.1)]).unwrap(),
1✔
664
            vec![TimeInterval::new(1, 2)?; 2],
1✔
665
            Default::default(),
1✔
666
            CacheHint::default(),
1✔
667
        )?;
1✔
668

1✔
669
        let point_source =
1✔
670
            MockFeatureCollectionSource::multiple(vec![points1.clone(), points2.clone()]).boxed();
1✔
671

1✔
672
        let polygon1 = MultiPolygon::new(vec![vec![vec![
1✔
673
            (0.0, 0.0).into(),
1✔
674
            (10.0, 0.0).into(),
1✔
675
            (10.0, 10.0).into(),
1✔
676
            (0.0, 10.0).into(),
1✔
677
            (0.0, 0.0).into(),
1✔
678
        ]]])?;
1✔
679
        let polygon2 = MultiPolygon::new(vec![vec![vec![
1✔
680
            (10.0, 10.0).into(),
1✔
681
            (20.0, 10.0).into(),
1✔
682
            (20.0, 20.0).into(),
1✔
683
            (10.0, 20.0).into(),
1✔
684
            (10.0, 10.0).into(),
1✔
685
        ]]])?;
1✔
686

1✔
687
        let polygon_source = MockFeatureCollectionSource::multiple(vec![
1✔
688
            MultiPolygonCollection::from_data(
1✔
689
                vec![polygon1.clone()],
1✔
690
                vec![TimeInterval::new(0, 1)?],
1✔
691
                Default::default(),
1✔
692
                CacheHint::default(),
1✔
693
            )?,
1✔
694
            MultiPolygonCollection::from_data(
1✔
695
                vec![polygon1, polygon2],
1✔
696
                vec![TimeInterval::new(1, 2)?, TimeInterval::new(1, 2)?],
1✔
697
                Default::default(),
1✔
698
                CacheHint::default(),
1✔
699
            )?,
1✔
700
        ])
1✔
701
        .boxed();
1✔
702

1✔
703
        let operator = PointInPolygonFilter {
1✔
704
            params: PointInPolygonFilterParams {},
1✔
705
            sources: PointInPolygonFilterSource {
1✔
706
                points: point_source,
1✔
707
                polygons: polygon_source,
1✔
708
            },
1✔
709
        }
1✔
710
        .boxed()
1✔
711
        .initialize(
1✔
712
            WorkflowOperatorPath::initialize_root(),
1✔
713
            &MockExecutionContext::test_default(),
1✔
714
        )
1✔
715
        .await?;
1✔
716

1✔
717
        let query_processor = operator.query_processor()?.multi_point().unwrap();
1✔
718

1✔
719
        let query_rectangle = VectorQueryRectangle {
1✔
720
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
721
            time_interval: TimeInterval::default(),
1✔
722
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
723
            attributes: ColumnSelection::all(),
1✔
724
        };
1✔
725

1✔
726
        let ctx_one_chunk = MockQueryContext::new(ChunkByteSize::MAX);
1✔
727
        let ctx_minimal_chunks = MockQueryContext::new(ChunkByteSize::MIN);
1✔
728

1✔
729
        let query = query_processor
1✔
730
            .query(query_rectangle.clone(), &ctx_minimal_chunks)
1✔
731
            .await
1✔
732
            .unwrap();
1✔
733

1✔
734
        let result = query
1✔
735
            .map(Result::unwrap)
1✔
736
            .collect::<Vec<MultiPointCollection>>()
1✔
737
            .await;
1✔
738

1✔
739
        assert_eq!(result.len(), 2);
1✔
740

1✔
741
        assert!(result[0].chunks_equal_ignoring_cache_hint(&points1.filter(vec![true, false])?));
1✔
742
        assert!(result[1].chunks_equal_ignoring_cache_hint(&points2));
1✔
743

1✔
744
        let query = query_processor
1✔
745
            .query(query_rectangle, &ctx_one_chunk)
1✔
746
            .await
1✔
747
            .unwrap();
1✔
748

1✔
749
        let result = query
1✔
750
            .map(Result::unwrap)
1✔
751
            .collect::<Vec<MultiPointCollection>>()
1✔
752
            .await;
1✔
753

1✔
754
        assert_eq!(result.len(), 1);
1✔
755

1✔
756
        assert!(result[0].chunks_equal_ignoring_cache_hint(
1✔
757
            &points1.filter(vec![true, false])?.append(&points2)?
1✔
758
        ));
1✔
759

1✔
760
        Ok(())
1✔
761
    }
1✔
762

763
    #[tokio::test]
764
    async fn empty_points() {
1✔
765
        let point_collection = MultiPointCollection::from_data(
1✔
766
            vec![],
1✔
767
            vec![],
1✔
768
            Default::default(),
1✔
769
            CacheHint::default(),
1✔
770
        )
1✔
771
        .unwrap();
1✔
772

1✔
773
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
774
            vec![MultiPolygon::new(vec![vec![vec![
1✔
775
                (0.0, 0.0).into(),
1✔
776
                (10.0, 0.0).into(),
1✔
777
                (10.0, 10.0).into(),
1✔
778
                (0.0, 10.0).into(),
1✔
779
                (0.0, 0.0).into(),
1✔
780
            ]]])
1✔
781
            .unwrap()],
1✔
782
            vec![TimeInterval::default()],
1✔
783
            Default::default(),
1✔
784
            CacheHint::default(),
1✔
785
        )
1✔
786
        .unwrap();
1✔
787

1✔
788
        let operator = PointInPolygonFilter {
1✔
789
            params: PointInPolygonFilterParams {},
1✔
790
            sources: PointInPolygonFilterSource {
1✔
791
                points: MockFeatureCollectionSource::single(point_collection).boxed(),
1✔
792
                polygons: MockFeatureCollectionSource::single(polygon_collection).boxed(),
1✔
793
            },
1✔
794
        }
1✔
795
        .boxed()
1✔
796
        .initialize(
1✔
797
            WorkflowOperatorPath::initialize_root(),
1✔
798
            &MockExecutionContext::test_default(),
1✔
799
        )
1✔
800
        .await
1✔
801
        .unwrap();
1✔
802

1✔
803
        let query_rectangle = VectorQueryRectangle {
1✔
804
            spatial_bounds: BoundingBox2D::new((-10., -10.).into(), (10., 10.).into()).unwrap(),
1✔
805
            time_interval: TimeInterval::default(),
1✔
806
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
807
            attributes: ColumnSelection::all(),
1✔
808
        };
1✔
809

1✔
810
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
811

1✔
812
        let query_context = MockQueryContext::test_default();
1✔
813

1✔
814
        let query = query_processor
1✔
815
            .query(query_rectangle, &query_context)
1✔
816
            .await
1✔
817
            .unwrap();
1✔
818

1✔
819
        let result = query
1✔
820
            .map(Result::unwrap)
1✔
821
            .collect::<Vec<MultiPointCollection>>()
1✔
822
            .await;
1✔
823

1✔
824
        assert_eq!(result.len(), 1);
1✔
825
        assert!(result[0].chunks_equal_ignoring_cache_hint(&MultiPointCollection::empty()));
1✔
826
    }
1✔
827

828
    #[tokio::test]
829
    async fn it_checks_sref() {
1✔
830
        let point_collection = MultiPointCollection::from_data(
1✔
831
            vec![],
1✔
832
            vec![],
1✔
833
            Default::default(),
1✔
834
            CacheHint::default(),
1✔
835
        )
1✔
836
        .unwrap();
1✔
837

1✔
838
        let polygon_collection = MultiPolygonCollection::from_data(
1✔
839
            vec![MultiPolygon::new(vec![vec![vec![
1✔
840
                (0.0, 0.0).into(),
1✔
841
                (10.0, 0.0).into(),
1✔
842
                (10.0, 10.0).into(),
1✔
843
                (0.0, 10.0).into(),
1✔
844
                (0.0, 0.0).into(),
1✔
845
            ]]])
1✔
846
            .unwrap()],
1✔
847
            vec![TimeInterval::default()],
1✔
848
            Default::default(),
1✔
849
            CacheHint::default(),
1✔
850
        )
1✔
851
        .unwrap();
1✔
852

1✔
853
        let operator = PointInPolygonFilter {
1✔
854
            params: PointInPolygonFilterParams {},
1✔
855
            sources: PointInPolygonFilterSource {
1✔
856
                points: MockFeatureCollectionSource::with_collections_and_sref(
1✔
857
                    vec![point_collection],
1✔
858
                    SpatialReference::epsg_4326(),
1✔
859
                )
1✔
860
                .boxed(),
1✔
861
                polygons: MockFeatureCollectionSource::with_collections_and_sref(
1✔
862
                    vec![polygon_collection],
1✔
863
                    SpatialReference::from_str("EPSG:3857").unwrap(),
1✔
864
                )
1✔
865
                .boxed(),
1✔
866
            },
1✔
867
        }
1✔
868
        .boxed()
1✔
869
        .initialize(
1✔
870
            WorkflowOperatorPath::initialize_root(),
1✔
871
            &MockExecutionContext::test_default(),
1✔
872
        )
1✔
873
        .await;
1✔
874

1✔
875
        assert!(matches!(
1✔
876
            operator,
1✔
877
            Err(Error::InvalidSpatialReference {
1✔
878
                expected: _,
1✔
879
                found: _,
1✔
880
            })
1✔
881
        ));
1✔
882
    }
1✔
883
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc