• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 16167706152

09 Jul 2025 11:08AM UTC coverage: 88.738% (-1.0%) from 89.762%
16167706152

push

github

web-flow
refactor: Updates-2025-07-02 (#1062)

* rust 1.88

* clippy auto-fixes

* manual clippy fixes

* update deps

* cargo update

* update onnx

* cargo fmt

* update sqlfluff

121 of 142 new or added lines in 29 files covered. (85.21%)

300 existing lines in 88 files now uncovered.

111259 of 125379 relevant lines covered (88.74%)

77910.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.22
/operators/src/plot/scatter_plot.rs
1
use async_trait::async_trait;
2
use futures::StreamExt;
3
use serde::{Deserialize, Serialize};
4

5
use geoengine_datatypes::collections::FeatureCollectionInfos;
6
use geoengine_datatypes::plots::{Histogram2D, HistogramDimension, Plot, PlotData};
7

8
use crate::engine::{
9
    CanonicOperatorName, ExecutionContext, InitializedPlotOperator, InitializedSources,
10
    InitializedVectorOperator, Operator, OperatorName, PlotOperator, PlotQueryProcessor,
11
    PlotResultDescriptor, QueryContext, QueryProcessor, SingleVectorSource,
12
    TypedPlotQueryProcessor, TypedVectorQueryProcessor, WorkflowOperatorPath,
13
};
14
use crate::error::Error;
15
use crate::util::Result;
16
use geoengine_datatypes::primitives::{Coordinate2D, PlotQueryRectangle};
17

18
pub const SCATTERPLOT_OPERATOR_NAME: &str = "ScatterPlot";
19

20
/// The maximum number of elements for a scatter plot
21
const SCATTER_PLOT_THRESHOLD: usize = 500;
22

23
/// The number of elements to process at once (i.e., without switching from scatter-plot to histogram)
24
const BATCH_SIZE: usize = 1000;
25

26
/// The maximum number of elements before we turn the collector into a histogram.
27
/// At this point, the bounds of the histogram are fixed (i.e., further values exceeding
28
/// the min/max seen so far are ignored)
29
const COLLECTOR_TO_HISTOGRAM_THRESHOLD: usize = BATCH_SIZE * 10;
30

31
/// A scatter plot about two attributes of a vector dataset. If the
32
/// dataset contains more then `SCATTER_PLOT_THRESHOLD` elements, this
33
/// operator creates a 2D histogram.
34
pub type ScatterPlot = Operator<ScatterPlotParams, SingleVectorSource>;
35

36
impl OperatorName for ScatterPlot {
37
    const TYPE_NAME: &'static str = "ScatterPlot";
38
}
39

40
/// The parameter spec for `ScatterPlot`
41
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
42
#[serde(rename_all = "camelCase")]
43
pub struct ScatterPlotParams {
44
    /// Name of the (numeric) attribute for the x-axis.
45
    pub column_x: String,
46
    /// Name of the (numeric) attribute for the y-axis.
47
    pub column_y: String,
48
}
49

50
#[typetag::serde]
×
51
#[async_trait]
52
impl PlotOperator for ScatterPlot {
53
    async fn _initialize(
54
        self: Box<Self>,
55
        path: WorkflowOperatorPath,
56
        context: &dyn ExecutionContext,
57
    ) -> Result<Box<dyn InitializedPlotOperator>> {
20✔
58
        let name = CanonicOperatorName::from(&self);
10✔
59

60
        let initialized_sources = self
10✔
61
            .sources
10✔
62
            .initialize_sources(path.clone(), context)
10✔
63
            .await?;
10✔
64
        let source = initialized_sources.vector;
10✔
65
        for cn in [&self.params.column_x, &self.params.column_y] {
17✔
66
            match source.result_descriptor().column_data_type(cn.as_str()) {
17✔
67
                Some(column) if !column.is_numeric() => {
16✔
68
                    return Err(Error::InvalidOperatorSpec {
3✔
69
                        reason: format!("Column '{cn}' is not numeric."),
3✔
70
                    });
3✔
71
                }
72
                Some(_) => {
13✔
73
                    // OK
13✔
74
                }
13✔
75
                None => {
76
                    return Err(Error::ColumnDoesNotExist {
1✔
77
                        column: cn.to_string(),
1✔
78
                    });
1✔
79
                }
80
            }
81
        }
82

83
        let in_desc = source.result_descriptor().clone();
6✔
84

85
        Ok(InitializedScatterPlot::new(name, in_desc.into(), self.params, source).boxed())
6✔
86
    }
20✔
87

88
    span_fn!(ScatterPlot);
89
}
90

91
/// The initialization of `Histogram`
92
pub struct InitializedScatterPlot<Op> {
93
    name: CanonicOperatorName,
94
    result_descriptor: PlotResultDescriptor,
95
    column_x: String,
96
    column_y: String,
97
    source: Op,
98
}
99

100
impl<Op> InitializedScatterPlot<Op> {
101
    pub fn new(
6✔
102
        name: CanonicOperatorName,
6✔
103
        result_descriptor: PlotResultDescriptor,
6✔
104
        params: ScatterPlotParams,
6✔
105
        source: Op,
6✔
106
    ) -> Self {
6✔
107
        Self {
6✔
108
            name,
6✔
109
            result_descriptor,
6✔
110
            column_x: params.column_x,
6✔
111
            column_y: params.column_y,
6✔
112
            source,
6✔
113
        }
6✔
114
    }
6✔
115
}
116
impl InitializedPlotOperator for InitializedScatterPlot<Box<dyn InitializedVectorOperator>> {
117
    fn result_descriptor(&self) -> &PlotResultDescriptor {
×
118
        &self.result_descriptor
×
119
    }
×
120

121
    fn query_processor(&self) -> Result<TypedPlotQueryProcessor> {
6✔
122
        let processor = ScatterPlotQueryProcessor {
6✔
123
            input: self.source.query_processor()?,
6✔
124
            column_x: self.column_x.clone(),
6✔
125
            column_y: self.column_y.clone(),
6✔
126
        };
127

128
        Ok(TypedPlotQueryProcessor::JsonVega(processor.boxed()))
6✔
129
    }
6✔
130

131
    fn canonic_name(&self) -> CanonicOperatorName {
×
132
        self.name.clone()
×
133
    }
×
134
}
135

136
/// A query processor that calculates the scatter plot about its vector input.
137
pub struct ScatterPlotQueryProcessor {
138
    input: TypedVectorQueryProcessor,
139
    column_x: String,
140
    column_y: String,
141
}
142

143
#[async_trait]
144
impl PlotQueryProcessor for ScatterPlotQueryProcessor {
145
    type OutputFormat = PlotData;
146

147
    fn plot_type(&self) -> &'static str {
×
148
        SCATTERPLOT_OPERATOR_NAME
×
149
    }
×
150

151
    async fn plot_query<'p>(
152
        &'p self,
153
        query: PlotQueryRectangle,
154
        ctx: &'p dyn QueryContext,
155
    ) -> Result<Self::OutputFormat> {
12✔
156
        let mut collector =
6✔
157
            CollectorKind::Values(Collector::new(self.column_x.clone(), self.column_y.clone()));
6✔
158

159
        call_on_generic_vector_processor!(&self.input, processor => {
6✔
160
            let mut query = processor.query(query.into(), ctx).await?;
6✔
161
            while let Some(collection) = query.next().await {
15✔
162
                let collection = collection?;
9✔
163

164
                let data_x = collection.data(&self.column_x).expect("checked in param");
9✔
165
                let data_y = collection.data(&self.column_y).expect("checked in param");
9✔
166

167
                let valid_points = data_x.float_options_iter().zip(data_y.float_options_iter()).filter_map(|(a,b)| match (a,b) {
18,717✔
168
                    (Some(x),Some(y)) if x.is_finite() && y.is_finite() => Some(Coordinate2D::new(x,y)),
18,714✔
169
                    _ => None
6✔
170
                });
18,717✔
171

172
                for chunk in &itertools::Itertools::chunks(valid_points, BATCH_SIZE) {
23✔
173
                    collector.add_batch( chunk )?;
23✔
174
                }
175
            }
176
        });
177
        Ok(collector.into_plot()?.to_vega_embeddable(false)?)
6✔
178
    }
12✔
179
}
180

181
struct Collector {
182
    elements: Vec<Coordinate2D>,
183
    column_x: String,
184
    column_y: String,
185
    bounds_x: (f64, f64),
186
    bounds_y: (f64, f64),
187
}
188

189
impl Collector {
190
    fn new(column_x: String, column_y: String) -> Self {
12✔
191
        Collector {
12✔
192
            column_x,
12✔
193
            column_y,
12✔
194
            elements: Vec::with_capacity(SCATTER_PLOT_THRESHOLD),
12✔
195
            bounds_x: (f64::INFINITY, f64::NEG_INFINITY),
12✔
196
            bounds_y: (f64::INFINITY, f64::NEG_INFINITY),
12✔
197
        }
12✔
198
    }
12✔
199

200
    fn element_count(&self) -> usize {
35✔
201
        self.elements.len()
35✔
202
    }
35✔
203

204
    fn add_batch(&mut self, values: impl Iterator<Item = Coordinate2D>) {
21✔
205
        for v in values {
42,985✔
206
            self.add(v);
42,964✔
207
        }
42,964✔
208
    }
21✔
209

210
    fn add(&mut self, value: Coordinate2D) {
42,964✔
211
        if value.x.is_finite() && value.y.is_finite() {
42,964✔
212
            self.bounds_x.0 = std::cmp::min_by(self.bounds_x.0, value.x, |a, b| {
42,964✔
213
                a.partial_cmp(b).expect("checked")
42,964✔
214
            });
42,964✔
215
            self.bounds_x.1 = std::cmp::max_by(self.bounds_x.1, value.x, |a, b| {
42,964✔
216
                a.partial_cmp(b).expect("checked")
42,964✔
217
            });
42,964✔
218
            self.bounds_y.0 = std::cmp::min_by(self.bounds_y.0, value.y, |a, b| {
42,964✔
219
                a.partial_cmp(b).expect("checked")
42,964✔
220
            });
42,964✔
221
            self.bounds_y.1 = std::cmp::max_by(self.bounds_y.1, value.y, |a, b| {
42,964✔
222
                a.partial_cmp(b).expect("checked")
42,964✔
223
            });
42,964✔
224
            self.elements.push(value);
42,964✔
UNCOV
225
        }
×
226
    }
42,964✔
227
}
228

229
enum CollectorKind {
230
    Values(Collector),
231
    Histogram(Histogram2D),
232
}
233

234
impl CollectorKind {
235
    fn histogram_from_collector(value: &Collector) -> Result<Histogram2D> {
6✔
236
        let bucket_count = std::cmp::min(100, f64::sqrt(value.element_count() as f64) as usize);
6✔
237

238
        let dim_x = HistogramDimension::new(
6✔
239
            value.column_x.clone(),
6✔
240
            value.bounds_x.0,
6✔
241
            value.bounds_x.1,
6✔
242
            bucket_count,
6✔
UNCOV
243
        )?;
×
244
        let dim_y = HistogramDimension::new(
6✔
245
            value.column_y.clone(),
6✔
246
            value.bounds_y.0,
6✔
247
            value.bounds_y.1,
6✔
248
            bucket_count,
6✔
UNCOV
249
        )?;
×
250

251
        let mut result = Histogram2D::new(dim_x, dim_y);
6✔
252
        result.update_batch(value.elements.iter().copied());
6✔
253
        Ok(result)
6✔
254
    }
6✔
255

256
    fn add_batch(&mut self, values: impl Iterator<Item = Coordinate2D>) -> Result<()> {
30✔
257
        match self {
30✔
258
            Self::Values(c) => {
21✔
259
                c.add_batch(values);
21✔
260
                if c.element_count() > COLLECTOR_TO_HISTOGRAM_THRESHOLD {
21✔
261
                    *self = Self::Histogram(Self::histogram_from_collector(c)?);
4✔
262
                }
17✔
263
            }
264
            Self::Histogram(h) => {
9✔
265
                h.update_batch(values);
9✔
266
            }
9✔
267
        }
268
        Ok(())
30✔
269
    }
30✔
270

271
    fn into_plot(self) -> Result<Box<dyn Plot>> {
12✔
272
        match self {
8✔
273
            Self::Histogram(h) => Ok(Box::new(h)),
4✔
274
            Self::Values(v) if v.element_count() <= SCATTER_PLOT_THRESHOLD => Ok(Box::new(
8✔
275
                geoengine_datatypes::plots::ScatterPlot::new_with_data(
6✔
276
                    v.column_x, v.column_y, v.elements,
6✔
277
                ),
6✔
278
            )),
6✔
279
            Self::Values(v) => Ok(Box::new(Self::histogram_from_collector(&v)?)),
2✔
280
        }
281
    }
12✔
282
}
283

284
#[cfg(test)]
285
mod tests {
286
    use geoengine_datatypes::util::test::TestDefault;
287
    use serde_json::json;
288

289
    use geoengine_datatypes::primitives::{
290
        BoundingBox2D, FeatureData, NoGeometry, PlotSeriesSelection, SpatialResolution,
291
        TimeInterval,
292
    };
293
    use geoengine_datatypes::{collections::DataCollection, primitives::MultiPoint};
294

295
    use crate::engine::{ChunkByteSize, MockExecutionContext, MockQueryContext, VectorOperator};
296
    use crate::mock::MockFeatureCollectionSource;
297

298
    use super::*;
299

300
    #[test]
301
    fn serialization() {
1✔
302
        let scatter_plot = ScatterPlot {
1✔
303
            params: ScatterPlotParams {
1✔
304
                column_x: "foo".to_owned(),
1✔
305
                column_y: "bar".to_owned(),
1✔
306
            },
1✔
307
            sources: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![])
1✔
308
                .boxed()
1✔
309
                .into(),
1✔
310
        };
1✔
311

312
        let serialized = json!({
1✔
313
            "type": "ScatterPlot",
1✔
314
            "params": {
1✔
315
                "columnX": "foo",
1✔
316
                "columnY": "bar",
1✔
317
            },
318
            "sources": {
1✔
319
                "vector": {
1✔
320
                    "type": "MockFeatureCollectionSourceMultiPoint",
1✔
321
                    "params": {
1✔
322
                        "collections": [],
1✔
323
                        "spatialReference": "EPSG:4326",
1✔
324
                        "measurements": {},
1✔
325
                    }
326
                }
327
            }
328
        })
329
        .to_string();
1✔
330

331
        let deserialized: ScatterPlot = serde_json::from_str(&serialized).unwrap();
1✔
332

333
        assert_eq!(deserialized.params, scatter_plot.params);
1✔
334
    }
1✔
335

336
    #[tokio::test]
337
    async fn vector_data() {
1✔
338
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
339
            DataCollection::from_slices(
1✔
340
                &[] as &[NoGeometry],
1✔
341
                &[TimeInterval::default(); 4],
1✔
342
                &[
1✔
343
                    ("foo", FeatureData::Int(vec![1, 2, 3, 4])),
1✔
344
                    ("bar", FeatureData::Int(vec![1, 2, 3, 4])),
1✔
345
                ],
1✔
346
            )
347
            .unwrap(),
1✔
348
            DataCollection::from_slices(
1✔
349
                &[] as &[NoGeometry],
1✔
350
                &[TimeInterval::default(); 4],
1✔
351
                &[
1✔
352
                    ("foo", FeatureData::Int(vec![5, 6, 7, 8])),
1✔
353
                    ("bar", FeatureData::Int(vec![5, 6, 7, 8])),
1✔
354
                ],
1✔
355
            )
356
            .unwrap(),
1✔
357
        ])
358
        .boxed();
1✔
359

360
        let box_plot = ScatterPlot {
1✔
361
            params: ScatterPlotParams {
1✔
362
                column_x: "foo".to_string(),
1✔
363
                column_y: "bar".to_string(),
1✔
364
            },
1✔
365
            sources: vector_source.into(),
1✔
366
        };
1✔
367

368
        let execution_context = MockExecutionContext::test_default();
1✔
369

370
        let query_processor = box_plot
1✔
371
            .boxed()
1✔
372
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
373
            .await
1✔
374
            .unwrap()
1✔
375
            .query_processor()
1✔
376
            .unwrap()
1✔
377
            .json_vega()
1✔
378
            .unwrap();
1✔
379

380
        let result = query_processor
1✔
381
            .plot_query(
1✔
382
                PlotQueryRectangle {
1✔
383
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
384
                        .unwrap(),
1✔
385
                    time_interval: TimeInterval::default(),
1✔
386
                    spatial_resolution: SpatialResolution::one(),
1✔
387
                    attributes: PlotSeriesSelection::all(),
1✔
388
                },
1✔
389
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
390
            )
1✔
391
            .await
1✔
392
            .unwrap();
1✔
393

394
        let mut expected =
1✔
395
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
396
        for i in 1..=8 {
9✔
397
            expected.update(Coordinate2D::new(f64::from(i), f64::from(i)));
8✔
398
        }
8✔
399
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
400
    }
1✔
401

402
    #[tokio::test]
403
    async fn vector_data_with_nulls_and_nan() {
1✔
404
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
405
            DataCollection::from_slices(
1✔
406
                &[] as &[NoGeometry],
1✔
407
                &[TimeInterval::default(); 7],
1✔
408
                &[
1✔
409
                    (
1✔
410
                        "foo",
1✔
411
                        FeatureData::NullableFloat(vec![
1✔
412
                            Some(1.0),
1✔
413
                            None,
1✔
414
                            Some(3.0),
1✔
415
                            None,
1✔
416
                            Some(f64::NAN),
1✔
417
                            Some(6.0),
1✔
418
                            Some(f64::NAN),
1✔
419
                        ]),
1✔
420
                    ),
1✔
421
                    (
1✔
422
                        "bar",
1✔
423
                        FeatureData::NullableFloat(vec![
1✔
424
                            Some(1.0),
1✔
425
                            Some(2.0),
1✔
426
                            None,
1✔
427
                            None,
1✔
428
                            Some(5.0),
1✔
429
                            Some(f64::NAN),
1✔
430
                            Some(f64::NAN),
1✔
431
                        ]),
1✔
432
                    ),
1✔
433
                ],
1✔
434
            )
435
            .unwrap(),
1✔
436
        ])
437
        .boxed();
1✔
438

439
        let box_plot = ScatterPlot {
1✔
440
            params: ScatterPlotParams {
1✔
441
                column_x: "foo".to_string(),
1✔
442
                column_y: "bar".to_string(),
1✔
443
            },
1✔
444
            sources: vector_source.into(),
1✔
445
        };
1✔
446

447
        let execution_context = MockExecutionContext::test_default();
1✔
448

449
        let query_processor = box_plot
1✔
450
            .boxed()
1✔
451
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
452
            .await
1✔
453
            .unwrap()
1✔
454
            .query_processor()
1✔
455
            .unwrap()
1✔
456
            .json_vega()
1✔
457
            .unwrap();
1✔
458

459
        let result = query_processor
1✔
460
            .plot_query(
1✔
461
                PlotQueryRectangle {
1✔
462
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
463
                        .unwrap(),
1✔
464
                    time_interval: TimeInterval::default(),
1✔
465
                    spatial_resolution: SpatialResolution::one(),
1✔
466
                    attributes: PlotSeriesSelection::all(),
1✔
467
                },
1✔
468
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
469
            )
1✔
470
            .await
1✔
471
            .unwrap();
1✔
472

473
        let mut expected =
1✔
474
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
475
        expected.update(Coordinate2D::new(1.0, 1.0));
1✔
476
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
477
    }
1✔
478

479
    #[tokio::test]
480
    async fn vector_data_text_column_x() {
1✔
481
        let vector_source = MockFeatureCollectionSource::single(
1✔
482
            DataCollection::from_slices(
1✔
483
                &[] as &[NoGeometry],
1✔
484
                &[TimeInterval::default(); 1],
1✔
485
                &[
1✔
486
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
487
                    ("bar", FeatureData::Int(vec![64])),
1✔
488
                ],
1✔
489
            )
490
            .unwrap(),
1✔
491
        )
492
        .boxed();
1✔
493

494
        let box_plot = ScatterPlot {
1✔
495
            params: ScatterPlotParams {
1✔
496
                column_x: "foo".to_string(),
1✔
497
                column_y: "bar".to_string(),
1✔
498
            },
1✔
499
            sources: vector_source.into(),
1✔
500
        };
1✔
501

502
        let execution_context = MockExecutionContext::test_default();
1✔
503

504
        let init = box_plot
1✔
505
            .boxed()
1✔
506
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
507
            .await;
1✔
508

509
        assert!(init.is_err());
1✔
510
    }
1✔
511

512
    #[tokio::test]
513
    async fn vector_data_text_column_y() {
1✔
514
        let vector_source = MockFeatureCollectionSource::single(
1✔
515
            DataCollection::from_slices(
1✔
516
                &[] as &[NoGeometry],
1✔
517
                &[TimeInterval::default(); 1],
1✔
518
                &[
1✔
519
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
520
                    ("bar", FeatureData::Int(vec![64])),
1✔
521
                ],
1✔
522
            )
523
            .unwrap(),
1✔
524
        )
525
        .boxed();
1✔
526

527
        let box_plot = ScatterPlot {
1✔
528
            params: ScatterPlotParams {
1✔
529
                column_x: "bar".to_string(),
1✔
530
                column_y: "foo".to_string(),
1✔
531
            },
1✔
532
            sources: vector_source.into(),
1✔
533
        };
1✔
534

535
        let execution_context = MockExecutionContext::test_default();
1✔
536

537
        let init = box_plot
1✔
538
            .boxed()
1✔
539
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
540
            .await;
1✔
541

542
        assert!(init.is_err());
1✔
543
    }
1✔
544

545
    #[tokio::test]
546
    async fn vector_data_missing_column_x() {
1✔
547
        let vector_source = MockFeatureCollectionSource::single(
1✔
548
            DataCollection::from_slices(
1✔
549
                &[] as &[NoGeometry],
1✔
550
                &[TimeInterval::default(); 1],
1✔
551
                &[
1✔
552
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
553
                    ("bar", FeatureData::Int(vec![64])),
1✔
554
                ],
1✔
555
            )
556
            .unwrap(),
1✔
557
        )
558
        .boxed();
1✔
559

560
        let box_plot = ScatterPlot {
1✔
561
            params: ScatterPlotParams {
1✔
562
                column_x: "fo".to_string(),
1✔
563
                column_y: "bar".to_string(),
1✔
564
            },
1✔
565
            sources: vector_source.into(),
1✔
566
        };
1✔
567

568
        let execution_context = MockExecutionContext::test_default();
1✔
569

570
        let init = box_plot
1✔
571
            .boxed()
1✔
572
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
573
            .await;
1✔
574

575
        assert!(init.is_err());
1✔
576
    }
1✔
577

578
    #[tokio::test]
579
    async fn vector_data_missing_column_y() {
1✔
580
        let vector_source = MockFeatureCollectionSource::single(
1✔
581
            DataCollection::from_slices(
1✔
582
                &[] as &[NoGeometry],
1✔
583
                &[TimeInterval::default(); 1],
1✔
584
                &[
1✔
585
                    ("foo", FeatureData::Text(vec!["test".to_string()])),
1✔
586
                    ("bar", FeatureData::Int(vec![64])),
1✔
587
                ],
1✔
588
            )
589
            .unwrap(),
1✔
590
        )
591
        .boxed();
1✔
592

593
        let box_plot = ScatterPlot {
1✔
594
            params: ScatterPlotParams {
1✔
595
                column_x: "foo".to_string(),
1✔
596
                column_y: "ba".to_string(),
1✔
597
            },
1✔
598
            sources: vector_source.into(),
1✔
599
        };
1✔
600

601
        let execution_context = MockExecutionContext::test_default();
1✔
602

603
        let init = box_plot
1✔
604
            .boxed()
1✔
605
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
606
            .await;
1✔
607

608
        assert!(init.is_err());
1✔
609
    }
1✔
610

611
    #[tokio::test]
612
    async fn vector_data_single_feature() {
1✔
613
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
614
            DataCollection::from_slices(
1✔
615
                &[] as &[NoGeometry],
1✔
616
                &[TimeInterval::default(); 1],
1✔
617
                &[
1✔
618
                    ("foo", FeatureData::Int(vec![1])),
1✔
619
                    ("bar", FeatureData::Int(vec![1])),
1✔
620
                ],
1✔
621
            )
622
            .unwrap(),
1✔
623
        ])
624
        .boxed();
1✔
625

626
        let box_plot = ScatterPlot {
1✔
627
            params: ScatterPlotParams {
1✔
628
                column_x: "foo".to_string(),
1✔
629
                column_y: "bar".to_string(),
1✔
630
            },
1✔
631
            sources: vector_source.into(),
1✔
632
        };
1✔
633

634
        let execution_context = MockExecutionContext::test_default();
1✔
635

636
        let query_processor = box_plot
1✔
637
            .boxed()
1✔
638
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
639
            .await
1✔
640
            .unwrap()
1✔
641
            .query_processor()
1✔
642
            .unwrap()
1✔
643
            .json_vega()
1✔
644
            .unwrap();
1✔
645

646
        let result = query_processor
1✔
647
            .plot_query(
1✔
648
                PlotQueryRectangle {
1✔
649
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
650
                        .unwrap(),
1✔
651
                    time_interval: TimeInterval::default(),
1✔
652
                    spatial_resolution: SpatialResolution::one(),
1✔
653
                    attributes: PlotSeriesSelection::all(),
1✔
654
                },
1✔
655
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
656
            )
1✔
657
            .await
1✔
658
            .unwrap();
1✔
659

660
        let mut expected =
1✔
661
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
662
        expected.update(Coordinate2D::new(1.0, 1.0));
1✔
663
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
664
    }
1✔
665

666
    #[tokio::test]
667
    async fn vector_data_empty() {
1✔
668
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
669
            DataCollection::from_slices(
1✔
670
                &[] as &[NoGeometry],
1✔
671
                &[] as &[TimeInterval],
1✔
672
                &[
1✔
673
                    ("foo", FeatureData::Int(vec![])),
1✔
674
                    ("bar", FeatureData::Int(vec![])),
1✔
675
                ],
1✔
676
            )
677
            .unwrap(),
1✔
678
        ])
679
        .boxed();
1✔
680

681
        let box_plot = ScatterPlot {
1✔
682
            params: ScatterPlotParams {
1✔
683
                column_x: "foo".to_string(),
1✔
684
                column_y: "bar".to_string(),
1✔
685
            },
1✔
686
            sources: vector_source.into(),
1✔
687
        };
1✔
688

689
        let execution_context = MockExecutionContext::test_default();
1✔
690

691
        let query_processor = box_plot
1✔
692
            .boxed()
1✔
693
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
694
            .await
1✔
695
            .unwrap()
1✔
696
            .query_processor()
1✔
697
            .unwrap()
1✔
698
            .json_vega()
1✔
699
            .unwrap();
1✔
700

701
        let result = query_processor
1✔
702
            .plot_query(
1✔
703
                PlotQueryRectangle {
1✔
704
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
705
                        .unwrap(),
1✔
706
                    time_interval: TimeInterval::default(),
1✔
707
                    spatial_resolution: SpatialResolution::one(),
1✔
708
                    attributes: PlotSeriesSelection::all(),
1✔
709
                },
1✔
710
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
711
            )
1✔
712
            .await
1✔
713
            .unwrap();
1✔
714

715
        let expected =
1✔
716
            geoengine_datatypes::plots::ScatterPlot::new("foo".to_string(), "bar".to_string());
1✔
717
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
718
    }
1✔
719

720
    #[tokio::test]
721
    async fn to_histogram_at_end() {
1✔
722
        let mut values = vec![1; 700];
1✔
723
        values.push(2);
1✔
724

725
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
726
            DataCollection::from_slices(
1✔
727
                &[] as &[NoGeometry],
1✔
728
                &[TimeInterval::default(); 701],
1✔
729
                &[
1✔
730
                    ("foo", FeatureData::Int(values.clone())),
1✔
731
                    ("bar", FeatureData::Int(values.clone())),
1✔
732
                ],
1✔
733
            )
734
            .unwrap(),
1✔
735
        ])
736
        .boxed();
1✔
737

738
        let box_plot = ScatterPlot {
1✔
739
            params: ScatterPlotParams {
1✔
740
                column_x: "foo".to_string(),
1✔
741
                column_y: "bar".to_string(),
1✔
742
            },
1✔
743
            sources: vector_source.into(),
1✔
744
        };
1✔
745

746
        let execution_context = MockExecutionContext::test_default();
1✔
747

748
        let query_processor = box_plot
1✔
749
            .boxed()
1✔
750
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
751
            .await
1✔
752
            .unwrap()
1✔
753
            .query_processor()
1✔
754
            .unwrap()
1✔
755
            .json_vega()
1✔
756
            .unwrap();
1✔
757

758
        let result = query_processor
1✔
759
            .plot_query(
1✔
760
                PlotQueryRectangle {
1✔
761
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
762
                        .unwrap(),
1✔
763
                    time_interval: TimeInterval::default(),
1✔
764
                    spatial_resolution: SpatialResolution::one(),
1✔
765
                    attributes: PlotSeriesSelection::all(),
1✔
766
                },
1✔
767
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
768
            )
1✔
769
            .await
1✔
770
            .unwrap();
1✔
771

772
        let dim_x = HistogramDimension::new("foo".to_string(), 1.0, 2.0, 26).unwrap();
1✔
773
        let dim_y = HistogramDimension::new("bar".to_string(), 1.0, 2.0, 26).unwrap();
1✔
774

775
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dim_x, dim_y);
1✔
776
        expected.update_batch(
1✔
777
            values
1✔
778
                .into_iter()
1✔
779
                .map(|x| Coordinate2D::new(x as f64, x as f64)),
701✔
780
        );
781
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
782
    }
1✔
783

784
    #[tokio::test]
785
    async fn to_histogram_while_iterating() {
1✔
786
        let mut values = vec![1; 5999];
1✔
787
        values.push(2);
1✔
788

789
        let vector_source = MockFeatureCollectionSource::multiple(vec![
1✔
790
            DataCollection::from_slices(
1✔
791
                &[] as &[NoGeometry],
1✔
792
                &[TimeInterval::default(); 6000],
1✔
793
                &[
1✔
794
                    ("foo", FeatureData::Int(values.clone())),
1✔
795
                    ("bar", FeatureData::Int(values.clone())),
1✔
796
                ],
1✔
797
            )
798
            .unwrap(),
1✔
799
            DataCollection::from_slices(
1✔
800
                &[] as &[NoGeometry],
1✔
801
                &[TimeInterval::default(); 6000],
1✔
802
                &[
1✔
803
                    ("foo", FeatureData::Int(values.clone())),
1✔
804
                    ("bar", FeatureData::Int(values.clone())),
1✔
805
                ],
1✔
806
            )
807
            .unwrap(),
1✔
808
            DataCollection::from_slices(
1✔
809
                &[] as &[NoGeometry],
1✔
810
                &[TimeInterval::default(); 6000],
1✔
811
                &[
1✔
812
                    ("foo", FeatureData::Int(values.clone())),
1✔
813
                    ("bar", FeatureData::Int(values.clone())),
1✔
814
                ],
1✔
815
            )
816
            .unwrap(),
1✔
817
        ])
818
        .boxed();
1✔
819

820
        let box_plot = ScatterPlot {
1✔
821
            params: ScatterPlotParams {
1✔
822
                column_x: "foo".to_string(),
1✔
823
                column_y: "bar".to_string(),
1✔
824
            },
1✔
825
            sources: vector_source.into(),
1✔
826
        };
1✔
827

828
        let execution_context = MockExecutionContext::test_default();
1✔
829

830
        let query_processor = box_plot
1✔
831
            .boxed()
1✔
832
            .initialize(WorkflowOperatorPath::initialize_root(), &execution_context)
1✔
833
            .await
1✔
834
            .unwrap()
1✔
835
            .query_processor()
1✔
836
            .unwrap()
1✔
837
            .json_vega()
1✔
838
            .unwrap();
1✔
839

840
        let result = query_processor
1✔
841
            .plot_query(
1✔
842
                PlotQueryRectangle {
1✔
843
                    spatial_bounds: BoundingBox2D::new((-180., -90.).into(), (180., 90.).into())
1✔
844
                        .unwrap(),
1✔
845
                    time_interval: TimeInterval::default(),
1✔
846
                    spatial_resolution: SpatialResolution::one(),
1✔
847
                    attributes: PlotSeriesSelection::all(),
1✔
848
                },
1✔
849
                &MockQueryContext::new(ChunkByteSize::MIN),
1✔
850
            )
1✔
851
            .await
1✔
852
            .unwrap();
1✔
853

854
        let dim_x = HistogramDimension::new("foo".to_string(), 1.0, 2.0, 100).unwrap();
1✔
855
        let dim_y = HistogramDimension::new("bar".to_string(), 1.0, 2.0, 100).unwrap();
1✔
856

857
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dim_x, dim_y);
1✔
858
        expected.update_batch(
1✔
859
            values
1✔
860
                .iter()
1✔
861
                .map(|x| Coordinate2D::new(*x as f64, *x as f64)),
6,000✔
862
        );
863
        expected.update_batch(
1✔
864
            values
1✔
865
                .iter()
1✔
866
                .map(|x| Coordinate2D::new(*x as f64, *x as f64)),
6,000✔
867
        );
868
        expected.update_batch(
1✔
869
            values
1✔
870
                .iter()
1✔
871
                .map(|x| Coordinate2D::new(*x as f64, *x as f64)),
6,000✔
872
        );
873
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), result);
1✔
874
    }
1✔
875

876
    #[test]
877
    fn test_collector_kind_empty() {
1✔
878
        let cx = "x".to_string();
1✔
879
        let cy = "y".to_string();
1✔
880

881
        let c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
882
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
883

884
        let expected = geoengine_datatypes::plots::ScatterPlot::new(cx, cy)
1✔
885
            .to_vega_embeddable(false)
1✔
886
            .unwrap();
1✔
887

888
        assert_eq!(expected, res);
1✔
889
    }
1✔
890

891
    #[test]
892
    fn test_collector_kind_scatter_plot() {
1✔
893
        let cx = "x".to_string();
1✔
894
        let cy = "y".to_string();
1✔
895

896
        let mut values = Vec::with_capacity(200);
1✔
897
        for i in 0..SCATTER_PLOT_THRESHOLD / 2 {
250✔
898
            values.push(Coordinate2D::new(i as f64, i as f64));
250✔
899
        }
250✔
900

901
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
902

903
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
904

905
        assert!(matches!(c, CollectorKind::Values(_)));
1✔
906

907
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
908

909
        let mut expected = geoengine_datatypes::plots::ScatterPlot::new(cx, cy);
1✔
910
        expected.update_batch(values.into_iter());
1✔
911

912
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
913
    }
1✔
914

915
    #[test]
916
    fn test_collector_kind_histogram_end() {
1✔
917
        let cx = "x".to_string();
1✔
918
        let cy = "y".to_string();
1✔
919

920
        let element_count = SCATTER_PLOT_THRESHOLD * 2;
1✔
921

922
        let mut values = Vec::with_capacity(element_count);
1✔
923
        for i in 0..element_count {
1,000✔
924
            values.push(Coordinate2D::new(i as f64, i as f64));
1,000✔
925
        }
1,000✔
926

927
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
928
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
929

930
        assert!(matches!(c, CollectorKind::Values(_)));
1✔
931

932
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
933

934
        // expected
935
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
936
        let dimx =
1✔
937
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
938

939
        let dimy =
1✔
940
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
941

942
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
943
        expected.update_batch(values.into_iter());
1✔
944

945
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
946
    }
1✔
947
    #[test]
948
    fn test_collector_kind_histogram_in_flight() {
1✔
949
        let cx = "x".to_string();
1✔
950
        let cy = "y".to_string();
1✔
951

952
        let element_count = COLLECTOR_TO_HISTOGRAM_THRESHOLD + 1;
1✔
953

954
        let mut values = Vec::with_capacity(element_count);
1✔
955
        for i in 0..element_count {
10,001✔
956
            values.push(Coordinate2D::new(i as f64, i as f64));
10,001✔
957
        }
10,001✔
958

959
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
960
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
961

962
        assert!(matches!(c, CollectorKind::Histogram(_)));
1✔
963

964
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
965

966
        // expected
967
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
968
        let dimx =
1✔
969
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
970

971
        let dimy =
1✔
972
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
973

974
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
975
        expected.update_batch(values.into_iter());
1✔
976

977
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
978
    }
1✔
979

980
    #[test]
981
    fn test_collector_kind_histogram_out_of_range() {
1✔
982
        let cx = "x".to_string();
1✔
983
        let cy = "y".to_string();
1✔
984

985
        let element_count = COLLECTOR_TO_HISTOGRAM_THRESHOLD + 1;
1✔
986

987
        let mut values = Vec::with_capacity(element_count);
1✔
988
        for i in 0..element_count {
10,001✔
989
            values.push(Coordinate2D::new(i as f64, i as f64));
10,001✔
990
        }
10,001✔
991

992
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
993
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
994

995
        assert!(matches!(c, CollectorKind::Histogram(_)));
1✔
996

997
        // This value should be skipped
998
        c.add_batch(
1✔
999
            [Coordinate2D::new(
1✔
1000
                element_count as f64,
1✔
1001
                element_count as f64,
1✔
1002
            )]
1✔
1003
            .into_iter(),
1✔
1004
        )
1005
        .unwrap();
1✔
1006

1007
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
1008

1009
        // expected
1010
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
1011
        let dimx =
1✔
1012
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1013

1014
        let dimy =
1✔
1015
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1016

1017
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
1018
        expected.update_batch(values.into_iter());
1✔
1019

1020
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
1021
    }
1✔
1022

1023
    #[test]
1024
    fn test_collector_kind_histogram_infinite() {
1✔
1025
        let cx = "x".to_string();
1✔
1026
        let cy = "y".to_string();
1✔
1027

1028
        let element_count = COLLECTOR_TO_HISTOGRAM_THRESHOLD + 1;
1✔
1029

1030
        let mut values = Vec::with_capacity(element_count);
1✔
1031
        for i in 0..element_count {
10,001✔
1032
            values.push(Coordinate2D::new(i as f64, i as f64));
10,001✔
1033
        }
10,001✔
1034

1035
        let mut c = CollectorKind::Values(Collector::new(cx.clone(), cy.clone()));
1✔
1036
        c.add_batch(values.clone().into_iter()).unwrap();
1✔
1037

1038
        assert!(matches!(c, CollectorKind::Histogram(_)));
1✔
1039

1040
        // This value should be skipped
1041
        c.add_batch([Coordinate2D::new(f64::NAN, f64::NAN)].into_iter())
1✔
1042
            .unwrap();
1✔
1043

1044
        let res = c.into_plot().unwrap().to_vega_embeddable(false).unwrap();
1✔
1045

1046
        // expected
1047
        let bucket_count = std::cmp::min(100, f64::sqrt(element_count as f64) as usize);
1✔
1048
        let dimx =
1✔
1049
            HistogramDimension::new(cx, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1050

1051
        let dimy =
1✔
1052
            HistogramDimension::new(cy, 0.0, (element_count - 1) as f64, bucket_count).unwrap();
1✔
1053

1054
        let mut expected = geoengine_datatypes::plots::Histogram2D::new(dimx, dimy);
1✔
1055
        expected.update_batch(values.into_iter());
1✔
1056

1057
        assert_eq!(expected.to_vega_embeddable(false).unwrap(), res);
1✔
1058
    }
1✔
1059
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc