• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.23
/operators/src/processing/expression/vector_operator.rs
1
use super::{
2
    canonicalize_name, error::vector as error, get_expression_dependencies, AsExpressionGeo,
3
    FromExpressionGeo, VectorExpressionError,
4
};
5
use crate::{
6
    engine::{
7
        CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator,
8
        Operator, OperatorName, QueryContext, SingleVectorSource, TypedVectorQueryProcessor,
9
        VectorColumnInfo, VectorOperator, VectorQueryProcessor, VectorResultDescriptor,
10
        WorkflowOperatorPath,
11
    },
12
    util::Result,
13
};
14
use async_trait::async_trait;
15
use futures::stream::BoxStream;
16
use futures::StreamExt;
17
use geoengine_datatypes::primitives::{
18
    FeatureData, FeatureDataRef, FeatureDataType, FloatOptionsParIter, Geometry, Measurement,
19
    MultiLineString, MultiPoint, MultiPolygon, VectorQueryRectangle,
20
};
21
use geoengine_datatypes::util::arrow::ArrowTyped;
22
use geoengine_datatypes::{
23
    collections::{
24
        FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
25
        GeoFeatureCollectionModifications, GeoVectorDataType, IntoGeometryOptionsIterator,
26
        VectorDataType,
27
    },
28
    primitives::NoGeometry,
29
};
30
use geoengine_expression::{
31
    is_allowed_variable_name, DataType, ExpressionParser, LinkedExpression,
32
    Parameter as ExpressionParameter,
33
};
34
use rayon::iter::{
35
    FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, ParallelIterator,
36
};
37
use serde::{Deserialize, Serialize};
38
use snafu::ResultExt;
39
use std::collections::HashMap;
40
use std::marker::PhantomData;
41
use std::sync::Arc;
42

43
/// A vector expression creates or replaces a column in a `FeatureCollection` by evaluating an expression.
44
/// The expression receives the feature's columns as variables.
45
pub type VectorExpression = Operator<VectorExpressionParams, SingleVectorSource>;
46

47
impl OperatorName for VectorExpression {
48
    const TYPE_NAME: &'static str = "VectorExpression";
49
}
50

51
const MAX_INPUT_COLUMNS: usize = 8;
52
const PARALLEL_MIN_BATCH_SIZE: usize = 32; // TODO: find good default
53
const EXPRESSION_MAIN_NAME: &str = "expression";
54

55
#[derive(Debug, Clone, Deserialize, Serialize)]
6✔
56
#[serde(rename_all = "camelCase")]
57
pub struct VectorExpressionParams {
58
    /// The columns to use as variables in the expression.
59
    ///
60
    /// For usage in the expression, all special characters are replaced by underscores.
61
    /// E.g., `precipitation.cm` becomes `precipitation_cm`.
62
    ///
63
    /// If the column name starts with a number, an underscore is prepended.
64
    /// E.g., `1column` becomes `_1column`.
65
    ///
66
    pub input_columns: Vec<String>,
67

68
    /// The expression to evaluate.
69
    pub expression: String,
70

71
    /// The type and name of the new column.
72
    pub output_column: OutputColumn,
73

74
    /// The expression will always include the geometry column.
75
    /// Thus, it is necessary to specify the variable name of the geometry column.
76
    /// The default is `geom`.
77
    #[serde(default = "geometry_default_column_name")]
78
    pub geometry_column_name: String,
79

80
    /// The measurement of the new column.
81
    /// The default is [`Measurement::Unitless`].
82
    #[serde(default)]
83
    pub output_measurement: Measurement,
84
}
85

86
fn geometry_default_column_name() -> String {
×
87
    "geom".into()
×
88
}
×
89

90
/// Specify the output of the expression.
91
#[derive(Debug, Clone, Deserialize, Serialize)]
3✔
92
#[serde(tag = "type", content = "value", rename_all = "camelCase")]
93
pub enum OutputColumn {
94
    /// The expression will override the current geometry
95
    Geometry(GeoVectorDataType),
96
    /// The expression will append a new `Float` column
97
    // TODO: allow more types than `Float`s
98
    Column(String),
99
}
100

101
struct InitializedVectorExpression {
102
    name: CanonicOperatorName,
103
    result_descriptor: VectorResultDescriptor,
104
    features: Box<dyn InitializedVectorOperator>,
105
    expression: Arc<LinkedExpression>,
106
    input_columns: Vec<String>,
107
    output_column: OutputColumn,
108
}
109

110
#[typetag::serde]
×
111
#[async_trait]
112
impl VectorOperator for VectorExpression {
113
    async fn _initialize(
114
        self: Box<Self>,
115
        path: WorkflowOperatorPath,
116
        context: &dyn ExecutionContext,
117
    ) -> Result<Box<dyn InitializedVectorOperator>> {
5✔
118
        // TODO: This is super ugly to being forced to do this on every operator. This must be refactored.
119
        let name = CanonicOperatorName::from(&self);
5✔
120

5✔
121
        if self.params.input_columns.len() > MAX_INPUT_COLUMNS {
5✔
UNCOV
122
            Err(VectorExpressionError::TooManyInputColumns {
×
123
                max: MAX_INPUT_COLUMNS,
×
124
                found: self.params.input_columns.len(),
×
125
            })?;
×
126
        }
5✔
127

128
        let initialized_source = self.sources.initialize_sources(path, context).await?;
5✔
129

130
        // we can reuse the result descriptor, because we only add a column later on
131
        let mut result_descriptor = initialized_source.vector.result_descriptor().clone();
5✔
132

5✔
133
        check_input_column_validity(&result_descriptor.columns, &self.params.input_columns)?;
5✔
134
        check_output_column_validity(&self.params.output_column)?;
5✔
135

136
        let expression_geom_input_type = result_descriptor.data_type;
5✔
137
        let expression_output_type = match &self.params.output_column {
5✔
138
            OutputColumn::Geometry(vector_data_type) => {
1✔
139
                result_descriptor.data_type = (*vector_data_type).into();
1✔
140
                match vector_data_type {
1✔
141
                    GeoVectorDataType::MultiPoint => DataType::MultiPoint,
1✔
UNCOV
142
                    GeoVectorDataType::MultiLineString => DataType::MultiLineString,
×
UNCOV
143
                    GeoVectorDataType::MultiPolygon => DataType::MultiPolygon,
×
144
                }
145
            }
146
            OutputColumn::Column(output_column_name) => {
4✔
147
                insert_new_column(
4✔
148
                    &mut result_descriptor.columns,
4✔
149
                    output_column_name.clone(),
4✔
150
                    self.params.output_measurement,
4✔
151
                )?;
4✔
152
                DataType::Number
4✔
153
            }
154
        };
155

156
        let mut expression_input_names = Vec::with_capacity(self.params.input_columns.len());
5✔
157
        for input_column in &self.params.input_columns {
16✔
158
            let variable_name = canonicalize_name(input_column);
11✔
159

11✔
160
            if !is_allowed_variable_name(&variable_name) {
11✔
UNCOV
161
                return Err(VectorExpressionError::ColumnNameContainsSpecialCharacters {
×
162
                    name: variable_name,
×
163
                })?;
×
164
            }
11✔
165

11✔
166
            expression_input_names.push(variable_name);
11✔
167
        }
168

169
        let expression = {
5✔
170
            let expression_code = self.params.expression.clone();
5✔
171
            let geometry_column_name = self.params.geometry_column_name.clone();
5✔
172

5✔
173
            crate::util::spawn_blocking(move || {
5✔
174
                compile_expression(
5✔
175
                    &expression_code,
5✔
176
                    geometry_column_name,
5✔
177
                    expression_geom_input_type,
5✔
178
                    &expression_input_names,
5✔
179
                    expression_output_type,
5✔
180
                )
5✔
181
                .map(Arc::new)
5✔
182
            })
5✔
183
            .await
5✔
184
            .map_err(|source| VectorExpressionError::CompilationTask { source })??
5✔
185
        };
186

187
        let initialized_operator = InitializedVectorExpression {
5✔
188
            name,
5✔
189
            result_descriptor,
5✔
190
            features: initialized_source.vector,
5✔
191
            expression,
5✔
192
            input_columns: self.params.input_columns,
5✔
193
            output_column: self.params.output_column,
5✔
194
        };
5✔
195

5✔
196
        Ok(initialized_operator.boxed())
5✔
197
    }
10✔
198

199
    span_fn!(VectorExpression);
200
}
201

202
fn check_input_column_validity(
5✔
203
    columns: &HashMap<String, VectorColumnInfo>,
5✔
204
    input_columns: &[String],
5✔
205
) -> Result<(), VectorExpressionError> {
5✔
206
    for input_column in input_columns {
16✔
207
        if input_column.contains(|c: char| !c.is_alphanumeric()) {
25✔
208
            Err(VectorExpressionError::ColumnNameContainsSpecialCharacters {
×
209
                name: input_column.clone(),
×
210
            })?;
×
211
        }
11✔
212

213
        let Some(column_info) = columns.get(input_column) else {
11✔
214
            return Err(VectorExpressionError::InputColumnNotExisting {
×
215
                name: input_column.clone(),
×
216
            });
×
217
        };
218

219
        match column_info.data_type {
11✔
220
            FeatureDataType::Float | FeatureDataType::Int => {}
11✔
221
            _ => Err(VectorExpressionError::InputColumnNotNumeric {
×
222
                name: input_column.clone(),
×
223
            })?,
×
224
        }
225
    }
226

227
    Ok(())
5✔
228
}
5✔
229

230
fn check_output_column_validity(output_column: &OutputColumn) -> Result<(), VectorExpressionError> {
5✔
231
    match output_column {
5✔
232
        OutputColumn::Geometry(_) => {}
1✔
233
        OutputColumn::Column(column) => {
4✔
234
            if column.contains(|c: char| !c.is_alphanumeric()) {
13✔
235
                Err(VectorExpressionError::ColumnNameContainsSpecialCharacters {
×
236
                    name: column.clone(),
×
237
                })?;
×
238
            }
4✔
239
        }
240
    };
241

242
    Ok(())
5✔
243
}
5✔
244

245
fn insert_new_column(
4✔
246
    columns: &mut HashMap<String, VectorColumnInfo>,
4✔
247
    name: String,
4✔
248
    measurement: Measurement,
4✔
249
) -> Result<(), VectorExpressionError> {
4✔
250
    let output_column_collision = columns.insert(
4✔
251
        name.clone(),
4✔
252
        VectorColumnInfo {
4✔
253
            data_type: FeatureDataType::Float,
4✔
254
            measurement,
4✔
255
        },
4✔
256
    );
4✔
257

4✔
258
    if output_column_collision.is_some() {
4✔
259
        return Err(VectorExpressionError::OutputColumnCollision { name });
×
260
    }
4✔
261

4✔
262
    Ok(())
4✔
263
}
4✔
264

265
fn compile_expression(
5✔
266
    expression_code: &str,
5✔
267
    geom_name: String,
5✔
268
    geom_type: VectorDataType,
5✔
269
    parameters: &[String],
5✔
270
    output_type: DataType,
5✔
271
) -> Result<LinkedExpression, VectorExpressionError> {
5✔
272
    let geom_parameter = match geom_type {
5✔
273
        VectorDataType::Data | VectorDataType::MultiPoint => {
274
            ExpressionParameter::MultiPoint(geom_name.into())
3✔
275
        }
276
        VectorDataType::MultiLineString => ExpressionParameter::MultiLineString(geom_name.into()),
×
277
        VectorDataType::MultiPolygon => ExpressionParameter::MultiPolygon(geom_name.into()),
2✔
278
    };
279
    let mut expression_parameters = Vec::with_capacity(parameters.len() + 1);
5✔
280
    expression_parameters.push(geom_parameter);
5✔
281
    expression_parameters.extend(
5✔
282
        parameters
5✔
283
            .iter()
5✔
284
            .map(|p| ExpressionParameter::Number(p.into())),
11✔
285
    );
5✔
286
    let expression = ExpressionParser::new(&expression_parameters, output_type)?
5✔
287
        .parse(EXPRESSION_MAIN_NAME, expression_code)?;
5✔
288

289
    let expression_dependencies = get_expression_dependencies().context(error::Dependencies)?;
5✔
290

291
    Ok(LinkedExpression::from_ast(
5✔
292
        &expression,
5✔
293
        expression_dependencies,
5✔
294
    )?)
5✔
295
}
5✔
296

297
impl InitializedVectorExpression {
298
    #[inline]
299
    fn column_processor<G>(
4✔
300
        &self,
4✔
301
        source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
4✔
302
        output_column: String,
4✔
303
    ) -> TypedVectorQueryProcessor
4✔
304
    where
4✔
305
        G: Geometry + ArrowTyped + 'static,
4✔
306
        FeatureCollection<G>: for<'g> IntoGeometryOptionsIterator<'g> + 'static,
4✔
307
        for<'g> <FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryType:
4✔
308
            AsExpressionGeo + Send,
4✔
309
        for<'g> <<FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
4✔
310
        IndexedParallelIterator + Send,
4✔
311
        TypedVectorQueryProcessor: From<Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>>,
4✔
312
    {
4✔
313
        VectorExpressionColumnProcessor {
4✔
314
            source,
4✔
315
            result_descriptor: self.result_descriptor.clone(),
4✔
316
            expression: self.expression.clone(),
4✔
317
            input_columns: self.input_columns.clone(),
4✔
318
            output_column,
4✔
319
        }
4✔
320
        .boxed()
4✔
321
        .into()
4✔
322
    }
4✔
323

324
    #[inline]
325
    fn geometry_processor<GIn, GOut>(
1✔
326
        &self,
1✔
327
        source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<GIn>>>,
1✔
328
    ) -> TypedVectorQueryProcessor
1✔
329
    where
1✔
330
        GIn: Geometry + ArrowTyped + Send + Sync + 'static + Sized,
1✔
331
        GOut: Geometry
1✔
332
            + ArrowTyped
1✔
333
            + FromExpressionGeo
1✔
334
            + Send
1✔
335
            + Sync
1✔
336
            + 'static
1✔
337
            + Sized,
1✔
338
        FeatureCollection<GIn>: GeoFeatureCollectionModifications<GOut> + for<'g> IntoGeometryOptionsIterator<'g>,
1✔
339
        for<'g> <<FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
1✔
340
            IndexedParallelIterator + Send,
1✔
341
        for<'g> <FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryType: AsExpressionGeo,
1✔
342
        TypedVectorQueryProcessor: From<Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<GOut>>>>,
1✔
343
    {
1✔
344
        VectorExpressionGeometryProcessor {
1✔
345
            source,
1✔
346
            result_descriptor: self.result_descriptor.clone(),
1✔
347
            expression: self.expression.clone(),
1✔
348
            input_columns: self.input_columns.clone(),
1✔
349
            _out: PhantomData::<GOut>,
1✔
350
        }
1✔
351
        .boxed()
1✔
352
        .into()
1✔
353
    }
1✔
354

355
    #[inline]
356
    fn dispatch_float_column_output(
4✔
357
        &self,
4✔
358
        source_processor: TypedVectorQueryProcessor,
4✔
359
        output_column: String,
4✔
360
    ) -> TypedVectorQueryProcessor {
4✔
361
        match source_processor {
4✔
362
            TypedVectorQueryProcessor::Data(source) => {
×
363
                self.column_processor::<NoGeometry>(source, output_column)
×
364
            }
365
            TypedVectorQueryProcessor::MultiPoint(source) => {
3✔
366
                self.column_processor::<MultiPoint>(source, output_column)
3✔
367
            }
368
            TypedVectorQueryProcessor::MultiLineString(source) => {
×
369
                self.column_processor::<MultiLineString>(source, output_column)
×
370
            }
371
            TypedVectorQueryProcessor::MultiPolygon(source) => {
1✔
372
                self.column_processor::<MultiPolygon>(source, output_column)
1✔
373
            }
374
        }
375
    }
4✔
376

377
    #[inline]
378
    fn dispatch_geometry_output_for_type<GOut>(
1✔
379
        &self,
1✔
380
        source_processor: TypedVectorQueryProcessor,
1✔
381
    ) -> TypedVectorQueryProcessor
1✔
382
    where
1✔
383
        GOut: Geometry + ArrowTyped + FromExpressionGeo + Send + Sync + 'static + Sized,
1✔
384
        TypedVectorQueryProcessor:
1✔
385
            From<Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<GOut>>>>,
1✔
386
    {
1✔
387
        match source_processor {
1✔
388
            TypedVectorQueryProcessor::Data(source) => {
×
389
                self.geometry_processor::<NoGeometry, GOut>(source)
×
390
            }
391
            TypedVectorQueryProcessor::MultiPoint(source) => {
×
392
                self.geometry_processor::<MultiPoint, GOut>(source)
×
393
            }
394
            TypedVectorQueryProcessor::MultiLineString(source) => {
×
395
                self.geometry_processor::<MultiLineString, GOut>(source)
×
396
            }
397
            TypedVectorQueryProcessor::MultiPolygon(source) => {
1✔
398
                self.geometry_processor::<MultiPolygon, GOut>(source)
1✔
399
            }
400
        }
401
    }
1✔
402

403
    #[inline]
404
    fn dispatch_geometry_output(
1✔
405
        &self,
1✔
406
        source_processor: TypedVectorQueryProcessor,
1✔
407
        vector_data_type: GeoVectorDataType,
1✔
408
    ) -> TypedVectorQueryProcessor {
1✔
409
        match vector_data_type {
1✔
410
            GeoVectorDataType::MultiPoint => {
411
                self.dispatch_geometry_output_for_type::<MultiPoint>(source_processor)
1✔
412
            }
413
            GeoVectorDataType::MultiLineString => {
414
                self.dispatch_geometry_output_for_type::<MultiLineString>(source_processor)
×
415
            }
416
            GeoVectorDataType::MultiPolygon => {
417
                self.dispatch_geometry_output_for_type::<MultiPolygon>(source_processor)
×
418
            }
419
        }
420
    }
1✔
421
}
422

423
impl InitializedVectorOperator for InitializedVectorExpression {
424
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
425
        &self.result_descriptor
×
426
    }
×
427

428
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
429
        let source_processor = self.features.query_processor()?;
5✔
430

431
        Ok(match self.output_column.clone() {
5✔
432
            OutputColumn::Geometry(vector_data_type) => {
1✔
433
                self.dispatch_geometry_output(source_processor, vector_data_type)
1✔
434
            }
435
            OutputColumn::Column(output_column) => {
4✔
436
                self.dispatch_float_column_output(source_processor, output_column)
4✔
437
            }
438
        })
439
    }
5✔
440

441
    fn canonic_name(&self) -> CanonicOperatorName {
×
442
        self.name.clone()
×
443
    }
×
444
}
445

446
/// A processor that evaluates an expression on the columns of a `FeatureCollection`.
447
/// The result is a new `FeatureCollection` with the evaluated column added.
448
pub struct VectorExpressionColumnProcessor<Q, G>
449
where
450
    G: Geometry,
451
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
452
{
453
    source: Q,
454
    result_descriptor: VectorResultDescriptor,
455
    expression: Arc<LinkedExpression>,
456
    input_columns: Vec<String>,
457
    output_column: String,
458
}
459

460
/// A processor that evaluates an expression on the columns of a `FeatureCollection`.
461
/// The result is a new `FeatureCollection` with a replaced geometry column.
462
pub struct VectorExpressionGeometryProcessor<Q, GIn, GOut>
463
where
464
    GIn: Geometry,
465
    GOut: Geometry,
466
    Q: VectorQueryProcessor<VectorType = FeatureCollection<GIn>>,
467
{
468
    source: Q,
469
    result_descriptor: VectorResultDescriptor,
470
    expression: Arc<LinkedExpression>,
471
    input_columns: Vec<String>,
472
    _out: PhantomData<GOut>,
473
}
474

475
type ExpressionGeometryType<'g, G> = <<FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryType as AsExpressionGeo>::ExpressionGeometryType;
476

477
#[async_trait]
478
impl<Q, G> VectorQueryProcessor for VectorExpressionColumnProcessor<Q, G>
479
where
480
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
481
    G: Geometry + ArrowTyped + 'static,
482
    FeatureCollection<G>: for<'g> IntoGeometryOptionsIterator<'g> + 'static,
483
    for<'g> <FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryType:
484
        AsExpressionGeo + Send,
485
    for<'g> <<FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
486
        IndexedParallelIterator + Send,
487
{
488
    type VectorType = FeatureCollection<G>;
489

490

491
    async fn vector_query<'a>(
492
        &'a self,
493
        query: VectorQueryRectangle,
494
        ctx: &'a dyn QueryContext,
495
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
4✔
496
        let stream = self.source.vector_query(query, ctx).await?;
4✔
497

498
        let stream = stream.then(move |collection| async move {
4✔
499
            let collection = collection?;
4✔
500
            let input_columns = self.input_columns.clone();
4✔
501
            let output_column = self.output_column.clone();
4✔
502
            let expression = self.expression.clone();
4✔
503

4✔
504
            crate::util::spawn_blocking_with_thread_pool(ctx.thread_pool().clone(), move || {
4✔
505
                let result: Vec<Option<f64>> = call_expression_function(
4✔
506
                    &expression,
4✔
507
                    &collection,
4✔
508
                    &input_columns,
4✔
509
                    std::convert::identity,
4✔
510
                )?;
4✔
511

512
                Ok(collection
4✔
513
                    .add_column(&output_column, FeatureData::NullableFloat(result))
4✔
514
                    .context(error::AddColumn {
4✔
515
                        name: output_column,
4✔
516
                    })?)
4✔
517
            })
4✔
518
            .await?
4✔
519
        });
8✔
520

4✔
521
        Ok(stream.boxed())
4✔
522
    }
8✔
523

524
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
4✔
525
        &self.result_descriptor
4✔
526
    }
4✔
527
}
528

529
#[async_trait]
530
impl<Q, GIn, GOut> VectorQueryProcessor
531
    for VectorExpressionGeometryProcessor<Q, GIn, GOut>
532
where
533
    Q: VectorQueryProcessor<VectorType = FeatureCollection<GIn>> + 'static + Sized,
534
    GIn: Geometry + ArrowTyped + Send + Sync + 'static + Sized,
535
    GOut: Geometry
536
        + ArrowTyped
537
        + FromExpressionGeo
538
        + Send
539
        + Sync
540
        + 'static
541
        + Sized,
542
    FeatureCollection<GIn>: GeoFeatureCollectionModifications<GOut> + for<'g> IntoGeometryOptionsIterator<'g>,
543
    for<'g> <<FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
544
        IndexedParallelIterator + Send,
545
    for<'g> <FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryType: AsExpressionGeo,
546
    Vec<Option<GOut>>: FromParallelIterator<Option<GOut>>,
547
{
548
    type VectorType = FeatureCollection<GOut>;
549

550
    async fn vector_query<'a>(
551
        &'a self,
552
        query: VectorQueryRectangle,
553
        ctx: &'a dyn QueryContext,
554
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
1✔
555
        let stream = self.source.vector_query(query, ctx).await?;
1✔
556

557
        let stream = stream.then(move |collection| async move {
1✔
558
            let collection = collection?;
1✔
559
            let input_columns = self.input_columns.clone();
1✔
560
            let expression = self.expression.clone();
1✔
561

1✔
562
            crate::util::spawn_blocking_with_thread_pool(ctx.thread_pool().clone(), move || {
1✔
563
                let (geometry_options, row_filter): (Vec<Option<GOut>>, Vec<bool>) = call_expression_function(
1✔
564
                    &expression,
1✔
565
                    &collection,
1✔
566
                    &input_columns,
1✔
567
                    |geom_option| {
2✔
568
                        let geom_option = geom_option.and_then(<GOut as FromExpressionGeo>::from_expression_geo);
2✔
569

2✔
570
                        let row_filter = geom_option.is_some();
2✔
571

2✔
572
                        (geom_option, row_filter)
2✔
573
                    },
2✔
574
                )?;
1✔
575

576
                // remove all `None`s and output only the geometries
577
                let geometries = geometry_options.into_par_iter().with_min_len(PARALLEL_MIN_BATCH_SIZE).filter_map(std::convert::identity).collect::<Vec<_>>();
1✔
578

1✔
579
                Ok(collection
1✔
580
                    .filter(row_filter) // we have to filter out the rows with empty geometries
1✔
581
                    .context(error::FilterEmptyGeometries)?
1✔
582
                    .replace_geometries(geometries)
1✔
583
                    .context(error::ReplaceGeometries)?)
1✔
584
            })
1✔
585
            .await?
1✔
586
        });
2✔
587

1✔
588
        Ok(stream.boxed())
1✔
589
    }
2✔
590

591
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
1✔
592
        &self.result_descriptor
1✔
593
    }
1✔
594
}
595

596
fn call_expression_function<GIn, ExprOut, MapOut, Out>(
5✔
597
    expression: &Arc<LinkedExpression>,
5✔
598
    collection: &FeatureCollection<GIn>,
5✔
599
    input_columns: &[String],
5✔
600
    map_fn: fn(Option<ExprOut>) -> MapOut,
5✔
601
) -> Result<Out, VectorExpressionError>
5✔
602
where
5✔
603
    GIn: Geometry + ArrowTyped + 'static,
5✔
604
    for<'i> FeatureCollection<GIn>: IntoGeometryOptionsIterator<'i>,
5✔
605
    for<'g> <<FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
5✔
606
        IndexedParallelIterator + Send,
5✔
607
    for<'g> <FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryType: AsExpressionGeo,
5✔
608
    ExprOut: Send,
5✔
609
    MapOut: Send,
5✔
610
    Out: FromParallelIterator<MapOut> + Send,
5✔
611
{
5✔
612
    let data_columns: Vec<FeatureDataRef> = input_columns
5✔
613
        .iter()
5✔
614
        .map(|input_column| {
11✔
615
            collection
11✔
616
                .data(input_column)
11✔
617
                .expect("was checked durin initialization")
11✔
618
        })
11✔
619
        .collect();
5✔
620

5✔
621
    let float_inputs: Vec<FloatOptionsParIter> = data_columns
5✔
622
        .iter()
5✔
623
        .map(FeatureDataRef::float_options_par_iter)
5✔
624
        .collect::<Vec<_>>();
5✔
625

5✔
626
    let geom_input = collection
5✔
627
        .geometry_options()
5✔
628
        .into_par_iter()
5✔
629
        .map(|geometry_option| {
112✔
630
            if let Some(geometry) = geometry_option.as_ref() {
112✔
631
                geometry.as_expression_geo()
112✔
632
            } else {
633
                None
×
634
            }
635
        });
112✔
636

637
    macro_rules! impl_expression_subcall {
638
        ($n:literal, $($i:ident),*) => {
639
            {
640
                let [ $($i),* ] = <[_; $n]>::try_from(float_inputs).expect("it matches the match condition");
641
                let f = unsafe {
642
                    expression.function_nary::<fn(
643
                        Option<ExpressionGeometryType<'_, GIn>>,
644
                        $( impl_expression_subcall!(@float_option $i), )*
645
                    ) -> Option<ExprOut>>()
646
                }
647
                .map_err(VectorExpressionError::from)?;
648

649
                (geom_input, $($i),*)
650
                    .into_par_iter()
651
                    .with_min_len(PARALLEL_MIN_BATCH_SIZE)
652
                    .map(|(geom, $($i),*)| map_fn(f(geom, $($i),*)))
108✔
653
                    .collect()
654
            }
655
        };
656
        // Create one float option for each float input
657
        (@float_option $i:ident) => {
658
            Option<f64>
659
        };
660
    }
661

662
    Ok(match float_inputs.len() {
5✔
663
        0 => {
664
            let f = unsafe {
2✔
665
                expression.function_nary::<fn(
2✔
666
                    Option<ExpressionGeometryType<'_, GIn>>,
2✔
667
                ) -> Option<ExprOut>>()
2✔
668
            }
2✔
669
            .map_err(VectorExpressionError::from)?;
2✔
670

671
            geom_input
2✔
672
                .with_min_len(PARALLEL_MIN_BATCH_SIZE)
2✔
673
                .map(|geom| map_fn(f(geom)))
4✔
674
                .collect()
2✔
675
        }
676
        1 => impl_expression_subcall!(1, i1),
1✔
677
        2 => impl_expression_subcall!(2, i1, i2),
1✔
UNCOV
678
        3 => impl_expression_subcall!(3, i1, i2, i3),
×
UNCOV
679
        4 => impl_expression_subcall!(4, i1, i2, i3, i4),
×
UNCOV
680
        5 => impl_expression_subcall!(5, i1, i2, i3, i4, i5),
×
UNCOV
681
        6 => impl_expression_subcall!(6, i1, i2, i3, i4, i5, i6),
×
UNCOV
682
        7 => impl_expression_subcall!(7, i1, i2, i3, i4, i5, i6, i7),
×
683
        8 => impl_expression_subcall!(8, i1, i2, i3, i4, i5, i6, i7, i8),
1✔
684
        other => Err(VectorExpressionError::TooManyInputColumns {
×
685
            max: MAX_INPUT_COLUMNS,
×
686
            found: other,
×
687
        })?,
×
688
    })
689
}
5✔
690

691
#[cfg(test)]
692
mod tests {
693
    use super::*;
694
    use crate::{
695
        engine::{ChunkByteSize, MockExecutionContext, MockQueryContext, QueryProcessor},
696
        mock::MockFeatureCollectionSource,
697
    };
698
    use geoengine_datatypes::{
699
        collections::{
700
            ChunksEqualIgnoringCacheHint, IntoGeometryIterator, MultiPointCollection,
701
            MultiPolygonCollection,
702
        },
703
        primitives::{
704
            BoundingBox2D, ColumnSelection, MultiPoint, MultiPolygon, SpatialResolution,
705
            TimeInterval,
706
        },
707
        util::test::TestDefault,
708
    };
709

710
    #[test]
711
    fn it_deserializes_the_operator() {
1✔
712
        let def: Operator<VectorExpressionParams, SingleVectorSource> = VectorExpression {
1✔
713
            params: VectorExpressionParams {
1✔
714
                input_columns: vec!["foo".into(), "bar".into()],
1✔
715
                expression: "foo + bar".into(),
1✔
716
                output_column: OutputColumn::Column("baz".into()),
1✔
717
                output_measurement: Measurement::Unitless,
1✔
718
                geometry_column_name: "geom".to_string(),
1✔
719
            },
1✔
720
            sources: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![])
1✔
721
                .boxed()
1✔
722
                .into(),
1✔
723
        };
1✔
724

1✔
725
        let json = serde_json::json!({
1✔
726
            "params": {
1✔
727
                "inputColumns": ["foo", "bar"],
1✔
728
                "expression": "foo + bar",
1✔
729
                "outputColumn": {
1✔
730
                    "type": "column",
1✔
731
                    "value": "baz",
1✔
732
                },
1✔
733
                "outputMeasurement": {
1✔
734
                    "type": "unitless",
1✔
735
                },
1✔
736
                "geometryColumnName": "geom",
1✔
737
            },
1✔
738
            "sources": {
1✔
739
                "vector": {
1✔
740
                    "type": "MockFeatureCollectionSourceMultiPoint",
1✔
741
                    "params": {
1✔
742
                        "collections": [],
1✔
743
                        "spatialReference": "EPSG:4326",
1✔
744
                        "measurements": null,
1✔
745
                    }
1✔
746
                }
1✔
747
            }
1✔
748
        });
1✔
749

1✔
750
        assert_eq!(serde_json::to_value(&def).unwrap(), json.clone());
1✔
751
        let _operator: VectorExpression = serde_json::from_value(json).unwrap();
1✔
752
    }
1✔
753

754
    #[tokio::test]
755
    async fn it_computes_unary_float_expressions() {
1✔
756
        let points = MultiPointCollection::from_slices(
1✔
757
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)])
1✔
758
                .unwrap()
1✔
759
                .as_ref(),
1✔
760
            &[TimeInterval::new_unchecked(0, 1); 3],
1✔
761
            &[(
1✔
762
                "foo",
1✔
763
                FeatureData::NullableFloat(vec![Some(1.0), None, Some(3.0)]),
1✔
764
            )],
1✔
765
        )
1✔
766
        .unwrap();
1✔
767

1✔
768
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
769

1✔
770
        let operator = VectorExpression {
1✔
771
            params: VectorExpressionParams {
1✔
772
                input_columns: vec!["foo".into()],
1✔
773
                expression: "2 * foo".into(),
1✔
774
                output_column: OutputColumn::Column("bar".into()),
1✔
775
                output_measurement: Measurement::Unitless,
1✔
776
                geometry_column_name: "geom".to_string(),
1✔
777
            },
1✔
778
            sources: point_source.into(),
1✔
779
        }
1✔
780
        .boxed()
1✔
781
        .initialize(
1✔
782
            WorkflowOperatorPath::initialize_root(),
1✔
783
            &MockExecutionContext::test_default(),
1✔
784
        )
1✔
785
        .await
1✔
786
        .unwrap();
1✔
787

1✔
788
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
789

1✔
790
        let query_rectangle = VectorQueryRectangle {
1✔
791
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
792
            time_interval: TimeInterval::default(),
1✔
793
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
794
            attributes: ColumnSelection::all(),
1✔
795
        };
1✔
796
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
797

1✔
798
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
799

1✔
800
        let mut result = query
1✔
801
            .map(Result::unwrap)
1✔
802
            .collect::<Vec<MultiPointCollection>>()
1✔
803
            .await;
1✔
804

1✔
805
        assert_eq!(result.len(), 1);
1✔
806
        let result = result.remove(0);
1✔
807

1✔
808
        let expected_result = MultiPointCollection::from_slices(
1✔
809
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)])
1✔
810
                .unwrap()
1✔
811
                .as_ref(),
1✔
812
            &[TimeInterval::new_unchecked(0, 1); 3],
1✔
813
            &[
1✔
814
                (
1✔
815
                    "foo",
1✔
816
                    FeatureData::NullableFloat(vec![Some(1.0), None, Some(3.0)]),
1✔
817
                ),
1✔
818
                (
1✔
819
                    "bar",
1✔
820
                    FeatureData::NullableFloat(vec![Some(2.0), None, Some(6.0)]),
1✔
821
                ),
1✔
822
            ],
1✔
823
        )
1✔
824
        .unwrap();
1✔
825

1✔
826
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
827
        assert!(
1✔
828
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
829
            "{result:#?} != {expected_result:#?}",
1✔
830
        );
1✔
831
    }
1✔
832

833
    #[tokio::test]
834
    async fn it_computes_binary_float_expressions() {
1✔
835
        let points = MultiPointCollection::from_slices(
1✔
836
            MultiPoint::many(vec![
1✔
837
                (0.0, 0.1),
1✔
838
                (1.0, 1.1),
1✔
839
                (2.0, 2.1),
1✔
840
                (3.0, 3.1),
1✔
841
                (4.0, 4.1),
1✔
842
            ])
1✔
843
            .unwrap()
1✔
844
            .as_ref(),
1✔
845
            &[TimeInterval::new_unchecked(0, 1); 5],
1✔
846
            &[
1✔
847
                (
1✔
848
                    "foo",
1✔
849
                    FeatureData::NullableFloat(vec![Some(1.0), None, Some(3.0), None, Some(5.0)]),
1✔
850
                ),
1✔
851
                (
1✔
852
                    "bar",
1✔
853
                    FeatureData::NullableInt(vec![Some(10), None, None, Some(40), Some(50)]),
1✔
854
                ),
1✔
855
            ],
1✔
856
        )
1✔
857
        .unwrap();
1✔
858

1✔
859
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
860

1✔
861
        let operator = VectorExpression {
1✔
862
            params: VectorExpressionParams {
1✔
863
                input_columns: vec!["foo".into(), "bar".into()],
1✔
864
                expression: "foo + bar".into(),
1✔
865
                output_column: OutputColumn::Column("baz".into()),
1✔
866
                output_measurement: Measurement::Unitless,
1✔
867
                geometry_column_name: "geom".to_string(),
1✔
868
            },
1✔
869
            sources: point_source.into(),
1✔
870
        }
1✔
871
        .boxed()
1✔
872
        .initialize(
1✔
873
            WorkflowOperatorPath::initialize_root(),
1✔
874
            &MockExecutionContext::test_default(),
1✔
875
        )
1✔
876
        .await
1✔
877
        .unwrap();
1✔
878

1✔
879
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
880

1✔
881
        let query_rectangle = VectorQueryRectangle {
1✔
882
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
883
            time_interval: TimeInterval::default(),
1✔
884
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
885
            attributes: ColumnSelection::all(),
1✔
886
        };
1✔
887
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
888

1✔
889
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
890

1✔
891
        let mut result = query
1✔
892
            .map(Result::unwrap)
1✔
893
            .collect::<Vec<MultiPointCollection>>()
1✔
894
            .await;
1✔
895

1✔
896
        assert_eq!(result.len(), 1);
1✔
897
        let result = result.remove(0);
1✔
898

1✔
899
        let expected_result = MultiPointCollection::from_slices(
1✔
900
            MultiPoint::many(vec![
1✔
901
                (0.0, 0.1),
1✔
902
                (1.0, 1.1),
1✔
903
                (2.0, 2.1),
1✔
904
                (3.0, 3.1),
1✔
905
                (4.0, 4.1),
1✔
906
            ])
1✔
907
            .unwrap()
1✔
908
            .as_ref(),
1✔
909
            &[TimeInterval::new_unchecked(0, 1); 5],
1✔
910
            &[
1✔
911
                (
1✔
912
                    "foo",
1✔
913
                    FeatureData::NullableFloat(vec![Some(1.0), None, Some(3.0), None, Some(5.0)]),
1✔
914
                ),
1✔
915
                (
1✔
916
                    "bar",
1✔
917
                    FeatureData::NullableInt(vec![Some(10), None, None, Some(40), Some(50)]),
1✔
918
                ),
1✔
919
                (
1✔
920
                    "baz",
1✔
921
                    FeatureData::NullableFloat(vec![Some(11.0), None, None, None, Some(55.0)]),
1✔
922
                ),
1✔
923
            ],
1✔
924
        )
1✔
925
        .unwrap();
1✔
926

1✔
927
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
928
        assert!(
1✔
929
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
930
            "{result:#?} != {expected_result:#?}",
1✔
931
        );
1✔
932
    }
1✔
933

934
    #[tokio::test]
935
    async fn it_computes_the_area_from_a_geom() {
1✔
936
        let polygons = MockFeatureCollectionSource::single(
1✔
937
            MultiPolygonCollection::from_slices(
1✔
938
                &[
1✔
939
                    MultiPolygon::new(vec![vec![vec![
1✔
940
                        (0.5, -0.5).into(),
1✔
941
                        (4., -1.).into(),
1✔
942
                        (0.5, -2.5).into(),
1✔
943
                        (0.5, -0.5).into(),
1✔
944
                    ]]])
1✔
945
                    .unwrap(),
1✔
946
                    MultiPolygon::new(vec![vec![vec![
1✔
947
                        (1.0, -1.0).into(),
1✔
948
                        (8., -2.).into(),
1✔
949
                        (1.0, -5.0).into(),
1✔
950
                        (1.0, -1.0).into(),
1✔
951
                    ]]])
1✔
952
                    .unwrap(),
1✔
953
                ],
1✔
954
                &[TimeInterval::new_unchecked(0, 1); 2],
1✔
955
                &[("foo", FeatureData::NullableFloat(vec![Some(1.0), None]))],
1✔
956
            )
1✔
957
            .unwrap(),
1✔
958
        )
1✔
959
        .boxed();
1✔
960

1✔
961
        let result = compute_result::<MultiPolygonCollection>(
1✔
962
            VectorExpression {
1✔
963
                params: VectorExpressionParams {
1✔
964
                    input_columns: vec![],
1✔
965
                    expression: "area(geom)".into(),
1✔
966
                    output_column: OutputColumn::Column("area".into()),
1✔
967
                    output_measurement: Measurement::Unitless,
1✔
968
                    geometry_column_name: "geom".to_string(),
1✔
969
                },
1✔
970
                sources: polygons.into(),
1✔
971
            },
1✔
972
            VectorQueryRectangle {
1✔
973
                spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
974
                time_interval: TimeInterval::default(),
1✔
975
                spatial_resolution: SpatialResolution::zero_point_one(),
1✔
976
                attributes: ColumnSelection::all(),
1✔
977
            },
1✔
978
        )
1✔
979
        .await;
2✔
980

1✔
981
        let expected_result = MultiPolygonCollection::from_slices(
1✔
982
            &[
1✔
983
                MultiPolygon::new(vec![vec![vec![
1✔
984
                    (0.5, -0.5).into(),
1✔
985
                    (4., -1.).into(),
1✔
986
                    (0.5, -2.5).into(),
1✔
987
                    (0.5, -0.5).into(),
1✔
988
                ]]])
1✔
989
                .unwrap(),
1✔
990
                MultiPolygon::new(vec![vec![vec![
1✔
991
                    (1.0, -1.0).into(),
1✔
992
                    (8., -2.).into(),
1✔
993
                    (1.0, -5.0).into(),
1✔
994
                    (1.0, -1.0).into(),
1✔
995
                ]]])
1✔
996
                .unwrap(),
1✔
997
            ],
1✔
998
            &[TimeInterval::new_unchecked(0, 1); 2],
1✔
999
            &[
1✔
1000
                ("foo", FeatureData::NullableFloat(vec![Some(1.0), None])),
1✔
1001
                (
1✔
1002
                    "area",
1✔
1003
                    FeatureData::NullableFloat(vec![Some(3.5), Some(14.0)]),
1✔
1004
                ),
1✔
1005
            ],
1✔
1006
        )
1✔
1007
        .unwrap();
1✔
1008

1✔
1009
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
1010
        assert!(
1✔
1011
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
1012
            "{result:#?} != {expected_result:#?}",
1✔
1013
        );
1✔
1014
    }
1✔
1015

1016
    #[tokio::test]
1017
    async fn it_computes_the_centroid_of_a_geom() {
1✔
1018
        let polygons = MockFeatureCollectionSource::single(
1✔
1019
            MultiPolygonCollection::from_slices(
1✔
1020
                &[
1✔
1021
                    MultiPolygon::new(vec![vec![vec![
1✔
1022
                        (0., 0.).into(),
1✔
1023
                        (5., 0.).into(),
1✔
1024
                        (5., 6.).into(),
1✔
1025
                        (0., 6.).into(),
1✔
1026
                        (0., 0.).into(),
1✔
1027
                    ]]])
1✔
1028
                    .unwrap(),
1✔
1029
                    MultiPolygon::new(vec![vec![vec![
1✔
1030
                        (0.5, -0.5).into(),
1✔
1031
                        (4., -1.).into(),
1✔
1032
                        (0.5, -2.5).into(),
1✔
1033
                        (0.5, -0.5).into(),
1✔
1034
                    ]]])
1✔
1035
                    .unwrap(),
1✔
1036
                ],
1✔
1037
                &[TimeInterval::new_unchecked(0, 1); 2],
1✔
1038
                &[("foo", FeatureData::NullableFloat(vec![Some(1.0), None]))],
1✔
1039
            )
1✔
1040
            .unwrap(),
1✔
1041
        )
1✔
1042
        .boxed();
1✔
1043

1✔
1044
        let result = compute_result::<MultiPointCollection>(
1✔
1045
            VectorExpression {
1✔
1046
                params: VectorExpressionParams {
1✔
1047
                    input_columns: vec![],
1✔
1048
                    expression: "centroid(geom)".into(),
1✔
1049
                    output_column: OutputColumn::Geometry(GeoVectorDataType::MultiPoint),
1✔
1050
                    output_measurement: Measurement::Unitless,
1✔
1051
                    geometry_column_name: "geom".to_string(),
1✔
1052
                },
1✔
1053
                sources: polygons.into(),
1✔
1054
            },
1✔
1055
            VectorQueryRectangle {
1✔
1056
                spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
1057
                time_interval: TimeInterval::default(),
1✔
1058
                spatial_resolution: SpatialResolution::zero_point_one(),
1✔
1059
                attributes: ColumnSelection::all(),
1✔
1060
            },
1✔
1061
        )
1✔
1062
        .await;
2✔
1063

1✔
1064
        let expected_result = MultiPointCollection::from_slices(
1✔
1065
            MultiPoint::many(vec![
1✔
1066
                (2.5, 3.0),
1✔
1067
                (1.666_666_666_666_666_7, -1.333_333_333_333_333_5),
1✔
1068
            ])
1✔
1069
            .unwrap()
1✔
1070
            .as_ref(),
1✔
1071
            &[TimeInterval::new_unchecked(0, 1); 2],
1✔
1072
            &[("foo", FeatureData::NullableFloat(vec![Some(1.0), None]))],
1✔
1073
        )
1✔
1074
        .unwrap();
1✔
1075

1✔
1076
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
1077
        assert!(
1✔
1078
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
1079
            "{result_geometries:#?} != {expected_result_geometries:#?}",
1✔
1080
            result_geometries = result.geometries().collect::<Vec<_>>(),
×
1081
            expected_result_geometries = expected_result.geometries().collect::<Vec<_>>(),
×
1082
        );
1✔
1083
    }
1✔
1084

1085
    #[tokio::test]
1086
    async fn it_computes_eight_larger_inputs() {
1✔
1087
        const NUMBER_OF_ROWS: usize = 100;
1✔
1088
        let points = MultiPointCollection::from_slices(
1✔
1089
            MultiPoint::many((0..NUMBER_OF_ROWS).map(|i| (i as f64, i as f64)).collect())
100✔
1090
                .unwrap()
1✔
1091
                .as_ref(),
1✔
1092
            &[TimeInterval::new_unchecked(0, 1); NUMBER_OF_ROWS],
1✔
1093
            &[
1✔
1094
                (
1✔
1095
                    "f1",
1✔
1096
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1097
                ),
1✔
1098
                (
1✔
1099
                    "f2",
1✔
1100
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1101
                ),
1✔
1102
                (
1✔
1103
                    "f3",
1✔
1104
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1105
                ),
1✔
1106
                (
1✔
1107
                    "f4",
1✔
1108
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1109
                ),
1✔
1110
                (
1✔
1111
                    "f5",
1✔
1112
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1113
                ),
1✔
1114
                (
1✔
1115
                    "f6",
1✔
1116
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1117
                ),
1✔
1118
                (
1✔
1119
                    "f7",
1✔
1120
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1121
                ),
1✔
1122
                (
1✔
1123
                    "f8",
1✔
1124
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1125
                ),
1✔
1126
            ],
1✔
1127
        )
1✔
1128
        .unwrap();
1✔
1129

1✔
1130
        let result = compute_result::<MultiPointCollection>(
1✔
1131
            VectorExpression {
1✔
1132
                params: VectorExpressionParams {
1✔
1133
                    input_columns: vec![
1✔
1134
                        "f1".into(),
1✔
1135
                        "f2".into(),
1✔
1136
                        "f3".into(),
1✔
1137
                        "f4".into(),
1✔
1138
                        "f5".into(),
1✔
1139
                        "f6".into(),
1✔
1140
                        "f7".into(),
1✔
1141
                        "f8".into(),
1✔
1142
                    ],
1✔
1143
                    expression: "f1 + f2 + f3 + f4 + f5 + f6 + f7 + f8".into(),
1✔
1144
                    output_column: OutputColumn::Column("new".into()),
1✔
1145
                    output_measurement: Measurement::Unitless,
1✔
1146
                    geometry_column_name: "geom".to_string(),
1✔
1147
                },
1✔
1148
                sources: MockFeatureCollectionSource::single(points.clone())
1✔
1149
                    .boxed()
1✔
1150
                    .into(),
1✔
1151
            },
1✔
1152
            VectorQueryRectangle {
1✔
1153
                spatial_bounds: BoundingBox2D::new(
1✔
1154
                    (0., 0.).into(),
1✔
1155
                    (NUMBER_OF_ROWS as f64, NUMBER_OF_ROWS as f64).into(),
1✔
1156
                )
1✔
1157
                .unwrap(),
1✔
1158
                time_interval: TimeInterval::default(),
1✔
1159
                spatial_resolution: SpatialResolution::zero_point_one(),
1✔
1160
                attributes: ColumnSelection::all(),
1✔
1161
            },
1✔
1162
        )
1✔
1163
        .await;
2✔
1164

1✔
1165
        let expected_result = points
1✔
1166
            .add_column(
1✔
1167
                "new",
1✔
1168
                FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| 8.0 * i as f64).collect()),
100✔
1169
            )
1✔
1170
            .unwrap();
1✔
1171

1✔
1172
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
1173
        assert!(
1✔
1174
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
1175
            "{result_geometries:#?} != {expected_result_geometries:#?}",
1✔
1176
            result_geometries = result.geometries().collect::<Vec<_>>(),
×
1177
            expected_result_geometries = expected_result.geometries().collect::<Vec<_>>(),
×
1178
        );
1✔
1179
    }
1✔
1180

1181
    async fn compute_result<C>(operator: VectorExpression, query_rectangle: VectorQueryRectangle) -> C
3✔
1182
    where
3✔
1183
        C: 'static,
3✔
1184
        Box<dyn VectorQueryProcessor<VectorType = C>>: TryFrom<TypedVectorQueryProcessor>,
3✔
1185
        <Box<dyn VectorQueryProcessor<VectorType = C>> as TryFrom<TypedVectorQueryProcessor>>::Error: std::fmt::Debug,
3✔
1186
    {
3✔
1187
        let operator = operator
3✔
1188
            .boxed()
3✔
1189
            .initialize(
3✔
1190
                WorkflowOperatorPath::initialize_root(),
3✔
1191
                &MockExecutionContext::test_default(),
3✔
1192
            )
3✔
1193
            .await
3✔
1194
            .unwrap();
3✔
1195

3✔
1196
        let query_processor: Box<dyn VectorQueryProcessor<VectorType = C>> =
3✔
1197
            operator.query_processor().unwrap().try_into().unwrap();
3✔
1198

3✔
1199
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
3✔
1200

1201
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
3✔
1202

1203
        let mut result = query.map(Result::unwrap).collect::<Vec<C>>().await;
3✔
1204

1205
        assert_eq!(result.len(), 1);
3✔
1206
        result.remove(0)
3✔
1207
    }
3✔
1208
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc