• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.61
/operators/src/processing/expression/vector_operator.rs
1
use super::{
2
    canonicalize_name, error::vector as error, get_expression_dependencies, AsExpressionGeo,
3
    FromExpressionGeo, VectorExpressionError,
4
};
5
use crate::{
6
    engine::{
7
        CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator,
8
        Operator, OperatorName, QueryContext, SingleVectorSource, TypedVectorQueryProcessor,
9
        VectorColumnInfo, VectorOperator, VectorQueryProcessor, VectorResultDescriptor,
10
        WorkflowOperatorPath,
11
    },
12
    util::Result,
13
};
14
use async_trait::async_trait;
15
use futures::stream::BoxStream;
16
use futures::StreamExt;
17
use geoengine_datatypes::primitives::{
18
    FeatureData, FeatureDataRef, FeatureDataType, FloatOptionsParIter, Geometry, Measurement,
19
    MultiLineString, MultiPoint, MultiPolygon, VectorQueryRectangle,
20
};
21
use geoengine_datatypes::util::arrow::ArrowTyped;
22
use geoengine_datatypes::{
23
    collections::{
24
        FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
25
        GeoFeatureCollectionModifications, GeoVectorDataType, IntoGeometryOptionsIterator,
26
        VectorDataType,
27
    },
28
    primitives::NoGeometry,
29
};
30
use geoengine_expression::{
31
    is_allowed_variable_name, DataType, ExpressionParser, LinkedExpression,
32
    Parameter as ExpressionParameter,
33
};
34
use rayon::iter::{
35
    FromParallelIterator, IndexedParallelIterator, IntoParallelIterator, ParallelIterator,
36
};
37
use serde::{Deserialize, Serialize};
38
use snafu::ResultExt;
39
use std::collections::HashMap;
40
use std::marker::PhantomData;
41
use std::sync::Arc;
42

43
/// A vector expression creates or replaces a column in a `FeatureCollection` by evaluating an expression.
44
/// The expression receives the feature's columns as variables.
45
pub type VectorExpression = Operator<VectorExpressionParams, SingleVectorSource>;
46

47
impl OperatorName for VectorExpression {
48
    const TYPE_NAME: &'static str = "VectorExpression";
49
}
50

51
const MAX_INPUT_COLUMNS: usize = 8;
52
const PARALLEL_MIN_BATCH_SIZE: usize = 32; // TODO: find good default
53
const EXPRESSION_MAIN_NAME: &str = "expression";
54

55
#[derive(Debug, Clone, Deserialize, Serialize)]
56
#[serde(rename_all = "camelCase")]
57
pub struct VectorExpressionParams {
58
    /// The columns to use as variables in the expression.
59
    ///
60
    /// For usage in the expression, all special characters are replaced by underscores.
61
    /// E.g., `precipitation.cm` becomes `precipitation_cm`.
62
    ///
63
    /// If the column name starts with a number, an underscore is prepended.
64
    /// E.g., `1column` becomes `_1column`.
65
    ///
66
    pub input_columns: Vec<String>,
67

68
    /// The expression to evaluate.
69
    pub expression: String,
70

71
    /// The type and name of the new column.
72
    pub output_column: OutputColumn,
73

74
    /// The expression will always include the geometry column.
75
    /// Thus, it is necessary to specify the variable name of the geometry column.
76
    /// The default is `geom`.
77
    #[serde(default = "geometry_default_column_name")]
78
    pub geometry_column_name: String,
79

80
    /// The measurement of the new column.
81
    /// The default is [`Measurement::Unitless`].
82
    #[serde(default)]
83
    pub output_measurement: Measurement,
84
}
85

86
fn geometry_default_column_name() -> String {
×
87
    "geom".into()
×
88
}
×
89

90
/// Specify the output of the expression.
91
#[derive(Debug, Clone, Deserialize, Serialize)]
92
#[serde(tag = "type", content = "value", rename_all = "camelCase")]
93
pub enum OutputColumn {
94
    /// The expression will override the current geometry
95
    Geometry(GeoVectorDataType),
96
    /// The expression will append a new `Float` column
97
    // TODO: allow more types than `Float`s
98
    Column(String),
99
}
100

101
struct InitializedVectorExpression {
102
    name: CanonicOperatorName,
103
    path: WorkflowOperatorPath,
104
    result_descriptor: VectorResultDescriptor,
105
    features: Box<dyn InitializedVectorOperator>,
106
    expression: Arc<LinkedExpression>,
107
    input_columns: Vec<String>,
108
    output_column: OutputColumn,
109
}
110

111
#[typetag::serde]
×
112
#[async_trait]
113
impl VectorOperator for VectorExpression {
114
    async fn _initialize(
115
        self: Box<Self>,
116
        path: WorkflowOperatorPath,
117
        context: &dyn ExecutionContext,
118
    ) -> Result<Box<dyn InitializedVectorOperator>> {
5✔
119
        // TODO: This is super ugly to being forced to do this on every operator. This must be refactored.
120
        let name = CanonicOperatorName::from(&self);
5✔
121

5✔
122
        if self.params.input_columns.len() > MAX_INPUT_COLUMNS {
5✔
123
            Err(VectorExpressionError::TooManyInputColumns {
×
124
                max: MAX_INPUT_COLUMNS,
×
125
                found: self.params.input_columns.len(),
×
126
            })?;
×
127
        }
5✔
128

129
        let initialized_source = self
5✔
130
            .sources
5✔
131
            .initialize_sources(path.clone(), context)
5✔
132
            .await?;
5✔
133

134
        // we can reuse the result descriptor, because we only add a column later on
135
        let mut result_descriptor = initialized_source.vector.result_descriptor().clone();
5✔
136

5✔
137
        check_input_column_validity(&result_descriptor.columns, &self.params.input_columns)?;
5✔
138
        check_output_column_validity(&self.params.output_column)?;
5✔
139

140
        let expression_geom_input_type = result_descriptor.data_type;
5✔
141
        let expression_output_type = match &self.params.output_column {
5✔
142
            OutputColumn::Geometry(vector_data_type) => {
1✔
143
                result_descriptor.data_type = (*vector_data_type).into();
1✔
144
                match vector_data_type {
1✔
145
                    GeoVectorDataType::MultiPoint => DataType::MultiPoint,
1✔
146
                    GeoVectorDataType::MultiLineString => DataType::MultiLineString,
×
147
                    GeoVectorDataType::MultiPolygon => DataType::MultiPolygon,
×
148
                }
149
            }
150
            OutputColumn::Column(output_column_name) => {
4✔
151
                insert_new_column(
4✔
152
                    &mut result_descriptor.columns,
4✔
153
                    output_column_name.clone(),
4✔
154
                    self.params.output_measurement,
4✔
155
                )?;
4✔
156
                DataType::Number
4✔
157
            }
158
        };
159

160
        let mut expression_input_names = Vec::with_capacity(self.params.input_columns.len());
5✔
161
        for input_column in &self.params.input_columns {
16✔
162
            let variable_name = canonicalize_name(input_column);
11✔
163

11✔
164
            if !is_allowed_variable_name(&variable_name) {
11✔
165
                return Err(VectorExpressionError::ColumnNameContainsSpecialCharacters {
×
166
                    name: variable_name,
×
167
                })?;
×
168
            }
11✔
169

11✔
170
            expression_input_names.push(variable_name);
11✔
171
        }
172

173
        let expression = {
5✔
174
            let expression_code = self.params.expression.clone();
5✔
175
            let geometry_column_name = self.params.geometry_column_name.clone();
5✔
176

5✔
177
            crate::util::spawn_blocking(move || {
5✔
178
                compile_expression(
5✔
179
                    &expression_code,
5✔
180
                    geometry_column_name,
5✔
181
                    expression_geom_input_type,
5✔
182
                    &expression_input_names,
5✔
183
                    expression_output_type,
5✔
184
                )
5✔
185
                .map(Arc::new)
5✔
186
            })
5✔
187
            .await
5✔
188
            .map_err(|source| VectorExpressionError::CompilationTask { source })??
5✔
189
        };
190

191
        let initialized_operator = InitializedVectorExpression {
5✔
192
            name,
5✔
193
            path,
5✔
194
            result_descriptor,
5✔
195
            features: initialized_source.vector,
5✔
196
            expression,
5✔
197
            input_columns: self.params.input_columns,
5✔
198
            output_column: self.params.output_column,
5✔
199
        };
5✔
200

5✔
201
        Ok(initialized_operator.boxed())
5✔
202
    }
10✔
203

204
    span_fn!(VectorExpression);
205
}
206

207
fn check_input_column_validity(
5✔
208
    columns: &HashMap<String, VectorColumnInfo>,
5✔
209
    input_columns: &[String],
5✔
210
) -> Result<(), VectorExpressionError> {
5✔
211
    for input_column in input_columns {
16✔
212
        if input_column.contains(|c: char| !c.is_alphanumeric()) {
25✔
213
            Err(VectorExpressionError::ColumnNameContainsSpecialCharacters {
×
214
                name: input_column.clone(),
×
215
            })?;
×
216
        }
11✔
217

218
        let Some(column_info) = columns.get(input_column) else {
11✔
219
            return Err(VectorExpressionError::InputColumnNotExisting {
×
220
                name: input_column.clone(),
×
221
            });
×
222
        };
223

224
        match column_info.data_type {
11✔
225
            FeatureDataType::Float | FeatureDataType::Int => {}
11✔
226
            _ => Err(VectorExpressionError::InputColumnNotNumeric {
×
227
                name: input_column.clone(),
×
228
            })?,
×
229
        }
230
    }
231

232
    Ok(())
5✔
233
}
5✔
234

235
fn check_output_column_validity(output_column: &OutputColumn) -> Result<(), VectorExpressionError> {
5✔
236
    match output_column {
5✔
237
        OutputColumn::Geometry(_) => {}
1✔
238
        OutputColumn::Column(column) => {
4✔
239
            if column.contains(|c: char| !c.is_alphanumeric()) {
13✔
240
                Err(VectorExpressionError::ColumnNameContainsSpecialCharacters {
×
241
                    name: column.clone(),
×
242
                })?;
×
243
            }
4✔
244
        }
245
    };
246

247
    Ok(())
5✔
248
}
5✔
249

250
fn insert_new_column(
4✔
251
    columns: &mut HashMap<String, VectorColumnInfo>,
4✔
252
    name: String,
4✔
253
    measurement: Measurement,
4✔
254
) -> Result<(), VectorExpressionError> {
4✔
255
    let output_column_collision = columns.insert(
4✔
256
        name.clone(),
4✔
257
        VectorColumnInfo {
4✔
258
            data_type: FeatureDataType::Float,
4✔
259
            measurement,
4✔
260
        },
4✔
261
    );
4✔
262

4✔
263
    if output_column_collision.is_some() {
4✔
264
        return Err(VectorExpressionError::OutputColumnCollision { name });
×
265
    }
4✔
266

4✔
267
    Ok(())
4✔
268
}
4✔
269

270
fn compile_expression(
5✔
271
    expression_code: &str,
5✔
272
    geom_name: String,
5✔
273
    geom_type: VectorDataType,
5✔
274
    parameters: &[String],
5✔
275
    output_type: DataType,
5✔
276
) -> Result<LinkedExpression, VectorExpressionError> {
5✔
277
    let geom_parameter = match geom_type {
5✔
278
        VectorDataType::Data | VectorDataType::MultiPoint => {
279
            ExpressionParameter::MultiPoint(geom_name.into())
3✔
280
        }
281
        VectorDataType::MultiLineString => ExpressionParameter::MultiLineString(geom_name.into()),
×
282
        VectorDataType::MultiPolygon => ExpressionParameter::MultiPolygon(geom_name.into()),
2✔
283
    };
284
    let mut expression_parameters = Vec::with_capacity(parameters.len() + 1);
5✔
285
    expression_parameters.push(geom_parameter);
5✔
286
    expression_parameters.extend(
5✔
287
        parameters
5✔
288
            .iter()
5✔
289
            .map(|p| ExpressionParameter::Number(p.into())),
11✔
290
    );
5✔
291
    let expression = ExpressionParser::new(&expression_parameters, output_type)?
5✔
292
        .parse(EXPRESSION_MAIN_NAME, expression_code)?;
5✔
293

294
    let expression_dependencies = get_expression_dependencies().context(error::Dependencies)?;
5✔
295

296
    Ok(LinkedExpression::from_ast(
5✔
297
        &expression,
5✔
298
        expression_dependencies,
5✔
299
    )?)
5✔
300
}
5✔
301

302
impl InitializedVectorExpression {
303
    #[inline]
304
    fn column_processor<G>(
4✔
305
        &self,
4✔
306
        source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
4✔
307
        output_column: String,
4✔
308
    ) -> TypedVectorQueryProcessor
4✔
309
    where
4✔
310
        G: Geometry + ArrowTyped + 'static,
4✔
311
        FeatureCollection<G>: for<'g> IntoGeometryOptionsIterator<'g> + 'static,
4✔
312
        for<'g> <FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryType:
4✔
313
            AsExpressionGeo + Send,
4✔
314
        for<'g> <<FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
4✔
315
        IndexedParallelIterator + Send,
4✔
316
        TypedVectorQueryProcessor: From<Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>>,
4✔
317
    {
4✔
318
        VectorExpressionColumnProcessor {
4✔
319
            source,
4✔
320
            result_descriptor: self.result_descriptor.clone(),
4✔
321
            expression: self.expression.clone(),
4✔
322
            input_columns: self.input_columns.clone(),
4✔
323
            output_column,
4✔
324
        }
4✔
325
        .boxed()
4✔
326
        .into()
4✔
327
    }
4✔
328

329
    #[inline]
330
    fn geometry_processor<GIn, GOut>(
1✔
331
        &self,
1✔
332
        source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<GIn>>>,
1✔
333
    ) -> TypedVectorQueryProcessor
1✔
334
    where
1✔
335
        GIn: Geometry + ArrowTyped + Send + Sync + 'static + Sized,
1✔
336
        GOut: Geometry
1✔
337
            + ArrowTyped
1✔
338
            + FromExpressionGeo
1✔
339
            + Send
1✔
340
            + Sync
1✔
341
            + 'static
1✔
342
            + Sized,
1✔
343
        FeatureCollection<GIn>: GeoFeatureCollectionModifications<GOut> + for<'g> IntoGeometryOptionsIterator<'g>,
1✔
344
        for<'g> <<FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
1✔
345
            IndexedParallelIterator + Send,
1✔
346
        for<'g> <FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryType: AsExpressionGeo,
1✔
347
        TypedVectorQueryProcessor: From<Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<GOut>>>>,
1✔
348
    {
1✔
349
        VectorExpressionGeometryProcessor {
1✔
350
            source,
1✔
351
            result_descriptor: self.result_descriptor.clone(),
1✔
352
            expression: self.expression.clone(),
1✔
353
            input_columns: self.input_columns.clone(),
1✔
354
            _out: PhantomData::<GOut>,
1✔
355
        }
1✔
356
        .boxed()
1✔
357
        .into()
1✔
358
    }
1✔
359

360
    #[inline]
361
    fn dispatch_float_column_output(
4✔
362
        &self,
4✔
363
        source_processor: TypedVectorQueryProcessor,
4✔
364
        output_column: String,
4✔
365
    ) -> TypedVectorQueryProcessor {
4✔
366
        match source_processor {
4✔
367
            TypedVectorQueryProcessor::Data(source) => {
×
368
                self.column_processor::<NoGeometry>(source, output_column)
×
369
            }
370
            TypedVectorQueryProcessor::MultiPoint(source) => {
3✔
371
                self.column_processor::<MultiPoint>(source, output_column)
3✔
372
            }
373
            TypedVectorQueryProcessor::MultiLineString(source) => {
×
374
                self.column_processor::<MultiLineString>(source, output_column)
×
375
            }
376
            TypedVectorQueryProcessor::MultiPolygon(source) => {
1✔
377
                self.column_processor::<MultiPolygon>(source, output_column)
1✔
378
            }
379
        }
380
    }
4✔
381

382
    #[inline]
383
    fn dispatch_geometry_output_for_type<GOut>(
1✔
384
        &self,
1✔
385
        source_processor: TypedVectorQueryProcessor,
1✔
386
    ) -> TypedVectorQueryProcessor
1✔
387
    where
1✔
388
        GOut: Geometry + ArrowTyped + FromExpressionGeo + Send + Sync + 'static + Sized,
1✔
389
        TypedVectorQueryProcessor:
1✔
390
            From<Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<GOut>>>>,
1✔
391
    {
1✔
392
        match source_processor {
1✔
393
            TypedVectorQueryProcessor::Data(source) => {
×
394
                self.geometry_processor::<NoGeometry, GOut>(source)
×
395
            }
396
            TypedVectorQueryProcessor::MultiPoint(source) => {
×
397
                self.geometry_processor::<MultiPoint, GOut>(source)
×
398
            }
399
            TypedVectorQueryProcessor::MultiLineString(source) => {
×
400
                self.geometry_processor::<MultiLineString, GOut>(source)
×
401
            }
402
            TypedVectorQueryProcessor::MultiPolygon(source) => {
1✔
403
                self.geometry_processor::<MultiPolygon, GOut>(source)
1✔
404
            }
405
        }
406
    }
1✔
407

408
    #[inline]
409
    fn dispatch_geometry_output(
1✔
410
        &self,
1✔
411
        source_processor: TypedVectorQueryProcessor,
1✔
412
        vector_data_type: GeoVectorDataType,
1✔
413
    ) -> TypedVectorQueryProcessor {
1✔
414
        match vector_data_type {
1✔
415
            GeoVectorDataType::MultiPoint => {
416
                self.dispatch_geometry_output_for_type::<MultiPoint>(source_processor)
1✔
417
            }
418
            GeoVectorDataType::MultiLineString => {
419
                self.dispatch_geometry_output_for_type::<MultiLineString>(source_processor)
×
420
            }
421
            GeoVectorDataType::MultiPolygon => {
422
                self.dispatch_geometry_output_for_type::<MultiPolygon>(source_processor)
×
423
            }
424
        }
425
    }
1✔
426
}
427

428
impl InitializedVectorOperator for InitializedVectorExpression {
429
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
430
        &self.result_descriptor
×
431
    }
×
432

433
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
5✔
434
        let source_processor = self.features.query_processor()?;
5✔
435

436
        Ok(match self.output_column.clone() {
5✔
437
            OutputColumn::Geometry(vector_data_type) => {
1✔
438
                self.dispatch_geometry_output(source_processor, vector_data_type)
1✔
439
            }
440
            OutputColumn::Column(output_column) => {
4✔
441
                self.dispatch_float_column_output(source_processor, output_column)
4✔
442
            }
443
        })
444
    }
5✔
445

446
    fn canonic_name(&self) -> CanonicOperatorName {
×
447
        self.name.clone()
×
448
    }
×
449

NEW
450
    fn name(&self) -> &'static str {
×
NEW
451
        VectorExpression::TYPE_NAME
×
NEW
452
    }
×
453

NEW
454
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
455
        self.path.clone()
×
NEW
456
    }
×
457
}
458

459
/// A processor that evaluates an expression on the columns of a `FeatureCollection`.
460
/// The result is a new `FeatureCollection` with the evaluated column added.
461
pub struct VectorExpressionColumnProcessor<Q, G>
462
where
463
    G: Geometry,
464
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
465
{
466
    source: Q,
467
    result_descriptor: VectorResultDescriptor,
468
    expression: Arc<LinkedExpression>,
469
    input_columns: Vec<String>,
470
    output_column: String,
471
}
472

473
/// A processor that evaluates an expression on the columns of a `FeatureCollection`.
474
/// The result is a new `FeatureCollection` with a replaced geometry column.
475
pub struct VectorExpressionGeometryProcessor<Q, GIn, GOut>
476
where
477
    GIn: Geometry,
478
    GOut: Geometry,
479
    Q: VectorQueryProcessor<VectorType = FeatureCollection<GIn>>,
480
{
481
    source: Q,
482
    result_descriptor: VectorResultDescriptor,
483
    expression: Arc<LinkedExpression>,
484
    input_columns: Vec<String>,
485
    _out: PhantomData<GOut>,
486
}
487

488
type ExpressionGeometryType<'g, G> = <<FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryType as AsExpressionGeo>::ExpressionGeometryType;
489

490
#[async_trait]
491
impl<Q, G> VectorQueryProcessor for VectorExpressionColumnProcessor<Q, G>
492
where
493
    Q: VectorQueryProcessor<VectorType = FeatureCollection<G>>,
494
    G: Geometry + ArrowTyped + 'static,
495
    FeatureCollection<G>: for<'g> IntoGeometryOptionsIterator<'g> + 'static,
496
    for<'g> <FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryType:
497
        AsExpressionGeo + Send,
498
    for<'g> <<FeatureCollection<G> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
499
        IndexedParallelIterator + Send,
500
{
501
    type VectorType = FeatureCollection<G>;
502

503

504
    async fn vector_query<'a>(
505
        &'a self,
506
        query: VectorQueryRectangle,
507
        ctx: &'a dyn QueryContext,
508
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
4✔
509
        let stream = self.source.vector_query(query, ctx).await?;
4✔
510

511
        let stream = stream.then(move |collection| async move {
4✔
512
            let collection = collection?;
4✔
513
            let input_columns = self.input_columns.clone();
4✔
514
            let output_column = self.output_column.clone();
4✔
515
            let expression = self.expression.clone();
4✔
516

4✔
517
            crate::util::spawn_blocking_with_thread_pool(ctx.thread_pool().clone(), move || {
4✔
518
                let result: Vec<Option<f64>> = call_expression_function(
4✔
519
                    &expression,
4✔
520
                    &collection,
4✔
521
                    &input_columns,
4✔
522
                    std::convert::identity,
4✔
523
                )?;
4✔
524

525
                Ok(collection
4✔
526
                    .add_column(&output_column, FeatureData::NullableFloat(result))
4✔
527
                    .context(error::AddColumn {
4✔
528
                        name: output_column,
4✔
529
                    })?)
4✔
530
            })
4✔
531
            .await?
4✔
532
        });
8✔
533

4✔
534
        Ok(stream.boxed())
4✔
535
    }
8✔
536

537
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
4✔
538
        &self.result_descriptor
4✔
539
    }
4✔
540
}
541

542
#[async_trait]
543
impl<Q, GIn, GOut> VectorQueryProcessor
544
    for VectorExpressionGeometryProcessor<Q, GIn, GOut>
545
where
546
    Q: VectorQueryProcessor<VectorType = FeatureCollection<GIn>> + 'static + Sized,
547
    GIn: Geometry + ArrowTyped + Send + Sync + 'static + Sized,
548
    GOut: Geometry
549
        + ArrowTyped
550
        + FromExpressionGeo
551
        + Send
552
        + Sync
553
        + 'static
554
        + Sized,
555
    FeatureCollection<GIn>: GeoFeatureCollectionModifications<GOut> + for<'g> IntoGeometryOptionsIterator<'g>,
556
    for<'g> <<FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
557
        IndexedParallelIterator + Send,
558
    for<'g> <FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryType: AsExpressionGeo,
559
    Vec<Option<GOut>>: FromParallelIterator<Option<GOut>>,
560
{
561
    type VectorType = FeatureCollection<GOut>;
562

563
    async fn vector_query<'a>(
564
        &'a self,
565
        query: VectorQueryRectangle,
566
        ctx: &'a dyn QueryContext,
567
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
1✔
568
        let stream = self.source.vector_query(query, ctx).await?;
1✔
569

570
        let stream = stream.then(move |collection| async move {
1✔
571
            let collection = collection?;
1✔
572
            let input_columns = self.input_columns.clone();
1✔
573
            let expression = self.expression.clone();
1✔
574

1✔
575
            crate::util::spawn_blocking_with_thread_pool(ctx.thread_pool().clone(), move || {
1✔
576
                let (geometry_options, row_filter): (Vec<Option<GOut>>, Vec<bool>) = call_expression_function(
1✔
577
                    &expression,
1✔
578
                    &collection,
1✔
579
                    &input_columns,
1✔
580
                    |geom_option| {
2✔
581
                        let geom_option = geom_option.and_then(<GOut as FromExpressionGeo>::from_expression_geo);
2✔
582

2✔
583
                        let row_filter = geom_option.is_some();
2✔
584

2✔
585
                        (geom_option, row_filter)
2✔
586
                    },
2✔
587
                )?;
1✔
588

589
                // remove all `None`s and output only the geometries
590
                let geometries = geometry_options.into_par_iter().with_min_len(PARALLEL_MIN_BATCH_SIZE).filter_map(std::convert::identity).collect::<Vec<_>>();
1✔
591

1✔
592
                Ok(collection
1✔
593
                    .filter(row_filter) // we have to filter out the rows with empty geometries
1✔
594
                    .context(error::FilterEmptyGeometries)?
1✔
595
                    .replace_geometries(geometries)
1✔
596
                    .context(error::ReplaceGeometries)?)
1✔
597
            })
1✔
598
            .await?
1✔
599
        });
2✔
600

1✔
601
        Ok(stream.boxed())
1✔
602
    }
2✔
603

604
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
1✔
605
        &self.result_descriptor
1✔
606
    }
1✔
607
}
608

609
fn call_expression_function<GIn, ExprOut, MapOut, Out>(
5✔
610
    expression: &Arc<LinkedExpression>,
5✔
611
    collection: &FeatureCollection<GIn>,
5✔
612
    input_columns: &[String],
5✔
613
    map_fn: fn(Option<ExprOut>) -> MapOut,
5✔
614
) -> Result<Out, VectorExpressionError>
5✔
615
where
5✔
616
    GIn: Geometry + ArrowTyped + 'static,
5✔
617
    for<'i> FeatureCollection<GIn>: IntoGeometryOptionsIterator<'i>,
5✔
618
    for<'g> <<FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryOptionIterator as IntoParallelIterator>::Iter:
5✔
619
        IndexedParallelIterator + Send,
5✔
620
    for<'g> <FeatureCollection<GIn> as IntoGeometryOptionsIterator<'g>>::GeometryType: AsExpressionGeo,
5✔
621
    ExprOut: Send,
5✔
622
    MapOut: Send,
5✔
623
    Out: FromParallelIterator<MapOut> + Send,
5✔
624
{
5✔
625
    let data_columns: Vec<FeatureDataRef> = input_columns
5✔
626
        .iter()
5✔
627
        .map(|input_column| {
11✔
628
            collection
11✔
629
                .data(input_column)
11✔
630
                .expect("was checked durin initialization")
11✔
631
        })
11✔
632
        .collect();
5✔
633

5✔
634
    let float_inputs: Vec<FloatOptionsParIter> = data_columns
5✔
635
        .iter()
5✔
636
        .map(FeatureDataRef::float_options_par_iter)
5✔
637
        .collect::<Vec<_>>();
5✔
638

5✔
639
    let geom_input = collection
5✔
640
        .geometry_options()
5✔
641
        .into_par_iter()
5✔
642
        .map(|geometry_option| {
112✔
643
            if let Some(geometry) = geometry_option.as_ref() {
112✔
644
                geometry.as_expression_geo()
112✔
645
            } else {
646
                None
×
647
            }
648
        });
112✔
649

650
    macro_rules! impl_expression_subcall {
651
        ($n:literal, $($i:ident),*) => {
652
            {
653
                let [ $($i),* ] = <[_; $n]>::try_from(float_inputs).expect("it matches the match condition");
654
                let f = unsafe {
655
                    expression.function_nary::<fn(
656
                        Option<ExpressionGeometryType<'_, GIn>>,
657
                        $( impl_expression_subcall!(@float_option $i), )*
658
                    ) -> Option<ExprOut>>()
659
                }
660
                .map_err(VectorExpressionError::from)?;
661

662
                (geom_input, $($i),*)
663
                    .into_par_iter()
664
                    .with_min_len(PARALLEL_MIN_BATCH_SIZE)
665
                    .map(|(geom, $($i),*)| map_fn(f(geom, $($i),*)))
108✔
666
                    .collect()
667
            }
668
        };
669
        // Create one float option for each float input
670
        (@float_option $i:ident) => {
671
            Option<f64>
672
        };
673
    }
674

675
    Ok(match float_inputs.len() {
5✔
676
        0 => {
677
            let f = unsafe {
2✔
678
                expression.function_nary::<fn(
2✔
679
                    Option<ExpressionGeometryType<'_, GIn>>,
2✔
680
                ) -> Option<ExprOut>>()
2✔
681
            }
2✔
682
            .map_err(VectorExpressionError::from)?;
2✔
683

684
            geom_input
2✔
685
                .with_min_len(PARALLEL_MIN_BATCH_SIZE)
2✔
686
                .map(|geom| map_fn(f(geom)))
4✔
687
                .collect()
2✔
688
        }
689
        1 => impl_expression_subcall!(1, i1),
1✔
690
        2 => impl_expression_subcall!(2, i1, i2),
1✔
691
        3 => impl_expression_subcall!(3, i1, i2, i3),
×
692
        4 => impl_expression_subcall!(4, i1, i2, i3, i4),
×
693
        5 => impl_expression_subcall!(5, i1, i2, i3, i4, i5),
×
694
        6 => impl_expression_subcall!(6, i1, i2, i3, i4, i5, i6),
×
695
        7 => impl_expression_subcall!(7, i1, i2, i3, i4, i5, i6, i7),
×
696
        8 => impl_expression_subcall!(8, i1, i2, i3, i4, i5, i6, i7, i8),
1✔
697
        other => Err(VectorExpressionError::TooManyInputColumns {
×
698
            max: MAX_INPUT_COLUMNS,
×
699
            found: other,
×
700
        })?,
×
701
    })
702
}
5✔
703

704
#[cfg(test)]
705
mod tests {
706
    use super::*;
707
    use crate::{
708
        engine::{ChunkByteSize, MockExecutionContext, MockQueryContext, QueryProcessor},
709
        mock::MockFeatureCollectionSource,
710
    };
711
    use geoengine_datatypes::{
712
        collections::{
713
            ChunksEqualIgnoringCacheHint, IntoGeometryIterator, MultiPointCollection,
714
            MultiPolygonCollection,
715
        },
716
        primitives::{
717
            BoundingBox2D, ColumnSelection, MultiPoint, MultiPolygon, SpatialResolution,
718
            TimeInterval,
719
        },
720
        util::test::TestDefault,
721
    };
722

723
    #[test]
724
    fn it_deserializes_the_operator() {
1✔
725
        let def: Operator<VectorExpressionParams, SingleVectorSource> = VectorExpression {
1✔
726
            params: VectorExpressionParams {
1✔
727
                input_columns: vec!["foo".into(), "bar".into()],
1✔
728
                expression: "foo + bar".into(),
1✔
729
                output_column: OutputColumn::Column("baz".into()),
1✔
730
                output_measurement: Measurement::Unitless,
1✔
731
                geometry_column_name: "geom".to_string(),
1✔
732
            },
1✔
733
            sources: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![])
1✔
734
                .boxed()
1✔
735
                .into(),
1✔
736
        };
1✔
737

1✔
738
        let json = serde_json::json!({
1✔
739
            "params": {
1✔
740
                "inputColumns": ["foo", "bar"],
1✔
741
                "expression": "foo + bar",
1✔
742
                "outputColumn": {
1✔
743
                    "type": "column",
1✔
744
                    "value": "baz",
1✔
745
                },
1✔
746
                "outputMeasurement": {
1✔
747
                    "type": "unitless",
1✔
748
                },
1✔
749
                "geometryColumnName": "geom",
1✔
750
            },
1✔
751
            "sources": {
1✔
752
                "vector": {
1✔
753
                    "type": "MockFeatureCollectionSourceMultiPoint",
1✔
754
                    "params": {
1✔
755
                        "collections": [],
1✔
756
                        "spatialReference": "EPSG:4326",
1✔
757
                        "measurements": null,
1✔
758
                    }
1✔
759
                }
1✔
760
            }
1✔
761
        });
1✔
762

1✔
763
        assert_eq!(serde_json::to_value(&def).unwrap(), json.clone());
1✔
764
        let _operator: VectorExpression = serde_json::from_value(json).unwrap();
1✔
765
    }
1✔
766

767
    #[tokio::test]
768
    async fn it_computes_unary_float_expressions() {
1✔
769
        let points = MultiPointCollection::from_slices(
1✔
770
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)])
1✔
771
                .unwrap()
1✔
772
                .as_ref(),
1✔
773
            &[TimeInterval::new_unchecked(0, 1); 3],
1✔
774
            &[(
1✔
775
                "foo",
1✔
776
                FeatureData::NullableFloat(vec![Some(1.0), None, Some(3.0)]),
1✔
777
            )],
1✔
778
        )
1✔
779
        .unwrap();
1✔
780

1✔
781
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
782

1✔
783
        let operator = VectorExpression {
1✔
784
            params: VectorExpressionParams {
1✔
785
                input_columns: vec!["foo".into()],
1✔
786
                expression: "2 * foo".into(),
1✔
787
                output_column: OutputColumn::Column("bar".into()),
1✔
788
                output_measurement: Measurement::Unitless,
1✔
789
                geometry_column_name: "geom".to_string(),
1✔
790
            },
1✔
791
            sources: point_source.into(),
1✔
792
        }
1✔
793
        .boxed()
1✔
794
        .initialize(
1✔
795
            WorkflowOperatorPath::initialize_root(),
1✔
796
            &MockExecutionContext::test_default(),
1✔
797
        )
1✔
798
        .await
1✔
799
        .unwrap();
1✔
800

1✔
801
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
802

1✔
803
        let query_rectangle = VectorQueryRectangle {
1✔
804
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
805
            time_interval: TimeInterval::default(),
1✔
806
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
807
            attributes: ColumnSelection::all(),
1✔
808
        };
1✔
809
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
810

1✔
811
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
812

1✔
813
        let mut result = query
1✔
814
            .map(Result::unwrap)
1✔
815
            .collect::<Vec<MultiPointCollection>>()
1✔
816
            .await;
1✔
817

1✔
818
        assert_eq!(result.len(), 1);
1✔
819
        let result = result.remove(0);
1✔
820

1✔
821
        let expected_result = MultiPointCollection::from_slices(
1✔
822
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 3.1)])
1✔
823
                .unwrap()
1✔
824
                .as_ref(),
1✔
825
            &[TimeInterval::new_unchecked(0, 1); 3],
1✔
826
            &[
1✔
827
                (
1✔
828
                    "foo",
1✔
829
                    FeatureData::NullableFloat(vec![Some(1.0), None, Some(3.0)]),
1✔
830
                ),
1✔
831
                (
1✔
832
                    "bar",
1✔
833
                    FeatureData::NullableFloat(vec![Some(2.0), None, Some(6.0)]),
1✔
834
                ),
1✔
835
            ],
1✔
836
        )
1✔
837
        .unwrap();
1✔
838

1✔
839
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
840
        assert!(
1✔
841
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
842
            "{result:#?} != {expected_result:#?}",
1✔
843
        );
1✔
844
    }
1✔
845

846
    #[tokio::test]
847
    async fn it_computes_binary_float_expressions() {
1✔
848
        let points = MultiPointCollection::from_slices(
1✔
849
            MultiPoint::many(vec![
1✔
850
                (0.0, 0.1),
1✔
851
                (1.0, 1.1),
1✔
852
                (2.0, 2.1),
1✔
853
                (3.0, 3.1),
1✔
854
                (4.0, 4.1),
1✔
855
            ])
1✔
856
            .unwrap()
1✔
857
            .as_ref(),
1✔
858
            &[TimeInterval::new_unchecked(0, 1); 5],
1✔
859
            &[
1✔
860
                (
1✔
861
                    "foo",
1✔
862
                    FeatureData::NullableFloat(vec![Some(1.0), None, Some(3.0), None, Some(5.0)]),
1✔
863
                ),
1✔
864
                (
1✔
865
                    "bar",
1✔
866
                    FeatureData::NullableInt(vec![Some(10), None, None, Some(40), Some(50)]),
1✔
867
                ),
1✔
868
            ],
1✔
869
        )
1✔
870
        .unwrap();
1✔
871

1✔
872
        let point_source = MockFeatureCollectionSource::single(points.clone()).boxed();
1✔
873

1✔
874
        let operator = VectorExpression {
1✔
875
            params: VectorExpressionParams {
1✔
876
                input_columns: vec!["foo".into(), "bar".into()],
1✔
877
                expression: "foo + bar".into(),
1✔
878
                output_column: OutputColumn::Column("baz".into()),
1✔
879
                output_measurement: Measurement::Unitless,
1✔
880
                geometry_column_name: "geom".to_string(),
1✔
881
            },
1✔
882
            sources: point_source.into(),
1✔
883
        }
1✔
884
        .boxed()
1✔
885
        .initialize(
1✔
886
            WorkflowOperatorPath::initialize_root(),
1✔
887
            &MockExecutionContext::test_default(),
1✔
888
        )
1✔
889
        .await
1✔
890
        .unwrap();
1✔
891

1✔
892
        let query_processor = operator.query_processor().unwrap().multi_point().unwrap();
1✔
893

1✔
894
        let query_rectangle = VectorQueryRectangle {
1✔
895
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
896
            time_interval: TimeInterval::default(),
1✔
897
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
898
            attributes: ColumnSelection::all(),
1✔
899
        };
1✔
900
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
1✔
901

1✔
902
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
903

1✔
904
        let mut result = query
1✔
905
            .map(Result::unwrap)
1✔
906
            .collect::<Vec<MultiPointCollection>>()
1✔
907
            .await;
1✔
908

1✔
909
        assert_eq!(result.len(), 1);
1✔
910
        let result = result.remove(0);
1✔
911

1✔
912
        let expected_result = MultiPointCollection::from_slices(
1✔
913
            MultiPoint::many(vec![
1✔
914
                (0.0, 0.1),
1✔
915
                (1.0, 1.1),
1✔
916
                (2.0, 2.1),
1✔
917
                (3.0, 3.1),
1✔
918
                (4.0, 4.1),
1✔
919
            ])
1✔
920
            .unwrap()
1✔
921
            .as_ref(),
1✔
922
            &[TimeInterval::new_unchecked(0, 1); 5],
1✔
923
            &[
1✔
924
                (
1✔
925
                    "foo",
1✔
926
                    FeatureData::NullableFloat(vec![Some(1.0), None, Some(3.0), None, Some(5.0)]),
1✔
927
                ),
1✔
928
                (
1✔
929
                    "bar",
1✔
930
                    FeatureData::NullableInt(vec![Some(10), None, None, Some(40), Some(50)]),
1✔
931
                ),
1✔
932
                (
1✔
933
                    "baz",
1✔
934
                    FeatureData::NullableFloat(vec![Some(11.0), None, None, None, Some(55.0)]),
1✔
935
                ),
1✔
936
            ],
1✔
937
        )
1✔
938
        .unwrap();
1✔
939

1✔
940
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
941
        assert!(
1✔
942
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
943
            "{result:#?} != {expected_result:#?}",
1✔
944
        );
1✔
945
    }
1✔
946

947
    #[tokio::test]
948
    async fn it_computes_the_area_from_a_geom() {
1✔
949
        let polygons = MockFeatureCollectionSource::single(
1✔
950
            MultiPolygonCollection::from_slices(
1✔
951
                &[
1✔
952
                    MultiPolygon::new(vec![vec![vec![
1✔
953
                        (0.5, -0.5).into(),
1✔
954
                        (4., -1.).into(),
1✔
955
                        (0.5, -2.5).into(),
1✔
956
                        (0.5, -0.5).into(),
1✔
957
                    ]]])
1✔
958
                    .unwrap(),
1✔
959
                    MultiPolygon::new(vec![vec![vec![
1✔
960
                        (1.0, -1.0).into(),
1✔
961
                        (8., -2.).into(),
1✔
962
                        (1.0, -5.0).into(),
1✔
963
                        (1.0, -1.0).into(),
1✔
964
                    ]]])
1✔
965
                    .unwrap(),
1✔
966
                ],
1✔
967
                &[TimeInterval::new_unchecked(0, 1); 2],
1✔
968
                &[("foo", FeatureData::NullableFloat(vec![Some(1.0), None]))],
1✔
969
            )
1✔
970
            .unwrap(),
1✔
971
        )
1✔
972
        .boxed();
1✔
973

1✔
974
        let result = compute_result::<MultiPolygonCollection>(
1✔
975
            VectorExpression {
1✔
976
                params: VectorExpressionParams {
1✔
977
                    input_columns: vec![],
1✔
978
                    expression: "area(geom)".into(),
1✔
979
                    output_column: OutputColumn::Column("area".into()),
1✔
980
                    output_measurement: Measurement::Unitless,
1✔
981
                    geometry_column_name: "geom".to_string(),
1✔
982
                },
1✔
983
                sources: polygons.into(),
1✔
984
            },
1✔
985
            VectorQueryRectangle {
1✔
986
                spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
987
                time_interval: TimeInterval::default(),
1✔
988
                spatial_resolution: SpatialResolution::zero_point_one(),
1✔
989
                attributes: ColumnSelection::all(),
1✔
990
            },
1✔
991
        )
1✔
992
        .await;
1✔
993

1✔
994
        let expected_result = MultiPolygonCollection::from_slices(
1✔
995
            &[
1✔
996
                MultiPolygon::new(vec![vec![vec![
1✔
997
                    (0.5, -0.5).into(),
1✔
998
                    (4., -1.).into(),
1✔
999
                    (0.5, -2.5).into(),
1✔
1000
                    (0.5, -0.5).into(),
1✔
1001
                ]]])
1✔
1002
                .unwrap(),
1✔
1003
                MultiPolygon::new(vec![vec![vec![
1✔
1004
                    (1.0, -1.0).into(),
1✔
1005
                    (8., -2.).into(),
1✔
1006
                    (1.0, -5.0).into(),
1✔
1007
                    (1.0, -1.0).into(),
1✔
1008
                ]]])
1✔
1009
                .unwrap(),
1✔
1010
            ],
1✔
1011
            &[TimeInterval::new_unchecked(0, 1); 2],
1✔
1012
            &[
1✔
1013
                ("foo", FeatureData::NullableFloat(vec![Some(1.0), None])),
1✔
1014
                (
1✔
1015
                    "area",
1✔
1016
                    FeatureData::NullableFloat(vec![Some(3.5), Some(14.0)]),
1✔
1017
                ),
1✔
1018
            ],
1✔
1019
        )
1✔
1020
        .unwrap();
1✔
1021

1✔
1022
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
1023
        assert!(
1✔
1024
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
1025
            "{result:#?} != {expected_result:#?}",
1✔
1026
        );
1✔
1027
    }
1✔
1028

1029
    #[tokio::test]
1030
    async fn it_computes_the_centroid_of_a_geom() {
1✔
1031
        let polygons = MockFeatureCollectionSource::single(
1✔
1032
            MultiPolygonCollection::from_slices(
1✔
1033
                &[
1✔
1034
                    MultiPolygon::new(vec![vec![vec![
1✔
1035
                        (0., 0.).into(),
1✔
1036
                        (5., 0.).into(),
1✔
1037
                        (5., 6.).into(),
1✔
1038
                        (0., 6.).into(),
1✔
1039
                        (0., 0.).into(),
1✔
1040
                    ]]])
1✔
1041
                    .unwrap(),
1✔
1042
                    MultiPolygon::new(vec![vec![vec![
1✔
1043
                        (0.5, -0.5).into(),
1✔
1044
                        (4., -1.).into(),
1✔
1045
                        (0.5, -2.5).into(),
1✔
1046
                        (0.5, -0.5).into(),
1✔
1047
                    ]]])
1✔
1048
                    .unwrap(),
1✔
1049
                ],
1✔
1050
                &[TimeInterval::new_unchecked(0, 1); 2],
1✔
1051
                &[("foo", FeatureData::NullableFloat(vec![Some(1.0), None]))],
1✔
1052
            )
1✔
1053
            .unwrap(),
1✔
1054
        )
1✔
1055
        .boxed();
1✔
1056

1✔
1057
        let result = compute_result::<MultiPointCollection>(
1✔
1058
            VectorExpression {
1✔
1059
                params: VectorExpressionParams {
1✔
1060
                    input_columns: vec![],
1✔
1061
                    expression: "centroid(geom)".into(),
1✔
1062
                    output_column: OutputColumn::Geometry(GeoVectorDataType::MultiPoint),
1✔
1063
                    output_measurement: Measurement::Unitless,
1✔
1064
                    geometry_column_name: "geom".to_string(),
1✔
1065
                },
1✔
1066
                sources: polygons.into(),
1✔
1067
            },
1✔
1068
            VectorQueryRectangle {
1✔
1069
                spatial_bounds: BoundingBox2D::new((0., 0.).into(), (10., 10.).into()).unwrap(),
1✔
1070
                time_interval: TimeInterval::default(),
1✔
1071
                spatial_resolution: SpatialResolution::zero_point_one(),
1✔
1072
                attributes: ColumnSelection::all(),
1✔
1073
            },
1✔
1074
        )
1✔
1075
        .await;
1✔
1076

1✔
1077
        let expected_result = MultiPointCollection::from_slices(
1✔
1078
            MultiPoint::many(vec![
1✔
1079
                (2.5, 3.0),
1✔
1080
                (1.666_666_666_666_666_7, -1.333_333_333_333_333_5),
1✔
1081
            ])
1✔
1082
            .unwrap()
1✔
1083
            .as_ref(),
1✔
1084
            &[TimeInterval::new_unchecked(0, 1); 2],
1✔
1085
            &[("foo", FeatureData::NullableFloat(vec![Some(1.0), None]))],
1✔
1086
        )
1✔
1087
        .unwrap();
1✔
1088

1✔
1089
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
1090
        assert!(
1✔
1091
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
1092
            "{result_geometries:#?} != {expected_result_geometries:#?}",
1✔
1093
            result_geometries = result.geometries().collect::<Vec<_>>(),
×
1094
            expected_result_geometries = expected_result.geometries().collect::<Vec<_>>(),
×
1095
        );
1✔
1096
    }
1✔
1097

1098
    #[tokio::test]
1099
    async fn it_computes_eight_larger_inputs() {
1✔
1100
        const NUMBER_OF_ROWS: usize = 100;
1✔
1101
        let points = MultiPointCollection::from_slices(
1✔
1102
            MultiPoint::many((0..NUMBER_OF_ROWS).map(|i| (i as f64, i as f64)).collect())
100✔
1103
                .unwrap()
1✔
1104
                .as_ref(),
1✔
1105
            &[TimeInterval::new_unchecked(0, 1); NUMBER_OF_ROWS],
1✔
1106
            &[
1✔
1107
                (
1✔
1108
                    "f1",
1✔
1109
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1110
                ),
1✔
1111
                (
1✔
1112
                    "f2",
1✔
1113
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1114
                ),
1✔
1115
                (
1✔
1116
                    "f3",
1✔
1117
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1118
                ),
1✔
1119
                (
1✔
1120
                    "f4",
1✔
1121
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1122
                ),
1✔
1123
                (
1✔
1124
                    "f5",
1✔
1125
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1126
                ),
1✔
1127
                (
1✔
1128
                    "f6",
1✔
1129
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1130
                ),
1✔
1131
                (
1✔
1132
                    "f7",
1✔
1133
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1134
                ),
1✔
1135
                (
1✔
1136
                    "f8",
1✔
1137
                    FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| i as f64).collect()),
100✔
1138
                ),
1✔
1139
            ],
1✔
1140
        )
1✔
1141
        .unwrap();
1✔
1142

1✔
1143
        let result = compute_result::<MultiPointCollection>(
1✔
1144
            VectorExpression {
1✔
1145
                params: VectorExpressionParams {
1✔
1146
                    input_columns: vec![
1✔
1147
                        "f1".into(),
1✔
1148
                        "f2".into(),
1✔
1149
                        "f3".into(),
1✔
1150
                        "f4".into(),
1✔
1151
                        "f5".into(),
1✔
1152
                        "f6".into(),
1✔
1153
                        "f7".into(),
1✔
1154
                        "f8".into(),
1✔
1155
                    ],
1✔
1156
                    expression: "f1 + f2 + f3 + f4 + f5 + f6 + f7 + f8".into(),
1✔
1157
                    output_column: OutputColumn::Column("new".into()),
1✔
1158
                    output_measurement: Measurement::Unitless,
1✔
1159
                    geometry_column_name: "geom".to_string(),
1✔
1160
                },
1✔
1161
                sources: MockFeatureCollectionSource::single(points.clone())
1✔
1162
                    .boxed()
1✔
1163
                    .into(),
1✔
1164
            },
1✔
1165
            VectorQueryRectangle {
1✔
1166
                spatial_bounds: BoundingBox2D::new(
1✔
1167
                    (0., 0.).into(),
1✔
1168
                    (NUMBER_OF_ROWS as f64, NUMBER_OF_ROWS as f64).into(),
1✔
1169
                )
1✔
1170
                .unwrap(),
1✔
1171
                time_interval: TimeInterval::default(),
1✔
1172
                spatial_resolution: SpatialResolution::zero_point_one(),
1✔
1173
                attributes: ColumnSelection::all(),
1✔
1174
            },
1✔
1175
        )
1✔
1176
        .await;
1✔
1177

1✔
1178
        let expected_result = points
1✔
1179
            .add_column(
1✔
1180
                "new",
1✔
1181
                FeatureData::Float((0..NUMBER_OF_ROWS).map(|i| 8.0 * i as f64).collect()),
100✔
1182
            )
1✔
1183
            .unwrap();
1✔
1184

1✔
1185
        // TODO: maybe it is nicer to have something wrapping the actual data that we care about and just adds some cache info
1✔
1186
        assert!(
1✔
1187
            result.chunks_equal_ignoring_cache_hint(&expected_result),
1✔
1188
            "{result_geometries:#?} != {expected_result_geometries:#?}",
1✔
1189
            result_geometries = result.geometries().collect::<Vec<_>>(),
×
1190
            expected_result_geometries = expected_result.geometries().collect::<Vec<_>>(),
×
1191
        );
1✔
1192
    }
1✔
1193

1194
    async fn compute_result<C>(operator: VectorExpression, query_rectangle: VectorQueryRectangle) -> C
3✔
1195
    where
3✔
1196
        C: 'static,
3✔
1197
        Box<dyn VectorQueryProcessor<VectorType = C>>: TryFrom<TypedVectorQueryProcessor>,
3✔
1198
        <Box<dyn VectorQueryProcessor<VectorType = C>> as TryFrom<TypedVectorQueryProcessor>>::Error: std::fmt::Debug,
3✔
1199
    {
3✔
1200
        let operator = operator
3✔
1201
            .boxed()
3✔
1202
            .initialize(
3✔
1203
                WorkflowOperatorPath::initialize_root(),
3✔
1204
                &MockExecutionContext::test_default(),
3✔
1205
            )
3✔
1206
            .await
3✔
1207
            .unwrap();
3✔
1208

3✔
1209
        let query_processor: Box<dyn VectorQueryProcessor<VectorType = C>> =
3✔
1210
            operator.query_processor().unwrap().try_into().unwrap();
3✔
1211

3✔
1212
        let ctx = MockQueryContext::new(ChunkByteSize::MAX);
3✔
1213

1214
        let query = query_processor.query(query_rectangle, &ctx).await.unwrap();
3✔
1215

1216
        let mut result = query.map(Result::unwrap).collect::<Vec<C>>().await;
3✔
1217

1218
        assert_eq!(result.len(), 1);
3✔
1219
        result.remove(0)
3✔
1220
    }
3✔
1221
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc