• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 17090117584

20 Aug 2025 06:11AM UTC coverage: 88.388%. First build
17090117584

Pull #1073

github

web-flow
Merge eca2827dd into db8685e5e
Pull Request #1073: fix(operators): resolution in visual clustering

30 of 36 new or added lines in 1 file covered. (83.33%)

113482 of 128391 relevant lines covered (88.39%)

229121.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.87
/operators/src/processing/circle_merging_quadtree/operator.rs
1
use std::collections::{HashMap, HashSet};
2

3
use async_trait::async_trait;
4
use futures::StreamExt;
5
use futures::stream::{BoxStream, FuturesUnordered};
6
use geoengine_datatypes::collections::{
7
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
8
};
9
use geoengine_datatypes::primitives::{
10
    BoundingBox2D, Circle, FeatureDataType, FeatureDataValue, Measurement, MultiPoint,
11
    MultiPointAccess, SpatialBounded, VectorQueryRectangle,
12
};
13
use geoengine_datatypes::primitives::{CacheHint, ColumnSelection};
14
use serde::{Deserialize, Serialize};
15
use snafu::ensure;
16

17
use crate::adapters::FeatureCollectionStreamExt;
18
use crate::engine::{
19
    CanonicOperatorName, ExecutionContext, InitializedVectorOperator, Operator, QueryContext,
20
    QueryProcessor, SingleVectorSource, TypedVectorQueryProcessor, VectorColumnInfo,
21
    VectorOperator, VectorQueryProcessor, VectorResultDescriptor,
22
};
23
use crate::engine::{InitializedSources, OperatorName, WorkflowOperatorPath};
24
use crate::error::{self, Error};
25
use crate::processing::circle_merging_quadtree::aggregates::MeanAggregator;
26
use crate::processing::circle_merging_quadtree::circle_of_points::CircleOfPoints;
27
use crate::processing::circle_merging_quadtree::circle_radius_model::LogScaledRadius;
28
use crate::util::Result;
29

30
use super::aggregates::{AttributeAggregate, AttributeAggregateType, StringSampler};
31
use super::circle_radius_model::CircleRadiusModel;
32
use super::grid::Grid;
33
use super::quadtree::CircleMergingQuadtree;
34

35
#[derive(Debug, Serialize, Deserialize, Clone)]
36
#[serde(rename_all = "camelCase")]
37
pub struct VisualPointClusteringParams {
38
    pub min_radius_px: f64,
39
    pub delta_px: f64,
40
    pub resolution: f64,
41
    radius_column: String,
42
    count_column: String,
43
    column_aggregates: HashMap<String, AttributeAggregateDef>,
44
}
45

46
#[derive(Debug, Serialize, Deserialize, Clone)]
47
#[serde(rename_all = "camelCase")]
48
pub struct AttributeAggregateDef {
49
    pub column_name: String,
50
    pub aggregate_type: AttributeAggregateType,
51
    // if measurement is unset, it will be taken from the source result descriptor
52
    pub measurement: Option<Measurement>,
53
}
54

55
pub type VisualPointClustering = Operator<VisualPointClusteringParams, SingleVectorSource>;
56

57
impl OperatorName for VisualPointClustering {
58
    const TYPE_NAME: &'static str = "VisualPointClustering";
59
}
60

61
#[typetag::serde]
×
62
#[async_trait]
63
#[allow(clippy::too_many_lines)]
64
impl VectorOperator for VisualPointClustering {
65
    async fn _initialize(
66
        mut self: Box<Self>,
67
        path: WorkflowOperatorPath,
68
        context: &dyn ExecutionContext,
69
    ) -> Result<Box<dyn InitializedVectorOperator>> {
10✔
70
        ensure!(
5✔
71
            self.params.min_radius_px > 0.0,
5✔
72
            error::InputMustBeGreaterThanZero {
×
73
                scope: "VisualPointClustering",
×
NEW
74
                name: "minRadius"
×
75
            }
×
76
        );
77
        ensure!(
5✔
78
            self.params.delta_px >= 0.0,
5✔
79
            error::InputMustBeZeroOrPositive {
×
80
                scope: "VisualPointClustering",
×
NEW
81
                name: "deltaPx"
×
NEW
82
            }
×
83
        );
84
        ensure!(
5✔
85
            self.params.resolution >= 0.0,
5✔
NEW
86
            error::InputMustBeZeroOrPositive {
×
NEW
87
                scope: "VisualPointClustering",
×
NEW
88
                name: "resolution"
×
89
            }
×
90
        );
91
        ensure!(!self.params.radius_column.is_empty(), error::EmptyInput);
5✔
92
        ensure!(
5✔
93
            self.params.count_column != self.params.radius_column,
5✔
94
            error::DuplicateOutputColumns
×
95
        );
96

97
        let name = CanonicOperatorName::from(&self);
5✔
98

99
        let radius_model = LogScaledRadius::new(self.params.min_radius_px, self.params.delta_px)?;
5✔
100

101
        let initialized_sources = self
5✔
102
            .sources
5✔
103
            .initialize_sources(path.clone(), context)
5✔
104
            .await?;
5✔
105
        let vector_source = initialized_sources.vector;
5✔
106

107
        ensure!(
5✔
108
            vector_source.result_descriptor().data_type == VectorDataType::MultiPoint,
5✔
109
            error::InvalidType {
×
110
                expected: VectorDataType::MultiPoint.to_string(),
×
111
                found: vector_source.result_descriptor().data_type.to_string(),
×
112
            }
×
113
        );
114

115
        // check that all input columns exist
116
        for column_name in self.params.column_aggregates.keys() {
5✔
117
            let column_name_exists = vector_source
4✔
118
                .result_descriptor()
4✔
119
                .columns
4✔
120
                .contains_key(column_name);
4✔
121

122
            ensure!(
4✔
123
                column_name_exists,
4✔
124
                error::MissingInputColumn {
×
125
                    name: column_name.clone()
×
126
                }
×
127
            );
128
        }
129

130
        // check that there are no duplicates in the output columns
131
        let output_names: HashSet<&String> = self
5✔
132
            .params
5✔
133
            .column_aggregates
5✔
134
            .values()
5✔
135
            .map(|def| &def.column_name)
5✔
136
            .collect();
5✔
137
        ensure!(
5✔
138
            output_names.len() == self.params.column_aggregates.len(),
5✔
139
            error::DuplicateOutputColumns
×
140
        );
141

142
        // create schema for [`ResultDescriptor`]
143
        let mut new_columns: HashMap<String, VectorColumnInfo> =
5✔
144
            HashMap::with_capacity(self.params.column_aggregates.len());
5✔
145
        for attribute_aggregate_def in self.params.column_aggregates.values_mut() {
5✔
146
            if attribute_aggregate_def.measurement.is_none() {
4✔
147
                // take it from source measurement
4✔
148

4✔
149
                attribute_aggregate_def.measurement = vector_source
4✔
150
                    .result_descriptor()
4✔
151
                    .column_measurement(&attribute_aggregate_def.column_name)
4✔
152
                    .cloned();
4✔
153
            }
4✔
154

155
            let data_type = match attribute_aggregate_def.aggregate_type {
4✔
156
                AttributeAggregateType::MeanNumber => FeatureDataType::Float,
2✔
157
                AttributeAggregateType::StringSample => FeatureDataType::Text,
2✔
158
                AttributeAggregateType::Null => {
159
                    return Err(Error::InvalidType {
×
160
                        expected: "not null".to_string(),
×
161
                        found: "null".to_string(),
×
162
                    });
×
163
                }
164
            };
165

166
            new_columns.insert(
4✔
167
                attribute_aggregate_def.column_name.clone(),
4✔
168
                VectorColumnInfo {
4✔
169
                    data_type,
4✔
170
                    measurement: attribute_aggregate_def.measurement.clone().into(),
4✔
171
                },
4✔
172
            );
173
        }
174

175
        // check that output schema does not interfere with count and radius columns
176
        ensure!(
5✔
177
            !new_columns.contains_key(&self.params.radius_column)
5✔
178
                && !new_columns.contains_key(&self.params.count_column),
5✔
179
            error::DuplicateOutputColumns
×
180
        );
181

182
        let in_desc = vector_source.result_descriptor();
5✔
183

184
        Ok(InitializedVisualPointClustering {
5✔
185
            name,
5✔
186
            path,
5✔
187
            result_descriptor: VectorResultDescriptor {
5✔
188
                data_type: VectorDataType::MultiPoint,
5✔
189
                spatial_reference: in_desc.spatial_reference,
5✔
190
                columns: new_columns,
5✔
191
                time: in_desc.time,
5✔
192
                bbox: in_desc.bbox,
5✔
193
            },
5✔
194
            vector_source,
5✔
195
            radius_model,
5✔
196
            radius_column: self.params.radius_column,
5✔
197
            count_column: self.params.count_column,
5✔
198
            attribute_mapping: self.params.column_aggregates,
5✔
199
            resolution: self.params.resolution,
5✔
200
        }
5✔
201
        .boxed())
5✔
202
    }
10✔
203

204
    span_fn!(VisualPointClustering);
205
}
206

207
pub struct InitializedVisualPointClustering {
208
    name: CanonicOperatorName,
209
    path: WorkflowOperatorPath,
210
    result_descriptor: VectorResultDescriptor,
211
    vector_source: Box<dyn InitializedVectorOperator>,
212
    radius_model: LogScaledRadius,
213
    radius_column: String,
214
    count_column: String,
215
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
216
    resolution: f64,
217
}
218

219
impl InitializedVectorOperator for InitializedVisualPointClustering {
220
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
221
        match self.vector_source.query_processor()? {
5✔
222
            TypedVectorQueryProcessor::MultiPoint(source) => {
5✔
223
                Ok(TypedVectorQueryProcessor::MultiPoint(
5✔
224
                    VisualPointClusteringProcessor::new(
5✔
225
                        source,
5✔
226
                        self.radius_model,
5✔
227
                        self.radius_column.clone(),
5✔
228
                        self.count_column.clone(),
5✔
229
                        self.result_descriptor.clone(),
5✔
230
                        self.attribute_mapping.clone(),
5✔
231
                        self.resolution,
5✔
232
                    )
5✔
233
                    .boxed(),
5✔
234
                ))
5✔
235
            }
236
            TypedVectorQueryProcessor::MultiLineString(_) => Err(error::Error::InvalidVectorType {
×
237
                expected: "MultiPoint".to_owned(),
×
238
                found: "MultiLineString".to_owned(),
×
239
            }),
×
240
            TypedVectorQueryProcessor::MultiPolygon(_) => Err(error::Error::InvalidVectorType {
×
241
                expected: "MultiPoint".to_owned(),
×
242
                found: "MultiPolygon".to_owned(),
×
243
            }),
×
244
            TypedVectorQueryProcessor::Data(_) => Err(error::Error::InvalidVectorType {
×
245
                expected: "MultiPoint".to_owned(),
×
246
                found: "Data".to_owned(),
×
247
            }),
×
248
        }
249
    }
5✔
250

251
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
252
        &self.result_descriptor
×
253
    }
×
254

255
    fn canonic_name(&self) -> CanonicOperatorName {
×
256
        self.name.clone()
×
257
    }
×
258

259
    fn name(&self) -> &'static str {
×
260
        VisualPointClustering::TYPE_NAME
×
261
    }
×
262

263
    fn path(&self) -> WorkflowOperatorPath {
×
264
        self.path.clone()
×
265
    }
×
266
}
267

268
pub struct VisualPointClusteringProcessor {
269
    source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
270
    radius_model: LogScaledRadius,
271
    radius_column: String,
272
    count_column: String,
273
    result_descriptor: VectorResultDescriptor,
274
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
275
    resolution: f64,
276
}
277

278
impl VisualPointClusteringProcessor {
279
    fn new(
5✔
280
        source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
5✔
281
        radius_model: LogScaledRadius,
5✔
282
        radius_column: String,
5✔
283
        count_column: String,
5✔
284
        result_descriptor: VectorResultDescriptor,
5✔
285
        attribute_mapping: HashMap<String, AttributeAggregateDef>,
5✔
286
        resolution: f64,
5✔
287
    ) -> Self {
5✔
288
        Self {
5✔
289
            source,
5✔
290
            radius_model,
5✔
291
            radius_column,
5✔
292
            count_column,
5✔
293
            result_descriptor,
5✔
294
            attribute_mapping,
5✔
295
            resolution,
5✔
296
        }
5✔
297
    }
5✔
298

299
    fn create_point_collection(
5✔
300
        circles_of_points: impl Iterator<Item = CircleOfPoints>,
5✔
301
        radius_column: &str,
5✔
302
        count_column: &str,
5✔
303
        columns: &HashMap<String, FeatureDataType>,
5✔
304
        cache_hint: CacheHint,
5✔
305
        resolution: f64,
5✔
306
    ) -> Result<MultiPointCollection> {
5✔
307
        let mut builder = MultiPointCollection::builder();
5✔
308

309
        for (column, &data_type) in columns {
9✔
310
            builder.add_column(column.clone(), data_type)?;
4✔
311
        }
312

313
        builder.add_column(radius_column.to_string(), FeatureDataType::Float)?;
5✔
314
        builder.add_column(count_column.to_string(), FeatureDataType::Int)?;
5✔
315

316
        let mut builder = builder.finish_header();
5✔
317

318
        for circle_of_points in circles_of_points {
17✔
319
            builder.push_geometry(MultiPoint::new(vec![circle_of_points.circle.center()])?);
12✔
320
            builder.push_time_interval(circle_of_points.time_aggregate);
12✔
321

322
            builder.push_data(
12✔
323
                radius_column,
12✔
324
                FeatureDataValue::Float(circle_of_points.circle.radius() / resolution),
12✔
325
            )?;
×
326
            builder.push_data(
12✔
327
                count_column,
12✔
328
                FeatureDataValue::Int(circle_of_points.number_of_points() as i64),
12✔
329
            )?;
×
330

331
            for (column, data_type) in circle_of_points.attribute_aggregates {
22✔
332
                match data_type {
10✔
333
                    AttributeAggregate::MeanNumber(mean_aggregator) => {
3✔
334
                        builder
3✔
335
                            .push_data(&column, FeatureDataValue::Float(mean_aggregator.mean))?;
3✔
336
                    }
337
                    AttributeAggregate::StringSample(string_sampler) => {
4✔
338
                        builder.push_data(
4✔
339
                            &column,
4✔
340
                            FeatureDataValue::Text(string_sampler.strings.join(", ")),
4✔
341
                        )?;
×
342
                    }
343
                    AttributeAggregate::Null => {
344
                        builder.push_null(&column)?;
3✔
345
                    }
346
                }
347
            }
348

349
            builder.finish_row();
12✔
350
        }
351

352
        builder.cache_hint(cache_hint);
5✔
353

354
        builder.build().map_err(Into::into)
5✔
355
    }
5✔
356

357
    fn aggregate_from_feature_data(
26✔
358
        feature_data: FeatureDataValue,
26✔
359
        aggregate_type: AttributeAggregateType,
26✔
360
    ) -> AttributeAggregate {
26✔
361
        match (feature_data, aggregate_type) {
26✔
362
            (
363
                FeatureDataValue::Category(value) | FeatureDataValue::NullableCategory(Some(value)),
×
364
                AttributeAggregateType::MeanNumber,
365
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(f64::from(value))),
×
366
            (
367
                FeatureDataValue::Float(value) | FeatureDataValue::NullableFloat(Some(value)),
10✔
368
                AttributeAggregateType::MeanNumber,
369
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value)),
10✔
370
            (
371
                FeatureDataValue::Int(value) | FeatureDataValue::NullableInt(Some(value)),
1✔
372
                AttributeAggregateType::MeanNumber,
373
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value as f64)),
1✔
374
            (
375
                FeatureDataValue::Text(value) | FeatureDataValue::NullableText(Some(value)),
6✔
376
                AttributeAggregateType::StringSample,
377
            ) => AttributeAggregate::StringSample(StringSampler::from_value(value)),
6✔
378
            (
379
                FeatureDataValue::DateTime(value) | FeatureDataValue::NullableDateTime(Some(value)),
×
380
                AttributeAggregateType::StringSample,
381
            ) => AttributeAggregate::StringSample(StringSampler::from_value(value.to_string())),
×
382
            _ => AttributeAggregate::Null,
9✔
383
        }
384
    }
26✔
385
}
386

387
struct GridFoldState {
388
    grid: Grid<LogScaledRadius>,
389
    column_mapping: HashMap<String, AttributeAggregateDef>,
390
    cache_hint: CacheHint,
391
}
392

393
#[async_trait]
394
impl QueryProcessor for VisualPointClusteringProcessor {
395
    type Output = MultiPointCollection;
396
    type SpatialBounds = BoundingBox2D;
397
    type Selection = ColumnSelection;
398
    type ResultDescription = VectorResultDescriptor;
399

400
    async fn _query<'a>(
401
        &'a self,
402
        query: VectorQueryRectangle,
403
        ctx: &'a dyn QueryContext,
404
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
10✔
405
        // we aggregate all points into one collection
406

407
        let column_schema = self
5✔
408
            .result_descriptor
5✔
409
            .columns
5✔
410
            .iter()
5✔
411
            .map(|(name, column_info)| (name.clone(), column_info.data_type))
5✔
412
            .collect();
5✔
413
        let scaled_radius_model = self.radius_model.with_scaled_radii(self.resolution)?;
5✔
414

415
        let initial_grid_fold_state = Result::<GridFoldState>::Ok(GridFoldState {
5✔
416
            grid: Grid::new(query.spatial_bounds().spatial_bounds(), scaled_radius_model),
5✔
417
            column_mapping: self.attribute_mapping.clone(),
5✔
418
            cache_hint: CacheHint::max_duration(),
5✔
419
        });
5✔
420

421
        let grid_future = self.source.query(query.clone(), ctx).await?.fold(
5✔
422
            initial_grid_fold_state,
5✔
423
            |state, feature_collection| async move {
5✔
424
                // TODO: worker thread
425

426
                let GridFoldState {
427
                    mut grid,
5✔
428
                    column_mapping,
5✔
429
                    mut cache_hint,
5✔
430
                } = state?;
5✔
431

432
                let feature_collection = feature_collection?;
5✔
433

434
                for feature in &feature_collection {
41✔
435
                    // TODO: pre-aggregate multi-points differently?
436
                    for coordinate in feature.geometry.points() {
36✔
437
                        let circle =
36✔
438
                            Circle::from_coordinate(coordinate, grid.radius_model().min_radius());
36✔
439

440
                        let mut attribute_aggregates = HashMap::with_capacity(column_mapping.len());
36✔
441

442
                        for (
443
                            src_column,
26✔
444
                            AttributeAggregateDef {
445
                                column_name: tgt_column,
26✔
446
                                aggregate_type,
26✔
447
                                measurement: _,
448
                            },
449
                        ) in &column_mapping
62✔
450
                        {
451
                            let attribute_aggregate =
26✔
452
                                if let Some(feature_data) = feature.get(src_column) {
26✔
453
                                    Self::aggregate_from_feature_data(feature_data, *aggregate_type)
26✔
454
                                } else {
455
                                    AttributeAggregate::Null
×
456
                                };
457

458
                            attribute_aggregates.insert(tgt_column.clone(), attribute_aggregate);
26✔
459
                        }
460

461
                        grid.insert(CircleOfPoints::new_with_one_point(
36✔
462
                            circle,
36✔
463
                            feature.time_interval,
36✔
464
                            attribute_aggregates,
36✔
465
                        ));
466
                    }
467
                }
468

469
                cache_hint.merge_with(&feature_collection.cache_hint);
5✔
470

471
                Ok(GridFoldState {
5✔
472
                    grid,
5✔
473
                    column_mapping,
5✔
474
                    cache_hint,
5✔
475
                })
5✔
476
            },
10✔
477
        );
478

479
        let stream = FuturesUnordered::new();
5✔
480
        stream.push(grid_future);
5✔
481

482
        let stream = stream.map(move |grid| {
5✔
483
            let GridFoldState {
484
                grid,
5✔
485
                column_mapping: _,
486
                cache_hint,
5✔
487
            } = grid?;
5✔
488

489
            let mut cmq = CircleMergingQuadtree::new(
5✔
490
                query.spatial_bounds().spatial_bounds(),
5✔
491
                *grid.radius_model(),
5✔
492
                1,
493
            );
494

495
            // TODO: worker thread
496
            for circle_of_points in grid.drain() {
12✔
497
                cmq.insert_circle(circle_of_points);
12✔
498
            }
12✔
499

500
            Self::create_point_collection(
5✔
501
                cmq.into_iter(),
5✔
502
                &self.radius_column,
5✔
503
                &self.count_column,
5✔
504
                &column_schema,
5✔
505
                cache_hint,
5✔
506
                self.resolution,
5✔
507
            )
508
        });
5✔
509

510
        Ok(stream.merge_chunks(ctx.chunk_byte_size().into()).boxed())
5✔
511
    }
10✔
512

513
    fn result_descriptor(&self) -> &VectorResultDescriptor {
10✔
514
        &self.result_descriptor
10✔
515
    }
10✔
516
}
517

518
#[cfg(test)]
519
mod tests {
520
    use geoengine_datatypes::collections::ChunksEqualIgnoringCacheHint;
521
    use geoengine_datatypes::primitives::BoundingBox2D;
522
    use geoengine_datatypes::primitives::CacheHint;
523
    use geoengine_datatypes::primitives::FeatureData;
524
    use geoengine_datatypes::primitives::TimeInterval;
525
    use geoengine_datatypes::util::test::TestDefault;
526

527
    use crate::{engine::MockExecutionContext, mock::MockFeatureCollectionSource};
528

529
    use super::*;
530

531
    #[tokio::test]
532
    async fn simple_test() {
1✔
533
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
534
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
535

536
        let input = MultiPointCollection::from_data(
1✔
537
            MultiPoint::many(coordinates).unwrap(),
1✔
538
            vec![TimeInterval::default(); 10],
1✔
539
            HashMap::default(),
1✔
540
            CacheHint::default(),
1✔
541
        )
542
        .unwrap();
1✔
543

544
        let operator = VisualPointClustering {
1✔
545
            params: VisualPointClusteringParams {
1✔
546
                min_radius_px: 8.,
1✔
547
                delta_px: 1.,
1✔
548
                resolution: 0.1,
1✔
549
                radius_column: "radius".to_string(),
1✔
550
                count_column: "count".to_string(),
1✔
551
                column_aggregates: Default::default(),
1✔
552
            },
1✔
553
            sources: SingleVectorSource {
1✔
554
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
555
            },
1✔
556
        };
1✔
557

558
        let execution_context = MockExecutionContext::test_default();
1✔
559

560
        let initialized_operator = operator
1✔
561
            .boxed()
1✔
562
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
563
            .await
1✔
564
            .unwrap();
1✔
565

566
        let query_processor = initialized_operator
1✔
567
            .query_processor()
1✔
568
            .unwrap()
1✔
569
            .multi_point()
1✔
570
            .unwrap();
1✔
571

572
        let query_context = execution_context.mock_query_context_test_default();
1✔
573

574
        let qrect = VectorQueryRectangle::new(
1✔
575
            BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
576
            TimeInterval::default(),
1✔
577
            ColumnSelection::all(),
1✔
578
        );
579

580
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
581

582
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
583

584
        assert_eq!(result.len(), 1);
1✔
585
        assert!(
1✔
586
            result[0].chunks_equal_ignoring_cache_hint(
1✔
587
                &MultiPointCollection::from_slices(
1✔
588
                    &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
589
                    &[TimeInterval::default(); 2],
1✔
590
                    &[
1✔
591
                        ("count", FeatureData::Int(vec![9, 1])),
1✔
592
                        (
1✔
593
                            "radius",
1✔
594
                            FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
595
                        )
1✔
596
                    ],
1✔
597
                )
1✔
598
                .unwrap()
1✔
599
            )
1✔
600
        );
1✔
601
    }
1✔
602

603
    #[tokio::test]
604
    async fn simple_test_with_aggregate() {
1✔
605
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
606
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
607

608
        let input = MultiPointCollection::from_slices(
1✔
609
            &MultiPoint::many(coordinates).unwrap(),
1✔
610
            &[TimeInterval::default(); 10],
1✔
611
            &[(
1✔
612
                "foo",
1✔
613
                FeatureData::Float(vec![1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]),
1✔
614
            )],
1✔
615
        )
616
        .unwrap();
1✔
617

618
        let operator = VisualPointClustering {
1✔
619
            params: VisualPointClusteringParams {
1✔
620
                min_radius_px: 8.,
1✔
621
                delta_px: 1.,
1✔
622
                resolution: 0.1,
1✔
623
                radius_column: "radius".to_string(),
1✔
624
                count_column: "count".to_string(),
1✔
625
                column_aggregates: [(
1✔
626
                    "foo".to_string(),
1✔
627
                    AttributeAggregateDef {
1✔
628
                        column_name: "bar".to_string(),
1✔
629
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
630
                        measurement: None,
1✔
631
                    },
1✔
632
                )]
1✔
633
                .iter()
1✔
634
                .cloned()
1✔
635
                .collect(),
1✔
636
            },
1✔
637
            sources: SingleVectorSource {
1✔
638
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
639
            },
1✔
640
        };
1✔
641

642
        let execution_context = MockExecutionContext::test_default();
1✔
643

644
        let initialized_operator = operator
1✔
645
            .boxed()
1✔
646
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
647
            .await
1✔
648
            .unwrap();
1✔
649

650
        let query_processor = initialized_operator
1✔
651
            .query_processor()
1✔
652
            .unwrap()
1✔
653
            .multi_point()
1✔
654
            .unwrap();
1✔
655

656
        let query_context = execution_context.mock_query_context_test_default();
1✔
657

658
        let qrect = VectorQueryRectangle::new(
1✔
659
            BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
660
            TimeInterval::default(),
1✔
661
            ColumnSelection::all(),
1✔
662
        );
663

664
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
665

666
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
667

668
        assert_eq!(result.len(), 1);
1✔
669
        assert!(
1✔
670
            result[0].chunks_equal_ignoring_cache_hint(
1✔
671
                &MultiPointCollection::from_slices(
1✔
672
                    &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
673
                    &[TimeInterval::default(); 2],
1✔
674
                    &[
1✔
675
                        ("count", FeatureData::Int(vec![9, 1])),
1✔
676
                        (
1✔
677
                            "radius",
1✔
678
                            FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
679
                        ),
1✔
680
                        ("bar", FeatureData::Float(vec![5., 10.]))
1✔
681
                    ],
1✔
682
                )
1✔
683
                .unwrap()
1✔
684
            )
1✔
685
        );
1✔
686
    }
1✔
687

688
    #[tokio::test]
689
    async fn aggregate_of_null() {
1✔
690
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
691
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
692

693
        let input = MultiPointCollection::from_slices(
1✔
694
            &MultiPoint::many(coordinates).unwrap(),
1✔
695
            &[TimeInterval::default(); 4],
1✔
696
            &[(
1✔
697
                "foo",
1✔
698
                FeatureData::NullableInt(vec![Some(1), None, None, None]),
1✔
699
            )],
1✔
700
        )
701
        .unwrap();
1✔
702

703
        let operator = VisualPointClustering {
1✔
704
            params: VisualPointClusteringParams {
1✔
705
                min_radius_px: 8.,
1✔
706
                delta_px: 1.,
1✔
707
                resolution: 0.1,
1✔
708
                radius_column: "radius".to_string(),
1✔
709
                count_column: "count".to_string(),
1✔
710
                column_aggregates: [(
1✔
711
                    "foo".to_string(),
1✔
712
                    AttributeAggregateDef {
1✔
713
                        column_name: "foo".to_string(),
1✔
714
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
715
                        measurement: None,
1✔
716
                    },
1✔
717
                )]
1✔
718
                .iter()
1✔
719
                .cloned()
1✔
720
                .collect(),
1✔
721
            },
1✔
722
            sources: SingleVectorSource {
1✔
723
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
724
            },
1✔
725
        };
1✔
726

727
        let execution_context = MockExecutionContext::test_default();
1✔
728

729
        let initialized_operator = operator
1✔
730
            .boxed()
1✔
731
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
732
            .await
1✔
733
            .unwrap();
1✔
734

735
        let query_processor = initialized_operator
1✔
736
            .query_processor()
1✔
737
            .unwrap()
1✔
738
            .multi_point()
1✔
739
            .unwrap();
1✔
740

741
        let query_context = execution_context.mock_query_context_test_default();
1✔
742

743
        let qrect = VectorQueryRectangle::new(
1✔
744
            BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
745
            TimeInterval::default(),
1✔
746
            ColumnSelection::all(),
1✔
747
        );
748

749
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
750

751
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
752

753
        assert_eq!(result.len(), 1);
1✔
754
        assert!(
1✔
755
            result[0].chunks_equal_ignoring_cache_hint(
1✔
756
                &MultiPointCollection::from_slices(
1✔
757
                    &[(0.0, 0.1), (50.0, 50.1)],
1✔
758
                    &[TimeInterval::default(); 2],
1✔
759
                    &[
1✔
760
                        ("count", FeatureData::Int(vec![2, 2])),
1✔
761
                        (
1✔
762
                            "radius",
1✔
763
                            FeatureData::Float(vec![8.693_147_180_559_945, 8.693_147_180_559_945])
1✔
764
                        ),
1✔
765
                        ("foo", FeatureData::NullableFloat(vec![Some(1.), None]))
1✔
766
                    ],
1✔
767
                )
1✔
768
                .unwrap()
1✔
769
            )
1✔
770
        );
1✔
771
    }
1✔
772

773
    #[tokio::test]
774
    async fn text_aggregate() {
1✔
775
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
776
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
777
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
778

779
        let input = MultiPointCollection::from_slices(
1✔
780
            &MultiPoint::many(coordinates).unwrap(),
1✔
781
            &[TimeInterval::default(); 6],
1✔
782
            &[(
1✔
783
                "text",
1✔
784
                FeatureData::NullableText(vec![
1✔
785
                    Some("foo".to_string()),
1✔
786
                    Some("bar".to_string()),
1✔
787
                    Some("foo".to_string()),
1✔
788
                    None,
1✔
789
                    None,
1✔
790
                    None,
1✔
791
                ]),
1✔
792
            )],
1✔
793
        )
794
        .unwrap();
1✔
795

796
        let operator = VisualPointClustering {
1✔
797
            params: VisualPointClusteringParams {
1✔
798
                min_radius_px: 8.,
1✔
799
                delta_px: 1.,
1✔
800
                resolution: 0.1,
1✔
801
                radius_column: "radius".to_string(),
1✔
802
                count_column: "count".to_string(),
1✔
803
                column_aggregates: [(
1✔
804
                    "text".to_string(),
1✔
805
                    AttributeAggregateDef {
1✔
806
                        column_name: "text".to_string(),
1✔
807
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
808
                        measurement: None,
1✔
809
                    },
1✔
810
                )]
1✔
811
                .iter()
1✔
812
                .cloned()
1✔
813
                .collect(),
1✔
814
            },
1✔
815
            sources: SingleVectorSource {
1✔
816
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
817
            },
1✔
818
        };
1✔
819

820
        let execution_context = MockExecutionContext::test_default();
1✔
821

822
        let initialized_operator = operator
1✔
823
            .boxed()
1✔
824
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
825
            .await
1✔
826
            .unwrap();
1✔
827

828
        let query_processor = initialized_operator
1✔
829
            .query_processor()
1✔
830
            .unwrap()
1✔
831
            .multi_point()
1✔
832
            .unwrap();
1✔
833

834
        let query_context = execution_context.mock_query_context_test_default();
1✔
835

836
        let qrect = VectorQueryRectangle::new(
1✔
837
            BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
838
            TimeInterval::default(),
1✔
839
            ColumnSelection::all(),
1✔
840
        );
841

842
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
843

844
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
845

846
        assert_eq!(result.len(), 1);
1✔
847
        assert!(
1✔
848
            result[0].chunks_equal_ignoring_cache_hint(
1✔
849
                &MultiPointCollection::from_slices(
1✔
850
                    &[(0.0, 0.1), (50.0, 50.1), (25.0, 25.1)],
1✔
851
                    &[TimeInterval::default(); 3],
1✔
852
                    &[
1✔
853
                        ("count", FeatureData::Int(vec![2, 2, 2])),
1✔
854
                        (
1✔
855
                            "radius",
1✔
856
                            FeatureData::Float(vec![
1✔
857
                                8.693_147_180_559_945,
1✔
858
                                8.693_147_180_559_945,
1✔
859
                                8.693_147_180_559_945
1✔
860
                            ])
1✔
861
                        ),
1✔
862
                        (
1✔
863
                            "text",
1✔
864
                            FeatureData::NullableText(vec![
1✔
865
                                Some("foo, bar".to_string()),
1✔
866
                                Some("foo".to_string()),
1✔
867
                                None
1✔
868
                            ])
1✔
869
                        )
1✔
870
                    ],
1✔
871
                )
1✔
872
                .unwrap()
1✔
873
            )
1✔
874
        );
1✔
875
    }
1✔
876

877
    #[tokio::test]
878
    async fn it_attaches_cache_hint() {
1✔
879
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
880
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
881
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
882

883
        let mut input = MultiPointCollection::from_slices(
1✔
884
            &MultiPoint::many(coordinates).unwrap(),
1✔
885
            &[TimeInterval::default(); 6],
1✔
886
            &[(
1✔
887
                "text",
1✔
888
                FeatureData::NullableText(vec![
1✔
889
                    Some("foo".to_string()),
1✔
890
                    Some("bar".to_string()),
1✔
891
                    Some("foo".to_string()),
1✔
892
                    None,
1✔
893
                    None,
1✔
894
                    None,
1✔
895
                ]),
1✔
896
            )],
1✔
897
        )
898
        .unwrap();
1✔
899

900
        let cache_hint = CacheHint::seconds(1234);
1✔
901

902
        input.cache_hint = cache_hint;
1✔
903

904
        let operator = VisualPointClustering {
1✔
905
            params: VisualPointClusteringParams {
1✔
906
                min_radius_px: 8.,
1✔
907
                delta_px: 1.,
1✔
908
                resolution: 0.1,
1✔
909
                radius_column: "radius".to_string(),
1✔
910
                count_column: "count".to_string(),
1✔
911
                column_aggregates: [(
1✔
912
                    "text".to_string(),
1✔
913
                    AttributeAggregateDef {
1✔
914
                        column_name: "text".to_string(),
1✔
915
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
916
                        measurement: None,
1✔
917
                    },
1✔
918
                )]
1✔
919
                .iter()
1✔
920
                .cloned()
1✔
921
                .collect(),
1✔
922
            },
1✔
923
            sources: SingleVectorSource {
1✔
924
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
925
            },
1✔
926
        };
1✔
927

928
        let execution_context = MockExecutionContext::test_default();
1✔
929

930
        let initialized_operator = operator
1✔
931
            .boxed()
1✔
932
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
933
            .await
1✔
934
            .unwrap();
1✔
935

936
        let query_processor = initialized_operator
1✔
937
            .query_processor()
1✔
938
            .unwrap()
1✔
939
            .multi_point()
1✔
940
            .unwrap();
1✔
941

942
        let query_context = execution_context.mock_query_context_test_default();
1✔
943

944
        let qrect = VectorQueryRectangle::new(
1✔
945
            BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
946
            TimeInterval::default(),
1✔
947
            ColumnSelection::all(),
1✔
948
        );
949

950
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
951

952
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
953

954
        assert_eq!(result.len(), 1);
1✔
955
        assert_eq!(result[0].cache_hint.expires(), cache_hint.expires());
1✔
956
    }
1✔
957
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc