• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 13719457298

07 Mar 2025 11:11AM UTC coverage: 90.08% (+0.004%) from 90.076%
13719457298

Pull #1026

github

web-flow
Merge d09cf15d1 into c96026921
Pull Request #1026: Ubuntu 24 LTS

2350 of 2476 new or added lines in 108 files covered. (94.91%)

6 existing lines in 4 files now uncovered.

126337 of 140250 relevant lines covered (90.08%)

57391.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.98
/operators/src/processing/circle_merging_quadtree/operator.rs
1
use std::collections::{HashMap, HashSet};
2

3
use async_trait::async_trait;
4
use futures::StreamExt;
5
use futures::stream::{BoxStream, FuturesUnordered};
6
use geoengine_datatypes::collections::{
7
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
8
};
9
use geoengine_datatypes::primitives::{
10
    BoundingBox2D, Circle, FeatureDataType, FeatureDataValue, Measurement, MultiPoint,
11
    MultiPointAccess, VectorQueryRectangle,
12
};
13
use geoengine_datatypes::primitives::{CacheHint, ColumnSelection};
14
use serde::{Deserialize, Serialize};
15
use snafu::ensure;
16

17
use crate::adapters::FeatureCollectionStreamExt;
18
use crate::engine::{
19
    CanonicOperatorName, ExecutionContext, InitializedVectorOperator, Operator, QueryContext,
20
    QueryProcessor, SingleVectorSource, TypedVectorQueryProcessor, VectorColumnInfo,
21
    VectorOperator, VectorQueryProcessor, VectorResultDescriptor,
22
};
23
use crate::engine::{InitializedSources, OperatorName, WorkflowOperatorPath};
24
use crate::error::{self, Error};
25
use crate::processing::circle_merging_quadtree::aggregates::MeanAggregator;
26
use crate::processing::circle_merging_quadtree::circle_of_points::CircleOfPoints;
27
use crate::processing::circle_merging_quadtree::circle_radius_model::LogScaledRadius;
28
use crate::util::Result;
29

30
use super::aggregates::{AttributeAggregate, AttributeAggregateType, StringSampler};
31
use super::circle_radius_model::CircleRadiusModel;
32
use super::grid::Grid;
33
use super::quadtree::CircleMergingQuadtree;
34

35
#[derive(Debug, Serialize, Deserialize, Clone)]
36
#[serde(rename_all = "camelCase")]
37
pub struct VisualPointClusteringParams {
38
    pub min_radius_px: f64,
39
    pub delta_px: f64,
40
    radius_column: String,
41
    count_column: String,
42
    column_aggregates: HashMap<String, AttributeAggregateDef>,
43
}
44

45
#[derive(Debug, Serialize, Deserialize, Clone)]
46
#[serde(rename_all = "camelCase")]
47
pub struct AttributeAggregateDef {
48
    pub column_name: String,
49
    pub aggregate_type: AttributeAggregateType,
50
    // if measurement is unset, it will be taken from the source result descriptor
51
    pub measurement: Option<Measurement>,
52
}
53

54
pub type VisualPointClustering = Operator<VisualPointClusteringParams, SingleVectorSource>;
55

56
impl OperatorName for VisualPointClustering {
57
    const TYPE_NAME: &'static str = "VisualPointClustering";
58
}
59

60
#[typetag::serde]
×
61
#[async_trait]
62
#[allow(clippy::too_many_lines)]
63
impl VectorOperator for VisualPointClustering {
64
    async fn _initialize(
65
        mut self: Box<Self>,
66
        path: WorkflowOperatorPath,
67
        context: &dyn ExecutionContext,
68
    ) -> Result<Box<dyn InitializedVectorOperator>> {
5✔
69
        ensure!(
5✔
70
            self.params.min_radius_px > 0.0,
5✔
71
            error::InputMustBeGreaterThanZero {
×
72
                scope: "VisualPointClustering",
×
73
                name: "min_radius_px"
×
74
            }
×
75
        );
76
        ensure!(
5✔
77
            self.params.delta_px >= 0.0,
5✔
78
            error::InputMustBeZeroOrPositive {
×
79
                scope: "VisualPointClustering",
×
80
                name: "delta_px"
×
81
            }
×
82
        );
83
        ensure!(!self.params.radius_column.is_empty(), error::EmptyInput);
5✔
84
        ensure!(
5✔
85
            self.params.count_column != self.params.radius_column,
5✔
86
            error::DuplicateOutputColumns
×
87
        );
88

89
        let name = CanonicOperatorName::from(&self);
5✔
90

91
        let radius_model = LogScaledRadius::new(self.params.min_radius_px, self.params.delta_px)?;
5✔
92

93
        let initialized_sources = self
5✔
94
            .sources
5✔
95
            .initialize_sources(path.clone(), context)
5✔
96
            .await?;
5✔
97
        let vector_source = initialized_sources.vector;
5✔
98

5✔
99
        ensure!(
5✔
100
            vector_source.result_descriptor().data_type == VectorDataType::MultiPoint,
5✔
101
            error::InvalidType {
×
102
                expected: VectorDataType::MultiPoint.to_string(),
×
103
                found: vector_source.result_descriptor().data_type.to_string(),
×
104
            }
×
105
        );
106

107
        // check that all input columns exist
108
        for column_name in self.params.column_aggregates.keys() {
5✔
109
            let column_name_exists = vector_source
4✔
110
                .result_descriptor()
4✔
111
                .columns
4✔
112
                .contains_key(column_name);
4✔
113

4✔
114
            ensure!(
4✔
115
                column_name_exists,
4✔
116
                error::MissingInputColumn {
×
117
                    name: column_name.clone()
×
118
                }
×
119
            );
120
        }
121

122
        // check that there are no duplicates in the output columns
123
        let output_names: HashSet<&String> = self
5✔
124
            .params
5✔
125
            .column_aggregates
5✔
126
            .values()
5✔
127
            .map(|def| &def.column_name)
5✔
128
            .collect();
5✔
129
        ensure!(
5✔
130
            output_names.len() == self.params.column_aggregates.len(),
5✔
131
            error::DuplicateOutputColumns
×
132
        );
133

134
        // create schema for [`ResultDescriptor`]
135
        let mut new_columns: HashMap<String, VectorColumnInfo> =
5✔
136
            HashMap::with_capacity(self.params.column_aggregates.len());
5✔
137
        for attribute_aggregate_def in self.params.column_aggregates.values_mut() {
5✔
138
            if attribute_aggregate_def.measurement.is_none() {
4✔
139
                // take it from source measurement
4✔
140

4✔
141
                attribute_aggregate_def.measurement = vector_source
4✔
142
                    .result_descriptor()
4✔
143
                    .column_measurement(&attribute_aggregate_def.column_name)
4✔
144
                    .cloned();
4✔
145
            }
4✔
146

147
            let data_type = match attribute_aggregate_def.aggregate_type {
4✔
148
                AttributeAggregateType::MeanNumber => FeatureDataType::Float,
2✔
149
                AttributeAggregateType::StringSample => FeatureDataType::Text,
2✔
150
                AttributeAggregateType::Null => {
151
                    return Err(Error::InvalidType {
×
152
                        expected: "not null".to_string(),
×
153
                        found: "null".to_string(),
×
NEW
154
                    });
×
155
                }
156
            };
157

158
            new_columns.insert(
4✔
159
                attribute_aggregate_def.column_name.clone(),
4✔
160
                VectorColumnInfo {
4✔
161
                    data_type,
4✔
162
                    measurement: attribute_aggregate_def.measurement.clone().into(),
4✔
163
                },
4✔
164
            );
4✔
165
        }
166

167
        // check that output schema does not interfere with count and radius columns
168
        ensure!(
5✔
169
            !new_columns.contains_key(&self.params.radius_column)
5✔
170
                && !new_columns.contains_key(&self.params.count_column),
5✔
171
            error::DuplicateOutputColumns
×
172
        );
173

174
        let in_desc = vector_source.result_descriptor();
5✔
175

5✔
176
        Ok(InitializedVisualPointClustering {
5✔
177
            name,
5✔
178
            path,
5✔
179
            result_descriptor: VectorResultDescriptor {
5✔
180
                data_type: VectorDataType::MultiPoint,
5✔
181
                spatial_reference: in_desc.spatial_reference,
5✔
182
                columns: new_columns,
5✔
183
                time: in_desc.time,
5✔
184
                bbox: in_desc.bbox,
5✔
185
            },
5✔
186
            vector_source,
5✔
187
            radius_model,
5✔
188
            radius_column: self.params.radius_column,
5✔
189
            count_column: self.params.count_column,
5✔
190
            attribute_mapping: self.params.column_aggregates,
5✔
191
        }
5✔
192
        .boxed())
5✔
193
    }
10✔
194

195
    span_fn!(VisualPointClustering);
196
}
197

198
pub struct InitializedVisualPointClustering {
199
    name: CanonicOperatorName,
200
    path: WorkflowOperatorPath,
201
    result_descriptor: VectorResultDescriptor,
202
    vector_source: Box<dyn InitializedVectorOperator>,
203
    radius_model: LogScaledRadius,
204
    radius_column: String,
205
    count_column: String,
206
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
207
}
208

209
impl InitializedVectorOperator for InitializedVisualPointClustering {
210
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
211
        match self.vector_source.query_processor()? {
5✔
212
            TypedVectorQueryProcessor::MultiPoint(source) => {
5✔
213
                Ok(TypedVectorQueryProcessor::MultiPoint(
5✔
214
                    VisualPointClusteringProcessor::new(
5✔
215
                        source,
5✔
216
                        self.radius_model,
5✔
217
                        self.radius_column.clone(),
5✔
218
                        self.count_column.clone(),
5✔
219
                        self.result_descriptor.clone(),
5✔
220
                        self.attribute_mapping.clone(),
5✔
221
                    )
5✔
222
                    .boxed(),
5✔
223
                ))
5✔
224
            }
225
            TypedVectorQueryProcessor::MultiLineString(_) => Err(error::Error::InvalidVectorType {
×
226
                expected: "MultiPoint".to_owned(),
×
227
                found: "MultiLineString".to_owned(),
×
228
            }),
×
229
            TypedVectorQueryProcessor::MultiPolygon(_) => Err(error::Error::InvalidVectorType {
×
230
                expected: "MultiPoint".to_owned(),
×
231
                found: "MultiPolygon".to_owned(),
×
232
            }),
×
233
            TypedVectorQueryProcessor::Data(_) => Err(error::Error::InvalidVectorType {
×
234
                expected: "MultiPoint".to_owned(),
×
235
                found: "Data".to_owned(),
×
236
            }),
×
237
        }
238
    }
5✔
239

240
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
241
        &self.result_descriptor
×
242
    }
×
243

244
    fn canonic_name(&self) -> CanonicOperatorName {
×
245
        self.name.clone()
×
246
    }
×
247

248
    fn name(&self) -> &'static str {
×
249
        VisualPointClustering::TYPE_NAME
×
250
    }
×
251

252
    fn path(&self) -> WorkflowOperatorPath {
×
253
        self.path.clone()
×
254
    }
×
255
}
256

257
pub struct VisualPointClusteringProcessor {
258
    source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
259
    radius_model: LogScaledRadius,
260
    radius_column: String,
261
    count_column: String,
262
    result_descriptor: VectorResultDescriptor,
263
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
264
}
265

266
impl VisualPointClusteringProcessor {
267
    fn new(
5✔
268
        source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
5✔
269
        radius_model: LogScaledRadius,
5✔
270
        radius_column: String,
5✔
271
        count_column: String,
5✔
272
        result_descriptor: VectorResultDescriptor,
5✔
273
        attribute_mapping: HashMap<String, AttributeAggregateDef>,
5✔
274
    ) -> Self {
5✔
275
        Self {
5✔
276
            source,
5✔
277
            radius_model,
5✔
278
            radius_column,
5✔
279
            count_column,
5✔
280
            result_descriptor,
5✔
281
            attribute_mapping,
5✔
282
        }
5✔
283
    }
5✔
284

285
    fn create_point_collection(
5✔
286
        circles_of_points: impl Iterator<Item = CircleOfPoints>,
5✔
287
        radius_column: &str,
5✔
288
        count_column: &str,
5✔
289
        resolution: f64,
5✔
290
        columns: &HashMap<String, FeatureDataType>,
5✔
291
        cache_hint: CacheHint,
5✔
292
    ) -> Result<MultiPointCollection> {
5✔
293
        let mut builder = MultiPointCollection::builder();
5✔
294

295
        for (column, &data_type) in columns {
9✔
296
            builder.add_column(column.clone(), data_type)?;
4✔
297
        }
298

299
        builder.add_column(radius_column.to_string(), FeatureDataType::Float)?;
5✔
300
        builder.add_column(count_column.to_string(), FeatureDataType::Int)?;
5✔
301

302
        let mut builder = builder.finish_header();
5✔
303

304
        for circle_of_points in circles_of_points {
17✔
305
            builder.push_geometry(MultiPoint::new(vec![circle_of_points.circle.center()])?);
12✔
306
            builder.push_time_interval(circle_of_points.time_aggregate);
12✔
307

12✔
308
            builder.push_data(
12✔
309
                radius_column,
12✔
310
                FeatureDataValue::Float(circle_of_points.circle.radius() / resolution),
12✔
311
            )?;
12✔
312
            builder.push_data(
12✔
313
                count_column,
12✔
314
                FeatureDataValue::Int(circle_of_points.number_of_points() as i64),
12✔
315
            )?;
12✔
316

317
            for (column, data_type) in circle_of_points.attribute_aggregates {
22✔
318
                match data_type {
10✔
319
                    AttributeAggregate::MeanNumber(mean_aggregator) => {
3✔
320
                        builder
3✔
321
                            .push_data(&column, FeatureDataValue::Float(mean_aggregator.mean))?;
3✔
322
                    }
323
                    AttributeAggregate::StringSample(string_sampler) => {
4✔
324
                        builder.push_data(
4✔
325
                            &column,
4✔
326
                            FeatureDataValue::Text(string_sampler.strings.join(", ")),
4✔
327
                        )?;
4✔
328
                    }
329
                    AttributeAggregate::Null => {
330
                        builder.push_null(&column)?;
3✔
331
                    }
332
                };
333
            }
334

335
            builder.finish_row();
12✔
336
        }
337

338
        builder.cache_hint(cache_hint);
5✔
339

5✔
340
        builder.build().map_err(Into::into)
5✔
341
    }
5✔
342

343
    fn aggregate_from_feature_data(
26✔
344
        feature_data: FeatureDataValue,
26✔
345
        aggregate_type: AttributeAggregateType,
26✔
346
    ) -> AttributeAggregate {
26✔
347
        match (feature_data, aggregate_type) {
26✔
348
            (
349
                FeatureDataValue::Category(value) | FeatureDataValue::NullableCategory(Some(value)),
×
350
                AttributeAggregateType::MeanNumber,
351
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(f64::from(value))),
×
352
            (
353
                FeatureDataValue::Float(value) | FeatureDataValue::NullableFloat(Some(value)),
10✔
354
                AttributeAggregateType::MeanNumber,
355
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value)),
10✔
356
            (
357
                FeatureDataValue::Int(value) | FeatureDataValue::NullableInt(Some(value)),
1✔
358
                AttributeAggregateType::MeanNumber,
359
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value as f64)),
1✔
360
            (
361
                FeatureDataValue::Text(value) | FeatureDataValue::NullableText(Some(value)),
6✔
362
                AttributeAggregateType::StringSample,
363
            ) => AttributeAggregate::StringSample(StringSampler::from_value(value)),
6✔
364
            _ => AttributeAggregate::Null,
9✔
365
        }
366
    }
26✔
367
}
368

369
struct GridFoldState {
370
    grid: Grid<LogScaledRadius>,
371
    column_mapping: HashMap<String, AttributeAggregateDef>,
372
    cache_hint: CacheHint,
373
}
374

375
#[async_trait]
376
impl QueryProcessor for VisualPointClusteringProcessor {
377
    type Output = MultiPointCollection;
378
    type SpatialBounds = BoundingBox2D;
379
    type Selection = ColumnSelection;
380
    type ResultDescription = VectorResultDescriptor;
381

382
    async fn _query<'a>(
383
        &'a self,
384
        query: VectorQueryRectangle,
385
        ctx: &'a dyn QueryContext,
386
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
5✔
387
        // we aggregate all points into one collection
388

389
        let column_schema = self
5✔
390
            .result_descriptor
5✔
391
            .columns
5✔
392
            .iter()
5✔
393
            .map(|(name, column_info)| (name.clone(), column_info.data_type))
5✔
394
            .collect();
5✔
395

5✔
396
        let joint_resolution = f64::max(query.spatial_resolution.x, query.spatial_resolution.y);
5✔
397
        let scaled_radius_model = self.radius_model.with_scaled_radii(joint_resolution)?;
5✔
398

399
        let initial_grid_fold_state = Result::<GridFoldState>::Ok(GridFoldState {
5✔
400
            grid: Grid::new(query.spatial_bounds, scaled_radius_model),
5✔
401
            column_mapping: self.attribute_mapping.clone(),
5✔
402
            cache_hint: CacheHint::max_duration(),
5✔
403
        });
5✔
404

405
        let grid_future = self.source.query(query.clone(), ctx).await?.fold(
5✔
406
            initial_grid_fold_state,
5✔
407
            |state, feature_collection| async move {
5✔
408
                // TODO: worker thread
409

410
                let GridFoldState {
411
                    mut grid,
5✔
412
                    column_mapping,
5✔
413
                    mut cache_hint,
5✔
414
                } = state?;
5✔
415

416
                let feature_collection = feature_collection?;
5✔
417

418
                for feature in &feature_collection {
41✔
419
                    // TODO: pre-aggregate multi-points differently?
420
                    for coordinate in feature.geometry.points() {
36✔
421
                        let circle =
36✔
422
                            Circle::from_coordinate(coordinate, grid.radius_model().min_radius());
36✔
423

36✔
424
                        let mut attribute_aggregates = HashMap::with_capacity(column_mapping.len());
36✔
425

426
                        for (
427
                            src_column,
26✔
428
                            AttributeAggregateDef {
26✔
429
                                column_name: tgt_column,
26✔
430
                                aggregate_type,
26✔
431
                                measurement: _,
432
                            },
433
                        ) in &column_mapping
62✔
434
                        {
435
                            let attribute_aggregate =
26✔
436
                                if let Some(feature_data) = feature.get(src_column) {
26✔
437
                                    Self::aggregate_from_feature_data(feature_data, *aggregate_type)
26✔
438
                                } else {
439
                                    AttributeAggregate::Null
×
440
                                };
441

442
                            attribute_aggregates.insert(tgt_column.clone(), attribute_aggregate);
26✔
443
                        }
444

445
                        grid.insert(CircleOfPoints::new_with_one_point(
36✔
446
                            circle,
36✔
447
                            feature.time_interval,
36✔
448
                            attribute_aggregates,
36✔
449
                        ));
36✔
450
                    }
451
                }
452

453
                cache_hint.merge_with(&feature_collection.cache_hint);
5✔
454

5✔
455
                Ok(GridFoldState {
5✔
456
                    grid,
5✔
457
                    column_mapping,
5✔
458
                    cache_hint,
5✔
459
                })
5✔
460
            },
10✔
461
        );
5✔
462

5✔
463
        let stream = FuturesUnordered::new();
5✔
464
        stream.push(grid_future);
5✔
465

5✔
466
        let stream = stream.map(move |grid| {
5✔
467
            let GridFoldState {
468
                grid,
5✔
469
                column_mapping: _,
5✔
470
                cache_hint,
5✔
471
            } = grid?;
5✔
472

473
            let mut cmq = CircleMergingQuadtree::new(query.spatial_bounds, *grid.radius_model(), 1);
5✔
474

475
            // TODO: worker thread
476
            for circle_of_points in grid.drain() {
12✔
477
                cmq.insert_circle(circle_of_points);
12✔
478
            }
12✔
479

480
            Self::create_point_collection(
5✔
481
                cmq.into_iter(),
5✔
482
                &self.radius_column,
5✔
483
                &self.count_column,
5✔
484
                joint_resolution,
5✔
485
                &column_schema,
5✔
486
                cache_hint,
5✔
487
            )
5✔
488
        });
5✔
489

5✔
490
        Ok(stream.merge_chunks(ctx.chunk_byte_size().into()).boxed())
5✔
491
    }
10✔
492

493
    fn result_descriptor(&self) -> &VectorResultDescriptor {
10✔
494
        &self.result_descriptor
10✔
495
    }
10✔
496
}
497

498
#[cfg(test)]
499
mod tests {
500
    use geoengine_datatypes::collections::ChunksEqualIgnoringCacheHint;
501
    use geoengine_datatypes::primitives::CacheHint;
502
    use geoengine_datatypes::primitives::FeatureData;
503
    use geoengine_datatypes::primitives::SpatialResolution;
504
    use geoengine_datatypes::primitives::TimeInterval;
505
    use geoengine_datatypes::util::test::TestDefault;
506

507
    use crate::{
508
        engine::{MockExecutionContext, MockQueryContext},
509
        mock::MockFeatureCollectionSource,
510
    };
511

512
    use super::*;
513

514
    #[tokio::test]
515
    async fn simple_test() {
1✔
516
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
517
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
518

1✔
519
        let input = MultiPointCollection::from_data(
1✔
520
            MultiPoint::many(coordinates).unwrap(),
1✔
521
            vec![TimeInterval::default(); 10],
1✔
522
            HashMap::default(),
1✔
523
            CacheHint::default(),
1✔
524
        )
1✔
525
        .unwrap();
1✔
526

1✔
527
        let operator = VisualPointClustering {
1✔
528
            params: VisualPointClusteringParams {
1✔
529
                min_radius_px: 8.,
1✔
530
                delta_px: 1.,
1✔
531
                radius_column: "radius".to_string(),
1✔
532
                count_column: "count".to_string(),
1✔
533
                column_aggregates: Default::default(),
1✔
534
            },
1✔
535
            sources: SingleVectorSource {
1✔
536
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
537
            },
1✔
538
        };
1✔
539

1✔
540
        let execution_context = MockExecutionContext::test_default();
1✔
541

1✔
542
        let initialized_operator = operator
1✔
543
            .boxed()
1✔
544
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
545
            .await
1✔
546
            .unwrap();
1✔
547

1✔
548
        let query_processor = initialized_operator
1✔
549
            .query_processor()
1✔
550
            .unwrap()
1✔
551
            .multi_point()
1✔
552
            .unwrap();
1✔
553

1✔
554
        let query_context = MockQueryContext::test_default();
1✔
555

1✔
556
        let qrect = VectorQueryRectangle {
1✔
557
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
558
            time_interval: TimeInterval::default(),
1✔
559
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
560
            attributes: ColumnSelection::all(),
1✔
561
        };
1✔
562

1✔
563
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
564

1✔
565
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
566

1✔
567
        assert_eq!(result.len(), 1);
1✔
568
        assert!(
1✔
569
            result[0].chunks_equal_ignoring_cache_hint(
1✔
570
                &MultiPointCollection::from_slices(
1✔
571
                    &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
572
                    &[TimeInterval::default(); 2],
1✔
573
                    &[
1✔
574
                        ("count", FeatureData::Int(vec![9, 1])),
1✔
575
                        (
1✔
576
                            "radius",
1✔
577
                            FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
578
                        )
1✔
579
                    ],
1✔
580
                )
1✔
581
                .unwrap()
1✔
582
            )
1✔
583
        );
1✔
584
    }
1✔
585

586
    #[tokio::test]
587
    async fn simple_test_with_aggregate() {
1✔
588
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
589
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
590

1✔
591
        let input = MultiPointCollection::from_slices(
1✔
592
            &MultiPoint::many(coordinates).unwrap(),
1✔
593
            &[TimeInterval::default(); 10],
1✔
594
            &[(
1✔
595
                "foo",
1✔
596
                FeatureData::Float(vec![1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]),
1✔
597
            )],
1✔
598
        )
1✔
599
        .unwrap();
1✔
600

1✔
601
        let operator = VisualPointClustering {
1✔
602
            params: VisualPointClusteringParams {
1✔
603
                min_radius_px: 8.,
1✔
604
                delta_px: 1.,
1✔
605
                radius_column: "radius".to_string(),
1✔
606
                count_column: "count".to_string(),
1✔
607
                column_aggregates: [(
1✔
608
                    "foo".to_string(),
1✔
609
                    AttributeAggregateDef {
1✔
610
                        column_name: "bar".to_string(),
1✔
611
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
612
                        measurement: None,
1✔
613
                    },
1✔
614
                )]
1✔
615
                .iter()
1✔
616
                .cloned()
1✔
617
                .collect(),
1✔
618
            },
1✔
619
            sources: SingleVectorSource {
1✔
620
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
621
            },
1✔
622
        };
1✔
623

1✔
624
        let execution_context = MockExecutionContext::test_default();
1✔
625

1✔
626
        let initialized_operator = operator
1✔
627
            .boxed()
1✔
628
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
629
            .await
1✔
630
            .unwrap();
1✔
631

1✔
632
        let query_processor = initialized_operator
1✔
633
            .query_processor()
1✔
634
            .unwrap()
1✔
635
            .multi_point()
1✔
636
            .unwrap();
1✔
637

1✔
638
        let query_context = MockQueryContext::test_default();
1✔
639

1✔
640
        let qrect = VectorQueryRectangle {
1✔
641
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
642
            time_interval: TimeInterval::default(),
1✔
643
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
644
            attributes: ColumnSelection::all(),
1✔
645
        };
1✔
646

1✔
647
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
648

1✔
649
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
650

1✔
651
        assert_eq!(result.len(), 1);
1✔
652
        assert!(
1✔
653
            result[0].chunks_equal_ignoring_cache_hint(
1✔
654
                &MultiPointCollection::from_slices(
1✔
655
                    &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
656
                    &[TimeInterval::default(); 2],
1✔
657
                    &[
1✔
658
                        ("count", FeatureData::Int(vec![9, 1])),
1✔
659
                        (
1✔
660
                            "radius",
1✔
661
                            FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
662
                        ),
1✔
663
                        ("bar", FeatureData::Float(vec![5., 10.]))
1✔
664
                    ],
1✔
665
                )
1✔
666
                .unwrap()
1✔
667
            )
1✔
668
        );
1✔
669
    }
1✔
670

671
    #[tokio::test]
672
    async fn aggregate_of_null() {
1✔
673
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
674
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
675

1✔
676
        let input = MultiPointCollection::from_slices(
1✔
677
            &MultiPoint::many(coordinates).unwrap(),
1✔
678
            &[TimeInterval::default(); 4],
1✔
679
            &[(
1✔
680
                "foo",
1✔
681
                FeatureData::NullableInt(vec![Some(1), None, None, None]),
1✔
682
            )],
1✔
683
        )
1✔
684
        .unwrap();
1✔
685

1✔
686
        let operator = VisualPointClustering {
1✔
687
            params: VisualPointClusteringParams {
1✔
688
                min_radius_px: 8.,
1✔
689
                delta_px: 1.,
1✔
690
                radius_column: "radius".to_string(),
1✔
691
                count_column: "count".to_string(),
1✔
692
                column_aggregates: [(
1✔
693
                    "foo".to_string(),
1✔
694
                    AttributeAggregateDef {
1✔
695
                        column_name: "foo".to_string(),
1✔
696
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
697
                        measurement: None,
1✔
698
                    },
1✔
699
                )]
1✔
700
                .iter()
1✔
701
                .cloned()
1✔
702
                .collect(),
1✔
703
            },
1✔
704
            sources: SingleVectorSource {
1✔
705
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
706
            },
1✔
707
        };
1✔
708

1✔
709
        let execution_context = MockExecutionContext::test_default();
1✔
710

1✔
711
        let initialized_operator = operator
1✔
712
            .boxed()
1✔
713
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
714
            .await
1✔
715
            .unwrap();
1✔
716

1✔
717
        let query_processor = initialized_operator
1✔
718
            .query_processor()
1✔
719
            .unwrap()
1✔
720
            .multi_point()
1✔
721
            .unwrap();
1✔
722

1✔
723
        let query_context = MockQueryContext::test_default();
1✔
724

1✔
725
        let qrect = VectorQueryRectangle {
1✔
726
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
727
            time_interval: TimeInterval::default(),
1✔
728
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
729
            attributes: ColumnSelection::all(),
1✔
730
        };
1✔
731

1✔
732
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
733

1✔
734
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
735

1✔
736
        assert_eq!(result.len(), 1);
1✔
737
        assert!(
1✔
738
            result[0].chunks_equal_ignoring_cache_hint(
1✔
739
                &MultiPointCollection::from_slices(
1✔
740
                    &[(0.0, 0.1), (50.0, 50.1)],
1✔
741
                    &[TimeInterval::default(); 2],
1✔
742
                    &[
1✔
743
                        ("count", FeatureData::Int(vec![2, 2])),
1✔
744
                        (
1✔
745
                            "radius",
1✔
746
                            FeatureData::Float(vec![8.693_147_180_559_945, 8.693_147_180_559_945])
1✔
747
                        ),
1✔
748
                        ("foo", FeatureData::NullableFloat(vec![Some(1.), None]))
1✔
749
                    ],
1✔
750
                )
1✔
751
                .unwrap()
1✔
752
            )
1✔
753
        );
1✔
754
    }
1✔
755

756
    #[tokio::test]
757
    async fn text_aggregate() {
1✔
758
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
759
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
760
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
761

1✔
762
        let input = MultiPointCollection::from_slices(
1✔
763
            &MultiPoint::many(coordinates).unwrap(),
1✔
764
            &[TimeInterval::default(); 6],
1✔
765
            &[(
1✔
766
                "text",
1✔
767
                FeatureData::NullableText(vec![
1✔
768
                    Some("foo".to_string()),
1✔
769
                    Some("bar".to_string()),
1✔
770
                    Some("foo".to_string()),
1✔
771
                    None,
1✔
772
                    None,
1✔
773
                    None,
1✔
774
                ]),
1✔
775
            )],
1✔
776
        )
1✔
777
        .unwrap();
1✔
778

1✔
779
        let operator = VisualPointClustering {
1✔
780
            params: VisualPointClusteringParams {
1✔
781
                min_radius_px: 8.,
1✔
782
                delta_px: 1.,
1✔
783
                radius_column: "radius".to_string(),
1✔
784
                count_column: "count".to_string(),
1✔
785
                column_aggregates: [(
1✔
786
                    "text".to_string(),
1✔
787
                    AttributeAggregateDef {
1✔
788
                        column_name: "text".to_string(),
1✔
789
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
790
                        measurement: None,
1✔
791
                    },
1✔
792
                )]
1✔
793
                .iter()
1✔
794
                .cloned()
1✔
795
                .collect(),
1✔
796
            },
1✔
797
            sources: SingleVectorSource {
1✔
798
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
799
            },
1✔
800
        };
1✔
801

1✔
802
        let execution_context = MockExecutionContext::test_default();
1✔
803

1✔
804
        let initialized_operator = operator
1✔
805
            .boxed()
1✔
806
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
807
            .await
1✔
808
            .unwrap();
1✔
809

1✔
810
        let query_processor = initialized_operator
1✔
811
            .query_processor()
1✔
812
            .unwrap()
1✔
813
            .multi_point()
1✔
814
            .unwrap();
1✔
815

1✔
816
        let query_context = MockQueryContext::test_default();
1✔
817

1✔
818
        let qrect = VectorQueryRectangle {
1✔
819
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
820
            time_interval: TimeInterval::default(),
1✔
821
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
822
            attributes: ColumnSelection::all(),
1✔
823
        };
1✔
824

1✔
825
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
826

1✔
827
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
828

1✔
829
        assert_eq!(result.len(), 1);
1✔
830
        assert!(
1✔
831
            result[0].chunks_equal_ignoring_cache_hint(
1✔
832
                &MultiPointCollection::from_slices(
1✔
833
                    &[(0.0, 0.1), (50.0, 50.1), (25.0, 25.1)],
1✔
834
                    &[TimeInterval::default(); 3],
1✔
835
                    &[
1✔
836
                        ("count", FeatureData::Int(vec![2, 2, 2])),
1✔
837
                        (
1✔
838
                            "radius",
1✔
839
                            FeatureData::Float(vec![
1✔
840
                                8.693_147_180_559_945,
1✔
841
                                8.693_147_180_559_945,
1✔
842
                                8.693_147_180_559_945
1✔
843
                            ])
1✔
844
                        ),
1✔
845
                        (
1✔
846
                            "text",
1✔
847
                            FeatureData::NullableText(vec![
1✔
848
                                Some("foo, bar".to_string()),
1✔
849
                                Some("foo".to_string()),
1✔
850
                                None
1✔
851
                            ])
1✔
852
                        )
1✔
853
                    ],
1✔
854
                )
1✔
855
                .unwrap()
1✔
856
            )
1✔
857
        );
1✔
858
    }
1✔
859

860
    #[tokio::test]
861
    async fn it_attaches_cache_hint() {
1✔
862
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
863
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
864
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
865

1✔
866
        let mut input = MultiPointCollection::from_slices(
1✔
867
            &MultiPoint::many(coordinates).unwrap(),
1✔
868
            &[TimeInterval::default(); 6],
1✔
869
            &[(
1✔
870
                "text",
1✔
871
                FeatureData::NullableText(vec![
1✔
872
                    Some("foo".to_string()),
1✔
873
                    Some("bar".to_string()),
1✔
874
                    Some("foo".to_string()),
1✔
875
                    None,
1✔
876
                    None,
1✔
877
                    None,
1✔
878
                ]),
1✔
879
            )],
1✔
880
        )
1✔
881
        .unwrap();
1✔
882

1✔
883
        let cache_hint = CacheHint::seconds(1234);
1✔
884

1✔
885
        input.cache_hint = cache_hint;
1✔
886

1✔
887
        let operator = VisualPointClustering {
1✔
888
            params: VisualPointClusteringParams {
1✔
889
                min_radius_px: 8.,
1✔
890
                delta_px: 1.,
1✔
891
                radius_column: "radius".to_string(),
1✔
892
                count_column: "count".to_string(),
1✔
893
                column_aggregates: [(
1✔
894
                    "text".to_string(),
1✔
895
                    AttributeAggregateDef {
1✔
896
                        column_name: "text".to_string(),
1✔
897
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
898
                        measurement: None,
1✔
899
                    },
1✔
900
                )]
1✔
901
                .iter()
1✔
902
                .cloned()
1✔
903
                .collect(),
1✔
904
            },
1✔
905
            sources: SingleVectorSource {
1✔
906
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
907
            },
1✔
908
        };
1✔
909

1✔
910
        let execution_context = MockExecutionContext::test_default();
1✔
911

1✔
912
        let initialized_operator = operator
1✔
913
            .boxed()
1✔
914
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
915
            .await
1✔
916
            .unwrap();
1✔
917

1✔
918
        let query_processor = initialized_operator
1✔
919
            .query_processor()
1✔
920
            .unwrap()
1✔
921
            .multi_point()
1✔
922
            .unwrap();
1✔
923

1✔
924
        let query_context = MockQueryContext::test_default();
1✔
925

1✔
926
        let qrect = VectorQueryRectangle {
1✔
927
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
928
            time_interval: TimeInterval::default(),
1✔
929
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
930
            attributes: ColumnSelection::all(),
1✔
931
        };
1✔
932

1✔
933
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
934

1✔
935
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
936

1✔
937
        assert_eq!(result.len(), 1);
1✔
938
        assert_eq!(result[0].cache_hint.expires(), cache_hint.expires());
1✔
939
    }
1✔
940
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc