• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 16167706152

09 Jul 2025 11:08AM UTC coverage: 88.738% (-1.0%) from 89.762%
16167706152

push

github

web-flow
refactor: Updates-2025-07-02 (#1062)

* rust 1.88

* clippy auto-fixes

* manual clippy fixes

* update deps

* cargo update

* update onnx

* cargo fmt

* update sqlfluff

121 of 142 new or added lines in 29 files covered. (85.21%)

300 existing lines in 88 files now uncovered.

111259 of 125379 relevant lines covered (88.74%)

77910.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.41
/operators/src/processing/circle_merging_quadtree/operator.rs
1
use std::collections::{HashMap, HashSet};
2

3
use async_trait::async_trait;
4
use futures::StreamExt;
5
use futures::stream::{BoxStream, FuturesUnordered};
6
use geoengine_datatypes::collections::{
7
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
8
};
9
use geoengine_datatypes::primitives::{
10
    BoundingBox2D, Circle, FeatureDataType, FeatureDataValue, Measurement, MultiPoint,
11
    MultiPointAccess, VectorQueryRectangle,
12
};
13
use geoengine_datatypes::primitives::{CacheHint, ColumnSelection};
14
use serde::{Deserialize, Serialize};
15
use snafu::ensure;
16

17
use crate::adapters::FeatureCollectionStreamExt;
18
use crate::engine::{
19
    CanonicOperatorName, ExecutionContext, InitializedVectorOperator, Operator, QueryContext,
20
    QueryProcessor, SingleVectorSource, TypedVectorQueryProcessor, VectorColumnInfo,
21
    VectorOperator, VectorQueryProcessor, VectorResultDescriptor,
22
};
23
use crate::engine::{InitializedSources, OperatorName, WorkflowOperatorPath};
24
use crate::error::{self, Error};
25
use crate::processing::circle_merging_quadtree::aggregates::MeanAggregator;
26
use crate::processing::circle_merging_quadtree::circle_of_points::CircleOfPoints;
27
use crate::processing::circle_merging_quadtree::circle_radius_model::LogScaledRadius;
28
use crate::util::Result;
29

30
use super::aggregates::{AttributeAggregate, AttributeAggregateType, StringSampler};
31
use super::circle_radius_model::CircleRadiusModel;
32
use super::grid::Grid;
33
use super::quadtree::CircleMergingQuadtree;
34

35
#[derive(Debug, Serialize, Deserialize, Clone)]
36
#[serde(rename_all = "camelCase")]
37
pub struct VisualPointClusteringParams {
38
    pub min_radius_px: f64,
39
    pub delta_px: f64,
40
    radius_column: String,
41
    count_column: String,
42
    column_aggregates: HashMap<String, AttributeAggregateDef>,
43
}
44

45
#[derive(Debug, Serialize, Deserialize, Clone)]
46
#[serde(rename_all = "camelCase")]
47
pub struct AttributeAggregateDef {
48
    pub column_name: String,
49
    pub aggregate_type: AttributeAggregateType,
50
    // if measurement is unset, it will be taken from the source result descriptor
51
    pub measurement: Option<Measurement>,
52
}
53

54
pub type VisualPointClustering = Operator<VisualPointClusteringParams, SingleVectorSource>;
55

56
impl OperatorName for VisualPointClustering {
57
    const TYPE_NAME: &'static str = "VisualPointClustering";
58
}
59

60
#[typetag::serde]
×
61
#[async_trait]
62
#[allow(clippy::too_many_lines)]
63
impl VectorOperator for VisualPointClustering {
64
    async fn _initialize(
65
        mut self: Box<Self>,
66
        path: WorkflowOperatorPath,
67
        context: &dyn ExecutionContext,
68
    ) -> Result<Box<dyn InitializedVectorOperator>> {
10✔
69
        ensure!(
5✔
70
            self.params.min_radius_px > 0.0,
5✔
71
            error::InputMustBeGreaterThanZero {
×
72
                scope: "VisualPointClustering",
×
73
                name: "min_radius_px"
×
74
            }
×
75
        );
76
        ensure!(
5✔
77
            self.params.delta_px >= 0.0,
5✔
78
            error::InputMustBeZeroOrPositive {
×
79
                scope: "VisualPointClustering",
×
80
                name: "delta_px"
×
81
            }
×
82
        );
83
        ensure!(!self.params.radius_column.is_empty(), error::EmptyInput);
5✔
84
        ensure!(
5✔
85
            self.params.count_column != self.params.radius_column,
5✔
86
            error::DuplicateOutputColumns
×
87
        );
88

89
        let name = CanonicOperatorName::from(&self);
5✔
90

91
        let radius_model = LogScaledRadius::new(self.params.min_radius_px, self.params.delta_px)?;
5✔
92

93
        let initialized_sources = self
5✔
94
            .sources
5✔
95
            .initialize_sources(path.clone(), context)
5✔
96
            .await?;
5✔
97
        let vector_source = initialized_sources.vector;
5✔
98

99
        ensure!(
5✔
100
            vector_source.result_descriptor().data_type == VectorDataType::MultiPoint,
5✔
101
            error::InvalidType {
×
102
                expected: VectorDataType::MultiPoint.to_string(),
×
103
                found: vector_source.result_descriptor().data_type.to_string(),
×
104
            }
×
105
        );
106

107
        // check that all input columns exist
108
        for column_name in self.params.column_aggregates.keys() {
5✔
109
            let column_name_exists = vector_source
4✔
110
                .result_descriptor()
4✔
111
                .columns
4✔
112
                .contains_key(column_name);
4✔
113

114
            ensure!(
4✔
115
                column_name_exists,
4✔
116
                error::MissingInputColumn {
×
117
                    name: column_name.clone()
×
118
                }
×
119
            );
120
        }
121

122
        // check that there are no duplicates in the output columns
123
        let output_names: HashSet<&String> = self
5✔
124
            .params
5✔
125
            .column_aggregates
5✔
126
            .values()
5✔
127
            .map(|def| &def.column_name)
5✔
128
            .collect();
5✔
129
        ensure!(
5✔
130
            output_names.len() == self.params.column_aggregates.len(),
5✔
131
            error::DuplicateOutputColumns
×
132
        );
133

134
        // create schema for [`ResultDescriptor`]
135
        let mut new_columns: HashMap<String, VectorColumnInfo> =
5✔
136
            HashMap::with_capacity(self.params.column_aggregates.len());
5✔
137
        for attribute_aggregate_def in self.params.column_aggregates.values_mut() {
5✔
138
            if attribute_aggregate_def.measurement.is_none() {
4✔
139
                // take it from source measurement
4✔
140

4✔
141
                attribute_aggregate_def.measurement = vector_source
4✔
142
                    .result_descriptor()
4✔
143
                    .column_measurement(&attribute_aggregate_def.column_name)
4✔
144
                    .cloned();
4✔
145
            }
4✔
146

147
            let data_type = match attribute_aggregate_def.aggregate_type {
4✔
148
                AttributeAggregateType::MeanNumber => FeatureDataType::Float,
2✔
149
                AttributeAggregateType::StringSample => FeatureDataType::Text,
2✔
150
                AttributeAggregateType::Null => {
151
                    return Err(Error::InvalidType {
×
152
                        expected: "not null".to_string(),
×
153
                        found: "null".to_string(),
×
154
                    });
×
155
                }
156
            };
157

158
            new_columns.insert(
4✔
159
                attribute_aggregate_def.column_name.clone(),
4✔
160
                VectorColumnInfo {
4✔
161
                    data_type,
4✔
162
                    measurement: attribute_aggregate_def.measurement.clone().into(),
4✔
163
                },
4✔
164
            );
165
        }
166

167
        // check that output schema does not interfere with count and radius columns
168
        ensure!(
5✔
169
            !new_columns.contains_key(&self.params.radius_column)
5✔
170
                && !new_columns.contains_key(&self.params.count_column),
5✔
171
            error::DuplicateOutputColumns
×
172
        );
173

174
        let in_desc = vector_source.result_descriptor();
5✔
175

176
        Ok(InitializedVisualPointClustering {
5✔
177
            name,
5✔
178
            path,
5✔
179
            result_descriptor: VectorResultDescriptor {
5✔
180
                data_type: VectorDataType::MultiPoint,
5✔
181
                spatial_reference: in_desc.spatial_reference,
5✔
182
                columns: new_columns,
5✔
183
                time: in_desc.time,
5✔
184
                bbox: in_desc.bbox,
5✔
185
            },
5✔
186
            vector_source,
5✔
187
            radius_model,
5✔
188
            radius_column: self.params.radius_column,
5✔
189
            count_column: self.params.count_column,
5✔
190
            attribute_mapping: self.params.column_aggregates,
5✔
191
        }
5✔
192
        .boxed())
5✔
193
    }
10✔
194

195
    span_fn!(VisualPointClustering);
196
}
197

198
pub struct InitializedVisualPointClustering {
199
    name: CanonicOperatorName,
200
    path: WorkflowOperatorPath,
201
    result_descriptor: VectorResultDescriptor,
202
    vector_source: Box<dyn InitializedVectorOperator>,
203
    radius_model: LogScaledRadius,
204
    radius_column: String,
205
    count_column: String,
206
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
207
}
208

209
impl InitializedVectorOperator for InitializedVisualPointClustering {
210
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
211
        match self.vector_source.query_processor()? {
5✔
212
            TypedVectorQueryProcessor::MultiPoint(source) => {
5✔
213
                Ok(TypedVectorQueryProcessor::MultiPoint(
5✔
214
                    VisualPointClusteringProcessor::new(
5✔
215
                        source,
5✔
216
                        self.radius_model,
5✔
217
                        self.radius_column.clone(),
5✔
218
                        self.count_column.clone(),
5✔
219
                        self.result_descriptor.clone(),
5✔
220
                        self.attribute_mapping.clone(),
5✔
221
                    )
5✔
222
                    .boxed(),
5✔
223
                ))
5✔
224
            }
225
            TypedVectorQueryProcessor::MultiLineString(_) => Err(error::Error::InvalidVectorType {
×
226
                expected: "MultiPoint".to_owned(),
×
227
                found: "MultiLineString".to_owned(),
×
228
            }),
×
229
            TypedVectorQueryProcessor::MultiPolygon(_) => Err(error::Error::InvalidVectorType {
×
230
                expected: "MultiPoint".to_owned(),
×
231
                found: "MultiPolygon".to_owned(),
×
232
            }),
×
233
            TypedVectorQueryProcessor::Data(_) => Err(error::Error::InvalidVectorType {
×
234
                expected: "MultiPoint".to_owned(),
×
235
                found: "Data".to_owned(),
×
236
            }),
×
237
        }
238
    }
5✔
239

240
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
241
        &self.result_descriptor
×
242
    }
×
243

244
    fn canonic_name(&self) -> CanonicOperatorName {
×
245
        self.name.clone()
×
246
    }
×
247

248
    fn name(&self) -> &'static str {
×
249
        VisualPointClustering::TYPE_NAME
×
250
    }
×
251

252
    fn path(&self) -> WorkflowOperatorPath {
×
253
        self.path.clone()
×
254
    }
×
255
}
256

257
pub struct VisualPointClusteringProcessor {
258
    source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
259
    radius_model: LogScaledRadius,
260
    radius_column: String,
261
    count_column: String,
262
    result_descriptor: VectorResultDescriptor,
263
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
264
}
265

266
impl VisualPointClusteringProcessor {
267
    fn new(
5✔
268
        source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
5✔
269
        radius_model: LogScaledRadius,
5✔
270
        radius_column: String,
5✔
271
        count_column: String,
5✔
272
        result_descriptor: VectorResultDescriptor,
5✔
273
        attribute_mapping: HashMap<String, AttributeAggregateDef>,
5✔
274
    ) -> Self {
5✔
275
        Self {
5✔
276
            source,
5✔
277
            radius_model,
5✔
278
            radius_column,
5✔
279
            count_column,
5✔
280
            result_descriptor,
5✔
281
            attribute_mapping,
5✔
282
        }
5✔
283
    }
5✔
284

285
    fn create_point_collection(
5✔
286
        circles_of_points: impl Iterator<Item = CircleOfPoints>,
5✔
287
        radius_column: &str,
5✔
288
        count_column: &str,
5✔
289
        resolution: f64,
5✔
290
        columns: &HashMap<String, FeatureDataType>,
5✔
291
        cache_hint: CacheHint,
5✔
292
    ) -> Result<MultiPointCollection> {
5✔
293
        let mut builder = MultiPointCollection::builder();
5✔
294

295
        for (column, &data_type) in columns {
9✔
296
            builder.add_column(column.clone(), data_type)?;
4✔
297
        }
298

299
        builder.add_column(radius_column.to_string(), FeatureDataType::Float)?;
5✔
300
        builder.add_column(count_column.to_string(), FeatureDataType::Int)?;
5✔
301

302
        let mut builder = builder.finish_header();
5✔
303

304
        for circle_of_points in circles_of_points {
17✔
305
            builder.push_geometry(MultiPoint::new(vec![circle_of_points.circle.center()])?);
12✔
306
            builder.push_time_interval(circle_of_points.time_aggregate);
12✔
307

308
            builder.push_data(
12✔
309
                radius_column,
12✔
310
                FeatureDataValue::Float(circle_of_points.circle.radius() / resolution),
12✔
UNCOV
311
            )?;
×
312
            builder.push_data(
12✔
313
                count_column,
12✔
314
                FeatureDataValue::Int(circle_of_points.number_of_points() as i64),
12✔
UNCOV
315
            )?;
×
316

317
            for (column, data_type) in circle_of_points.attribute_aggregates {
22✔
318
                match data_type {
10✔
319
                    AttributeAggregate::MeanNumber(mean_aggregator) => {
3✔
320
                        builder
3✔
321
                            .push_data(&column, FeatureDataValue::Float(mean_aggregator.mean))?;
3✔
322
                    }
323
                    AttributeAggregate::StringSample(string_sampler) => {
4✔
324
                        builder.push_data(
4✔
325
                            &column,
4✔
326
                            FeatureDataValue::Text(string_sampler.strings.join(", ")),
4✔
UNCOV
327
                        )?;
×
328
                    }
329
                    AttributeAggregate::Null => {
330
                        builder.push_null(&column)?;
3✔
331
                    }
332
                }
333
            }
334

335
            builder.finish_row();
12✔
336
        }
337

338
        builder.cache_hint(cache_hint);
5✔
339

340
        builder.build().map_err(Into::into)
5✔
341
    }
5✔
342

343
    fn aggregate_from_feature_data(
26✔
344
        feature_data: FeatureDataValue,
26✔
345
        aggregate_type: AttributeAggregateType,
26✔
346
    ) -> AttributeAggregate {
26✔
347
        match (feature_data, aggregate_type) {
26✔
348
            (
349
                FeatureDataValue::Category(value) | FeatureDataValue::NullableCategory(Some(value)),
×
350
                AttributeAggregateType::MeanNumber,
351
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(f64::from(value))),
×
352
            (
353
                FeatureDataValue::Float(value) | FeatureDataValue::NullableFloat(Some(value)),
10✔
354
                AttributeAggregateType::MeanNumber,
355
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value)),
10✔
356
            (
357
                FeatureDataValue::Int(value) | FeatureDataValue::NullableInt(Some(value)),
1✔
358
                AttributeAggregateType::MeanNumber,
359
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value as f64)),
1✔
360
            (
361
                FeatureDataValue::Text(value) | FeatureDataValue::NullableText(Some(value)),
6✔
362
                AttributeAggregateType::StringSample,
363
            ) => AttributeAggregate::StringSample(StringSampler::from_value(value)),
6✔
364
            (
365
                FeatureDataValue::DateTime(value) | FeatureDataValue::NullableDateTime(Some(value)),
×
366
                AttributeAggregateType::StringSample,
367
            ) => AttributeAggregate::StringSample(StringSampler::from_value(value.to_string())),
×
368
            _ => AttributeAggregate::Null,
9✔
369
        }
370
    }
26✔
371
}
372

373
struct GridFoldState {
374
    grid: Grid<LogScaledRadius>,
375
    column_mapping: HashMap<String, AttributeAggregateDef>,
376
    cache_hint: CacheHint,
377
}
378

379
#[async_trait]
380
impl QueryProcessor for VisualPointClusteringProcessor {
381
    type Output = MultiPointCollection;
382
    type SpatialBounds = BoundingBox2D;
383
    type Selection = ColumnSelection;
384
    type ResultDescription = VectorResultDescriptor;
385

386
    async fn _query<'a>(
387
        &'a self,
388
        query: VectorQueryRectangle,
389
        ctx: &'a dyn QueryContext,
390
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
10✔
391
        // we aggregate all points into one collection
392

393
        let column_schema = self
5✔
394
            .result_descriptor
5✔
395
            .columns
5✔
396
            .iter()
5✔
397
            .map(|(name, column_info)| (name.clone(), column_info.data_type))
5✔
398
            .collect();
5✔
399

400
        let joint_resolution = f64::max(query.spatial_resolution.x, query.spatial_resolution.y);
5✔
401
        let scaled_radius_model = self.radius_model.with_scaled_radii(joint_resolution)?;
5✔
402

403
        let initial_grid_fold_state = Result::<GridFoldState>::Ok(GridFoldState {
5✔
404
            grid: Grid::new(query.spatial_bounds, scaled_radius_model),
5✔
405
            column_mapping: self.attribute_mapping.clone(),
5✔
406
            cache_hint: CacheHint::max_duration(),
5✔
407
        });
5✔
408

409
        let grid_future = self.source.query(query.clone(), ctx).await?.fold(
5✔
410
            initial_grid_fold_state,
5✔
411
            |state, feature_collection| async move {
5✔
412
                // TODO: worker thread
413

414
                let GridFoldState {
415
                    mut grid,
5✔
416
                    column_mapping,
5✔
417
                    mut cache_hint,
5✔
418
                } = state?;
5✔
419

420
                let feature_collection = feature_collection?;
5✔
421

422
                for feature in &feature_collection {
41✔
423
                    // TODO: pre-aggregate multi-points differently?
424
                    for coordinate in feature.geometry.points() {
36✔
425
                        let circle =
36✔
426
                            Circle::from_coordinate(coordinate, grid.radius_model().min_radius());
36✔
427

428
                        let mut attribute_aggregates = HashMap::with_capacity(column_mapping.len());
36✔
429

430
                        for (
431
                            src_column,
26✔
432
                            AttributeAggregateDef {
433
                                column_name: tgt_column,
26✔
434
                                aggregate_type,
26✔
435
                                measurement: _,
436
                            },
437
                        ) in &column_mapping
62✔
438
                        {
439
                            let attribute_aggregate =
26✔
440
                                if let Some(feature_data) = feature.get(src_column) {
26✔
441
                                    Self::aggregate_from_feature_data(feature_data, *aggregate_type)
26✔
442
                                } else {
443
                                    AttributeAggregate::Null
×
444
                                };
445

446
                            attribute_aggregates.insert(tgt_column.clone(), attribute_aggregate);
26✔
447
                        }
448

449
                        grid.insert(CircleOfPoints::new_with_one_point(
36✔
450
                            circle,
36✔
451
                            feature.time_interval,
36✔
452
                            attribute_aggregates,
36✔
453
                        ));
454
                    }
455
                }
456

457
                cache_hint.merge_with(&feature_collection.cache_hint);
5✔
458

459
                Ok(GridFoldState {
5✔
460
                    grid,
5✔
461
                    column_mapping,
5✔
462
                    cache_hint,
5✔
463
                })
5✔
464
            },
10✔
465
        );
466

467
        let stream = FuturesUnordered::new();
5✔
468
        stream.push(grid_future);
5✔
469

470
        let stream = stream.map(move |grid| {
5✔
471
            let GridFoldState {
472
                grid,
5✔
473
                column_mapping: _,
474
                cache_hint,
5✔
475
            } = grid?;
5✔
476

477
            let mut cmq = CircleMergingQuadtree::new(query.spatial_bounds, *grid.radius_model(), 1);
5✔
478

479
            // TODO: worker thread
480
            for circle_of_points in grid.drain() {
12✔
481
                cmq.insert_circle(circle_of_points);
12✔
482
            }
12✔
483

484
            Self::create_point_collection(
5✔
485
                cmq.into_iter(),
5✔
486
                &self.radius_column,
5✔
487
                &self.count_column,
5✔
488
                joint_resolution,
5✔
489
                &column_schema,
5✔
490
                cache_hint,
5✔
491
            )
492
        });
5✔
493

494
        Ok(stream.merge_chunks(ctx.chunk_byte_size().into()).boxed())
5✔
495
    }
10✔
496

497
    fn result_descriptor(&self) -> &VectorResultDescriptor {
10✔
498
        &self.result_descriptor
10✔
499
    }
10✔
500
}
501

502
#[cfg(test)]
503
mod tests {
504
    use geoengine_datatypes::collections::ChunksEqualIgnoringCacheHint;
505
    use geoengine_datatypes::primitives::CacheHint;
506
    use geoengine_datatypes::primitives::FeatureData;
507
    use geoengine_datatypes::primitives::SpatialResolution;
508
    use geoengine_datatypes::primitives::TimeInterval;
509
    use geoengine_datatypes::util::test::TestDefault;
510

511
    use crate::{
512
        engine::{MockExecutionContext, MockQueryContext},
513
        mock::MockFeatureCollectionSource,
514
    };
515

516
    use super::*;
517

518
    #[tokio::test]
519
    async fn simple_test() {
1✔
520
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
521
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
522

523
        let input = MultiPointCollection::from_data(
1✔
524
            MultiPoint::many(coordinates).unwrap(),
1✔
525
            vec![TimeInterval::default(); 10],
1✔
526
            HashMap::default(),
1✔
527
            CacheHint::default(),
1✔
528
        )
529
        .unwrap();
1✔
530

531
        let operator = VisualPointClustering {
1✔
532
            params: VisualPointClusteringParams {
1✔
533
                min_radius_px: 8.,
1✔
534
                delta_px: 1.,
1✔
535
                radius_column: "radius".to_string(),
1✔
536
                count_column: "count".to_string(),
1✔
537
                column_aggregates: Default::default(),
1✔
538
            },
1✔
539
            sources: SingleVectorSource {
1✔
540
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
541
            },
1✔
542
        };
1✔
543

544
        let execution_context = MockExecutionContext::test_default();
1✔
545

546
        let initialized_operator = operator
1✔
547
            .boxed()
1✔
548
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
549
            .await
1✔
550
            .unwrap();
1✔
551

552
        let query_processor = initialized_operator
1✔
553
            .query_processor()
1✔
554
            .unwrap()
1✔
555
            .multi_point()
1✔
556
            .unwrap();
1✔
557

558
        let query_context = MockQueryContext::test_default();
1✔
559

560
        let qrect = VectorQueryRectangle {
1✔
561
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
562
            time_interval: TimeInterval::default(),
1✔
563
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
564
            attributes: ColumnSelection::all(),
1✔
565
        };
1✔
566

567
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
568

569
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
570

571
        assert_eq!(result.len(), 1);
1✔
572
        assert!(
1✔
573
            result[0].chunks_equal_ignoring_cache_hint(
1✔
574
                &MultiPointCollection::from_slices(
1✔
575
                    &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
576
                    &[TimeInterval::default(); 2],
1✔
577
                    &[
1✔
578
                        ("count", FeatureData::Int(vec![9, 1])),
1✔
579
                        (
1✔
580
                            "radius",
1✔
581
                            FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
582
                        )
1✔
583
                    ],
1✔
584
                )
1✔
585
                .unwrap()
1✔
586
            )
1✔
587
        );
1✔
588
    }
1✔
589

590
    #[tokio::test]
591
    async fn simple_test_with_aggregate() {
1✔
592
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
593
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
594

595
        let input = MultiPointCollection::from_slices(
1✔
596
            &MultiPoint::many(coordinates).unwrap(),
1✔
597
            &[TimeInterval::default(); 10],
1✔
598
            &[(
1✔
599
                "foo",
1✔
600
                FeatureData::Float(vec![1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]),
1✔
601
            )],
1✔
602
        )
603
        .unwrap();
1✔
604

605
        let operator = VisualPointClustering {
1✔
606
            params: VisualPointClusteringParams {
1✔
607
                min_radius_px: 8.,
1✔
608
                delta_px: 1.,
1✔
609
                radius_column: "radius".to_string(),
1✔
610
                count_column: "count".to_string(),
1✔
611
                column_aggregates: [(
1✔
612
                    "foo".to_string(),
1✔
613
                    AttributeAggregateDef {
1✔
614
                        column_name: "bar".to_string(),
1✔
615
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
616
                        measurement: None,
1✔
617
                    },
1✔
618
                )]
1✔
619
                .iter()
1✔
620
                .cloned()
1✔
621
                .collect(),
1✔
622
            },
1✔
623
            sources: SingleVectorSource {
1✔
624
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
625
            },
1✔
626
        };
1✔
627

628
        let execution_context = MockExecutionContext::test_default();
1✔
629

630
        let initialized_operator = operator
1✔
631
            .boxed()
1✔
632
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
633
            .await
1✔
634
            .unwrap();
1✔
635

636
        let query_processor = initialized_operator
1✔
637
            .query_processor()
1✔
638
            .unwrap()
1✔
639
            .multi_point()
1✔
640
            .unwrap();
1✔
641

642
        let query_context = MockQueryContext::test_default();
1✔
643

644
        let qrect = VectorQueryRectangle {
1✔
645
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
646
            time_interval: TimeInterval::default(),
1✔
647
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
648
            attributes: ColumnSelection::all(),
1✔
649
        };
1✔
650

651
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
652

653
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
654

655
        assert_eq!(result.len(), 1);
1✔
656
        assert!(
1✔
657
            result[0].chunks_equal_ignoring_cache_hint(
1✔
658
                &MultiPointCollection::from_slices(
1✔
659
                    &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
660
                    &[TimeInterval::default(); 2],
1✔
661
                    &[
1✔
662
                        ("count", FeatureData::Int(vec![9, 1])),
1✔
663
                        (
1✔
664
                            "radius",
1✔
665
                            FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
666
                        ),
1✔
667
                        ("bar", FeatureData::Float(vec![5., 10.]))
1✔
668
                    ],
1✔
669
                )
1✔
670
                .unwrap()
1✔
671
            )
1✔
672
        );
1✔
673
    }
1✔
674

675
    #[tokio::test]
676
    async fn aggregate_of_null() {
1✔
677
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
678
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
679

680
        let input = MultiPointCollection::from_slices(
1✔
681
            &MultiPoint::many(coordinates).unwrap(),
1✔
682
            &[TimeInterval::default(); 4],
1✔
683
            &[(
1✔
684
                "foo",
1✔
685
                FeatureData::NullableInt(vec![Some(1), None, None, None]),
1✔
686
            )],
1✔
687
        )
688
        .unwrap();
1✔
689

690
        let operator = VisualPointClustering {
1✔
691
            params: VisualPointClusteringParams {
1✔
692
                min_radius_px: 8.,
1✔
693
                delta_px: 1.,
1✔
694
                radius_column: "radius".to_string(),
1✔
695
                count_column: "count".to_string(),
1✔
696
                column_aggregates: [(
1✔
697
                    "foo".to_string(),
1✔
698
                    AttributeAggregateDef {
1✔
699
                        column_name: "foo".to_string(),
1✔
700
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
701
                        measurement: None,
1✔
702
                    },
1✔
703
                )]
1✔
704
                .iter()
1✔
705
                .cloned()
1✔
706
                .collect(),
1✔
707
            },
1✔
708
            sources: SingleVectorSource {
1✔
709
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
710
            },
1✔
711
        };
1✔
712

713
        let execution_context = MockExecutionContext::test_default();
1✔
714

715
        let initialized_operator = operator
1✔
716
            .boxed()
1✔
717
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
718
            .await
1✔
719
            .unwrap();
1✔
720

721
        let query_processor = initialized_operator
1✔
722
            .query_processor()
1✔
723
            .unwrap()
1✔
724
            .multi_point()
1✔
725
            .unwrap();
1✔
726

727
        let query_context = MockQueryContext::test_default();
1✔
728

729
        let qrect = VectorQueryRectangle {
1✔
730
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
731
            time_interval: TimeInterval::default(),
1✔
732
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
733
            attributes: ColumnSelection::all(),
1✔
734
        };
1✔
735

736
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
737

738
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
739

740
        assert_eq!(result.len(), 1);
1✔
741
        assert!(
1✔
742
            result[0].chunks_equal_ignoring_cache_hint(
1✔
743
                &MultiPointCollection::from_slices(
1✔
744
                    &[(0.0, 0.1), (50.0, 50.1)],
1✔
745
                    &[TimeInterval::default(); 2],
1✔
746
                    &[
1✔
747
                        ("count", FeatureData::Int(vec![2, 2])),
1✔
748
                        (
1✔
749
                            "radius",
1✔
750
                            FeatureData::Float(vec![8.693_147_180_559_945, 8.693_147_180_559_945])
1✔
751
                        ),
1✔
752
                        ("foo", FeatureData::NullableFloat(vec![Some(1.), None]))
1✔
753
                    ],
1✔
754
                )
1✔
755
                .unwrap()
1✔
756
            )
1✔
757
        );
1✔
758
    }
1✔
759

760
    #[tokio::test]
761
    async fn text_aggregate() {
1✔
762
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
763
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
764
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
765

766
        let input = MultiPointCollection::from_slices(
1✔
767
            &MultiPoint::many(coordinates).unwrap(),
1✔
768
            &[TimeInterval::default(); 6],
1✔
769
            &[(
1✔
770
                "text",
1✔
771
                FeatureData::NullableText(vec![
1✔
772
                    Some("foo".to_string()),
1✔
773
                    Some("bar".to_string()),
1✔
774
                    Some("foo".to_string()),
1✔
775
                    None,
1✔
776
                    None,
1✔
777
                    None,
1✔
778
                ]),
1✔
779
            )],
1✔
780
        )
781
        .unwrap();
1✔
782

783
        let operator = VisualPointClustering {
1✔
784
            params: VisualPointClusteringParams {
1✔
785
                min_radius_px: 8.,
1✔
786
                delta_px: 1.,
1✔
787
                radius_column: "radius".to_string(),
1✔
788
                count_column: "count".to_string(),
1✔
789
                column_aggregates: [(
1✔
790
                    "text".to_string(),
1✔
791
                    AttributeAggregateDef {
1✔
792
                        column_name: "text".to_string(),
1✔
793
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
794
                        measurement: None,
1✔
795
                    },
1✔
796
                )]
1✔
797
                .iter()
1✔
798
                .cloned()
1✔
799
                .collect(),
1✔
800
            },
1✔
801
            sources: SingleVectorSource {
1✔
802
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
803
            },
1✔
804
        };
1✔
805

806
        let execution_context = MockExecutionContext::test_default();
1✔
807

808
        let initialized_operator = operator
1✔
809
            .boxed()
1✔
810
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
811
            .await
1✔
812
            .unwrap();
1✔
813

814
        let query_processor = initialized_operator
1✔
815
            .query_processor()
1✔
816
            .unwrap()
1✔
817
            .multi_point()
1✔
818
            .unwrap();
1✔
819

820
        let query_context = MockQueryContext::test_default();
1✔
821

822
        let qrect = VectorQueryRectangle {
1✔
823
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
824
            time_interval: TimeInterval::default(),
1✔
825
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
826
            attributes: ColumnSelection::all(),
1✔
827
        };
1✔
828

829
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
830

831
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
832

833
        assert_eq!(result.len(), 1);
1✔
834
        assert!(
1✔
835
            result[0].chunks_equal_ignoring_cache_hint(
1✔
836
                &MultiPointCollection::from_slices(
1✔
837
                    &[(0.0, 0.1), (50.0, 50.1), (25.0, 25.1)],
1✔
838
                    &[TimeInterval::default(); 3],
1✔
839
                    &[
1✔
840
                        ("count", FeatureData::Int(vec![2, 2, 2])),
1✔
841
                        (
1✔
842
                            "radius",
1✔
843
                            FeatureData::Float(vec![
1✔
844
                                8.693_147_180_559_945,
1✔
845
                                8.693_147_180_559_945,
1✔
846
                                8.693_147_180_559_945
1✔
847
                            ])
1✔
848
                        ),
1✔
849
                        (
1✔
850
                            "text",
1✔
851
                            FeatureData::NullableText(vec![
1✔
852
                                Some("foo, bar".to_string()),
1✔
853
                                Some("foo".to_string()),
1✔
854
                                None
1✔
855
                            ])
1✔
856
                        )
1✔
857
                    ],
1✔
858
                )
1✔
859
                .unwrap()
1✔
860
            )
1✔
861
        );
1✔
862
    }
1✔
863

864
    #[tokio::test]
865
    async fn it_attaches_cache_hint() {
1✔
866
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
867
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
868
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
869

870
        let mut input = MultiPointCollection::from_slices(
1✔
871
            &MultiPoint::many(coordinates).unwrap(),
1✔
872
            &[TimeInterval::default(); 6],
1✔
873
            &[(
1✔
874
                "text",
1✔
875
                FeatureData::NullableText(vec![
1✔
876
                    Some("foo".to_string()),
1✔
877
                    Some("bar".to_string()),
1✔
878
                    Some("foo".to_string()),
1✔
879
                    None,
1✔
880
                    None,
1✔
881
                    None,
1✔
882
                ]),
1✔
883
            )],
1✔
884
        )
885
        .unwrap();
1✔
886

887
        let cache_hint = CacheHint::seconds(1234);
1✔
888

889
        input.cache_hint = cache_hint;
1✔
890

891
        let operator = VisualPointClustering {
1✔
892
            params: VisualPointClusteringParams {
1✔
893
                min_radius_px: 8.,
1✔
894
                delta_px: 1.,
1✔
895
                radius_column: "radius".to_string(),
1✔
896
                count_column: "count".to_string(),
1✔
897
                column_aggregates: [(
1✔
898
                    "text".to_string(),
1✔
899
                    AttributeAggregateDef {
1✔
900
                        column_name: "text".to_string(),
1✔
901
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
902
                        measurement: None,
1✔
903
                    },
1✔
904
                )]
1✔
905
                .iter()
1✔
906
                .cloned()
1✔
907
                .collect(),
1✔
908
            },
1✔
909
            sources: SingleVectorSource {
1✔
910
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
911
            },
1✔
912
        };
1✔
913

914
        let execution_context = MockExecutionContext::test_default();
1✔
915

916
        let initialized_operator = operator
1✔
917
            .boxed()
1✔
918
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
919
            .await
1✔
920
            .unwrap();
1✔
921

922
        let query_processor = initialized_operator
1✔
923
            .query_processor()
1✔
924
            .unwrap()
1✔
925
            .multi_point()
1✔
926
            .unwrap();
1✔
927

928
        let query_context = MockQueryContext::test_default();
1✔
929

930
        let qrect = VectorQueryRectangle {
1✔
931
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
932
            time_interval: TimeInterval::default(),
1✔
933
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
934
            attributes: ColumnSelection::all(),
1✔
935
        };
1✔
936

937
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
938

939
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
940

941
        assert_eq!(result.len(), 1);
1✔
942
        assert_eq!(result[0].cache_hint.expires(), cache_hint.expires());
1✔
943
    }
1✔
944
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc