• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12417203919

19 Dec 2024 04:55PM UTC coverage: 90.351% (-0.2%) from 90.512%
12417203919

Pull #998

github

web-flow
Merge c7e5c8ae4 into 34e12969f
Pull Request #998: quota logging wip

834 of 1211 new or added lines in 66 files covered. (68.87%)

220 existing lines in 21 files now uncovered.

133830 of 148123 relevant lines covered (90.35%)

54352.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.72
/operators/src/plot/scatter_plot.rs
1
use async_trait::async_trait;
2
use futures::StreamExt;
3
use serde::{Deserialize, Serialize};
4

5
use geoengine_datatypes::collections::FeatureCollectionInfos;
6
use geoengine_datatypes::plots::{Histogram2D, HistogramDimension, Plot, PlotData};
7

8
use crate::engine::{
9
    CanonicOperatorName, ExecutionContext, InitializedPlotOperator, InitializedSources,
10
    InitializedVectorOperator, Operator, OperatorName, PlotOperator, PlotQueryProcessor,
11
    PlotResultDescriptor, QueryContext, QueryProcessor, SingleVectorSource,
12
    TypedPlotQueryProcessor, TypedVectorQueryProcessor, WorkflowOperatorPath,
13
};
14
use crate::error::Error;
15
use crate::util::Result;
16
use geoengine_datatypes::primitives::{Coordinate2D, PlotQueryRectangle};
17

18
pub const SCATTERPLOT_OPERATOR_NAME: &str = "ScatterPlot";
19

20
/// The maximum number of elements for a scatter plot
21
const SCATTER_PLOT_THRESHOLD: usize = 500;
22

23
/// The number of elements to process at once (i.e., without switching from scatter-plot to histogram)
24
const BATCH_SIZE: usize = 1000;
25

26
/// The maximum number of elements before we turn the collector into a histogram.
27
/// At this point, the bounds of the histogram are fixed (i.e., further values exceeding
28
/// the min/max seen so far are ignored)
29
const COLLECTOR_TO_HISTOGRAM_THRESHOLD: usize = BATCH_SIZE * 10;
30

31
/// A scatter plot about two attributes of a vector dataset. If the
32
/// dataset contains more then `SCATTER_PLOT_THRESHOLD` elements, this
33
/// operator creates a 2D histogram.
34
pub type ScatterPlot = Operator<ScatterPlotParams, SingleVectorSource>;
35

36
impl OperatorName for ScatterPlot {
37
    const TYPE_NAME: &'static str = "ScatterPlot";
38
}
39

40
/// The parameter spec for `ScatterPlot`
41
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
3✔
42
#[serde(rename_all = "camelCase")]
43
pub struct ScatterPlotParams {
44
    /// Name of the (numeric) attribute for the x-axis.
45
    pub column_x: String,
46
    /// Name of the (numeric) attribute for the y-axis.
47
    pub column_y: String,
48
}
49

50
#[typetag::serde]
×
51
#[async_trait]
52
impl PlotOperator for ScatterPlot {
53
    async fn _initialize(
54
        self: Box<Self>,
55
        path: WorkflowOperatorPath,
56
        context: &dyn ExecutionContext,
57
    ) -> Result<Box<dyn InitializedPlotOperator>> {
10✔
58
        let name = CanonicOperatorName::from(&self);
10✔
59

60
        let initialized_sources = self
10✔
61
            .sources
10✔
62
            .initialize_sources(path.clone(), context)
10✔
NEW
63
            .await?;
×
64
        let source = initialized_sources.vector;
10✔
65
        for cn in [&self.params.column_x, &self.params.column_y] {
17✔
66
            match source.result_descriptor().column_data_type(cn.as_str()) {
17✔
67
                Some(column) if !column.is_numeric() => {
16✔
68
                    return Err(Error::InvalidOperatorSpec {
3✔
69
                        reason: format!("Column '{cn}' is not numeric."),
3✔
70
                    });
3✔
71
                }
72
                Some(_) => {
13✔
73
                    // OK
13✔
74
                }
13✔
75
                None => {
76
                    return Err(Error::ColumnDoesNotExist {
1✔
77
                        column: cn.to_string(),
1✔
78
                    });
1✔
79
                }
80
            }
81
        }
82

83
        let in_desc = source.result_descriptor().clone();
6✔
84

6✔
85
        Ok(InitializedScatterPlot::new(name, in_desc.into(), self.params, source).boxed())
6✔
86
    }
20✔
87

88
    span_fn!(ScatterPlot);
89
}
90

91
/// The initialization of `Histogram`
92
pub struct InitializedScatterPlot<Op> {
93
    name: CanonicOperatorName,
94
    result_descriptor: PlotResultDescriptor,
95
    column_x: String,
96
    column_y: String,
97
    source: Op,
98
}
99

100
impl<Op> InitializedScatterPlot<Op> {
101
    pub fn new(
6✔
102
        name: CanonicOperatorName,
6✔
103
        result_descriptor: PlotResultDescriptor,
6✔
104
        params: ScatterPlotParams,
6✔
105
        source: Op,
6✔
106
    ) -> Self {
6✔
107
        Self {
6✔
108
            name,
6✔
109
            result_descriptor,
6✔
110
            column_x: params.column_x,
6✔
111
            column_y: params.column_y,
6✔
112
            source,
6✔
113
        }
6✔
114
    }
6✔
115
}
116
impl InitializedPlotOperator for InitializedScatterPlot<Box<dyn InitializedVectorOperator>> {
117
    fn result_descriptor(&self) -> &PlotResultDescriptor {
×
118
        &self.result_descriptor
×
119
    }
×
120

121
    fn query_processor(&self) -> Result<TypedPlotQueryProcessor> {
6✔
122
        let processor = ScatterPlotQueryProcessor {
6✔
123
            input: self.source.query_processor()?,
6✔
124
            column_x: self.column_x.clone(),
6✔
125
            column_y: self.column_y.clone(),
6✔
126
        };
6✔
127

6✔
128
        Ok(TypedPlotQueryProcessor::JsonVega(processor.boxed()))
6✔
129
    }
6✔
130

131
    fn canonic_name(&self) -> CanonicOperatorName {
×
132
        self.name.clone()
×
133
    }
×
134
}
135

136
/// A query processor that calculates the scatter plot about its vector input.
137
pub struct ScatterPlotQueryProcessor {
138
    input: TypedVectorQueryProcessor,
139
    column_x: String,
140
    column_y: String,
141
}
142

143
#[async_trait]
144
impl PlotQueryProcessor for ScatterPlotQueryProcessor {
145
    type OutputFormat = PlotData;
146

147
    fn plot_type(&self) -> &'static str {
×
148
        SCATTERPLOT_OPERATOR_NAME
×
149
    }
×
150

151
    async fn plot_query<'p>(
152
        &'p self,
153
        query: PlotQueryRectangle,
154
        ctx: &'p dyn QueryContext,
155
    ) -> Result<Self::OutputFormat> {
6✔
156
        let mut collector =
6✔
157
            CollectorKind::Values(Collector::new(self.column_x.clone(), self.column_y.clone()));
6✔
158

159
        call_on_generic_vector_processor!(&self.input, processor => {
6✔
160
            let mut query = processor.query(query.into(), ctx).await?;
6✔
161
            while let Some(collection) = query.next().await {
15✔
162
                let collection = collection?;
9✔
163

164
                let data_x = collection.data(&self.column_x).expect("checked in param");
9✔
165
                let data_y = collection.data(&self.column_y).expect("checked in param");
9✔
166

9✔
167
                let valid_points = data_x.float_options_iter().zip(data_y.float_options_iter()).filter_map(|(a,b)| match (a,b) {
18,717✔
168
                    (Some(x),Some(y)) if x.is_finite() && y.is_finite() => Some(Coordinate2D::new(x,y)),
18,714✔
169
                    _ => None
6✔
170
                });
18,717✔
171

172
                for chunk in &itertools::Itertools::chunks(valid_points, BATCH_SIZE) {
23✔
173
                    collector.add_batch( chunk )?;
23✔
174
                }
175
            }
176
        });
177
        Ok(collector.into_plot()?.to_vega_embeddable(false)?)
6✔
178
    }
12✔
179
}
180

181
struct Collector {
182
    elements: Vec<Coordinate2D>,
183
    column_x: String,
184
    column_y: String,
185
    bounds_x: (f64, f64),
186
    bounds_y: (f64, f64),
187
}
188

189
impl Collector {
190
    fn new(column_x: String, column_y: String) -> Self {
12✔
191
        Collector {
12✔
192
            column_x,
12✔
193
            column_y,
12✔
194
            elements: Vec::with_capacity(SCATTER_PLOT_THRESHOLD),
12✔
195
            bounds_x: (f64::INFINITY, f64::NEG_INFINITY),
12✔
196
            bounds_y: (f64::INFINITY, f64::NEG_INFINITY),
12✔
197
        }
12✔
198
    }
12✔
199

200
    fn element_count(&self) -> usize {
35✔
201
        self.elements.len()
35✔
202
    }
35✔
203

204
    fn add_batch(&mut self, values: impl Iterator<Item = Coordinate2D>) {
21✔
205
        for v in values {
42,985✔
206
            self.add(v);
42,964✔
207
        }
42,964✔
208
    }
21✔
209

210
    fn add(&mut self, value: Coordinate2D) {
42,964✔
211
        if value.x.is_finite() && value.y.is_finite() {
42,964✔
212
            self.bounds_x.0 = std::cmp::min_by(self.bounds_x.0, value.x, |a, b| {
42,964✔
213
                a.partial_cmp(b).expect("checked")
42,964✔
214
            });
42,964✔
215
            self.bounds_x.1 = std::cmp::max_by(self.bounds_x.1, value.x, |a, b| {
42,964✔
216
                a.partial_cmp(b).expect("checked")
42,964✔
217
            });
42,964✔
218
            self.bounds_y.0 = std::cmp::min_by(self.bounds_y.0, value.y, |a, b| {
42,964✔
219
                a.partial_cmp(b).expect("checked")
42,964✔
220
            });
42,964✔
221
            self.bounds_y.1 = std::cmp::max_by(self.bounds_y.1, value.y, |a, b| {
42,964✔
222
                a.partial_cmp(b).expect("checked")
42,964✔
223
            });
42,964✔
224
            self.elements.push(value);
42,964✔
225
        }
42,964✔
226
    }
42,964✔
227
}
228

229
enum CollectorKind {
230
    Values(Collector),
231
    Histogram(Histogram2D),
232
}
233

234
impl CollectorKind {
235
    fn histogram_from_collector(value: &Collector) -> Result<Histogram2D> {
6✔
236
        let bucket_count = std::cmp::min(100, f64::sqrt(value.element_count() as f64) as usize);
6✔
237

238
        let dim_x = HistogramDimension::new(
6✔
239
            value.column_x.clone(),
6✔
240
            value.bounds_x.0,
6✔
241
            value.bounds_x.1,
6✔
242
            bucket_count,
6✔
243
        )?;
6✔
244
        let dim_y = HistogramDimension::new(
6✔
245
            value.column_y.clone(),
6✔
246
            value.bounds_y.0,
6✔
247
            value.bounds_y.1,
6✔
248
            bucket_count,
6✔
249
        )?;
6✔
250

251
        let mut result = Histogram2D::new(dim_x, dim_y);
6✔
252
        result.update_batch(value.elements.iter().copied());
6✔
253
        Ok(result)
6✔
254
    }
6✔
255

256
    fn add_batch(&mut self, values: impl Iterator<Item = Coordinate2D>) -> Result<()> {
30✔
257
        match self {
30✔
258
            Self::Values(ref mut c) => {
21✔
259
                c.add_batch(values);
21✔
260
                if c.element_count() > COLLECTOR_TO_HISTOGRAM_THRESHOLD {
21✔
261
                    *self = Self::Histogram(Self::histogram_from_collector(c)?);
4✔
262
                }
17✔
263
            }
264
            Self::Histogram(ref mut h) => {
9✔
265
                h.update_batch(values);
9✔
266
            }
9✔
267
        }
268
        Ok(())
30✔
269
    }
30✔
270

271
    fn into_plot(self) -> Result<Box<dyn Plot>> {
12✔
272
        match self {
8✔
273
            Self::Histogram(h) => Ok(Box::new(h)),
4✔
274
            Self::Values(v) if v.element_count() <= SCATTER_PLOT_THRESHOLD => Ok(Box::new(
8✔
275
                geoengine_datatypes::plots::ScatterPlot::new_with_data(
6✔
276
                    v.column_x, v.column_y, v.elements,
6✔
277
                ),
6✔
278
            )),
6✔
279
            Self::Values(v) => Ok(Box::new(Self::histogram_from_collector(&v)?)),
2✔
280
        }
281
    }
12✔
282
}
283

284
#[cfg(test)]
285
mod tests {
286
    use geoengine_datatypes::util::test::TestDefault;
287
    use serde_json::json;
288

289
    use geoengine_datatypes::primitives::{
290
        BoundingBox2D, FeatureData, NoGeometry, PlotSeriesSelection, SpatialResolution,
291
        TimeInterval,
292
    };
293
    use geoengine_datatypes::{collections::DataCollection, primitives::MultiPoint};
294

295
    use crate::engine::{ChunkByteSize, MockExecutionContext, MockQueryContext, VectorOperator};
296
    use crate::mock::MockFeatureCollectionSource;
297

298
    use super::*;
299

300
    #[test]
301
    fn serialization() {
1✔
302
        let scatter_plot = ScatterPlot {
1✔
303
            params: ScatterPlotParams {
1✔
304
                column_x: "foo".to_owned(),
1✔
305
                column_y: "bar".to_owned(),
1✔
306
            },
1✔
307
            sources: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![])
1✔
308
                .boxed()
1✔
309
                .into(),
1✔
310
        };
1✔
311

1✔
312
        let serialized = json!({
1✔
313
            "type": "ScatterPlot",
1✔
314
            "params": {
1✔
315
                "columnX": "foo",
1✔
316
                "columnY": "bar",
1✔
317
            },
1✔
318
            "sources": {
1✔
319
                "vector": {
1✔
320
                    "type": "MockFeatureCollectionSourceMultiPoint",
1✔
321
                    "params": {
1✔
322
                        "collections": [],
1✔
323
                        "spatialReference": "EPSG:4326",
1✔
324
                        "measurements": {},
1✔
325
                    }
1✔
326
                }
1✔
327
            }
1✔
328
        })
1✔
329
        .to_string();
1✔
330

1✔
331
        let deserialized: ScatterPlot = serde_json::from_str(&serialized).unwrap();
1✔
332

1✔
333
        assert_eq!(deserialized.params, scatter_plot.params);
1✔
334
    }
1✔
335

336
    #[tokio::test]
337
    async fn vector_data() {
1✔
338
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
339
            DataCollection::from_slices(
1✔
340
                &[] as &[NoGeometry],
1✔
341
                &[TimeInterval::default(); 4],
1✔
342
                &[
1✔
343
                    ("foo", FeatureData::Int(vec![1, 2, 3, 4])),
1✔
344
                    ("bar", FeatureData::Int(vec![1, 2, 3, 4])),
1✔
345
                ],
1✔
346
            )
1✔
347
            .unwrap(),
1✔
348
            DataCollection::from_slices(
1✔
349
                &[] as &[NoGeometry],
1✔
350
                &[TimeInterval::default(); 4],
1✔
351
                &[
1✔
352
                    ("foo", FeatureData::Int(vec![5, 6, 7, 8])),
1✔
353
                    ("bar", FeatureData::Int(vec![5, 6, 7, 8])),
1✔
354
                ],
1✔
355
            )
1✔
356
            .unwrap(),
1✔
357
        ])
1✔
358
        .boxed();
1✔
359

1✔
360
        let box_plot = ScatterPlot {
1✔
361
            params: ScatterPlotParams {
1✔
362
                column_x: "foo".to_string(),
1✔
363
                column_y: "bar".to_string(),
1✔
364
            },
1✔
365
            sources: vector_source.into(),
1✔
366
        };
1✔
367

1✔
368
        let execution_context = MockExecutionContext::test_default();
1✔
369

1✔
370
        let query_processor = box_plot
1✔
371
            .boxed()
1✔
372
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
373
            .await
1✔
374
            .unwrap()
1✔
375
            .query_processor()
1✔
376
            .unwrap()
1✔
377
            .json_vega()
1✔
378
            .unwrap();
1✔
379

1✔
380
        let result = query_processor
1✔
381
            .plot_query(
1✔
382
                PlotQueryRectangle {
1✔
383
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
384
                        .unwrap(),
1✔
385
                    time_interval: TimeInterval::default(),
1✔
386
                    spatial_resolution: SpatialResolution::one(),
1✔
387
                    attributes: PlotSeriesSelection::all(),
1✔
388
                },
1✔
389
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
390
            )
1✔
391
            .await
1✔
392
            .unwrap();
1✔
393

1✔
394
        let mut expected =
1✔
395
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
396
        for i in 1..=8 {
9✔
397
            expected.update(Coordinate2D::new(f64::from(i), f64::from(i)));
8✔
398
        }
8✔
399
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
400
    }
1✔
401

402
    #[tokio::test]
403
    async fn vector_data_with_nulls_and_nan() {
1✔
404
        let vector_source =
1✔
405
            MockFeatureCollectionSource::multiple(vec![DataCollection::from_slices(
1✔
406
                &[] as &[NoGeometry],
1✔
407
                &[TimeInterval::default(); 7],
1✔
408
                &[
1✔
409
                    (
1✔
410
                        "foo",
1✔
411
                        FeatureData::NullableFloat(vec![
1✔
412
                            Some(1.0),
1✔
413
                            None,
1✔
414
                            Some(3.0),
1✔
415
                            None,
1✔
416
                            Some(f64::NAN),
1✔
417
                            Some(6.0),
1✔
418
                            Some(f64::NAN),
1✔
419
                        ]),
1✔
420
                    ),
1✔
421
                    (
1✔
422
                        "bar",
1✔
423
                        FeatureData::NullableFloat(vec![
1✔
424
                            Some(1.0),
1✔
425
                            Some(2.0),
1✔
426
                            None,
1✔
427
                            None,
1✔
428
                            Some(5.0),
1✔
429
                            Some(f64::NAN),
1✔
430
                            Some(f64::NAN),
1✔
431
                        ]),
1✔
432
                    ),
1✔
433
                ],
1✔
434
            )
1✔
435
            .unwrap()])
1✔
436
            .boxed();
1✔
437

1✔
438
        let box_plot = ScatterPlot {
1✔
439
            params: ScatterPlotParams {
1✔
440
                column_x: "foo".to_string(),
1✔
441
                column_y: "bar".to_string(),
1✔
442
            },
1✔
443
            sources: vector_source.into(),
1✔
444
        };
1✔
445

1✔
446
        let execution_context = MockExecutionContext::test_default();
1✔
447

1✔
448
        let query_processor = box_plot
1✔
449
            .boxed()
1✔
450
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
451
            .await
1✔
452
            .unwrap()
1✔
453
            .query_processor()
1✔
454
            .unwrap()
1✔
455
            .json_vega()
1✔
456
            .unwrap();
1✔
457

1✔
458
        let result = query_processor
1✔
459
            .plot_query(
1✔
460
                PlotQueryRectangle {
1✔
461
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
462
                        .unwrap(),
1✔
463
                    time_interval: TimeInterval::default(),
1✔
464
                    spatial_resolution: SpatialResolution::one(),
1✔
465
                    attributes: PlotSeriesSelection::all(),
1✔
466
                },
1✔
467
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
468
            )
1✔
469
            .await
1✔
470
            .unwrap();
1✔
471

1✔
472
        let mut expected =
1✔
473
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
474
        expected.update(Coordinate2D::new(1.0, 1.0));
1✔
475
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
476
    }
1✔
477

478
    #[tokio::test]
479
    async fn vector_data_text_column_x() {
1✔
480
        let vector_source = MockFeatureCollectionSource::single(
1✔
481
            DataCollection::from_slices(
1✔
482
                &[] as &[NoGeometry],
1✔
483
                &[TimeInterval::default(); 1],
1✔
484
                &[
1✔
485
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
486
                    ("bar", FeatureData::Int(vec![64])),
1✔
487
                ],
1✔
488
            )
1✔
489
            .unwrap(),
1✔
490
        )
1✔
491
        .boxed();
1✔
492

1✔
493
        let box_plot = ScatterPlot {
1✔
494
            params: ScatterPlotParams {
1✔
495
                column_x: "foo".to_string(),
1✔
496
                column_y: "bar".to_string(),
1✔
497
            },
1✔
498
            sources: vector_source.into(),
1✔
499
        };
1✔
500

1✔
501
        let execution_context = MockExecutionContext::test_default();
1✔
502

1✔
503
        let init = box_plot
1✔
504
            .boxed()
1✔
505
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
506
            .await;
1✔
507

1✔
508
        assert!(init.is_err());
1✔
509
    }
1✔
510

511
    #[tokio::test]
512
    async fn vector_data_text_column_y() {
1✔
513
        let vector_source = MockFeatureCollectionSource::single(
1✔
514
            DataCollection::from_slices(
1✔
515
                &[] as &[NoGeometry],
1✔
516
                &[TimeInterval::default(); 1],
1✔
517
                &[
1✔
518
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
519
                    ("bar", FeatureData::Int(vec![64])),
1✔
520
                ],
1✔
521
            )
1✔
522
            .unwrap(),
1✔
523
        )
1✔
524
        .boxed();
1✔
525

1✔
526
        let box_plot = ScatterPlot {
1✔
527
            params: ScatterPlotParams {
1✔
528
                column_x: "bar".to_string(),
1✔
529
                column_y: "foo".to_string(),
1✔
530
            },
1✔
531
            sources: vector_source.into(),
1✔
532
        };
1✔
533

1✔
534
        let execution_context = MockExecutionContext::test_default();
1✔
535

1✔
536
        let init = box_plot
1✔
537
            .boxed()
1✔
538
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
539
            .await;
1✔
540

1✔
541
        assert!(init.is_err());
1✔
542
    }
1✔
543

544
    #[tokio::test]
545
    async fn vector_data_missing_column_x() {
1✔
546
        let vector_source = MockFeatureCollectionSource::single(
1✔
547
            DataCollection::from_slices(
1✔
548
                &[] as &[NoGeometry],
1✔
549
                &[TimeInterval::default(); 1],
1✔
550
                &[
1✔
551
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
552
                    ("bar", FeatureData::Int(vec![64])),
1✔
553
                ],
1✔
554
            )
1✔
555
            .unwrap(),
1✔
556
        )
1✔
557
        .boxed();
1✔
558

1✔
559
        let box_plot = ScatterPlot {
1✔
560
            params: ScatterPlotParams {
1✔
561
                column_x: "fo".to_string(),
1✔
562
                column_y: "bar".to_string(),
1✔
563
            },
1✔
564
            sources: vector_source.into(),
1✔
565
        };
1✔
566

1✔
567
        let execution_context = MockExecutionContext::test_default();
1✔
568

1✔
569
        let init = box_plot
1✔
570
            .boxed()
1✔
571
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
572
            .await;
1✔
573

1✔
574
        assert!(init.is_err());
1✔
575
    }
1✔
576

577
    #[tokio::test]
578
    async fn vector_data_missing_column_y() {
1✔
579
        let vector_source = MockFeatureCollectionSource::single(
1✔
580
            DataCollection::from_slices(
1✔
581
                &[] as &[NoGeometry],
1✔
582
                &[TimeInterval::default(); 1],
1✔
583
                &[
1✔
584
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
585
                    ("bar", FeatureData::Int(vec![64])),
1✔
586
                ],
1✔
587
            )
1✔
588
            .unwrap(),
1✔
589
        )
1✔
590
        .boxed();
1✔
591

1✔
592
        let box_plot = ScatterPlot {
1✔
593
            params: ScatterPlotParams {
1✔
594
                column_x: "foo".to_string(),
1✔
595
                column_y: "ba".to_string(),
1✔
596
            },
1✔
597
            sources: vector_source.into(),
1✔
598
        };
1✔
599

1✔
600
        let execution_context = MockExecutionContext::test_default();
1✔
601

1✔
602
        let init = box_plot
1✔
603
            .boxed()
1✔
604
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
605
            .await;
1✔
606

1✔
607
        assert!(init.is_err());
1✔
608
    }
1✔
609

610
    #[tokio::test]
611
    async fn vector_data_single_feature() {
1✔
612
        let vector_source =
1✔
613
            MockFeatureCollectionSource::multiple(vec![DataCollection::from_slices(
1✔
614
                &[] as &[NoGeometry],
1✔
615
                &[TimeInterval::default(); 1],
1✔
616
                &[
1✔
617
                    ("foo", FeatureData::Int(vec![1])),
1✔
618
                    ("bar", FeatureData::Int(vec![1])),
1✔
619
                ],
1✔
620
            )
1✔
621
            .unwrap()])
1✔
622
            .boxed();
1✔
623

1✔
624
        let box_plot = ScatterPlot {
1✔
625
            params: ScatterPlotParams {
1✔
626
                column_x: "foo".to_string(),
1✔
627
                column_y: "bar".to_string(),
1✔
628
            },
1✔
629
            sources: vector_source.into(),
1✔
630
        };
1✔
631

1✔
632
        let execution_context = MockExecutionContext::test_default();
1✔
633

1✔
634
        let query_processor = box_plot
1✔
635
            .boxed()
1✔
636
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
637
            .await
1✔
638
            .unwrap()
1✔
639
            .query_processor()
1✔
640
            .unwrap()
1✔
641
            .json_vega()
1✔
642
            .unwrap();
1✔
643

1✔
644
        let result = query_processor
1✔
645
            .plot_query(
1✔
646
                PlotQueryRectangle {
1✔
647
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
648
                        .unwrap(),
1✔
649
                    time_interval: TimeInterval::default(),
1✔
650
                    spatial_resolution: SpatialResolution::one(),
1✔
651
                    attributes: PlotSeriesSelection::all(),
1✔
652
                },
1✔
653
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
654
            )
1✔
655
            .await
1✔
656
            .unwrap();
1✔
657

1✔
658
        let mut expected =
1✔
659
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
660
        expected.update(Coordinate2D::new(1.0, 1.0));
1✔
661
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
662
    }
1✔
663

664
    #[tokio::test]
665
    async fn vector_data_empty() {
1✔
666
        let vector_source =
1✔
667
            MockFeatureCollectionSource::multiple(vec![DataCollection::from_slices(
1✔
668
                &[] as &[NoGeometry],
1✔
669
                &[] as &[TimeInterval],
1✔
670
                &[
1✔
671
                    ("foo", FeatureData::Int(vec![])),
1✔
672
                    ("bar", FeatureData::Int(vec![])),
1✔
673
                ],
1✔
674
            )
1✔
675
            .unwrap()])
1✔
676
            .boxed();
1✔
677

1✔
678
        let box_plot = ScatterPlot {
1✔
679
            params: ScatterPlotParams {
1✔
680
                column_x: "foo".to_string(),
1✔
681
                column_y: "bar".to_string(),
1✔
682
            },
1✔
683
            sources: vector_source.into(),
1✔
684
        };
1✔
685

1✔
686
        let execution_context = MockExecutionContext::test_default();
1✔
687

1✔
688
        let query_processor = box_plot
1✔
689
            .boxed()
1✔
690
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
691
            .await
1✔
692
            .unwrap()
1✔
693
            .query_processor()
1✔
694
            .unwrap()
1✔
695
            .json_vega()
1✔
696
            .unwrap();
1✔
697

1✔
698
        let result = query_processor
1✔
699
            .plot_query(
1✔
700
                PlotQueryRectangle {
1✔
701
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
702
                        .unwrap(),
1✔
703
                    time_interval: TimeInterval::default(),
1✔
704
                    spatial_resolution: SpatialResolution::one(),
1✔
705
                    attributes: PlotSeriesSelection::all(),
1✔
706
                },
1✔
707
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
708
            )
1✔
709
            .await
1✔
710
            .unwrap();
1✔
711

1✔
712
        let expected =
1✔
713
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
714
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
715
    }
1✔
716

717
    #[tokio::test]
718
    async fn to_histogram_at_end() {
1✔
719
        let mut values = vec![1; 700];
1✔
720
        values.push(2);
1✔
721

1✔
722
        let vector_source =
1✔
723
            MockFeatureCollectionSource::multiple(vec![DataCollection::from_slices(
1✔
724
                &[] as &[NoGeometry],
1✔
725
                &[TimeInterval::default(); 701],
1✔
726
                &[
1✔
727
                    ("foo", FeatureData::Int(values.clone())),
1✔
728
                    ("bar", FeatureData::Int(values.clone())),
1✔
729
                ],
1✔
730
            )
1✔
731
            .unwrap()])
1✔
732
            .boxed();
1✔
733

1✔
734
        let box_plot = ScatterPlot {
1✔
735
            params: ScatterPlotParams {
1✔
736
                column_x: "foo".to_string(),
1✔
737
                column_y: "bar".to_string(),
1✔
738
            },
1✔
739
            sources: vector_source.into(),
1✔
740
        };
1✔
741

1✔
742
        let execution_context = MockExecutionContext::test_default();
1✔
743

1✔
744
        let query_processor = box_plot
1✔
745
            .boxed()
1✔
746
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
747
            .await
1✔
748
            .unwrap()
1✔
749
            .query_processor()
1✔
750
            .unwrap()
1✔
751
            .json_vega()
1✔
752
            .unwrap();
1✔
753

1✔
754
        let result = query_processor
1✔
755
            .plot_query(
1✔
756
                PlotQueryRectangle {
1✔
757
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
758
                        .unwrap(),
1✔
759
                    time_interval: TimeInterval::default(),
1✔
760
                    spatial_resolution: SpatialResolution::one(),
1✔
761
                    attributes: PlotSeriesSelection::all(),
1✔
762
                },
1✔
763
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
764
            )
1✔
765
            .await
1✔
766
            .unwrap();
1✔
767

1✔
768
        let dim_x = HistogramDimension::new("foo".to_string(), 1.0, 2.0, 26).unwrap();
1✔
769
        let dim_y = HistogramDimension::new("bar".to_string(), 1.0, 2.0, 26).unwrap();
1✔
770

1✔
771
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dim_x, dim_y);
1✔
772
        expected.update_batch(
1✔
773
            values
1✔
774
                .into_iter()
1✔
775
                .map(|x| Coordinate2D::new(x as f64, x as f64)),
701✔
776
        );
1✔
777
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
778
    }
1✔
779

780
    #[tokio::test]
781
    async fn to_histogram_while_iterating() {
1✔
782
        let mut values = vec![1; 5999];
1✔
783
        values.push(2);
1✔
784

1✔
785
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
786
            DataCollection::from_slices(
1✔
787
                &[] as &[NoGeometry],
1✔
788
                &[TimeInterval::default(); 6000],
1✔
789
                &[
1✔
790
                    ("foo", FeatureData::Int(values.clone())),
1✔
791
                    ("bar", FeatureData::Int(values.clone())),
1✔
792
                ],
1✔
793
            )
1✔
794
            .unwrap(),
1✔
795
            DataCollection::from_slices(
1✔
796
                &[] as &[NoGeometry],
1✔
797
                &[TimeInterval::default(); 6000],
1✔
798
                &[
1✔
799
                    ("foo", FeatureData::Int(values.clone())),
1✔
800
                    ("bar", FeatureData::Int(values.clone())),
1✔
801
                ],
1✔
802
            )
1✔
803
            .unwrap(),
1✔
804
            DataCollection::from_slices(
1✔
805
                &[] as &[NoGeometry],
1✔
806
                &[TimeInterval::default(); 6000],
1✔
807
                &[
1✔
808
                    ("foo", FeatureData::Int(values.clone())),
1✔
809
                    ("bar", FeatureData::Int(values.clone())),
1✔
810
                ],
1✔
811
            )
1✔
812
            .unwrap(),
1✔
813
        ])
1✔
814
        .boxed();
1✔
815

1✔
816
        let box_plot = ScatterPlot {
1✔
817
            params: ScatterPlotParams {
1✔
818
                column_x: "foo".to_string(),
1✔
819
                column_y: "bar".to_string(),
1✔
820
            },
1✔
821
            sources: vector_source.into(),
1✔
822
        };
1✔
823

1✔
824
        let execution_context = MockExecutionContext::test_default();
1✔
825

1✔
826
        let query_processor = box_plot
1✔
827
            .boxed()
1✔
828
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
829
            .await
1✔
830
            .unwrap()
1✔
831
            .query_processor()
1✔
832
            .unwrap()
1✔
833
            .json_vega()
1✔
834
            .unwrap();
1✔
835

1✔
836
        let result = query_processor
1✔
837
            .plot_query(
1✔
838
                PlotQueryRectangle {
1✔
839
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
840
                        .unwrap(),
1✔
841
                    time_interval: TimeInterval::default(),
1✔
842
                    spatial_resolution: SpatialResolution::one(),
1✔
843
                    attributes: PlotSeriesSelection::all(),
1✔
844
                },
1✔
845
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
846
            )
1✔
847
            .await
1✔
848
            .unwrap();
1✔
849

1✔
850
        let dim_x = HistogramDimension::new("foo".to_string(), 1.0, 2.0, 100).unwrap();
1✔
851
        let dim_y = HistogramDimension::new("bar".to_string(), 1.0, 2.0, 100).unwrap();
1✔
852

1✔
853
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dim_x, dim_y);
1✔
854
        expected.update_batch(
1✔
855
            values
1✔
856
                .iter()
1✔
857
                .map(|x| Coordinate2D::new(*x as f64, *x as f64)),
6,000✔
858
        );
1✔
859
        expected.update_batch(
1✔
860
            values
1✔
861
                .iter()
1✔
862
                .map(|x| Coordinate2D::new(*x as f64, *x as f64)),
6,000✔
863
        );
1✔
864
        expected.update_batch(
1✔
865
            values
1✔
866
                .iter()
1✔
867
                .map(|x| Coordinate2D::new(*x as f64, *x as f64)),
6,000✔
868
        );
1✔
869
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
870
    }
1✔
871

872
    #[test]
873
    fn test_collector_kind_empty() {
1✔
874
        let cx = "x".to_string();
1✔
875
        let cy = "y".to_string();
1✔
876

1✔
877
        let c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
878
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
879

1✔
880
        let expected = geoengine_datatypes::plots::ScatterPlot::new(cx, cy)
1✔
881
            .to_vega_embeddable(false)
1✔
882
            .unwrap();
1✔
883

1✔
884
        assert_eq!(expected, res);
1✔
885
    }
1✔
886

887
    #[test]
888
    fn test_collector_kind_scatter_plot() {
1✔
889
        let cx = "x".to_string();
1✔
890
        let cy = "y".to_string();
1✔
891

1✔
892
        let mut values = Vec::with_capacity(200);
1✔
893
        for i in 0..SCATTER_PLOT_THRESHOLD / 2 {
250✔
894
            values.push(Coordinate2D::new(i as f64, i as f64));
250✔
895
        }
250✔
896

897
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
898

1✔
899
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
900

1✔
901
        assert!(matches!(c, CollectorKind::Values(_)));
1✔
902

903
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
904

1✔
905
        let mut expected = geoengine_datatypes::plots::ScatterPlot::new(cx, cy);
1✔
906
        expected.update_batch(values.into_iter());
1✔
907

1✔
908
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
909
    }
1✔
910

911
    #[test]
912
    fn test_collector_kind_histogram_end() {
1✔
913
        let cx = "x".to_string();
1✔
914
        let cy = "y".to_string();
1✔
915

1✔
916
        let element_count = SCATTER_PLOT_THRESHOLD * 2;
1✔
917

1✔
918
        let mut values = Vec::with_capacity(element_count);
1✔
919
        for i in 0..element_count {
1,000✔
920
            values.push(Coordinate2D::new(i as f64, i as f64));
1,000✔
921
        }
1,000✔
922

923
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
924
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
925

1✔
926
        assert!(matches!(c, CollectorKind::Values(_)));
1✔
927

928
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
929

1✔
930
        // expected
1✔
931
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
932
        let dimx =
1✔
933
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
934

1✔
935
        let dimy =
1✔
936
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
937

1✔
938
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
939
        expected.update_batch(values.into_iter());
1✔
940

1✔
941
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
942
    }
1✔
943
    #[test]
944
    fn test_collector_kind_histogram_in_flight() {
1✔
945
        let cx = "x".to_string();
1✔
946
        let cy = "y".to_string();
1✔
947

1✔
948
        let element_count = COLLECTOR_TO_HISTOGRAM_THRESHOLD + 1;
1✔
949

1✔
950
        let mut values = Vec::with_capacity(element_count);
1✔
951
        for i in 0..element_count {
10,001✔
952
            values.push(Coordinate2D::new(i as f64, i as f64));
10,001✔
953
        }
10,001✔
954

955
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
956
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
957

1✔
958
        assert!(matches!(c, CollectorKind::Histogram(_)));
1✔
959

960
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
961

1✔
962
        // expected
1✔
963
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
964
        let dimx =
1✔
965
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
966

1✔
967
        let dimy =
1✔
968
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
969

1✔
970
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
971
        expected.update_batch(values.into_iter());
1✔
972

1✔
973
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
974
    }
1✔
975

976
    #[test]
977
    fn test_collector_kind_histogram_out_of_range() {
1✔
978
        let cx = "x".to_string();
1✔
979
        let cy = "y".to_string();
1✔
980

1✔
981
        let element_count = COLLECTOR_TO_HISTOGRAM_THRESHOLD + 1;
1✔
982

1✔
983
        let mut values = Vec::with_capacity(element_count);
1✔
984
        for i in 0..element_count {
10,001✔
985
            values.push(Coordinate2D::new(i as f64, i as f64));
10,001✔
986
        }
10,001✔
987

988
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
989
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
990

1✔
991
        assert!(matches!(c, CollectorKind::Histogram(_)));
1✔
992

993
        // This value should be skipped
994
        c.add_batch(
1✔
995
            [Coordinate2D::new(
1✔
996
                element_count as f64,
1✔
997
                element_count as f64,
1✔
998
            )]
1✔
999
            .into_iter(),
1✔
1000
        )
1✔
1001
        .unwrap();
1✔
1002

1✔
1003
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
1004

1✔
1005
        // expected
1✔
1006
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
1007
        let dimx =
1✔
1008
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1009

1✔
1010
        let dimy =
1✔
1011
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1012

1✔
1013
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
1014
        expected.update_batch(values.into_iter());
1✔
1015

1✔
1016
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
1017
    }
1✔
1018

1019
    #[test]
1020
    fn test_collector_kind_histogram_infinite() {
1✔
1021
        let cx = "x".to_string();
1✔
1022
        let cy = "y".to_string();
1✔
1023

1✔
1024
        let element_count = COLLECTOR_TO_HISTOGRAM_THRESHOLD + 1;
1✔
1025

1✔
1026
        let mut values = Vec::with_capacity(element_count);
1✔
1027
        for i in 0..element_count {
10,001✔
1028
            values.push(Coordinate2D::new(i as f64, i as f64));
10,001✔
1029
        }
10,001✔
1030

1031
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
1032
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
1033

1✔
1034
        assert!(matches!(c, CollectorKind::Histogram(_)));
1✔
1035

1036
        // This value should be skipped
1037
        c.add_batch([Coordinate2D::new(f64::NAN, f64::NAN)].into_iter())
1✔
1038
            .unwrap();
1✔
1039

1✔
1040
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
1041

1✔
1042
        // expected
1✔
1043
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
1044
        let dimx =
1✔
1045
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1046

1✔
1047
        let dimy =
1✔
1048
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1049

1✔
1050
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
1051
        expected.update_batch(values.into_iter());
1✔
1052

1✔
1053
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
1054
    }
1✔
1055
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc