• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.39
/operators/src/processing/circle_merging_quadtree/operator.rs
1
use std::collections::{HashMap, HashSet};
2

3
use async_trait::async_trait;
4
use futures::stream::{BoxStream, FuturesUnordered};
5
use futures::StreamExt;
6
use geoengine_datatypes::collections::{
7
    BuilderProvider, GeoFeatureCollectionRowBuilder, MultiPointCollection, VectorDataType,
8
};
9
use geoengine_datatypes::primitives::{
10
    BoundingBox2D, Circle, FeatureDataType, FeatureDataValue, Measurement, MultiPoint,
11
    MultiPointAccess, VectorQueryRectangle,
12
};
13
use geoengine_datatypes::primitives::{CacheHint, ColumnSelection};
14
use serde::{Deserialize, Serialize};
15
use snafu::ensure;
16

17
use crate::adapters::FeatureCollectionStreamExt;
18
use crate::engine::{
19
    CanonicOperatorName, ExecutionContext, InitializedVectorOperator, Operator, QueryContext,
20
    QueryProcessor, SingleVectorSource, TypedVectorQueryProcessor, VectorColumnInfo,
21
    VectorOperator, VectorQueryProcessor, VectorResultDescriptor,
22
};
23
use crate::engine::{InitializedSources, OperatorName, WorkflowOperatorPath};
24
use crate::error::{self, Error};
25
use crate::processing::circle_merging_quadtree::aggregates::MeanAggregator;
26
use crate::processing::circle_merging_quadtree::circle_of_points::CircleOfPoints;
27
use crate::processing::circle_merging_quadtree::circle_radius_model::LogScaledRadius;
28
use crate::util::Result;
29

30
use super::aggregates::{AttributeAggregate, AttributeAggregateType, StringSampler};
31
use super::circle_radius_model::CircleRadiusModel;
32
use super::grid::Grid;
33
use super::quadtree::CircleMergingQuadtree;
34

35
#[derive(Debug, Serialize, Deserialize, Clone)]
×
36
#[serde(rename_all = "camelCase")]
37
pub struct VisualPointClusteringParams {
38
    pub min_radius_px: f64,
39
    pub delta_px: f64,
40
    radius_column: String,
41
    count_column: String,
42
    column_aggregates: HashMap<String, AttributeAggregateDef>,
43
}
44

45
#[derive(Debug, Serialize, Deserialize, Clone)]
×
46
#[serde(rename_all = "camelCase")]
47
pub struct AttributeAggregateDef {
48
    pub column_name: String,
49
    pub aggregate_type: AttributeAggregateType,
50
    // if measurement is unset, it will be taken from the source result descriptor
51
    pub measurement: Option<Measurement>,
52
}
53

54
pub type VisualPointClustering = Operator<VisualPointClusteringParams, SingleVectorSource>;
55

56
impl OperatorName for VisualPointClustering {
57
    const TYPE_NAME: &'static str = "VisualPointClustering";
58
}
59

60
#[typetag::serde]
×
61
#[async_trait]
62
#[allow(clippy::too_many_lines)]
63
impl VectorOperator for VisualPointClustering {
64
    async fn _initialize(
65
        mut self: Box<Self>,
66
        path: WorkflowOperatorPath,
67
        context: &dyn ExecutionContext,
68
    ) -> Result<Box<dyn InitializedVectorOperator>> {
5✔
69
        ensure!(
5✔
70
            self.params.min_radius_px > 0.0,
5✔
UNCOV
71
            error::InputMustBeGreaterThanZero {
×
72
                scope: "VisualPointClustering",
×
73
                name: "min_radius_px"
×
74
            }
×
75
        );
76
        ensure!(
5✔
77
            self.params.delta_px >= 0.0,
5✔
UNCOV
78
            error::InputMustBeZeroOrPositive {
×
79
                scope: "VisualPointClustering",
×
80
                name: "delta_px"
×
81
            }
×
82
        );
83
        ensure!(!self.params.radius_column.is_empty(), error::EmptyInput);
5✔
84
        ensure!(
5✔
85
            self.params.count_column != self.params.radius_column,
5✔
UNCOV
86
            error::DuplicateOutputColumns
×
87
        );
88

89
        let name = CanonicOperatorName::from(&self);
5✔
90

91
        let radius_model = LogScaledRadius::new(self.params.min_radius_px, self.params.delta_px)?;
5✔
92

93
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
5✔
94
        let vector_source = initialized_sources.vector;
5✔
95

5✔
96
        ensure!(
5✔
97
            vector_source.result_descriptor().data_type == VectorDataType::MultiPoint,
5✔
UNCOV
98
            error::InvalidType {
×
99
                expected: VectorDataType::MultiPoint.to_string(),
×
100
                found: vector_source.result_descriptor().data_type.to_string(),
×
101
            }
×
102
        );
103

104
        // check that all input columns exist
105
        for column_name in self.params.column_aggregates.keys() {
5✔
106
            let column_name_exists = vector_source
4✔
107
                .result_descriptor()
4✔
108
                .columns
4✔
109
                .contains_key(column_name);
4✔
110

4✔
111
            ensure!(
4✔
112
                column_name_exists,
4✔
UNCOV
113
                error::MissingInputColumn {
×
114
                    name: column_name.clone()
×
115
                }
×
116
            );
117
        }
118

119
        // check that there are no duplicates in the output columns
120
        let output_names: HashSet<&String> = self
5✔
121
            .params
5✔
122
            .column_aggregates
5✔
123
            .values()
5✔
124
            .map(|def| &def.column_name)
5✔
125
            .collect();
5✔
126
        ensure!(
5✔
127
            output_names.len() == self.params.column_aggregates.len(),
5✔
UNCOV
128
            error::DuplicateOutputColumns
×
129
        );
130

131
        // create schema for [`ResultDescriptor`]
132
        let mut new_columns: HashMap<String, VectorColumnInfo> =
5✔
133
            HashMap::with_capacity(self.params.column_aggregates.len());
5✔
134
        for attribute_aggregate_def in self.params.column_aggregates.values_mut() {
5✔
135
            if attribute_aggregate_def.measurement.is_none() {
4✔
136
                // take it from source measurement
4✔
137

4✔
138
                attribute_aggregate_def.measurement = vector_source
4✔
139
                    .result_descriptor()
4✔
140
                    .column_measurement(&attribute_aggregate_def.column_name)
4✔
141
                    .cloned();
4✔
142
            }
4✔
143

144
            let data_type = match attribute_aggregate_def.aggregate_type {
4✔
145
                AttributeAggregateType::MeanNumber => FeatureDataType::Float,
2✔
146
                AttributeAggregateType::StringSample => FeatureDataType::Text,
2✔
147
                AttributeAggregateType::Null => {
UNCOV
148
                    return Err(Error::InvalidType {
×
149
                        expected: "not null".to_string(),
×
150
                        found: "null".to_string(),
×
151
                    })
×
152
                }
153
            };
154

155
            new_columns.insert(
4✔
156
                attribute_aggregate_def.column_name.clone(),
4✔
157
                VectorColumnInfo {
4✔
158
                    data_type,
4✔
159
                    measurement: attribute_aggregate_def.measurement.clone().into(),
4✔
160
                },
4✔
161
            );
4✔
162
        }
163

164
        // check that output schema does not interfere with count and radius columns
165
        ensure!(
5✔
166
            !new_columns.contains_key(&self.params.radius_column)
5✔
167
                && !new_columns.contains_key(&self.params.count_column),
5✔
UNCOV
168
            error::DuplicateOutputColumns
×
169
        );
170

171
        let in_desc = vector_source.result_descriptor();
5✔
172

5✔
173
        Ok(InitializedVisualPointClustering {
5✔
174
            name,
5✔
175
            result_descriptor: VectorResultDescriptor {
5✔
176
                data_type: VectorDataType::MultiPoint,
5✔
177
                spatial_reference: in_desc.spatial_reference,
5✔
178
                columns: new_columns,
5✔
179
                time: in_desc.time,
5✔
180
                bbox: in_desc.bbox,
5✔
181
            },
5✔
182
            vector_source,
5✔
183
            radius_model,
5✔
184
            radius_column: self.params.radius_column,
5✔
185
            count_column: self.params.count_column,
5✔
186
            attribute_mapping: self.params.column_aggregates,
5✔
187
        }
5✔
188
        .boxed())
5✔
189
    }
10✔
190

191
    span_fn!(VisualPointClustering);
192
}
193

194
pub struct InitializedVisualPointClustering {
195
    name: CanonicOperatorName,
196
    result_descriptor: VectorResultDescriptor,
197
    vector_source: Box<dyn InitializedVectorOperator>,
198
    radius_model: LogScaledRadius,
199
    radius_column: String,
200
    count_column: String,
201
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
202
}
203

204
impl InitializedVectorOperator for InitializedVisualPointClustering {
205
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
206
        match self.vector_source.query_processor()? {
5✔
207
            TypedVectorQueryProcessor::MultiPoint(source) => {
5✔
208
                Ok(TypedVectorQueryProcessor::MultiPoint(
5✔
209
                    VisualPointClusteringProcessor::new(
5✔
210
                        source,
5✔
211
                        self.radius_model,
5✔
212
                        self.radius_column.clone(),
5✔
213
                        self.count_column.clone(),
5✔
214
                        self.result_descriptor.clone(),
5✔
215
                        self.attribute_mapping.clone(),
5✔
216
                    )
5✔
217
                    .boxed(),
5✔
218
                ))
5✔
219
            }
220
            TypedVectorQueryProcessor::MultiLineString(_) => Err(error::Error::InvalidVectorType {
×
221
                expected: "MultiPoint".to_owned(),
×
222
                found: "MultiLineString".to_owned(),
×
223
            }),
×
224
            TypedVectorQueryProcessor::MultiPolygon(_) => Err(error::Error::InvalidVectorType {
×
225
                expected: "MultiPoint".to_owned(),
×
226
                found: "MultiPolygon".to_owned(),
×
227
            }),
×
228
            TypedVectorQueryProcessor::Data(_) => Err(error::Error::InvalidVectorType {
×
229
                expected: "MultiPoint".to_owned(),
×
230
                found: "Data".to_owned(),
×
231
            }),
×
232
        }
233
    }
5✔
234

235
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
236
        &self.result_descriptor
×
237
    }
×
238

239
    fn canonic_name(&self) -> CanonicOperatorName {
×
240
        self.name.clone()
×
241
    }
×
242
}
243

244
pub struct VisualPointClusteringProcessor {
245
    source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
246
    radius_model: LogScaledRadius,
247
    radius_column: String,
248
    count_column: String,
249
    result_descriptor: VectorResultDescriptor,
250
    attribute_mapping: HashMap<String, AttributeAggregateDef>,
251
}
252

253
impl VisualPointClusteringProcessor {
254
    fn new(
5✔
255
        source: Box<dyn VectorQueryProcessor<VectorType = MultiPointCollection>>,
5✔
256
        radius_model: LogScaledRadius,
5✔
257
        radius_column: String,
5✔
258
        count_column: String,
5✔
259
        result_descriptor: VectorResultDescriptor,
5✔
260
        attribute_mapping: HashMap<String, AttributeAggregateDef>,
5✔
261
    ) -> Self {
5✔
262
        Self {
5✔
263
            source,
5✔
264
            radius_model,
5✔
265
            radius_column,
5✔
266
            count_column,
5✔
267
            result_descriptor,
5✔
268
            attribute_mapping,
5✔
269
        }
5✔
270
    }
5✔
271

272
    fn create_point_collection(
5✔
273
        circles_of_points: impl Iterator<Item = CircleOfPoints>,
5✔
274
        radius_column: &str,
5✔
275
        count_column: &str,
5✔
276
        resolution: f64,
5✔
277
        columns: &HashMap<String, FeatureDataType>,
5✔
278
        cache_hint: CacheHint,
5✔
279
    ) -> Result<MultiPointCollection> {
5✔
280
        let mut builder = MultiPointCollection::builder();
5✔
281

282
        for (column, &data_type) in columns {
9✔
283
            builder.add_column(column.clone(), data_type)?;
4✔
284
        }
285

286
        builder.add_column(radius_column.to_string(), FeatureDataType::Float)?;
5✔
287
        builder.add_column(count_column.to_string(), FeatureDataType::Int)?;
5✔
288

289
        let mut builder = builder.finish_header();
5✔
290

291
        for circle_of_points in circles_of_points {
17✔
292
            builder.push_geometry(MultiPoint::new(vec![circle_of_points.circle.center()])?);
12✔
293
            builder.push_time_interval(circle_of_points.time_aggregate);
12✔
294

12✔
295
            builder.push_data(
12✔
296
                radius_column,
12✔
297
                FeatureDataValue::Float(circle_of_points.circle.radius() / resolution),
12✔
298
            )?;
12✔
299
            builder.push_data(
12✔
300
                count_column,
12✔
301
                FeatureDataValue::Int(circle_of_points.number_of_points() as i64),
12✔
302
            )?;
12✔
303

304
            for (column, data_type) in circle_of_points.attribute_aggregates {
22✔
305
                match data_type {
10✔
306
                    AttributeAggregate::MeanNumber(mean_aggregator) => {
3✔
307
                        builder
3✔
308
                            .push_data(&column, FeatureDataValue::Float(mean_aggregator.mean))?;
3✔
309
                    }
310
                    AttributeAggregate::StringSample(string_sampler) => {
4✔
311
                        builder.push_data(
4✔
312
                            &column,
4✔
313
                            FeatureDataValue::Text(string_sampler.strings.join(", ")),
4✔
314
                        )?;
4✔
315
                    }
316
                    AttributeAggregate::Null => {
317
                        builder.push_null(&column)?;
3✔
318
                    }
319
                };
320
            }
321

322
            builder.finish_row();
12✔
323
        }
324

325
        builder.cache_hint(cache_hint);
5✔
326

5✔
327
        builder.build().map_err(Into::into)
5✔
328
    }
5✔
329

330
    fn aggregate_from_feature_data(
26✔
331
        feature_data: FeatureDataValue,
26✔
332
        aggregate_type: AttributeAggregateType,
26✔
333
    ) -> AttributeAggregate {
26✔
334
        match (feature_data, aggregate_type) {
26✔
335
            (
336
                FeatureDataValue::Category(value) | FeatureDataValue::NullableCategory(Some(value)),
×
337
                AttributeAggregateType::MeanNumber,
338
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(f64::from(value))),
×
339
            (
340
                FeatureDataValue::Float(value) | FeatureDataValue::NullableFloat(Some(value)),
10✔
341
                AttributeAggregateType::MeanNumber,
342
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value)),
10✔
343
            (
344
                FeatureDataValue::Int(value) | FeatureDataValue::NullableInt(Some(value)),
1✔
345
                AttributeAggregateType::MeanNumber,
346
            ) => AttributeAggregate::MeanNumber(MeanAggregator::from_value(value as f64)),
1✔
347
            (
348
                FeatureDataValue::Text(value) | FeatureDataValue::NullableText(Some(value)),
6✔
349
                AttributeAggregateType::StringSample,
350
            ) => AttributeAggregate::StringSample(StringSampler::from_value(value)),
6✔
351
            _ => AttributeAggregate::Null,
9✔
352
        }
353
    }
26✔
354
}
355

356
struct GridFoldState {
357
    grid: Grid<LogScaledRadius>,
358
    column_mapping: HashMap<String, AttributeAggregateDef>,
359
    cache_hint: CacheHint,
360
}
361

362
#[async_trait]
363
impl QueryProcessor for VisualPointClusteringProcessor {
364
    type Output = MultiPointCollection;
365
    type SpatialBounds = BoundingBox2D;
366
    type Selection = ColumnSelection;
367
    type ResultDescription = VectorResultDescriptor;
368

369
    async fn _query<'a>(
370
        &'a self,
371
        query: VectorQueryRectangle,
372
        ctx: &'a dyn QueryContext,
373
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
5✔
374
        // we aggregate all points into one collection
375

376
        let column_schema = self
5✔
377
            .result_descriptor
5✔
378
            .columns
5✔
379
            .iter()
5✔
380
            .map(|(name, column_info)| (name.clone(), column_info.data_type))
5✔
381
            .collect();
5✔
382

5✔
383
        let joint_resolution = f64::max(query.spatial_resolution.x, query.spatial_resolution.y);
5✔
384
        let scaled_radius_model = self.radius_model.with_scaled_radii(joint_resolution)?;
5✔
385

386
        let initial_grid_fold_state = Result::<GridFoldState>::Ok(GridFoldState {
5✔
387
            grid: Grid::new(query.spatial_bounds, scaled_radius_model),
5✔
388
            column_mapping: self.attribute_mapping.clone(),
5✔
389
            cache_hint: CacheHint::max_duration(),
5✔
390
        });
5✔
391

392
        let grid_future = self.source.query(query.clone(), ctx).await?.fold(
5✔
393
            initial_grid_fold_state,
5✔
394
            |state, feature_collection| async move {
5✔
395
                // TODO: worker thread
396

397
                let GridFoldState {
398
                    mut grid,
5✔
399
                    column_mapping,
5✔
400
                    mut cache_hint,
5✔
401
                } = state?;
5✔
402

403
                let feature_collection = feature_collection?;
5✔
404

405
                for feature in &feature_collection {
41✔
406
                    // TODO: pre-aggregate multi-points differently?
407
                    for coordinate in feature.geometry.points() {
36✔
408
                        let circle =
36✔
409
                            Circle::from_coordinate(coordinate, grid.radius_model().min_radius());
36✔
410

36✔
411
                        let mut attribute_aggregates = HashMap::with_capacity(column_mapping.len());
36✔
412

413
                        for (
414
                            src_column,
26✔
415
                            AttributeAggregateDef {
26✔
416
                                column_name: tgt_column,
26✔
417
                                aggregate_type,
26✔
418
                                measurement: _,
419
                            },
420
                        ) in &column_mapping
62✔
421
                        {
422
                            let attribute_aggregate =
26✔
423
                                if let Some(feature_data) = feature.get(src_column) {
26✔
424
                                    Self::aggregate_from_feature_data(feature_data, *aggregate_type)
26✔
425
                                } else {
UNCOV
426
                                    AttributeAggregate::Null
×
427
                                };
428

429
                            attribute_aggregates.insert(tgt_column.clone(), attribute_aggregate);
26✔
430
                        }
431

432
                        grid.insert(CircleOfPoints::new_with_one_point(
36✔
433
                            circle,
36✔
434
                            feature.time_interval,
36✔
435
                            attribute_aggregates,
36✔
436
                        ));
36✔
437
                    }
438
                }
439

440
                cache_hint.merge_with(&feature_collection.cache_hint);
5✔
441

5✔
442
                Ok(GridFoldState {
5✔
443
                    grid,
5✔
444
                    column_mapping,
5✔
445
                    cache_hint,
5✔
446
                })
5✔
447
            },
10✔
448
        );
5✔
449

5✔
450
        let stream = FuturesUnordered::new();
5✔
451
        stream.push(grid_future);
5✔
452

5✔
453
        let stream = stream.map(move |grid| {
5✔
454
            let GridFoldState {
455
                grid,
5✔
456
                column_mapping: _,
5✔
457
                cache_hint,
5✔
458
            } = grid?;
5✔
459

460
            let mut cmq = CircleMergingQuadtree::new(query.spatial_bounds, *grid.radius_model(), 1);
5✔
461

462
            // TODO: worker thread
463
            for circle_of_points in grid.drain() {
12✔
464
                cmq.insert_circle(circle_of_points);
12✔
465
            }
12✔
466

467
            Self::create_point_collection(
5✔
468
                cmq.into_iter(),
5✔
469
                &self.radius_column,
5✔
470
                &self.count_column,
5✔
471
                joint_resolution,
5✔
472
                &column_schema,
5✔
473
                cache_hint,
5✔
474
            )
5✔
475
        });
5✔
476

5✔
477
        Ok(stream.merge_chunks(ctx.chunk_byte_size().into()).boxed())
5✔
478
    }
10✔
479

480
    fn result_descriptor(&self) -> &VectorResultDescriptor {
10✔
481
        &self.result_descriptor
10✔
482
    }
10✔
483
}
484

485
#[cfg(test)]
486
mod tests {
487
    use geoengine_datatypes::collections::ChunksEqualIgnoringCacheHint;
488
    use geoengine_datatypes::primitives::CacheHint;
489
    use geoengine_datatypes::primitives::FeatureData;
490
    use geoengine_datatypes::primitives::SpatialResolution;
491
    use geoengine_datatypes::primitives::TimeInterval;
492
    use geoengine_datatypes::util::test::TestDefault;
493

494
    use crate::{
495
        engine::{MockExecutionContext, MockQueryContext},
496
        mock::MockFeatureCollectionSource,
497
    };
498

499
    use super::*;
500

501
    #[tokio::test]
502
    async fn simple_test() {
1✔
503
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
504
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
505

1✔
506
        let input = MultiPointCollection::from_data(
1✔
507
            MultiPoint::many(coordinates).unwrap(),
1✔
508
            vec![TimeInterval::default(); 10],
1✔
509
            HashMap::default(),
1✔
510
            CacheHint::default(),
1✔
511
        )
1✔
512
        .unwrap();
1✔
513

1✔
514
        let operator = VisualPointClustering {
1✔
515
            params: VisualPointClusteringParams {
1✔
516
                min_radius_px: 8.,
1✔
517
                delta_px: 1.,
1✔
518
                radius_column: "radius".to_string(),
1✔
519
                count_column: "count".to_string(),
1✔
520
                column_aggregates: Default::default(),
1✔
521
            },
1✔
522
            sources: SingleVectorSource {
1✔
523
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
524
            },
1✔
525
        };
1✔
526

1✔
527
        let execution_context = MockExecutionContext::test_default();
1✔
528

1✔
529
        let initialized_operator = operator
1✔
530
            .boxed()
1✔
531
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
532
            .await
1✔
533
            .unwrap();
1✔
534

1✔
535
        let query_processor = initialized_operator
1✔
536
            .query_processor()
1✔
537
            .unwrap()
1✔
538
            .multi_point()
1✔
539
            .unwrap();
1✔
540

1✔
541
        let query_context = MockQueryContext::test_default();
1✔
542

1✔
543
        let qrect = VectorQueryRectangle {
1✔
544
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
545
            time_interval: TimeInterval::default(),
1✔
546
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
547
            attributes: ColumnSelection::all(),
1✔
548
        };
1✔
549

1✔
550
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
551

1✔
552
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
553

1✔
554
        assert_eq!(result.len(), 1);
1✔
555
        assert!(result[0].chunks_equal_ignoring_cache_hint(
1✔
556
            &MultiPointCollection::from_slices(
1✔
557
                &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
558
                &[TimeInterval::default(); 2],
1✔
559
                &[
1✔
560
                    ("count", FeatureData::Int(vec![9, 1])),
1✔
561
                    (
1✔
562
                        "radius",
1✔
563
                        FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
564
                    )
1✔
565
                ],
1✔
566
            )
1✔
567
            .unwrap()
1✔
568
        ));
1✔
569
    }
1✔
570

571
    #[tokio::test]
572
    async fn simple_test_with_aggregate() {
1✔
573
        let mut coordinates = vec![(0.0, 0.1); 9];
1✔
574
        coordinates.extend_from_slice(&[(50.0, 50.1)]);
1✔
575

1✔
576
        let input = MultiPointCollection::from_slices(
1✔
577
            &MultiPoint::many(coordinates).unwrap(),
1✔
578
            &[TimeInterval::default(); 10],
1✔
579
            &[(
1✔
580
                "foo",
1✔
581
                FeatureData::Float(vec![1., 2., 3., 4., 5., 6., 7., 8., 9., 10.]),
1✔
582
            )],
1✔
583
        )
1✔
584
        .unwrap();
1✔
585

1✔
586
        let operator = VisualPointClustering {
1✔
587
            params: VisualPointClusteringParams {
1✔
588
                min_radius_px: 8.,
1✔
589
                delta_px: 1.,
1✔
590
                radius_column: "radius".to_string(),
1✔
591
                count_column: "count".to_string(),
1✔
592
                column_aggregates: [(
1✔
593
                    "foo".to_string(),
1✔
594
                    AttributeAggregateDef {
1✔
595
                        column_name: "bar".to_string(),
1✔
596
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
597
                        measurement: None,
1✔
598
                    },
1✔
599
                )]
1✔
600
                .iter()
1✔
601
                .cloned()
1✔
602
                .collect(),
1✔
603
            },
1✔
604
            sources: SingleVectorSource {
1✔
605
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
606
            },
1✔
607
        };
1✔
608

1✔
609
        let execution_context = MockExecutionContext::test_default();
1✔
610

1✔
611
        let initialized_operator = operator
1✔
612
            .boxed()
1✔
613
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
614
            .await
1✔
615
            .unwrap();
1✔
616

1✔
617
        let query_processor = initialized_operator
1✔
618
            .query_processor()
1✔
619
            .unwrap()
1✔
620
            .multi_point()
1✔
621
            .unwrap();
1✔
622

1✔
623
        let query_context = MockQueryContext::test_default();
1✔
624

1✔
625
        let qrect = VectorQueryRectangle {
1✔
626
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
627
            time_interval: TimeInterval::default(),
1✔
628
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
629
            attributes: ColumnSelection::all(),
1✔
630
        };
1✔
631

1✔
632
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
633

1✔
634
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
635

1✔
636
        assert_eq!(result.len(), 1);
1✔
637
        assert!(result[0].chunks_equal_ignoring_cache_hint(
1✔
638
            &MultiPointCollection::from_slices(
1✔
639
                &[(0.0, 0.099_999_999_999_999_99), (50.0, 50.1)],
1✔
640
                &[TimeInterval::default(); 2],
1✔
641
                &[
1✔
642
                    ("count", FeatureData::Int(vec![9, 1])),
1✔
643
                    (
1✔
644
                        "radius",
1✔
645
                        FeatureData::Float(vec![10.197_224_577_336_218, 8.])
1✔
646
                    ),
1✔
647
                    ("bar", FeatureData::Float(vec![5., 10.]))
1✔
648
                ],
1✔
649
            )
1✔
650
            .unwrap()
1✔
651
        ));
1✔
652
    }
1✔
653

654
    #[tokio::test]
655
    async fn aggregate_of_null() {
1✔
656
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
657
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
658

1✔
659
        let input = MultiPointCollection::from_slices(
1✔
660
            &MultiPoint::many(coordinates).unwrap(),
1✔
661
            &[TimeInterval::default(); 4],
1✔
662
            &[(
1✔
663
                "foo",
1✔
664
                FeatureData::NullableInt(vec![Some(1), None, None, None]),
1✔
665
            )],
1✔
666
        )
1✔
667
        .unwrap();
1✔
668

1✔
669
        let operator = VisualPointClustering {
1✔
670
            params: VisualPointClusteringParams {
1✔
671
                min_radius_px: 8.,
1✔
672
                delta_px: 1.,
1✔
673
                radius_column: "radius".to_string(),
1✔
674
                count_column: "count".to_string(),
1✔
675
                column_aggregates: [(
1✔
676
                    "foo".to_string(),
1✔
677
                    AttributeAggregateDef {
1✔
678
                        column_name: "foo".to_string(),
1✔
679
                        aggregate_type: AttributeAggregateType::MeanNumber,
1✔
680
                        measurement: None,
1✔
681
                    },
1✔
682
                )]
1✔
683
                .iter()
1✔
684
                .cloned()
1✔
685
                .collect(),
1✔
686
            },
1✔
687
            sources: SingleVectorSource {
1✔
688
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
689
            },
1✔
690
        };
1✔
691

1✔
692
        let execution_context = MockExecutionContext::test_default();
1✔
693

1✔
694
        let initialized_operator = operator
1✔
695
            .boxed()
1✔
696
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
697
            .await
1✔
698
            .unwrap();
1✔
699

1✔
700
        let query_processor = initialized_operator
1✔
701
            .query_processor()
1✔
702
            .unwrap()
1✔
703
            .multi_point()
1✔
704
            .unwrap();
1✔
705

1✔
706
        let query_context = MockQueryContext::test_default();
1✔
707

1✔
708
        let qrect = VectorQueryRectangle {
1✔
709
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
710
            time_interval: TimeInterval::default(),
1✔
711
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
712
            attributes: ColumnSelection::all(),
1✔
713
        };
1✔
714

1✔
715
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
716

1✔
717
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
718

1✔
719
        assert_eq!(result.len(), 1);
1✔
720
        assert!(result[0].chunks_equal_ignoring_cache_hint(
1✔
721
            &MultiPointCollection::from_slices(
1✔
722
                &[(0.0, 0.1), (50.0, 50.1)],
1✔
723
                &[TimeInterval::default(); 2],
1✔
724
                &[
1✔
725
                    ("count", FeatureData::Int(vec![2, 2])),
1✔
726
                    (
1✔
727
                        "radius",
1✔
728
                        FeatureData::Float(vec![8.693_147_180_559_945, 8.693_147_180_559_945])
1✔
729
                    ),
1✔
730
                    ("foo", FeatureData::NullableFloat(vec![Some(1.), None]))
1✔
731
                ],
1✔
732
            )
1✔
733
            .unwrap()
1✔
734
        ));
1✔
735
    }
1✔
736

737
    #[tokio::test]
738
    async fn text_aggregate() {
1✔
739
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
740
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
741
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
742

1✔
743
        let input = MultiPointCollection::from_slices(
1✔
744
            &MultiPoint::many(coordinates).unwrap(),
1✔
745
            &[TimeInterval::default(); 6],
1✔
746
            &[(
1✔
747
                "text",
1✔
748
                FeatureData::NullableText(vec![
1✔
749
                    Some("foo".to_string()),
1✔
750
                    Some("bar".to_string()),
1✔
751
                    Some("foo".to_string()),
1✔
752
                    None,
1✔
753
                    None,
1✔
754
                    None,
1✔
755
                ]),
1✔
756
            )],
1✔
757
        )
1✔
758
        .unwrap();
1✔
759

1✔
760
        let operator = VisualPointClustering {
1✔
761
            params: VisualPointClusteringParams {
1✔
762
                min_radius_px: 8.,
1✔
763
                delta_px: 1.,
1✔
764
                radius_column: "radius".to_string(),
1✔
765
                count_column: "count".to_string(),
1✔
766
                column_aggregates: [(
1✔
767
                    "text".to_string(),
1✔
768
                    AttributeAggregateDef {
1✔
769
                        column_name: "text".to_string(),
1✔
770
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
771
                        measurement: None,
1✔
772
                    },
1✔
773
                )]
1✔
774
                .iter()
1✔
775
                .cloned()
1✔
776
                .collect(),
1✔
777
            },
1✔
778
            sources: SingleVectorSource {
1✔
779
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
780
            },
1✔
781
        };
1✔
782

1✔
783
        let execution_context = MockExecutionContext::test_default();
1✔
784

1✔
785
        let initialized_operator = operator
1✔
786
            .boxed()
1✔
787
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
788
            .await
1✔
789
            .unwrap();
1✔
790

1✔
791
        let query_processor = initialized_operator
1✔
792
            .query_processor()
1✔
793
            .unwrap()
1✔
794
            .multi_point()
1✔
795
            .unwrap();
1✔
796

1✔
797
        let query_context = MockQueryContext::test_default();
1✔
798

1✔
799
        let qrect = VectorQueryRectangle {
1✔
800
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
801
            time_interval: TimeInterval::default(),
1✔
802
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
803
            attributes: ColumnSelection::all(),
1✔
804
        };
1✔
805

1✔
806
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
807

1✔
808
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
809

1✔
810
        assert_eq!(result.len(), 1);
1✔
811
        assert!(result[0].chunks_equal_ignoring_cache_hint(
1✔
812
            &MultiPointCollection::from_slices(
1✔
813
                &[(0.0, 0.1), (50.0, 50.1), (25.0, 25.1)],
1✔
814
                &[TimeInterval::default(); 3],
1✔
815
                &[
1✔
816
                    ("count", FeatureData::Int(vec![2, 2, 2])),
1✔
817
                    (
1✔
818
                        "radius",
1✔
819
                        FeatureData::Float(vec![
1✔
820
                            8.693_147_180_559_945,
1✔
821
                            8.693_147_180_559_945,
1✔
822
                            8.693_147_180_559_945
1✔
823
                        ])
1✔
824
                    ),
1✔
825
                    (
1✔
826
                        "text",
1✔
827
                        FeatureData::NullableText(vec![
1✔
828
                            Some("foo, bar".to_string()),
1✔
829
                            Some("foo".to_string()),
1✔
830
                            None
1✔
831
                        ])
1✔
832
                    )
1✔
833
                ],
1✔
834
            )
1✔
835
            .unwrap()
1✔
836
        ));
1✔
837
    }
1✔
838

839
    #[tokio::test]
840
    async fn it_attaches_cache_hint() {
1✔
841
        let mut coordinates = vec![(0.0, 0.1); 2];
1✔
842
        coordinates.extend_from_slice(&[(50.0, 50.1); 2]);
1✔
843
        coordinates.extend_from_slice(&[(25.0, 25.1); 2]);
1✔
844

1✔
845
        let mut input = MultiPointCollection::from_slices(
1✔
846
            &MultiPoint::many(coordinates).unwrap(),
1✔
847
            &[TimeInterval::default(); 6],
1✔
848
            &[(
1✔
849
                "text",
1✔
850
                FeatureData::NullableText(vec![
1✔
851
                    Some("foo".to_string()),
1✔
852
                    Some("bar".to_string()),
1✔
853
                    Some("foo".to_string()),
1✔
854
                    None,
1✔
855
                    None,
1✔
856
                    None,
1✔
857
                ]),
1✔
858
            )],
1✔
859
        )
1✔
860
        .unwrap();
1✔
861

1✔
862
        let cache_hint = CacheHint::seconds(1234);
1✔
863

1✔
864
        input.cache_hint = cache_hint;
1✔
865

1✔
866
        let operator = VisualPointClustering {
1✔
867
            params: VisualPointClusteringParams {
1✔
868
                min_radius_px: 8.,
1✔
869
                delta_px: 1.,
1✔
870
                radius_column: "radius".to_string(),
1✔
871
                count_column: "count".to_string(),
1✔
872
                column_aggregates: [(
1✔
873
                    "text".to_string(),
1✔
874
                    AttributeAggregateDef {
1✔
875
                        column_name: "text".to_string(),
1✔
876
                        aggregate_type: AttributeAggregateType::StringSample,
1✔
877
                        measurement: None,
1✔
878
                    },
1✔
879
                )]
1✔
880
                .iter()
1✔
881
                .cloned()
1✔
882
                .collect(),
1✔
883
            },
1✔
884
            sources: SingleVectorSource {
1✔
885
                vector: MockFeatureCollectionSource::single(input).boxed(),
1✔
886
            },
1✔
887
        };
1✔
888

1✔
889
        let execution_context = MockExecutionContext::test_default();
1✔
890

1✔
891
        let initialized_operator = operator
1✔
892
            .boxed()
1✔
893
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
894
            .await
1✔
895
            .unwrap();
1✔
896

1✔
897
        let query_processor = initialized_operator
1✔
898
            .query_processor()
1✔
899
            .unwrap()
1✔
900
            .multi_point()
1✔
901
            .unwrap();
1✔
902

1✔
903
        let query_context = MockQueryContext::test_default();
1✔
904

1✔
905
        let qrect = VectorQueryRectangle {
1✔
906
            spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into()).unwrap(),
1✔
907
            time_interval: TimeInterval::default(),
1✔
908
            spatial_resolution: SpatialResolution::new(0.1, 0.1).unwrap(),
1✔
909
            attributes: ColumnSelection::all(),
1✔
910
        };
1✔
911

1✔
912
        let query = query_processor.query(qrect, &query_context).await.unwrap();
1✔
913

1✔
914
        let result: Vec<MultiPointCollection> = query.map(Result::unwrap).collect().await;
1✔
915

1✔
916
        assert_eq!(result.len(), 1);
1✔
917
        assert_eq!(result[0].cache_hint.expires(), cache_hint.expires());
1✔
918
    }
1✔
919
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc