• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.94
/operators/src/processing/circle_merging_quadtree/operator.rs
1
use std::collections::{HashMap, HashSet};
2

3
use async_trait::async_trait;
4
use futures::stream::{BoxStream, FuturesUnordered};
5
use futures::StreamExt;
6
use geoengine_datatypes::collections::{
7
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
8
};
9
use geoengine_datatypes::primitives::{
10
    BoundingBox2D, Circle, FeatureDataType, FeatureDataValue, Measurement, MultiPoint,
11
    MultiPointAccess, VectorQueryRectangle,
12
};
13
use serde::{Deserialize, Serialize};
14
use snafu::ensure;
15

16
use crate::adapters::FeatureCollectionStreamExt;
17
use crate::engine::{CreateSpan, OperatorName};
18
use crate::engine::{
19
    ExecutionContext, InitializedVectorOperator, Operator, QueryContext, QueryProcessor,
20
    SingleVectorSource, TypedVectorQueryProcessor, VectorColumnInfo, VectorOperator,
21
    VectorQueryProcessor, VectorResultDescriptor,
22
};
23
use crate::error::{self, Error};
24
use crate::processing::circle_merging_quadtree::aggregates::MeanAggregator;
25
use crate::processing::circle_merging_quadtree::circle_of_points::CircleOfPoints;
26
use crate::processing::circle_merging_quadtree::circle_radius_model::LogScaledRadius;
27
use crate::util::Result;
28

29
use super::aggregates::{AttributeAggregate, AttributeAggregateType, StringSampler};
30
use super::circle_radius_model::CircleRadiusModel;
31
use super::grid::Grid;
32
use super::quadtree::CircleMergingQuadtree;
33
use tracing::{span, Level};
34

35
#[derive(Debug, Serialize, Deserialize, Clone)]
×
36
#[serde(rename_all = "camelCase")]
37
pub struct VisualPointClusteringParams {
38
    pub min_radius_px: f64,
39
    pub delta_px: f64,
40
    radius_column: String,
41
    count_column: String,
42
    column_aggregates: HashMap<String, AttributeAggregateDef>,
43
}
44

45
#[derive(Debug, Serialize, Deserialize, Clone)]
9✔
46
#[serde(rename_all = "camelCase")]
47
pub struct AttributeAggregateDef {
48
    pub column_name: String,
49
    pub aggregate_type: AttributeAggregateType,
50
    // if measurement is unset, it will be taken from the source result descriptor
51
    pub measurement: Option<Measurement>,
52
}
53

54
pub type VisualPointClustering = Operator<VisualPointClusteringParams, SingleVectorSource>;
55

56
impl OperatorName for VisualPointClustering {
57
    const TYPE_NAME: &'static str = "VisualPointClustering";
58
}
59

60
#[typetag::serde]
×
61
#[async_trait]
62
impl VectorOperator for VisualPointClustering {
63
    async fn _initialize(
4✔
64
        mut self: Box<Self>,
4✔
65
        context: &dyn ExecutionContext,
4✔
66
    ) -> Result<Box<dyn InitializedVectorOperator>> {
4✔
67
        ensure!(
4✔
68
            self.params.min_radius_px > 0.0,
4✔
69
            error::InputMustBeGreaterThanZero {
×
70
                scope: "VisualPointClustering",
×
71
                name: "min_radius_px"
×
72
            }
×
73
        );
74
        ensure!(
4✔
75
            self.params.delta_px >= 0.0,
4✔
76
            error::InputMustBeZeroOrPositive {
×
77
                scope: "VisualPointClustering",
×
78
                name: "delta_px"
×
79
            }
×
80
        );
81
        ensure!(!self.params.radius_column.is_empty(), error::EmptyInput);
4✔
82
        ensure!(
4✔
83
            self.params.count_column != self.params.radius_column,
4✔
84
            error::DuplicateOutputColumns
×
85
        );
86

87
        let radius_model = LogScaledRadius::new(self.params.min_radius_px, self.params.delta_px)?;
4✔
88

89
        let vector_source = self.sources.vector.initialize(context).await?;
4✔
90

91
        ensure!(
4✔
92
            vector_source.result_descriptor().data_type == VectorDataType::MultiPoint,
4✔
93
            error::InvalidType {
×
94
                expected: VectorDataType::MultiPoint.to_string(),
×
95
                found: vector_source.result_descriptor().data_type.to_string(),
×
96
            }
×
97
        );
98

99
        // check that all input columns exist
100
        for column_name in self.params.column_aggregates.keys() {
4✔
101
            let column_name_exists = vector_source
3✔
102
                .result_descriptor()
3✔
103
                .columns
3✔
104
                .contains_key(column_name);
3✔
105

3✔
106
            ensure!(
3✔
107
                column_name_exists,
3✔
108
                error::MissingInputColumn {
×
109
                    name: column_name.clone()
×
110
                }
×
111
            );
112
        }
113

114
        // check that there are no duplicates in the output columns
115
        let output_names: HashSet<&String> = self
4✔
116
            .params
4✔
117
            .column_aggregates
4✔
118
            .values()
4✔
119
            .map(|def| &def.column_name)
4✔
120
            .collect();
4✔
121
        ensure!(
4✔
122
            output_names.len() == self.params.column_aggregates.len(),
4✔
123
            error::DuplicateOutputColumns
×
124
        );
125

126
        // create schema for [`ResultDescriptor`]
127
        let mut new_columns: HashMap<String, VectorColumnInfo> =
4✔
128
            HashMap::with_capacity(self.params.column_aggregates.len());
4✔
129
        for attribute_aggregate_def in self.params.column_aggregates.values_mut() {
4✔
130
            if attribute_aggregate_def.measurement.is_none() {
3✔
131
                // take it from source measurement
3✔
132

3✔
133
                attribute_aggregate_def.measurement = vector_source
3✔
134
                    .result_descriptor()
3✔
135
                    .column_measurement(&attribute_aggregate_def.column_name)
3✔
136
                    .cloned();
3✔
137
            }
3✔
138

139
            let data_type = match attribute_aggregate_def.aggregate_type {
3✔
140
                AttributeAggregateType::MeanNumber => FeatureDataType::Float,
2✔
141
                AttributeAggregateType::StringSample => FeatureDataType::Text,
1✔
142
                AttributeAggregateType::Null => {
143
                    return Err(Error::InvalidType {
×
144
                        expected: "not null".to_string(),
×
145
                        found: "null".to_string(),
×
146
                    })
×
147
                }
148
            };
149

150
            new_columns.insert(
3✔
151
                attribute_aggregate_def.column_name.clone(),
3✔
152
                VectorColumnInfo {
3✔
153
                    data_type,
3✔
154
                    measurement: attribute_aggregate_def.measurement.clone().into(),
3✔
155
                },
3✔
156
            );
3✔
157
        }
158

159
        // check that output schema does not interfere with count and radius columns
160
        ensure!(
4✔
161
            !new_columns.contains_key(&self.params.radius_column)
4✔
162
                && !new_columns.contains_key(&self.params.count_column),
4✔
163
            error::DuplicateOutputColumns
×
164
        );
165

166
        let in_desc = vector_source.result_descriptor();
4✔
167

4✔
168
        Ok(InitializedVisualPointClustering {
4✔
169
            result_descriptor: VectorResultDescriptor {
4✔
170
                data_type: VectorDataType::MultiPoint,
4✔
171
                spatial_reference: in_desc.spatial_reference,
4✔
172
                columns: new_columns,
4✔
173
                time: in_desc.time,
4✔
174
                bbox: in_desc.bbox,
4✔
175
            },
4✔
176
            vector_source,
4✔
177
            radius_model,
4✔
178
            radius_column: self.params.radius_column,
4✔
179
            count_column: self.params.count_column,
4✔
180
            attribute_mapping: self.params.column_aggregates,
4✔
181
        }
4✔
182
        .boxed())
4✔
183
    }
8✔
184

185
    span_fn!(VisualPointClustering);
×
186
}
187

188
pub struct InitializedVisualPointClustering {
189
    result_descriptor: VectorResultDescriptor,
190
    vector_source: Box<dyn InitializedVectorOperator>,
191
    radius_model: LogScaledRadius,
192
    radius_column: String,
193
    count_column: String,
194
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
195
}
196

197
impl InitializedVectorOperator for InitializedVisualPointClustering {
198
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
199
        match self.vector_source.query_processor()? {
4✔
200
            TypedVectorQueryProcessor::MultiPoint(source) => {
4✔
201
                Ok(TypedVectorQueryProcessor::MultiPoint(
4✔
202
                    VisualPointClusteringProcessor::new(
4✔
203
                        source,
4✔
204
                        self.radius_model,
4✔
205
                        self.radius_column.clone(),
4✔
206
                        self.count_column.clone(),
4✔
207
                        self.result_descriptor.clone(),
4✔
208
                        self.attribute_mapping.clone(),
4✔
209
                    )
4✔
210
                    .boxed(),
4✔
211
                ))
4✔
212
            }
213
            TypedVectorQueryProcessor::MultiLineString(_) => Err(error::Error::InvalidVectorType {
×
214
                expected: "MultiPoint".to_owned(),
×
215
                found: "MultiLineString".to_owned(),
×
216
            }),
×
217
            TypedVectorQueryProcessor::MultiPolygon(_) => Err(error::Error::InvalidVectorType {
×
218
                expected: "MultiPoint".to_owned(),
×
219
                found: "MultiPolygon".to_owned(),
×
220
            }),
×
221
            TypedVectorQueryProcessor::Data(_) => Err(error::Error::InvalidVectorType {
×
222
                expected: "MultiPoint".to_owned(),
×
223
                found: "Data".to_owned(),
×
224
            }),
×
225
        }
226
    }
4✔
227

228
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
229
        &self.result_descriptor
×
230
    }
×
231
}
232

233
pub struct VisualPointClusteringProcessor {
234
    source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
235
    radius_model: LogScaledRadius,
236
    radius_column: String,
237
    count_column: String,
238
    result_descriptor: VectorResultDescriptor,
239
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
240
}
241

242
impl VisualPointClusteringProcessor {
243
    fn new(
4✔
244
        source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
4✔
245
        radius_model: LogScaledRadius,
4✔
246
        radius_column: String,
4✔
247
        count_column: String,
4✔
248
        result_descriptor: VectorResultDescriptor,
4✔
249
        attribute_mapping: HashMap<String, AttributeAggregateDef>,
4✔
250
    ) -> Self {
4✔
251
        Self {
4✔
252
            source,
4✔
253
            radius_model,
4✔
254
            radius_column,
4✔
255
            count_column,
4✔
256
            result_descriptor,
4✔
257
            attribute_mapping,
4✔
258
        }
4✔
259
    }
4✔
260

261
    fn create_point_collection(
4✔
262
        circles_of_points: impl Iterator<Item = CircleOfPoints>,
4✔
263
        radius_column: &str,
4✔
264
        count_column: &str,
4✔
265
        resolution: f64,
4✔
266
        columns: &HashMap<String, FeatureDataType>,
4✔
267
    ) -> Result<MultiPointCollection> {
4✔
268
        let mut builder = MultiPointCollection::builder();
4✔
269

270
        for (column, &data_type) in columns {
7✔
271
            builder.add_column(column.clone(), data_type)?;
3✔
272
        }
273

274
        builder.add_column(radius_column.to_string(), FeatureDataType::Float)?;
4✔
275
        builder.add_column(count_column.to_string(), FeatureDataType::Int)?;
4✔
276

277
        let mut builder = builder.finish_header();
4✔
278

279
        for circle_of_points in circles_of_points {
13✔
280
            builder.push_geometry(MultiPoint::new(vec![circle_of_points.circle.center()])?);
9✔
281
            builder.push_time_interval(circle_of_points.time_aggregate);
9✔
282

9✔
283
            builder.push_data(
9✔
284
                radius_column,
9✔
285
                FeatureDataValue::Float(circle_of_points.circle.radius() / resolution),
9✔
286
            )?;
9✔
287
            builder.push_data(
9✔
288
                count_column,
9✔
289
                FeatureDataValue::Int(circle_of_points.number_of_points() as i64),
9✔
290
            )?;
9✔
291

292
            for (column, data_type) in circle_of_points.attribute_aggregates {
16✔
293
                match data_type {
7✔
294
                    AttributeAggregate::MeanNumber(mean_aggregator) => {
3✔
295
                        builder
3✔
296
                            .push_data(&column, FeatureDataValue::Float(mean_aggregator.mean))?;
3✔
297
                    }
298
                    AttributeAggregate::StringSample(string_sampler) => {
2✔
299
                        builder.push_data(
2✔
300
                            &column,
2✔
301
                            FeatureDataValue::Text(string_sampler.strings.join(", ")),
2✔
302
                        )?;
2✔
303
                    }
304
                    AttributeAggregate::Null => {
305
                        builder.push_null(&column)?;
2✔
306
                    }
307
                };
308
            }
309

310
            builder.finish_row();
9✔
311
        }
312

313
        builder.build().map_err(Into::into)
4✔
314
    }
4✔
315

316
    fn aggregate_from_feature_data(
317
        feature_data: FeatureDataValue,
318
        aggregate_type: AttributeAggregateType,
319
    ) -> AttributeAggregate {
320
        match (feature_data, aggregate_type) {
20✔
321
            (
322
                FeatureDataValue::Category(value) | FeatureDataValue::NullableCategory(Some(value)),
×
323
                AttributeAggregateType::MeanNumber,
324
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(f64::from(value))),
×
325
            (
326
                FeatureDataValue::Float(value) | FeatureDataValue::NullableFloat(Some(value)),
10✔
327
                AttributeAggregateType::MeanNumber,
328
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value)),
10✔
329
            (
330
                FeatureDataValue::Int(value) | FeatureDataValue::NullableInt(Some(value)),
1✔
331
                AttributeAggregateType::MeanNumber,
332
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value as f64)),
1✔
333
            (
334
                FeatureDataValue::Text(value) | FeatureDataValue::NullableText(Some(value)),
3✔
335
                AttributeAggregateType::StringSample,
336
            ) => AttributeAggregate::StringSample(StringSampler::from_value(value)),
3✔
337
            _ => AttributeAggregate::Null,
6✔
338
        }
339
    }
20✔
340
}
341

342
struct GridFoldState {
343
    grid: Grid<LogScaledRadius>,
344
    column_mapping: HashMap<String, AttributeAggregateDef>,
345
}
346

347
#[async_trait]
348
impl QueryProcessor for VisualPointClusteringProcessor {
349
    type Output = MultiPointCollection;
350
    type SpatialBounds = BoundingBox2D;
351

352
    async fn _query<'a>(
4✔
353
        &'a self,
4✔
354
        query: VectorQueryRectangle,
4✔
355
        ctx: &'a dyn QueryContext,
4✔
356
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
4✔
357
        // we aggregate all points into one collection
358

359
        let column_schema = self
4✔
360
            .result_descriptor
4✔
361
            .columns
4✔
362
            .iter()
4✔
363
            .map(|(name, column_info)| (name.clone(), column_info.data_type))
4✔
364
            .collect();
4✔
365

4✔
366
        let joint_resolution = f64::max(query.spatial_resolution.x, query.spatial_resolution.y);
4✔
367
        let scaled_radius_model = self.radius_model.with_scaled_radii(joint_resolution)?;
4✔
368

369
        let initial_grid_fold_state = Result::<GridFoldState>::Ok(GridFoldState {
4✔
370
            grid: Grid::new(query.spatial_bounds, scaled_radius_model),
4✔
371
            column_mapping: self.attribute_mapping.clone(),
4✔
372
        });
4✔
373

374
        let grid_future = self.source.query(query, ctx).await?.fold(
4✔
375
            initial_grid_fold_state,
4✔
376
            |state, feature_collection| async move {
4✔
377
                // TODO: worker thread
378

379
                let GridFoldState {
380
                    mut grid,
4✔
381
                    column_mapping,
4✔
382
                } = state?;
4✔
383

384
                let feature_collection = feature_collection?;
4✔
385

386
                for feature in &feature_collection {
34✔
387
                    // TODO: pre-aggregate multi-points differently?
388
                    for coordinate in feature.geometry.points() {
30✔
389
                        let circle =
30✔
390
                            Circle::from_coordinate(coordinate, grid.radius_model().min_radius());
30✔
391

30✔
392
                        let mut attribute_aggregates = HashMap::with_capacity(column_mapping.len());
30✔
393

394
                        for (
395
                            src_column,
20✔
396
                            AttributeAggregateDef {
20✔
397
                                column_name: tgt_column,
20✔
398
                                aggregate_type,
20✔
399
                                measurement: _,
400
                            },
401
                        ) in &column_mapping
50✔
402
                        {
20✔
403
                            let attribute_aggregate =
20✔
404
                                if let Some(feature_data) = feature.get(src_column) {
20✔
405
                                    Self::aggregate_from_feature_data(feature_data, *aggregate_type)
20✔
406
                                } else {
407
                                    AttributeAggregate::Null
×
408
                                };
409

410
                            attribute_aggregates.insert(tgt_column.clone(), attribute_aggregate);
20✔
411
                        }
412

413
                        grid.insert(CircleOfPoints::new_with_one_point(
30✔
414
                            circle,
30✔
415
                            feature.time_interval,
30✔
416
                            attribute_aggregates,
30✔
417
                        ));
30✔
418
                    }
419
                }
420

421
                Ok(GridFoldState {
4✔
422
                    grid,
4✔
423
                    column_mapping,
4✔
424
                })
4✔
425
            },
4✔
426
        );
4✔
427

4✔
428
        let stream = FuturesUnordered::new();
4✔
429
        stream.push(grid_future);
4✔
430

4✔
431
        let stream = stream.map(move |grid| {
4✔
432
            let GridFoldState {
433
                grid,
4✔
434
                column_mapping: _,
435
            } = grid?;
4✔
436

437
            let mut cmq = CircleMergingQuadtree::new(query.spatial_bounds, *grid.radius_model(), 1);
4✔
438

439
            // TODO: worker thread
440
            for circle_of_points in grid.drain() {
9✔
441
                cmq.insert_circle(circle_of_points);
9✔
442
            }
9✔
443

444
            Self::create_point_collection(
4✔
445
                cmq.into_iter(),
4✔
446
                &self.radius_column,
4✔
447
                &self.count_column,
4✔
448
                joint_resolution,
4✔
449
                &column_schema,
4✔
450
            )
4✔
451
        });
4✔
452

4✔
453
        Ok(stream.merge_chunks(ctx.chunk_byte_size().into()).boxed())
4✔
454
    }
8✔
455
}
456

457
#[cfg(test)]
458
mod tests {
459
    use geoengine_datatypes::primitives::FeatureData;
460
    use geoengine_datatypes::primitives::SpatialResolution;
461
    use geoengine_datatypes::primitives::TimeInterval;
462
    use geoengine_datatypes::util::test::TestDefault;
463

464
    use crate::{
465
        engine::{MockExecutionContext, MockQueryContext},
466
        mock::MockFeatureCollectionSource,
467
    };
468

469
    use super::*;
470

471
    #[tokio::test]
1✔
472
    async fn simple_test() {
1✔
473
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
474
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
475

1✔
476
        let input = MultiPointCollection::from_data(
1✔
477
            MultiPoint::many(coordinates).unwrap(),
1✔
478
            vec![TimeInterval::default(); 10],
1✔
479
            HashMap::default(),
1✔
480
        )
1✔
481
        .unwrap();
1✔
482

1✔
483
        let operator = VisualPointClustering {
1✔
484
            params: VisualPointClusteringParams {
1✔
485
                min_radius_px: 8.,
1✔
486
                delta_px: 1.,
1✔
487
                radius_column: "radius".to_string(),
1✔
488
                count_column: "count".to_string(),
1✔
489
                column_aggregates: Default::default(),
1✔
490
            },
1✔
491
            sources: SingleVectorSource {
1✔
492
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
493
            },
1✔
494
        };
1✔
495

1✔
496
        let execution_context = MockExecutionContext::test_default();
1✔
497

498
        let initialized_operator = operator
1✔
499
            .boxed()
1✔
500
            .initialize(&execution_context)
1✔
501
            .await
×
502
            .unwrap();
1✔
503

1✔
504
        let query_processor = initialized_operator
1✔
505
            .query_processor()
1✔
506
            .unwrap()
1✔
507
            .multi_point()
1✔
508
            .unwrap();
1✔
509

1✔
510
        let query_context = MockQueryContext::test_default();
1✔
511

1✔
512
        let qrect = VectorQueryRectangle {
1✔
513
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
514
            time_interval: TimeInterval::default(),
1✔
515
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
516
        };
1✔
517

518
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
519

520
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
521

522
        assert_eq!(result.len(), 1);
1✔
523
        assert_eq!(
1✔
524
            result[0],
1✔
525
            MultiPointCollection::from_slices(
1✔
526
                &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
527
                &[TimeInterval::default(); 2],
1✔
528
                &[
1✔
529
                    ("count", FeatureData::Int(vec![9, 1])),
1✔
530
                    (
1✔
531
                        "radius",
1✔
532
                        FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
533
                    )
1✔
534
                ],
1✔
535
            )
1✔
536
            .unwrap()
1✔
537
        );
1✔
538
    }
539

540
    #[tokio::test]
1✔
541
    async fn simple_test_with_aggregate() {
1✔
542
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
543
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
544

1✔
545
        let input = MultiPointCollection::from_slices(
1✔
546
            &MultiPoint::many(coordinates).unwrap(),
1✔
547
            &[TimeInterval::default(); 10],
1✔
548
            &[(
1✔
549
                "foo",
1✔
550
                FeatureData::Float(vec![1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]),
1✔
551
            )],
1✔
552
        )
1✔
553
        .unwrap();
1✔
554

1✔
555
        let operator = VisualPointClustering {
1✔
556
            params: VisualPointClusteringParams {
1✔
557
                min_radius_px: 8.,
1✔
558
                delta_px: 1.,
1✔
559
                radius_column: "radius".to_string(),
1✔
560
                count_column: "count".to_string(),
1✔
561
                column_aggregates: [(
1✔
562
                    "foo".to_string(),
1✔
563
                    AttributeAggregateDef {
1✔
564
                        column_name: "bar".to_string(),
1✔
565
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
566
                        measurement: None,
1✔
567
                    },
1✔
568
                )]
1✔
569
                .iter()
1✔
570
                .cloned()
1✔
571
                .collect(),
1✔
572
            },
1✔
573
            sources: SingleVectorSource {
1✔
574
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
575
            },
1✔
576
        };
1✔
577

1✔
578
        let execution_context = MockExecutionContext::test_default();
1✔
579

580
        let initialized_operator = operator
1✔
581
            .boxed()
1✔
582
            .initialize(&execution_context)
1✔
583
            .await
×
584
            .unwrap();
1✔
585

1✔
586
        let query_processor = initialized_operator
1✔
587
            .query_processor()
1✔
588
            .unwrap()
1✔
589
            .multi_point()
1✔
590
            .unwrap();
1✔
591

1✔
592
        let query_context = MockQueryContext::test_default();
1✔
593

1✔
594
        let qrect = VectorQueryRectangle {
1✔
595
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
596
            time_interval: TimeInterval::default(),
1✔
597
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
598
        };
1✔
599

600
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
601

602
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
603

604
        assert_eq!(result.len(), 1);
1✔
605
        assert_eq!(
1✔
606
            result[0],
1✔
607
            MultiPointCollection::from_slices(
1✔
608
                &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
609
                &[TimeInterval::default(); 2],
1✔
610
                &[
1✔
611
                    ("count", FeatureData::Int(vec![9, 1])),
1✔
612
                    (
1✔
613
                        "radius",
1✔
614
                        FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
615
                    ),
1✔
616
                    ("bar", FeatureData::Float(vec![5., 10.]))
1✔
617
                ],
1✔
618
            )
1✔
619
            .unwrap()
1✔
620
        );
1✔
621
    }
622

623
    #[tokio::test]
1✔
624
    async fn aggregate_of_null() {
1✔
625
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
626
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
627

1✔
628
        let input = MultiPointCollection::from_slices(
1✔
629
            &MultiPoint::many(coordinates).unwrap(),
1✔
630
            &[TimeInterval::default(); 4],
1✔
631
            &[(
1✔
632
                "foo",
1✔
633
                FeatureData::NullableInt(vec![Some(1), None, None, None]),
1✔
634
            )],
1✔
635
        )
1✔
636
        .unwrap();
1✔
637

1✔
638
        let operator = VisualPointClustering {
1✔
639
            params: VisualPointClusteringParams {
1✔
640
                min_radius_px: 8.,
1✔
641
                delta_px: 1.,
1✔
642
                radius_column: "radius".to_string(),
1✔
643
                count_column: "count".to_string(),
1✔
644
                column_aggregates: [(
1✔
645
                    "foo".to_string(),
1✔
646
                    AttributeAggregateDef {
1✔
647
                        column_name: "foo".to_string(),
1✔
648
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
649
                        measurement: None,
1✔
650
                    },
1✔
651
                )]
1✔
652
                .iter()
1✔
653
                .cloned()
1✔
654
                .collect(),
1✔
655
            },
1✔
656
            sources: SingleVectorSource {
1✔
657
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
658
            },
1✔
659
        };
1✔
660

1✔
661
        let execution_context = MockExecutionContext::test_default();
1✔
662

663
        let initialized_operator = operator
1✔
664
            .boxed()
1✔
665
            .initialize(&execution_context)
1✔
666
            .await
×
667
            .unwrap();
1✔
668

1✔
669
        let query_processor = initialized_operator
1✔
670
            .query_processor()
1✔
671
            .unwrap()
1✔
672
            .multi_point()
1✔
673
            .unwrap();
1✔
674

1✔
675
        let query_context = MockQueryContext::test_default();
1✔
676

1✔
677
        let qrect = VectorQueryRectangle {
1✔
678
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
679
            time_interval: TimeInterval::default(),
1✔
680
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
681
        };
1✔
682

683
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
684

685
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
686

687
        assert_eq!(result.len(), 1);
1✔
688
        assert_eq!(
1✔
689
            result[0],
1✔
690
            MultiPointCollection::from_slices(
1✔
691
                &[(0.0, 0.1), (50.0, 50.1)],
1✔
692
                &[TimeInterval::default(); 2],
1✔
693
                &[
1✔
694
                    ("count", FeatureData::Int(vec![2, 2])),
1✔
695
                    (
1✔
696
                        "radius",
1✔
697
                        FeatureData::Float(vec![8.693_147_180_559_945, 8.693_147_180_559_945])
1✔
698
                    ),
1✔
699
                    ("foo", FeatureData::NullableFloat(vec![Some(1.), None]))
1✔
700
                ],
1✔
701
            )
1✔
702
            .unwrap()
1✔
703
        );
1✔
704
    }
705

706
    #[tokio::test]
1✔
707
    async fn text_aggregate() {
1✔
708
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
709
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
710
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
711

1✔
712
        let input = MultiPointCollection::from_slices(
1✔
713
            &MultiPoint::many(coordinates).unwrap(),
1✔
714
            &[TimeInterval::default(); 6],
1✔
715
            &[(
1✔
716
                "text",
1✔
717
                FeatureData::NullableText(vec![
1✔
718
                    Some("foo".to_string()),
1✔
719
                    Some("bar".to_string()),
1✔
720
                    Some("foo".to_string()),
1✔
721
                    None,
1✔
722
                    None,
1✔
723
                    None,
1✔
724
                ]),
1✔
725
            )],
1✔
726
        )
1✔
727
        .unwrap();
1✔
728

1✔
729
        let operator = VisualPointClustering {
1✔
730
            params: VisualPointClusteringParams {
1✔
731
                min_radius_px: 8.,
1✔
732
                delta_px: 1.,
1✔
733
                radius_column: "radius".to_string(),
1✔
734
                count_column: "count".to_string(),
1✔
735
                column_aggregates: [(
1✔
736
                    "text".to_string(),
1✔
737
                    AttributeAggregateDef {
1✔
738
                        column_name: "text".to_string(),
1✔
739
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
740
                        measurement: None,
1✔
741
                    },
1✔
742
                )]
1✔
743
                .iter()
1✔
744
                .cloned()
1✔
745
                .collect(),
1✔
746
            },
1✔
747
            sources: SingleVectorSource {
1✔
748
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
749
            },
1✔
750
        };
1✔
751

1✔
752
        let execution_context = MockExecutionContext::test_default();
1✔
753

754
        let initialized_operator = operator
1✔
755
            .boxed()
1✔
756
            .initialize(&execution_context)
1✔
757
            .await
×
758
            .unwrap();
1✔
759

1✔
760
        let query_processor = initialized_operator
1✔
761
            .query_processor()
1✔
762
            .unwrap()
1✔
763
            .multi_point()
1✔
764
            .unwrap();
1✔
765

1✔
766
        let query_context = MockQueryContext::test_default();
1✔
767

1✔
768
        let qrect = VectorQueryRectangle {
1✔
769
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
770
            time_interval: TimeInterval::default(),
1✔
771
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
772
        };
1✔
773

774
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
775

776
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
777

778
        assert_eq!(result.len(), 1);
1✔
779
        assert_eq!(
1✔
780
            result[0],
1✔
781
            MultiPointCollection::from_slices(
1✔
782
                &[(0.0, 0.1), (50.0, 50.1), (25.0, 25.1)],
1✔
783
                &[TimeInterval::default(); 3],
1✔
784
                &[
1✔
785
                    ("count", FeatureData::Int(vec![2, 2, 2])),
1✔
786
                    (
1✔
787
                        "radius",
1✔
788
                        FeatureData::Float(vec![
1✔
789
                            8.693_147_180_559_945,
1✔
790
                            8.693_147_180_559_945,
1✔
791
                            8.693_147_180_559_945
1✔
792
                        ])
1✔
793
                    ),
1✔
794
                    (
1✔
795
                        "text",
1✔
796
                        FeatureData::NullableText(vec![
1✔
797
                            Some("foo, bar".to_string()),
1✔
798
                            Some("foo".to_string()),
1✔
799
                            None
1✔
800
                        ])
1✔
801
                    )
1✔
802
                ],
1✔
803
            )
1✔
804
            .unwrap()
1✔
805
        );
1✔
806
    }
807
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc