• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5006008836

pending completion
5006008836

push

github

GitHub
Merge #785 #787

936 of 936 new or added lines in 50 files covered. (100.0%)

96010 of 107707 relevant lines covered (89.14%)

72676.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.71
/operators/src/processing/circle_merging_quadtree/operator.rs
1
use std::collections::{HashMap, HashSet};
2

3
use async_trait::async_trait;
4
use futures::stream::{BoxStream, FuturesUnordered};
5
use futures::StreamExt;
6
use geoengine_datatypes::collections::{
7
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
8
};
9
use geoengine_datatypes::primitives::{
10
    BoundingBox2D, Circle, FeatureDataType, FeatureDataValue, Measurement, MultiPoint,
11
    MultiPointAccess, VectorQueryRectangle,
12
};
13
use serde::{Deserialize, Serialize};
14
use snafu::ensure;
15

16
use crate::adapters::FeatureCollectionStreamExt;
17
use crate::engine::{
18
    CanonicOperatorName, ExecutionContext, InitializedVectorOperator, Operator, QueryContext,
19
    QueryProcessor, SingleVectorSource, TypedVectorQueryProcessor, VectorColumnInfo,
20
    VectorOperator, VectorQueryProcessor, VectorResultDescriptor,
21
};
22
use crate::engine::{InitializedSources, OperatorName, WorkflowOperatorPath};
23
use crate::error::{self, Error};
24
use crate::processing::circle_merging_quadtree::aggregates::MeanAggregator;
25
use crate::processing::circle_merging_quadtree::circle_of_points::CircleOfPoints;
26
use crate::processing::circle_merging_quadtree::circle_radius_model::LogScaledRadius;
27
use crate::util::Result;
28

29
use super::aggregates::{AttributeAggregate, AttributeAggregateType, StringSampler};
30
use super::circle_radius_model::CircleRadiusModel;
31
use super::grid::Grid;
32
use super::quadtree::CircleMergingQuadtree;
33

34
#[derive(Debug, Serialize, Deserialize, Clone)]
4✔
35
#[serde(rename_all = "camelCase")]
36
pub struct VisualPointClusteringParams {
37
    pub min_radius_px: f64,
38
    pub delta_px: f64,
39
    radius_column: String,
40
    count_column: String,
41
    column_aggregates: HashMap<String, AttributeAggregateDef>,
42
}
43

44
#[derive(Debug, Serialize, Deserialize, Clone)]
9✔
45
#[serde(rename_all = "camelCase")]
46
pub struct AttributeAggregateDef {
47
    pub column_name: String,
48
    pub aggregate_type: AttributeAggregateType,
49
    // if measurement is unset, it will be taken from the source result descriptor
50
    pub measurement: Option<Measurement>,
51
}
52

53
pub type VisualPointClustering = Operator<VisualPointClusteringParams, SingleVectorSource>;
54

55
impl OperatorName for VisualPointClustering {
56
    const TYPE_NAME: &'static str = "VisualPointClustering";
57
}
58

59
#[typetag::serde]
×
60
#[async_trait]
61
#[allow(clippy::too_many_lines)]
62
impl VectorOperator for VisualPointClustering {
63
    async fn _initialize(
4✔
64
        mut self: Box<Self>,
4✔
65
        path: WorkflowOperatorPath,
4✔
66
        context: &dyn ExecutionContext,
4✔
67
    ) -> Result<Box<dyn InitializedVectorOperator>> {
4✔
68
        ensure!(
4✔
69
            self.params.min_radius_px > 0.0,
4✔
70
            error::InputMustBeGreaterThanZero {
×
71
                scope: "VisualPointClustering",
×
72
                name: "min_radius_px"
×
73
            }
×
74
        );
75
        ensure!(
4✔
76
            self.params.delta_px >= 0.0,
4✔
77
            error::InputMustBeZeroOrPositive {
×
78
                scope: "VisualPointClustering",
×
79
                name: "delta_px"
×
80
            }
×
81
        );
82
        ensure!(!self.params.radius_column.is_empty(), error::EmptyInput);
4✔
83
        ensure!(
4✔
84
            self.params.count_column != self.params.radius_column,
4✔
85
            error::DuplicateOutputColumns
×
86
        );
87

88
        let name = CanonicOperatorName::from(&self);
4✔
89

90
        let radius_model = LogScaledRadius::new(self.params.min_radius_px, self.params.delta_px)?;
4✔
91

92
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
4✔
93
        let vector_source = initialized_sources.vector;
4✔
94

4✔
95
        ensure!(
4✔
96
            vector_source.result_descriptor().data_type == VectorDataType::MultiPoint,
4✔
97
            error::InvalidType {
×
98
                expected: VectorDataType::MultiPoint.to_string(),
×
99
                found: vector_source.result_descriptor().data_type.to_string(),
×
100
            }
×
101
        );
102

103
        // check that all input columns exist
104
        for column_name in self.params.column_aggregates.keys() {
4✔
105
            let column_name_exists = vector_source
3✔
106
                .result_descriptor()
3✔
107
                .columns
3✔
108
                .contains_key(column_name);
3✔
109

3✔
110
            ensure!(
3✔
111
                column_name_exists,
3✔
112
                error::MissingInputColumn {
×
113
                    name: column_name.clone()
×
114
                }
×
115
            );
116
        }
117

118
        // check that there are no duplicates in the output columns
119
        let output_names: HashSet<&String> = self
4✔
120
            .params
4✔
121
            .column_aggregates
4✔
122
            .values()
4✔
123
            .map(|def| &def.column_name)
4✔
124
            .collect();
4✔
125
        ensure!(
4✔
126
            output_names.len() == self.params.column_aggregates.len(),
4✔
127
            error::DuplicateOutputColumns
×
128
        );
129

130
        // create schema for [`ResultDescriptor`]
131
        let mut new_columns: HashMap<String, VectorColumnInfo> =
4✔
132
            HashMap::with_capacity(self.params.column_aggregates.len());
4✔
133
        for attribute_aggregate_def in self.params.column_aggregates.values_mut() {
4✔
134
            if attribute_aggregate_def.measurement.is_none() {
3✔
135
                // take it from source measurement
3✔
136

3✔
137
                attribute_aggregate_def.measurement = vector_source
3✔
138
                    .result_descriptor()
3✔
139
                    .column_measurement(&attribute_aggregate_def.column_name)
3✔
140
                    .cloned();
3✔
141
            }
3✔
142

143
            let data_type = match attribute_aggregate_def.aggregate_type {
3✔
144
                AttributeAggregateType::MeanNumber => FeatureDataType::Float,
2✔
145
                AttributeAggregateType::StringSample => FeatureDataType::Text,
1✔
146
                AttributeAggregateType::Null => {
147
                    return Err(Error::InvalidType {
×
148
                        expected: "not null".to_string(),
×
149
                        found: "null".to_string(),
×
150
                    })
×
151
                }
152
            };
153

154
            new_columns.insert(
3✔
155
                attribute_aggregate_def.column_name.clone(),
3✔
156
                VectorColumnInfo {
3✔
157
                    data_type,
3✔
158
                    measurement: attribute_aggregate_def.measurement.clone().into(),
3✔
159
                },
3✔
160
            );
3✔
161
        }
162

163
        // check that output schema does not interfere with count and radius columns
164
        ensure!(
4✔
165
            !new_columns.contains_key(&self.params.radius_column)
4✔
166
                && !new_columns.contains_key(&self.params.count_column),
4✔
167
            error::DuplicateOutputColumns
×
168
        );
169

170
        let in_desc = vector_source.result_descriptor();
4✔
171

4✔
172
        Ok(InitializedVisualPointClustering {
4✔
173
            name,
4✔
174
            result_descriptor: VectorResultDescriptor {
4✔
175
                data_type: VectorDataType::MultiPoint,
4✔
176
                spatial_reference: in_desc.spatial_reference,
4✔
177
                columns: new_columns,
4✔
178
                time: in_desc.time,
4✔
179
                bbox: in_desc.bbox,
4✔
180
            },
4✔
181
            vector_source,
4✔
182
            radius_model,
4✔
183
            radius_column: self.params.radius_column,
4✔
184
            count_column: self.params.count_column,
4✔
185
            attribute_mapping: self.params.column_aggregates,
4✔
186
        }
4✔
187
        .boxed())
4✔
188
    }
8✔
189

190
    span_fn!(VisualPointClustering);
×
191
}
192

193
pub struct InitializedVisualPointClustering {
194
    name: CanonicOperatorName,
195
    result_descriptor: VectorResultDescriptor,
196
    vector_source: Box<dyn InitializedVectorOperator>,
197
    radius_model: LogScaledRadius,
198
    radius_column: String,
199
    count_column: String,
200
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
201
}
202

203
impl InitializedVectorOperator for InitializedVisualPointClustering {
204
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
205
        match self.vector_source.query_processor()? {
4✔
206
            TypedVectorQueryProcessor::MultiPoint(source) => {
4✔
207
                Ok(TypedVectorQueryProcessor::MultiPoint(
4✔
208
                    VisualPointClusteringProcessor::new(
4✔
209
                        source,
4✔
210
                        self.radius_model,
4✔
211
                        self.radius_column.clone(),
4✔
212
                        self.count_column.clone(),
4✔
213
                        self.result_descriptor.clone(),
4✔
214
                        self.attribute_mapping.clone(),
4✔
215
                    )
4✔
216
                    .boxed(),
4✔
217
                ))
4✔
218
            }
219
            TypedVectorQueryProcessor::MultiLineString(_) => Err(error::Error::InvalidVectorType {
×
220
                expected: "MultiPoint".to_owned(),
×
221
                found: "MultiLineString".to_owned(),
×
222
            }),
×
223
            TypedVectorQueryProcessor::MultiPolygon(_) => Err(error::Error::InvalidVectorType {
×
224
                expected: "MultiPoint".to_owned(),
×
225
                found: "MultiPolygon".to_owned(),
×
226
            }),
×
227
            TypedVectorQueryProcessor::Data(_) => Err(error::Error::InvalidVectorType {
×
228
                expected: "MultiPoint".to_owned(),
×
229
                found: "Data".to_owned(),
×
230
            }),
×
231
        }
232
    }
4✔
233

234
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
235
        &self.result_descriptor
×
236
    }
×
237

238
    fn canonic_name(&self) -> CanonicOperatorName {
×
239
        self.name.clone()
×
240
    }
×
241
}
242

243
pub struct VisualPointClusteringProcessor {
244
    source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
245
    radius_model: LogScaledRadius,
246
    radius_column: String,
247
    count_column: String,
248
    result_descriptor: VectorResultDescriptor,
249
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
250
}
251

252
impl VisualPointClusteringProcessor {
253
    fn new(
4✔
254
        source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
4✔
255
        radius_model: LogScaledRadius,
4✔
256
        radius_column: String,
4✔
257
        count_column: String,
4✔
258
        result_descriptor: VectorResultDescriptor,
4✔
259
        attribute_mapping: HashMap<String, AttributeAggregateDef>,
4✔
260
    ) -> Self {
4✔
261
        Self {
4✔
262
            source,
4✔
263
            radius_model,
4✔
264
            radius_column,
4✔
265
            count_column,
4✔
266
            result_descriptor,
4✔
267
            attribute_mapping,
4✔
268
        }
4✔
269
    }
4✔
270

271
    fn create_point_collection(
4✔
272
        circles_of_points: impl Iterator<Item = CircleOfPoints>,
4✔
273
        radius_column: &str,
4✔
274
        count_column: &str,
4✔
275
        resolution: f64,
4✔
276
        columns: &HashMap<String, FeatureDataType>,
4✔
277
    ) -> Result<MultiPointCollection> {
4✔
278
        let mut builder = MultiPointCollection::builder();
4✔
279

280
        for (column, &data_type) in columns {
7✔
281
            builder.add_column(column.clone(), data_type)?;
3✔
282
        }
283

284
        builder.add_column(radius_column.to_string(), FeatureDataType::Float)?;
4✔
285
        builder.add_column(count_column.to_string(), FeatureDataType::Int)?;
4✔
286

287
        let mut builder = builder.finish_header();
4✔
288

289
        for circle_of_points in circles_of_points {
13✔
290
            builder.push_geometry(MultiPoint::new(vec![circle_of_points.circle.center()])?);
9✔
291
            builder.push_time_interval(circle_of_points.time_aggregate);
9✔
292

9✔
293
            builder.push_data(
9✔
294
                radius_column,
9✔
295
                FeatureDataValue::Float(circle_of_points.circle.radius() / resolution),
9✔
296
            )?;
9✔
297
            builder.push_data(
9✔
298
                count_column,
9✔
299
                FeatureDataValue::Int(circle_of_points.number_of_points() as i64),
9✔
300
            )?;
9✔
301

302
            for (column, data_type) in circle_of_points.attribute_aggregates {
16✔
303
                match data_type {
7✔
304
                    AttributeAggregate::MeanNumber(mean_aggregator) => {
3✔
305
                        builder
3✔
306
                            .push_data(&column, FeatureDataValue::Float(mean_aggregator.mean))?;
3✔
307
                    }
308
                    AttributeAggregate::StringSample(string_sampler) => {
2✔
309
                        builder.push_data(
2✔
310
                            &column,
2✔
311
                            FeatureDataValue::Text(string_sampler.strings.join(", ")),
2✔
312
                        )?;
2✔
313
                    }
314
                    AttributeAggregate::Null => {
315
                        builder.push_null(&column)?;
2✔
316
                    }
317
                };
318
            }
319

320
            builder.finish_row();
9✔
321
        }
322

323
        builder.build().map_err(Into::into)
4✔
324
    }
4✔
325

326
    fn aggregate_from_feature_data(
327
        feature_data: FeatureDataValue,
328
        aggregate_type: AttributeAggregateType,
329
    ) -> AttributeAggregate {
330
        match (feature_data, aggregate_type) {
20✔
331
            (
332
                FeatureDataValue::Category(value) | FeatureDataValue::NullableCategory(Some(value)),
×
333
                AttributeAggregateType::MeanNumber,
334
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(f64::from(value))),
×
335
            (
336
                FeatureDataValue::Float(value) | FeatureDataValue::NullableFloat(Some(value)),
10✔
337
                AttributeAggregateType::MeanNumber,
338
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value)),
10✔
339
            (
340
                FeatureDataValue::Int(value) | FeatureDataValue::NullableInt(Some(value)),
1✔
341
                AttributeAggregateType::MeanNumber,
342
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value as f64)),
1✔
343
            (
344
                FeatureDataValue::Text(value) | FeatureDataValue::NullableText(Some(value)),
3✔
345
                AttributeAggregateType::StringSample,
346
            ) => AttributeAggregate::StringSample(StringSampler::from_value(value)),
3✔
347
            _ => AttributeAggregate::Null,
6✔
348
        }
349
    }
20✔
350
}
351

352
struct GridFoldState {
353
    grid: Grid<LogScaledRadius>,
354
    column_mapping: HashMap<String, AttributeAggregateDef>,
355
}
356

357
#[async_trait]
358
impl QueryProcessor for VisualPointClusteringProcessor {
359
    type Output = MultiPointCollection;
360
    type SpatialBounds = BoundingBox2D;
361

362
    async fn _query<'a>(
4✔
363
        &'a self,
4✔
364
        query: VectorQueryRectangle,
4✔
365
        ctx: &'a dyn QueryContext,
4✔
366
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
4✔
367
        // we aggregate all points into one collection
368

369
        let column_schema = self
4✔
370
            .result_descriptor
4✔
371
            .columns
4✔
372
            .iter()
4✔
373
            .map(|(name, column_info)| (name.clone(), column_info.data_type))
4✔
374
            .collect();
4✔
375

4✔
376
        let joint_resolution = f64::max(query.spatial_resolution.x, query.spatial_resolution.y);
4✔
377
        let scaled_radius_model = self.radius_model.with_scaled_radii(joint_resolution)?;
4✔
378

379
        let initial_grid_fold_state = Result::<GridFoldState>::Ok(GridFoldState {
4✔
380
            grid: Grid::new(query.spatial_bounds, scaled_radius_model),
4✔
381
            column_mapping: self.attribute_mapping.clone(),
4✔
382
        });
4✔
383

384
        let grid_future = self.source.query(query, ctx).await?.fold(
4✔
385
            initial_grid_fold_state,
4✔
386
            |state, feature_collection| async move {
4✔
387
                // TODO: worker thread
388

389
                let GridFoldState {
390
                    mut grid,
4✔
391
                    column_mapping,
4✔
392
                } = state?;
4✔
393

394
                let feature_collection = feature_collection?;
4✔
395

396
                for feature in &feature_collection {
34✔
397
                    // TODO: pre-aggregate multi-points differently?
398
                    for coordinate in feature.geometry.points() {
30✔
399
                        let circle =
30✔
400
                            Circle::from_coordinate(coordinate, grid.radius_model().min_radius());
30✔
401

30✔
402
                        let mut attribute_aggregates = HashMap::with_capacity(column_mapping.len());
30✔
403

404
                        for (
405
                            src_column,
20✔
406
                            AttributeAggregateDef {
20✔
407
                                column_name: tgt_column,
20✔
408
                                aggregate_type,
20✔
409
                                measurement: _,
410
                            },
411
                        ) in &column_mapping
50✔
412
                        {
20✔
413
                            let attribute_aggregate =
20✔
414
                                if let Some(feature_data) = feature.get(src_column) {
20✔
415
                                    Self::aggregate_from_feature_data(feature_data, *aggregate_type)
20✔
416
                                } else {
417
                                    AttributeAggregate::Null
×
418
                                };
419

420
                            attribute_aggregates.insert(tgt_column.clone(), attribute_aggregate);
20✔
421
                        }
422

423
                        grid.insert(CircleOfPoints::new_with_one_point(
30✔
424
                            circle,
30✔
425
                            feature.time_interval,
30✔
426
                            attribute_aggregates,
30✔
427
                        ));
30✔
428
                    }
429
                }
430

431
                Ok(GridFoldState {
4✔
432
                    grid,
4✔
433
                    column_mapping,
4✔
434
                })
4✔
435
            },
4✔
436
        );
4✔
437

4✔
438
        let stream = FuturesUnordered::new();
4✔
439
        stream.push(grid_future);
4✔
440

4✔
441
        let stream = stream.map(move |grid| {
4✔
442
            let GridFoldState {
443
                grid,
4✔
444
                column_mapping: _,
445
            } = grid?;
4✔
446

447
            let mut cmq = CircleMergingQuadtree::new(query.spatial_bounds, *grid.radius_model(), 1);
4✔
448

449
            // TODO: worker thread
450
            for circle_of_points in grid.drain() {
9✔
451
                cmq.insert_circle(circle_of_points);
9✔
452
            }
9✔
453

454
            Self::create_point_collection(
4✔
455
                cmq.into_iter(),
4✔
456
                &self.radius_column,
4✔
457
                &self.count_column,
4✔
458
                joint_resolution,
4✔
459
                &column_schema,
4✔
460
            )
4✔
461
        });
4✔
462

4✔
463
        Ok(stream.merge_chunks(ctx.chunk_byte_size().into()).boxed())
4✔
464
    }
8✔
465
}
466

467
#[cfg(test)]
468
mod tests {
469
    use geoengine_datatypes::primitives::FeatureData;
470
    use geoengine_datatypes::primitives::SpatialResolution;
471
    use geoengine_datatypes::primitives::TimeInterval;
472
    use geoengine_datatypes::util::test::TestDefault;
473

474
    use crate::{
475
        engine::{MockExecutionContext, MockQueryContext},
476
        mock::MockFeatureCollectionSource,
477
    };
478

479
    use super::*;
480

481
    #[tokio::test]
1✔
482
    async fn simple_test() {
1✔
483
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
484
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
485

1✔
486
        let input = MultiPointCollection::from_data(
1✔
487
            MultiPoint::many(coordinates).unwrap(),
1✔
488
            vec![TimeInterval::default(); 10],
1✔
489
            HashMap::default(),
1✔
490
        )
1✔
491
        .unwrap();
1✔
492

1✔
493
        let operator = VisualPointClustering {
1✔
494
            params: VisualPointClusteringParams {
1✔
495
                min_radius_px: 8.,
1✔
496
                delta_px: 1.,
1✔
497
                radius_column: "radius".to_string(),
1✔
498
                count_column: "count".to_string(),
1✔
499
                column_aggregates: Default::default(),
1✔
500
            },
1✔
501
            sources: SingleVectorSource {
1✔
502
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
503
            },
1✔
504
        };
1✔
505

1✔
506
        let execution_context = MockExecutionContext::test_default();
1✔
507

508
        let initialized_operator = operator
1✔
509
            .boxed()
1✔
510
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
511
            .await
×
512
            .unwrap();
1✔
513

1✔
514
        let query_processor = initialized_operator
1✔
515
            .query_processor()
1✔
516
            .unwrap()
1✔
517
            .multi_point()
1✔
518
            .unwrap();
1✔
519

1✔
520
        let query_context = MockQueryContext::test_default();
1✔
521

1✔
522
        let qrect = VectorQueryRectangle {
1✔
523
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
524
            time_interval: TimeInterval::default(),
1✔
525
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
526
        };
1✔
527

528
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
529

530
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
531

532
        assert_eq!(result.len(), 1);
1✔
533
        assert_eq!(
1✔
534
            result[0],
1✔
535
            MultiPointCollection::from_slices(
1✔
536
                &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
537
                &[TimeInterval::default(); 2],
1✔
538
                &[
1✔
539
                    ("count", FeatureData::Int(vec![9, 1])),
1✔
540
                    (
1✔
541
                        "radius",
1✔
542
                        FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
543
                    )
1✔
544
                ],
1✔
545
            )
1✔
546
            .unwrap()
1✔
547
        );
1✔
548
    }
549

550
    #[tokio::test]
1✔
551
    async fn simple_test_with_aggregate() {
1✔
552
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
553
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
554

1✔
555
        let input = MultiPointCollection::from_slices(
1✔
556
            &MultiPoint::many(coordinates).unwrap(),
1✔
557
            &[TimeInterval::default(); 10],
1✔
558
            &[(
1✔
559
                "foo",
1✔
560
                FeatureData::Float(vec![1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]),
1✔
561
            )],
1✔
562
        )
1✔
563
        .unwrap();
1✔
564

1✔
565
        let operator = VisualPointClustering {
1✔
566
            params: VisualPointClusteringParams {
1✔
567
                min_radius_px: 8.,
1✔
568
                delta_px: 1.,
1✔
569
                radius_column: "radius".to_string(),
1✔
570
                count_column: "count".to_string(),
1✔
571
                column_aggregates: [(
1✔
572
                    "foo".to_string(),
1✔
573
                    AttributeAggregateDef {
1✔
574
                        column_name: "bar".to_string(),
1✔
575
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
576
                        measurement: None,
1✔
577
                    },
1✔
578
                )]
1✔
579
                .iter()
1✔
580
                .cloned()
1✔
581
                .collect(),
1✔
582
            },
1✔
583
            sources: SingleVectorSource {
1✔
584
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
585
            },
1✔
586
        };
1✔
587

1✔
588
        let execution_context = MockExecutionContext::test_default();
1✔
589

590
        let initialized_operator = operator
1✔
591
            .boxed()
1✔
592
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
593
            .await
×
594
            .unwrap();
1✔
595

1✔
596
        let query_processor = initialized_operator
1✔
597
            .query_processor()
1✔
598
            .unwrap()
1✔
599
            .multi_point()
1✔
600
            .unwrap();
1✔
601

1✔
602
        let query_context = MockQueryContext::test_default();
1✔
603

1✔
604
        let qrect = VectorQueryRectangle {
1✔
605
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
606
            time_interval: TimeInterval::default(),
1✔
607
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
608
        };
1✔
609

610
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
611

612
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
613

614
        assert_eq!(result.len(), 1);
1✔
615
        assert_eq!(
1✔
616
            result[0],
1✔
617
            MultiPointCollection::from_slices(
1✔
618
                &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
619
                &[TimeInterval::default(); 2],
1✔
620
                &[
1✔
621
                    ("count", FeatureData::Int(vec![9, 1])),
1✔
622
                    (
1✔
623
                        "radius",
1✔
624
                        FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
625
                    ),
1✔
626
                    ("bar", FeatureData::Float(vec![5., 10.]))
1✔
627
                ],
1✔
628
            )
1✔
629
            .unwrap()
1✔
630
        );
1✔
631
    }
632

633
    #[tokio::test]
1✔
634
    async fn aggregate_of_null() {
1✔
635
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
636
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
637

1✔
638
        let input = MultiPointCollection::from_slices(
1✔
639
            &MultiPoint::many(coordinates).unwrap(),
1✔
640
            &[TimeInterval::default(); 4],
1✔
641
            &[(
1✔
642
                "foo",
1✔
643
                FeatureData::NullableInt(vec![Some(1), None, None, None]),
1✔
644
            )],
1✔
645
        )
1✔
646
        .unwrap();
1✔
647

1✔
648
        let operator = VisualPointClustering {
1✔
649
            params: VisualPointClusteringParams {
1✔
650
                min_radius_px: 8.,
1✔
651
                delta_px: 1.,
1✔
652
                radius_column: "radius".to_string(),
1✔
653
                count_column: "count".to_string(),
1✔
654
                column_aggregates: [(
1✔
655
                    "foo".to_string(),
1✔
656
                    AttributeAggregateDef {
1✔
657
                        column_name: "foo".to_string(),
1✔
658
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
659
                        measurement: None,
1✔
660
                    },
1✔
661
                )]
1✔
662
                .iter()
1✔
663
                .cloned()
1✔
664
                .collect(),
1✔
665
            },
1✔
666
            sources: SingleVectorSource {
1✔
667
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
668
            },
1✔
669
        };
1✔
670

1✔
671
        let execution_context = MockExecutionContext::test_default();
1✔
672

673
        let initialized_operator = operator
1✔
674
            .boxed()
1✔
675
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
676
            .await
×
677
            .unwrap();
1✔
678

1✔
679
        let query_processor = initialized_operator
1✔
680
            .query_processor()
1✔
681
            .unwrap()
1✔
682
            .multi_point()
1✔
683
            .unwrap();
1✔
684

1✔
685
        let query_context = MockQueryContext::test_default();
1✔
686

1✔
687
        let qrect = VectorQueryRectangle {
1✔
688
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
689
            time_interval: TimeInterval::default(),
1✔
690
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
691
        };
1✔
692

693
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
694

695
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
696

697
        assert_eq!(result.len(), 1);
1✔
698
        assert_eq!(
1✔
699
            result[0],
1✔
700
            MultiPointCollection::from_slices(
1✔
701
                &[(0.0, 0.1), (50.0, 50.1)],
1✔
702
                &[TimeInterval::default(); 2],
1✔
703
                &[
1✔
704
                    ("count", FeatureData::Int(vec![2, 2])),
1✔
705
                    (
1✔
706
                        "radius",
1✔
707
                        FeatureData::Float(vec![8.693_147_180_559_945, 8.693_147_180_559_945])
1✔
708
                    ),
1✔
709
                    ("foo", FeatureData::NullableFloat(vec![Some(1.), None]))
1✔
710
                ],
1✔
711
            )
1✔
712
            .unwrap()
1✔
713
        );
1✔
714
    }
715

716
    #[tokio::test]
1✔
717
    async fn text_aggregate() {
1✔
718
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
719
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
720
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
721

1✔
722
        let input = MultiPointCollection::from_slices(
1✔
723
            &MultiPoint::many(coordinates).unwrap(),
1✔
724
            &[TimeInterval::default(); 6],
1✔
725
            &[(
1✔
726
                "text",
1✔
727
                FeatureData::NullableText(vec![
1✔
728
                    Some("foo".to_string()),
1✔
729
                    Some("bar".to_string()),
1✔
730
                    Some("foo".to_string()),
1✔
731
                    None,
1✔
732
                    None,
1✔
733
                    None,
1✔
734
                ]),
1✔
735
            )],
1✔
736
        )
1✔
737
        .unwrap();
1✔
738

1✔
739
        let operator = VisualPointClustering {
1✔
740
            params: VisualPointClusteringParams {
1✔
741
                min_radius_px: 8.,
1✔
742
                delta_px: 1.,
1✔
743
                radius_column: "radius".to_string(),
1✔
744
                count_column: "count".to_string(),
1✔
745
                column_aggregates: [(
1✔
746
                    "text".to_string(),
1✔
747
                    AttributeAggregateDef {
1✔
748
                        column_name: "text".to_string(),
1✔
749
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
750
                        measurement: None,
1✔
751
                    },
1✔
752
                )]
1✔
753
                .iter()
1✔
754
                .cloned()
1✔
755
                .collect(),
1✔
756
            },
1✔
757
            sources: SingleVectorSource {
1✔
758
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
759
            },
1✔
760
        };
1✔
761

1✔
762
        let execution_context = MockExecutionContext::test_default();
1✔
763

764
        let initialized_operator = operator
1✔
765
            .boxed()
1✔
766
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
767
            .await
×
768
            .unwrap();
1✔
769

1✔
770
        let query_processor = initialized_operator
1✔
771
            .query_processor()
1✔
772
            .unwrap()
1✔
773
            .multi_point()
1✔
774
            .unwrap();
1✔
775

1✔
776
        let query_context = MockQueryContext::test_default();
1✔
777

1✔
778
        let qrect = VectorQueryRectangle {
1✔
779
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
780
            time_interval: TimeInterval::default(),
1✔
781
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
782
        };
1✔
783

784
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
785

786
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
787

788
        assert_eq!(result.len(), 1);
1✔
789
        assert_eq!(
1✔
790
            result[0],
1✔
791
            MultiPointCollection::from_slices(
1✔
792
                &[(0.0, 0.1), (50.0, 50.1), (25.0, 25.1)],
1✔
793
                &[TimeInterval::default(); 3],
1✔
794
                &[
1✔
795
                    ("count", FeatureData::Int(vec![2, 2, 2])),
1✔
796
                    (
1✔
797
                        "radius",
1✔
798
                        FeatureData::Float(vec![
1✔
799
                            8.693_147_180_559_945,
1✔
800
                            8.693_147_180_559_945,
1✔
801
                            8.693_147_180_559_945
1✔
802
                        ])
1✔
803
                    ),
1✔
804
                    (
1✔
805
                        "text",
1✔
806
                        FeatureData::NullableText(vec![
1✔
807
                            Some("foo, bar".to_string()),
1✔
808
                            Some("foo".to_string()),
1✔
809
                            None
1✔
810
                        ])
1✔
811
                    )
1✔
812
                ],
1✔
813
            )
1✔
814
            .unwrap()
1✔
815
        );
1✔
816
    }
817
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc