• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5006008836

pending completion
5006008836

push

github

GitHub
Merge #785 #787

936 of 936 new or added lines in 50 files covered. (100.0%)

96010 of 107707 relevant lines covered (89.14%)

72676.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.79
/operators/src/plot/scatter_plot.rs
1
use async_trait::async_trait;
2
use futures::StreamExt;
3
use serde::{Deserialize, Serialize};
4

5
use geoengine_datatypes::collections::FeatureCollectionInfos;
6
use geoengine_datatypes::plots::{Histogram2D, HistogramDimension, Plot, PlotData};
7

8
use crate::engine::{
9
    CanonicOperatorName, ExecutionContext, InitializedPlotOperator, InitializedSources,
10
    InitializedVectorOperator, Operator, OperatorName, PlotOperator, PlotQueryProcessor,
11
    PlotResultDescriptor, QueryContext, QueryProcessor, SingleVectorSource,
12
    TypedPlotQueryProcessor, TypedVectorQueryProcessor, WorkflowOperatorPath,
13
};
14
use crate::error::Error;
15
use crate::util::Result;
16
use geoengine_datatypes::primitives::{Coordinate2D, VectorQueryRectangle};
17

18
pub const SCATTERPLOT_OPERATOR_NAME: &str = "ScatterPlot";
19

20
/// The maximum number of elements for a scatter plot
21
const SCATTER_PLOT_THRESHOLD: usize = 500;
22

23
/// The number of elements to process at once (i.e., without switching from scatter-plot to histogram)
24
const BATCH_SIZE: usize = 1000;
25

26
/// The maximum number of elements before we turn the collector into a histogram.
27
/// At this point, the bounds of the histogram are fixed (i.e., further values exceeding
28
/// the min/max seen so far are ignored)
29
const COLLECTOR_TO_HISTOGRAM_THRESHOLD: usize = BATCH_SIZE * 10;
30

31
/// A scatter plot about two attributes of a vector dataset. If the
32
/// dataset contains more then `SCATTER_PLOT_THRESHOLD` elements, this
33
/// operator creates a 2D histogram.
34
pub type ScatterPlot = Operator<ScatterPlotParams, SingleVectorSource>;
35

36
impl OperatorName for ScatterPlot {
37
    const TYPE_NAME: &'static str = "ScatterPlot";
38
}
39

40
/// The parameter spec for `ScatterPlot`
41
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
10✔
42
#[serde(rename_all = "camelCase")]
43
pub struct ScatterPlotParams {
44
    /// Name of the (numeric) attribute for the x-axis.
45
    pub column_x: String,
46
    /// Name of the (numeric) attribute for the y-axis.
47
    pub column_y: String,
48
}
49

50
#[typetag::serde]
×
51
#[async_trait]
52
impl PlotOperator for ScatterPlot {
53
    async fn _initialize(
10✔
54
        self: Box<Self>,
10✔
55
        path: WorkflowOperatorPath,
10✔
56
        context: &dyn ExecutionContext,
10✔
57
    ) -> Result<Box<dyn InitializedPlotOperator>> {
10✔
58
        let name = CanonicOperatorName::from(&self);
10✔
59

60
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
10✔
61
        let source = initialized_sources.vector;
10✔
62
        for cn in [&self.params.column_x, &self.params.column_y] {
17✔
63
            match source.result_descriptor().column_data_type(cn.as_str()) {
17✔
64
                Some(column) if !column.is_numeric() => {
16✔
65
                    return Err(Error::InvalidOperatorSpec {
3✔
66
                        reason: format!("Column '{cn}' is not numeric."),
3✔
67
                    });
3✔
68
                }
69
                Some(_) => {
13✔
70
                    // OK
13✔
71
                }
13✔
72
                None => {
73
                    return Err(Error::ColumnDoesNotExist {
1✔
74
                        column: cn.to_string(),
1✔
75
                    });
1✔
76
                }
77
            }
78
        }
79

80
        let in_desc = source.result_descriptor().clone();
6✔
81

6✔
82
        Ok(InitializedScatterPlot::new(name, in_desc.into(), self.params, source).boxed())
6✔
83
    }
20✔
84

85
    span_fn!(ScatterPlot);
×
86
}
87

88
/// The initialization of `Histogram`
89
pub struct InitializedScatterPlot<Op> {
90
    name: CanonicOperatorName,
91
    result_descriptor: PlotResultDescriptor,
92
    column_x: String,
93
    column_y: String,
94
    source: Op,
95
}
96

97
impl<Op> InitializedScatterPlot<Op> {
98
    pub fn new(
6✔
99
        name: CanonicOperatorName,
6✔
100
        result_descriptor: PlotResultDescriptor,
6✔
101
        params: ScatterPlotParams,
6✔
102
        source: Op,
6✔
103
    ) -> Self {
6✔
104
        Self {
6✔
105
            name,
6✔
106
            result_descriptor,
6✔
107
            column_x: params.column_x,
6✔
108
            column_y: params.column_y,
6✔
109
            source,
6✔
110
        }
6✔
111
    }
6✔
112
}
113
impl InitializedPlotOperator for InitializedScatterPlot<Box<dyn InitializedVectorOperator>> {
114
    fn result_descriptor(&self) -> &PlotResultDescriptor {
×
115
        &self.result_descriptor
×
116
    }
×
117

118
    fn query_processor(&self) -> Result<TypedPlotQueryProcessor> {
6✔
119
        let processor = ScatterPlotQueryProcessor {
6✔
120
            input: self.source.query_processor()?,
6✔
121
            column_x: self.column_x.clone(),
6✔
122
            column_y: self.column_y.clone(),
6✔
123
        };
6✔
124

6✔
125
        Ok(TypedPlotQueryProcessor::JsonVega(processor.boxed()))
6✔
126
    }
6✔
127

128
    fn canonic_name(&self) -> CanonicOperatorName {
×
129
        self.name.clone()
×
130
    }
×
131
}
132

133
/// A query processor that calculates the scatter plot about its vector input.
134
pub struct ScatterPlotQueryProcessor {
135
    input: TypedVectorQueryProcessor,
136
    column_x: String,
137
    column_y: String,
138
}
139

140
#[async_trait]
141
impl PlotQueryProcessor for ScatterPlotQueryProcessor {
142
    type OutputFormat = PlotData;
143

144
    fn plot_type(&self) -> &'static str {
×
145
        SCATTERPLOT_OPERATOR_NAME
×
146
    }
×
147

148
    async fn plot_query<'p>(
6✔
149
        &'p self,
6✔
150
        query: VectorQueryRectangle,
6✔
151
        ctx: &'p dyn QueryContext,
6✔
152
    ) -> Result<Self::OutputFormat> {
6✔
153
        let mut collector =
6✔
154
            CollectorKind::Values(Collector::new(self.column_x.clone(), self.column_y.clone()));
6✔
155

156
        call_on_generic_vector_processor!(&self.input, processor => {
6✔
157
            let mut query = processor.query(query, ctx).await?;
6✔
158
            while let Some(collection) = query.next().await {
15✔
159
                let collection = collection?;
9✔
160

161
                let data_x = collection.data(&self.column_x).expect("checked in param");
9✔
162
                let data_y = collection.data(&self.column_y).expect("checked in param");
9✔
163

164
                let valid_points = data_x.float_options_iter().zip(data_y.float_options_iter()).filter_map(|(a,b)| match (a,b) {
18,717✔
165
                    (Some(x),Some(y)) if x.is_finite() && y.is_finite() => Some(Coordinate2D::new(x,y)),
18,714✔
166
                    _ => None
6✔
167
                });
18,717✔
168

169
                for chunk in &itertools::Itertools::chunks(valid_points, BATCH_SIZE) {
23✔
170
                    collector.add_batch( chunk )?;
23✔
171
                }
172
            }
173
        });
174
        Ok(collector.into_plot()?.to_vega_embeddable(false)?)
6✔
175
    }
12✔
176
}
177

178
struct Collector {
179
    elements: Vec<Coordinate2D>,
180
    column_x: String,
181
    column_y: String,
182
    bounds_x: (f64, f64),
183
    bounds_y: (f64, f64),
184
}
185

186
impl Collector {
187
    fn new(column_x: String, column_y: String) -> Self {
12✔
188
        Collector {
12✔
189
            column_x,
12✔
190
            column_y,
12✔
191
            elements: Vec::with_capacity(SCATTER_PLOT_THRESHOLD),
12✔
192
            bounds_x: (f64::INFINITY, f64::NEG_INFINITY),
12✔
193
            bounds_y: (f64::INFINITY, f64::NEG_INFINITY),
12✔
194
        }
12✔
195
    }
12✔
196

197
    fn element_count(&self) -> usize {
35✔
198
        self.elements.len()
35✔
199
    }
35✔
200

201
    fn add_batch(&mut self, values: impl Iterator<Item = Coordinate2D>) {
21✔
202
        for v in values {
42,985✔
203
            self.add(v);
42,964✔
204
        }
42,964✔
205
    }
21✔
206

207
    fn add(&mut self, value: Coordinate2D) {
42,964✔
208
        if value.x.is_finite() && value.y.is_finite() {
42,964✔
209
            self.bounds_x.0 = std::cmp::min_by(self.bounds_x.0, value.x, |a, b| {
42,964✔
210
                a.partial_cmp(b).expect("checked")
42,964✔
211
            });
42,964✔
212
            self.bounds_x.1 = std::cmp::max_by(self.bounds_x.1, value.x, |a, b| {
42,964✔
213
                a.partial_cmp(b).expect("checked")
42,964✔
214
            });
42,964✔
215
            self.bounds_y.0 = std::cmp::min_by(self.bounds_y.0, value.y, |a, b| {
42,964✔
216
                a.partial_cmp(b).expect("checked")
42,964✔
217
            });
42,964✔
218
            self.bounds_y.1 = std::cmp::max_by(self.bounds_y.1, value.y, |a, b| {
42,964✔
219
                a.partial_cmp(b).expect("checked")
42,964✔
220
            });
42,964✔
221
            self.elements.push(value);
42,964✔
222
        }
42,964✔
223
    }
42,964✔
224
}
225

226
enum CollectorKind {
227
    Values(Collector),
228
    Histogram(Histogram2D),
229
}
230

231
impl CollectorKind {
232
    fn histogram_from_collector(value: &Collector) -> Result<Histogram2D> {
6✔
233
        let bucket_count = std::cmp::min(100, f64::sqrt(value.element_count() as f64) as usize);
6✔
234

235
        let dim_x = HistogramDimension::new(
6✔
236
            value.column_x.clone(),
6✔
237
            value.bounds_x.0,
6✔
238
            value.bounds_x.1,
6✔
239
            bucket_count,
6✔
240
        )?;
6✔
241
        let dim_y = HistogramDimension::new(
6✔
242
            value.column_y.clone(),
6✔
243
            value.bounds_y.0,
6✔
244
            value.bounds_y.1,
6✔
245
            bucket_count,
6✔
246
        )?;
6✔
247

248
        let mut result = Histogram2D::new(dim_x, dim_y);
6✔
249
        result.update_batch(value.elements.iter().copied());
6✔
250
        Ok(result)
6✔
251
    }
6✔
252

253
    fn add_batch(&mut self, values: impl Iterator<Item = Coordinate2D>) -> Result<()> {
30✔
254
        match self {
30✔
255
            Self::Values(ref mut c) => {
21✔
256
                c.add_batch(values);
21✔
257
                if c.element_count() > COLLECTOR_TO_HISTOGRAM_THRESHOLD {
21✔
258
                    *self = Self::Histogram(Self::histogram_from_collector(c)?);
4✔
259
                }
17✔
260
            }
261
            Self::Histogram(ref mut h) => {
9✔
262
                h.update_batch(values);
9✔
263
            }
9✔
264
        }
265
        Ok(())
30✔
266
    }
30✔
267

268
    fn into_plot(self) -> Result<Box<dyn Plot>> {
269
        match self {
8✔
270
            Self::Histogram(h) => Ok(Box::new(h)),
4✔
271
            Self::Values(v) if v.element_count() <= SCATTER_PLOT_THRESHOLD => Ok(Box::new(
8✔
272
                geoengine_datatypes::plots::ScatterPlot::new_with_data(
6✔
273
                    v.column_x, v.column_y, v.elements,
6✔
274
                ),
6✔
275
            )),
6✔
276
            Self::Values(v) => Ok(Box::new(Self::histogram_from_collector(&v)?)),
2✔
277
        }
278
    }
12✔
279
}
280

281
#[cfg(test)]
282
mod tests {
283
    use geoengine_datatypes::util::test::TestDefault;
284
    use serde_json::json;
285

286
    use geoengine_datatypes::primitives::{
287
        BoundingBox2D, FeatureData, NoGeometry, SpatialResolution, TimeInterval,
288
    };
289
    use geoengine_datatypes::{collections::DataCollection, primitives::MultiPoint};
290

291
    use crate::engine::{ChunkByteSize, MockExecutionContext, MockQueryContext, VectorOperator};
292
    use crate::mock::MockFeatureCollectionSource;
293

294
    use super::*;
295

296
    #[test]
1✔
297
    fn serialization() {
1✔
298
        let scatter_plot = ScatterPlot {
1✔
299
            params: ScatterPlotParams {
1✔
300
                column_x: "foo".to_owned(),
1✔
301
                column_y: "bar".to_owned(),
1✔
302
            },
1✔
303
            sources: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![])
1✔
304
                .boxed()
1✔
305
                .into(),
1✔
306
        };
1✔
307

1✔
308
        let serialized = json!({
1✔
309
            "type": "ScatterPlot",
1✔
310
            "params": {
1✔
311
                "columnX": "foo",
1✔
312
                "columnY": "bar",
1✔
313
            },
1✔
314
            "sources": {
1✔
315
                "vector": {
1✔
316
                    "type": "MockFeatureCollectionSourceMultiPoint",
1✔
317
                    "params": {
1✔
318
                        "collections": [],
1✔
319
                        "spatialReference": "EPSG:4326",
1✔
320
                        "measurements": {},
1✔
321
                    }
1✔
322
                }
1✔
323
            }
1✔
324
        })
1✔
325
        .to_string();
1✔
326

1✔
327
        let deserialized: ScatterPlot = serde_json::from_str(&serialized).unwrap();
1✔
328

1✔
329
        assert_eq!(deserialized.params, scatter_plot.params);
1✔
330
    }
1✔
331

332
    #[tokio::test]
1✔
333
    async fn vector_data() {
1✔
334
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
335
            DataCollection::from_slices(
1✔
336
                &[] as &[NoGeometry],
1✔
337
                &[TimeInterval::default(); 4],
1✔
338
                &[
1✔
339
                    ("foo", FeatureData::Int(vec![1, 2, 3, 4])),
1✔
340
                    ("bar", FeatureData::Int(vec![1, 2, 3, 4])),
1✔
341
                ],
1✔
342
            )
1✔
343
            .unwrap(),
1✔
344
            DataCollection::from_slices(
1✔
345
                &[] as &[NoGeometry],
1✔
346
                &[TimeInterval::default(); 4],
1✔
347
                &[
1✔
348
                    ("foo", FeatureData::Int(vec![5, 6, 7, 8])),
1✔
349
                    ("bar", FeatureData::Int(vec![5, 6, 7, 8])),
1✔
350
                ],
1✔
351
            )
1✔
352
            .unwrap(),
1✔
353
        ])
1✔
354
        .boxed();
1✔
355

1✔
356
        let box_plot = ScatterPlot {
1✔
357
            params: ScatterPlotParams {
1✔
358
                column_x: "foo".to_string(),
1✔
359
                column_y: "bar".to_string(),
1✔
360
            },
1✔
361
            sources: vector_source.into(),
1✔
362
        };
1✔
363

1✔
364
        let execution_context = MockExecutionContext::test_default();
1✔
365

366
        let query_processor = box_plot
1✔
367
            .boxed()
1✔
368
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
369
            .await
×
370
            .unwrap()
1✔
371
            .query_processor()
1✔
372
            .unwrap()
1✔
373
            .json_vega()
1✔
374
            .unwrap();
1✔
375

376
        let result = query_processor
1✔
377
            .plot_query(
1✔
378
                VectorQueryRectangle {
1✔
379
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
380
                        .unwrap(),
1✔
381
                    time_interval: TimeInterval::default(),
1✔
382
                    spatial_resolution: SpatialResolution::one(),
1✔
383
                },
1✔
384
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
385
            )
1✔
386
            .await
×
387
            .unwrap();
1✔
388

1✔
389
        let mut expected =
1✔
390
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
391
        for i in 1..=8 {
9✔
392
            expected.update(Coordinate2D::new(f64::from(i), f64::from(i)));
8✔
393
        }
8✔
394
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
395
    }
396

397
    #[tokio::test]
1✔
398
    async fn vector_data_with_nulls_and_nan() {
1✔
399
        let vector_source =
1✔
400
            MockFeatureCollectionSource::multiple(vec![DataCollection::from_slices(
1✔
401
                &[] as &[NoGeometry],
1✔
402
                &[TimeInterval::default(); 7],
1✔
403
                &[
1✔
404
                    (
1✔
405
                        "foo",
1✔
406
                        FeatureData::NullableFloat(vec![
1✔
407
                            Some(1.0),
1✔
408
                            None,
1✔
409
                            Some(3.0),
1✔
410
                            None,
1✔
411
                            Some(f64::NAN),
1✔
412
                            Some(6.0),
1✔
413
                            Some(f64::NAN),
1✔
414
                        ]),
1✔
415
                    ),
1✔
416
                    (
1✔
417
                        "bar",
1✔
418
                        FeatureData::NullableFloat(vec![
1✔
419
                            Some(1.0),
1✔
420
                            Some(2.0),
1✔
421
                            None,
1✔
422
                            None,
1✔
423
                            Some(5.0),
1✔
424
                            Some(f64::NAN),
1✔
425
                            Some(f64::NAN),
1✔
426
                        ]),
1✔
427
                    ),
1✔
428
                ],
1✔
429
            )
1✔
430
            .unwrap()])
1✔
431
            .boxed();
1✔
432

1✔
433
        let box_plot = ScatterPlot {
1✔
434
            params: ScatterPlotParams {
1✔
435
                column_x: "foo".to_string(),
1✔
436
                column_y: "bar".to_string(),
1✔
437
            },
1✔
438
            sources: vector_source.into(),
1✔
439
        };
1✔
440

1✔
441
        let execution_context = MockExecutionContext::test_default();
1✔
442

443
        let query_processor = box_plot
1✔
444
            .boxed()
1✔
445
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
446
            .await
×
447
            .unwrap()
1✔
448
            .query_processor()
1✔
449
            .unwrap()
1✔
450
            .json_vega()
1✔
451
            .unwrap();
1✔
452

453
        let result = query_processor
1✔
454
            .plot_query(
1✔
455
                VectorQueryRectangle {
1✔
456
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
457
                        .unwrap(),
1✔
458
                    time_interval: TimeInterval::default(),
1✔
459
                    spatial_resolution: SpatialResolution::one(),
1✔
460
                },
1✔
461
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
462
            )
1✔
463
            .await
×
464
            .unwrap();
1✔
465

1✔
466
        let mut expected =
1✔
467
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
468
        expected.update(Coordinate2D::new(1.0, 1.0));
1✔
469
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
470
    }
471

472
    #[tokio::test]
1✔
473
    async fn vector_data_text_column_x() {
1✔
474
        let vector_source = MockFeatureCollectionSource::single(
1✔
475
            DataCollection::from_slices(
1✔
476
                &[] as &[NoGeometry],
1✔
477
                &[TimeInterval::default(); 1],
1✔
478
                &[
1✔
479
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
480
                    ("bar", FeatureData::Int(vec![64])),
1✔
481
                ],
1✔
482
            )
1✔
483
            .unwrap(),
1✔
484
        )
1✔
485
        .boxed();
1✔
486

1✔
487
        let box_plot = ScatterPlot {
1✔
488
            params: ScatterPlotParams {
1✔
489
                column_x: "foo".to_string(),
1✔
490
                column_y: "bar".to_string(),
1✔
491
            },
1✔
492
            sources: vector_source.into(),
1✔
493
        };
1✔
494

1✔
495
        let execution_context = MockExecutionContext::test_default();
1✔
496

497
        let init = box_plot
1✔
498
            .boxed()
1✔
499
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
500
            .await;
×
501

502
        assert!(init.is_err());
1✔
503
    }
504

505
    #[tokio::test]
1✔
506
    async fn vector_data_text_column_y() {
1✔
507
        let vector_source = MockFeatureCollectionSource::single(
1✔
508
            DataCollection::from_slices(
1✔
509
                &[] as &[NoGeometry],
1✔
510
                &[TimeInterval::default(); 1],
1✔
511
                &[
1✔
512
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
513
                    ("bar", FeatureData::Int(vec![64])),
1✔
514
                ],
1✔
515
            )
1✔
516
            .unwrap(),
1✔
517
        )
1✔
518
        .boxed();
1✔
519

1✔
520
        let box_plot = ScatterPlot {
1✔
521
            params: ScatterPlotParams {
1✔
522
                column_x: "bar".to_string(),
1✔
523
                column_y: "foo".to_string(),
1✔
524
            },
1✔
525
            sources: vector_source.into(),
1✔
526
        };
1✔
527

1✔
528
        let execution_context = MockExecutionContext::test_default();
1✔
529

530
        let init = box_plot
1✔
531
            .boxed()
1✔
532
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
533
            .await;
×
534

535
        assert!(init.is_err());
1✔
536
    }
537

538
    #[tokio::test]
1✔
539
    async fn vector_data_missing_column_x() {
1✔
540
        let vector_source = MockFeatureCollectionSource::single(
1✔
541
            DataCollection::from_slices(
1✔
542
                &[] as &[NoGeometry],
1✔
543
                &[TimeInterval::default(); 1],
1✔
544
                &[
1✔
545
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
546
                    ("bar", FeatureData::Int(vec![64])),
1✔
547
                ],
1✔
548
            )
1✔
549
            .unwrap(),
1✔
550
        )
1✔
551
        .boxed();
1✔
552

1✔
553
        let box_plot = ScatterPlot {
1✔
554
            params: ScatterPlotParams {
1✔
555
                column_x: "fo".to_string(),
1✔
556
                column_y: "bar".to_string(),
1✔
557
            },
1✔
558
            sources: vector_source.into(),
1✔
559
        };
1✔
560

1✔
561
        let execution_context = MockExecutionContext::test_default();
1✔
562

563
        let init = box_plot
1✔
564
            .boxed()
1✔
565
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
566
            .await;
×
567

568
        assert!(init.is_err());
1✔
569
    }
570

571
    #[tokio::test]
1✔
572
    async fn vector_data_missing_column_y() {
1✔
573
        let vector_source = MockFeatureCollectionSource::single(
1✔
574
            DataCollection::from_slices(
1✔
575
                &[] as &[NoGeometry],
1✔
576
                &[TimeInterval::default(); 1],
1✔
577
                &[
1✔
578
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
579
                    ("bar", FeatureData::Int(vec![64])),
1✔
580
                ],
1✔
581
            )
1✔
582
            .unwrap(),
1✔
583
        )
1✔
584
        .boxed();
1✔
585

1✔
586
        let box_plot = ScatterPlot {
1✔
587
            params: ScatterPlotParams {
1✔
588
                column_x: "foo".to_string(),
1✔
589
                column_y: "ba".to_string(),
1✔
590
            },
1✔
591
            sources: vector_source.into(),
1✔
592
        };
1✔
593

1✔
594
        let execution_context = MockExecutionContext::test_default();
1✔
595

596
        let init = box_plot
1✔
597
            .boxed()
1✔
598
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
599
            .await;
×
600

601
        assert!(init.is_err());
1✔
602
    }
603

604
    #[tokio::test]
1✔
605
    async fn vector_data_single_feature() {
1✔
606
        let vector_source =
1✔
607
            MockFeatureCollectionSource::multiple(vec![DataCollection::from_slices(
1✔
608
                &[] as &[NoGeometry],
1✔
609
                &[TimeInterval::default(); 1],
1✔
610
                &[
1✔
611
                    ("foo", FeatureData::Int(vec![1])),
1✔
612
                    ("bar", FeatureData::Int(vec![1])),
1✔
613
                ],
1✔
614
            )
1✔
615
            .unwrap()])
1✔
616
            .boxed();
1✔
617

1✔
618
        let box_plot = ScatterPlot {
1✔
619
            params: ScatterPlotParams {
1✔
620
                column_x: "foo".to_string(),
1✔
621
                column_y: "bar".to_string(),
1✔
622
            },
1✔
623
            sources: vector_source.into(),
1✔
624
        };
1✔
625

1✔
626
        let execution_context = MockExecutionContext::test_default();
1✔
627

628
        let query_processor = box_plot
1✔
629
            .boxed()
1✔
630
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
631
            .await
×
632
            .unwrap()
1✔
633
            .query_processor()
1✔
634
            .unwrap()
1✔
635
            .json_vega()
1✔
636
            .unwrap();
1✔
637

638
        let result = query_processor
1✔
639
            .plot_query(
1✔
640
                VectorQueryRectangle {
1✔
641
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
642
                        .unwrap(),
1✔
643
                    time_interval: TimeInterval::default(),
1✔
644
                    spatial_resolution: SpatialResolution::one(),
1✔
645
                },
1✔
646
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
647
            )
1✔
648
            .await
×
649
            .unwrap();
1✔
650

1✔
651
        let mut expected =
1✔
652
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
653
        expected.update(Coordinate2D::new(1.0, 1.0));
1✔
654
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
655
    }
656

657
    #[tokio::test]
1✔
658
    async fn vector_data_empty() {
1✔
659
        let vector_source =
1✔
660
            MockFeatureCollectionSource::multiple(vec![DataCollection::from_slices(
1✔
661
                &[] as &[NoGeometry],
1✔
662
                &[] as &[TimeInterval],
1✔
663
                &[
1✔
664
                    ("foo", FeatureData::Int(vec![])),
1✔
665
                    ("bar", FeatureData::Int(vec![])),
1✔
666
                ],
1✔
667
            )
1✔
668
            .unwrap()])
1✔
669
            .boxed();
1✔
670

1✔
671
        let box_plot = ScatterPlot {
1✔
672
            params: ScatterPlotParams {
1✔
673
                column_x: "foo".to_string(),
1✔
674
                column_y: "bar".to_string(),
1✔
675
            },
1✔
676
            sources: vector_source.into(),
1✔
677
        };
1✔
678

1✔
679
        let execution_context = MockExecutionContext::test_default();
1✔
680

681
        let query_processor = box_plot
1✔
682
            .boxed()
1✔
683
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
684
            .await
×
685
            .unwrap()
1✔
686
            .query_processor()
1✔
687
            .unwrap()
1✔
688
            .json_vega()
1✔
689
            .unwrap();
1✔
690

691
        let result = query_processor
1✔
692
            .plot_query(
1✔
693
                VectorQueryRectangle {
1✔
694
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
695
                        .unwrap(),
1✔
696
                    time_interval: TimeInterval::default(),
1✔
697
                    spatial_resolution: SpatialResolution::one(),
1✔
698
                },
1✔
699
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
700
            )
1✔
701
            .await
×
702
            .unwrap();
1✔
703

1✔
704
        let expected =
1✔
705
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
706
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
707
    }
708

709
    #[tokio::test]
1✔
710
    async fn to_histogram_at_end() {
1✔
711
        let mut values = vec![1; 700];
1✔
712
        values.push(2);
1✔
713

1✔
714
        let vector_source =
1✔
715
            MockFeatureCollectionSource::multiple(vec![DataCollection::from_slices(
1✔
716
                &[] as &[NoGeometry],
1✔
717
                &[TimeInterval::default(); 701],
1✔
718
                &[
1✔
719
                    ("foo", FeatureData::Int(values.clone())),
1✔
720
                    ("bar", FeatureData::Int(values.clone())),
1✔
721
                ],
1✔
722
            )
1✔
723
            .unwrap()])
1✔
724
            .boxed();
1✔
725

1✔
726
        let box_plot = ScatterPlot {
1✔
727
            params: ScatterPlotParams {
1✔
728
                column_x: "foo".to_string(),
1✔
729
                column_y: "bar".to_string(),
1✔
730
            },
1✔
731
            sources: vector_source.into(),
1✔
732
        };
1✔
733

1✔
734
        let execution_context = MockExecutionContext::test_default();
1✔
735

736
        let query_processor = box_plot
1✔
737
            .boxed()
1✔
738
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
739
            .await
×
740
            .unwrap()
1✔
741
            .query_processor()
1✔
742
            .unwrap()
1✔
743
            .json_vega()
1✔
744
            .unwrap();
1✔
745

746
        let result = query_processor
1✔
747
            .plot_query(
1✔
748
                VectorQueryRectangle {
1✔
749
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
750
                        .unwrap(),
1✔
751
                    time_interval: TimeInterval::default(),
1✔
752
                    spatial_resolution: SpatialResolution::one(),
1✔
753
                },
1✔
754
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
755
            )
1✔
756
            .await
×
757
            .unwrap();
1✔
758

1✔
759
        let dim_x = HistogramDimension::new("foo".to_string(), 1.0, 2.0, 26).unwrap();
1✔
760
        let dim_y = HistogramDimension::new("bar".to_string(), 1.0, 2.0, 26).unwrap();
1✔
761

1✔
762
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dim_x, dim_y);
1✔
763
        expected.update_batch(
1✔
764
            values
1✔
765
                .into_iter()
1✔
766
                .map(|x| Coordinate2D::new(x as f64, x as f64)),
701✔
767
        );
1✔
768
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
769
    }
770

771
    #[tokio::test]
1✔
772
    async fn to_histogram_while_iterating() {
1✔
773
        let mut values = vec![1; 5999];
1✔
774
        values.push(2);
1✔
775

1✔
776
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
777
            DataCollection::from_slices(
1✔
778
                &[] as &[NoGeometry],
1✔
779
                &[TimeInterval::default(); 6000],
1✔
780
                &[
1✔
781
                    ("foo", FeatureData::Int(values.clone())),
1✔
782
                    ("bar", FeatureData::Int(values.clone())),
1✔
783
                ],
1✔
784
            )
1✔
785
            .unwrap(),
1✔
786
            DataCollection::from_slices(
1✔
787
                &[] as &[NoGeometry],
1✔
788
                &[TimeInterval::default(); 6000],
1✔
789
                &[
1✔
790
                    ("foo", FeatureData::Int(values.clone())),
1✔
791
                    ("bar", FeatureData::Int(values.clone())),
1✔
792
                ],
1✔
793
            )
1✔
794
            .unwrap(),
1✔
795
            DataCollection::from_slices(
1✔
796
                &[] as &[NoGeometry],
1✔
797
                &[TimeInterval::default(); 6000],
1✔
798
                &[
1✔
799
                    ("foo", FeatureData::Int(values.clone())),
1✔
800
                    ("bar", FeatureData::Int(values.clone())),
1✔
801
                ],
1✔
802
            )
1✔
803
            .unwrap(),
1✔
804
        ])
1✔
805
        .boxed();
1✔
806

1✔
807
        let box_plot = ScatterPlot {
1✔
808
            params: ScatterPlotParams {
1✔
809
                column_x: "foo".to_string(),
1✔
810
                column_y: "bar".to_string(),
1✔
811
            },
1✔
812
            sources: vector_source.into(),
1✔
813
        };
1✔
814

1✔
815
        let execution_context = MockExecutionContext::test_default();
1✔
816

817
        let query_processor = box_plot
1✔
818
            .boxed()
1✔
819
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
820
            .await
×
821
            .unwrap()
1✔
822
            .query_processor()
1✔
823
            .unwrap()
1✔
824
            .json_vega()
1✔
825
            .unwrap();
1✔
826

827
        let result = query_processor
1✔
828
            .plot_query(
1✔
829
                VectorQueryRectangle {
1✔
830
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
831
                        .unwrap(),
1✔
832
                    time_interval: TimeInterval::default(),
1✔
833
                    spatial_resolution: SpatialResolution::one(),
1✔
834
                },
1✔
835
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
836
            )
1✔
837
            .await
×
838
            .unwrap();
1✔
839

1✔
840
        let dim_x = HistogramDimension::new("foo".to_string(), 1.0, 2.0, 100).unwrap();
1✔
841
        let dim_y = HistogramDimension::new("bar".to_string(), 1.0, 2.0, 100).unwrap();
1✔
842

1✔
843
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dim_x, dim_y);
1✔
844
        expected.update_batch(
1✔
845
            values
1✔
846
                .iter()
1✔
847
                .map(|x| Coordinate2D::new(*x as f64, *x as f64)),
6,000✔
848
        );
1✔
849
        expected.update_batch(
1✔
850
            values
1✔
851
                .iter()
1✔
852
                .map(|x| Coordinate2D::new(*x as f64, *x as f64)),
6,000✔
853
        );
1✔
854
        expected.update_batch(
1✔
855
            values
1✔
856
                .iter()
1✔
857
                .map(|x| Coordinate2D::new(*x as f64, *x as f64)),
6,000✔
858
        );
1✔
859
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
860
    }
861

862
    #[test]
1✔
863
    fn test_collector_kind_empty() {
1✔
864
        let cx = "x".to_string();
1✔
865
        let cy = "y".to_string();
1✔
866

1✔
867
        let c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
868
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
869

1✔
870
        let expected = geoengine_datatypes::plots::ScatterPlot::new(cx, cy)
1✔
871
            .to_vega_embeddable(false)
1✔
872
            .unwrap();
1✔
873

1✔
874
        assert_eq!(expected, res);
1✔
875
    }
1✔
876

877
    #[test]
1✔
878
    fn test_collector_kind_scatter_plot() {
1✔
879
        let cx = "x".to_string();
1✔
880
        let cy = "y".to_string();
1✔
881

1✔
882
        let mut values = Vec::with_capacity(200);
1✔
883
        for i in 0..SCATTER_PLOT_THRESHOLD / 2 {
250✔
884
            values.push(Coordinate2D::new(i as f64, i as f64));
250✔
885
        }
250✔
886

887
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
888

1✔
889
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
890

891
        assert!(matches!(c, CollectorKind::Values(_)));
1✔
892

893
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
894

1✔
895
        let mut expected = geoengine_datatypes::plots::ScatterPlot::new(cx, cy);
1✔
896
        expected.update_batch(values.into_iter());
1✔
897

1✔
898
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
899
    }
1✔
900

901
    #[test]
1✔
902
    fn test_collector_kind_histogram_end() {
1✔
903
        let cx = "x".to_string();
1✔
904
        let cy = "y".to_string();
1✔
905

1✔
906
        let element_count = SCATTER_PLOT_THRESHOLD * 2;
1✔
907

1✔
908
        let mut values = Vec::with_capacity(element_count);
1✔
909
        for i in 0..element_count {
1,000✔
910
            values.push(Coordinate2D::new(i as f64, i as f64));
1,000✔
911
        }
1,000✔
912

913
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
914
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
915

916
        assert!(matches!(c, CollectorKind::Values(_)));
1✔
917

918
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
919

1✔
920
        // expected
1✔
921
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
922
        let dimx =
1✔
923
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
924

1✔
925
        let dimy =
1✔
926
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
927

1✔
928
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
929
        expected.update_batch(values.into_iter());
1✔
930

1✔
931
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
932
    }
1✔
933
    #[test]
1✔
934
    fn test_collector_kind_histogram_in_flight() {
1✔
935
        let cx = "x".to_string();
1✔
936
        let cy = "y".to_string();
1✔
937

1✔
938
        let element_count = COLLECTOR_TO_HISTOGRAM_THRESHOLD + 1;
1✔
939

1✔
940
        let mut values = Vec::with_capacity(element_count);
1✔
941
        for i in 0..element_count {
10,001✔
942
            values.push(Coordinate2D::new(i as f64, i as f64));
10,001✔
943
        }
10,001✔
944

945
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
946
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
947

948
        assert!(matches!(c, CollectorKind::Histogram(_)));
1✔
949

950
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
951

1✔
952
        // expected
1✔
953
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
954
        let dimx =
1✔
955
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
956

1✔
957
        let dimy =
1✔
958
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
959

1✔
960
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
961
        expected.update_batch(values.into_iter());
1✔
962

1✔
963
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
964
    }
1✔
965

966
    #[test]
1✔
967
    fn test_collector_kind_histogram_out_of_range() {
1✔
968
        let cx = "x".to_string();
1✔
969
        let cy = "y".to_string();
1✔
970

1✔
971
        let element_count = COLLECTOR_TO_HISTOGRAM_THRESHOLD + 1;
1✔
972

1✔
973
        let mut values = Vec::with_capacity(element_count);
1✔
974
        for i in 0..element_count {
10,001✔
975
            values.push(Coordinate2D::new(i as f64, i as f64));
10,001✔
976
        }
10,001✔
977

978
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
979
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
980

981
        assert!(matches!(c, CollectorKind::Histogram(_)));
1✔
982

983
        // This value should be skipped
984
        c.add_batch(
1✔
985
            [Coordinate2D::new(
1✔
986
                element_count as f64,
1✔
987
                element_count as f64,
1✔
988
            )]
1✔
989
            .into_iter(),
1✔
990
        )
1✔
991
        .unwrap();
1✔
992

1✔
993
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
994

1✔
995
        // expected
1✔
996
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
997
        let dimx =
1✔
998
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
999

1✔
1000
        let dimy =
1✔
1001
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1002

1✔
1003
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
1004
        expected.update_batch(values.into_iter());
1✔
1005

1✔
1006
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
1007
    }
1✔
1008

1009
    #[test]
1✔
1010
    fn test_collector_kind_histogram_infinite() {
1✔
1011
        let cx = "x".to_string();
1✔
1012
        let cy = "y".to_string();
1✔
1013

1✔
1014
        let element_count = COLLECTOR_TO_HISTOGRAM_THRESHOLD + 1;
1✔
1015

1✔
1016
        let mut values = Vec::with_capacity(element_count);
1✔
1017
        for i in 0..element_count {
10,001✔
1018
            values.push(Coordinate2D::new(i as f64, i as f64));
10,001✔
1019
        }
10,001✔
1020

1021
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
1022
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
1023

1024
        assert!(matches!(c, CollectorKind::Histogram(_)));
1✔
1025

1026
        // This value should be skipped
1027
        c.add_batch([Coordinate2D::new(f64::NAN, f64::NAN)].into_iter())
1✔
1028
            .unwrap();
1✔
1029

1✔
1030
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
1031

1✔
1032
        // expected
1✔
1033
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
1034
        let dimx =
1✔
1035
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1036

1✔
1037
        let dimy =
1✔
1038
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1039

1✔
1040
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
1041
        expected.update_batch(values.into_iter());
1✔
1042

1✔
1043
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
1044
    }
1✔
1045
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc