• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 11911118784

19 Nov 2024 10:06AM UTC coverage: 90.448% (-0.2%) from 90.687%
11911118784

push

github

web-flow
Merge pull request #994 from geo-engine/workspace-dependencies

use workspace dependencies, update toolchain, use global lock in expression

9 of 11 new or added lines in 6 files covered. (81.82%)

369 existing lines in 74 files now uncovered.

132871 of 146904 relevant lines covered (90.45%)

54798.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.07
/operators/src/processing/column_range_filter.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
3
    OperatorName, QueryContext, QueryProcessor, TypedVectorQueryProcessor, VectorOperator,
4
    VectorQueryProcessor, VectorResultDescriptor, WorkflowOperatorPath,
5
};
6
use crate::error;
7
use crate::util::input::StringOrNumberRange;
8
use crate::util::Result;
9
use crate::{adapters::FeatureCollectionChunkMerger, engine::SingleVectorSource};
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::StreamExt;
13
use geoengine_datatypes::collections::{
14
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
15
};
16
use geoengine_datatypes::primitives::{
17
    BoundingBox2D, ColumnSelection, FeatureDataType, FeatureDataValue, Geometry,
18
    VectorQueryRectangle,
19
};
20
use geoengine_datatypes::util::arrow::ArrowTyped;
21
use serde::{Deserialize, Serialize};
22
use std::marker::PhantomData;
23
use std::ops::RangeInclusive;
24

25
#[derive(Debug, Serialize, Deserialize, Clone)]
4✔
26
#[serde(rename_all = "camelCase")]
27
pub struct ColumnRangeFilterParams {
28
    pub column: String,
29
    pub ranges: Vec<StringOrNumberRange>,
30
    pub keep_nulls: bool,
31
}
32

33
pub type ColumnRangeFilter = Operator<ColumnRangeFilterParams, SingleVectorSource>;
34

35
impl OperatorName for ColumnRangeFilter {
36
    const TYPE_NAME: &'static str = "ColumnRangeFilter";
37
}
38

39
#[typetag::serde]
1✔
40
#[async_trait]
41
impl VectorOperator for ColumnRangeFilter {
42
    async fn _initialize(
43
        self: Box<Self>,
44
        path: WorkflowOperatorPath,
45
        context: &dyn ExecutionContext,
46
    ) -> Result<Box<dyn InitializedVectorOperator>> {
1✔
47
        let name = CanonicOperatorName::from(&self);
1✔
48

49
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
1✔
50

51
        let initialized_operator = InitializedColumnRangeFilter {
1✔
52
            name,
1✔
53
            result_descriptor: initialized_sources.vector.result_descriptor().clone(),
1✔
54
            vector_source: initialized_sources.vector,
1✔
55
            state: self.params,
1✔
56
        };
1✔
57

1✔
58
        Ok(initialized_operator.boxed())
1✔
59
    }
2✔
60

61
    span_fn!(ColumnRangeFilter);
62
}
63

64
pub struct InitializedColumnRangeFilter {
65
    name: CanonicOperatorName,
66
    result_descriptor: VectorResultDescriptor,
67
    vector_source: Box<dyn InitializedVectorOperator>,
68
    state: ColumnRangeFilterParams,
69
}
70

71
impl InitializedVectorOperator for InitializedColumnRangeFilter {
72
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
1✔
73
        Ok(map_typed_query_processor!(
×
74
            self.vector_source.query_processor()?,
1✔
75
            source => ColumnRangeFilterProcessor::new(source, self.state.clone()).boxed()
×
76
        ))
77
    }
1✔
78

79
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
80
        &self.result_descriptor
×
81
    }
×
82

83
    fn canonic_name(&self) -> CanonicOperatorName {
×
84
        self.name.clone()
×
85
    }
×
86
}
87

88
pub struct ColumnRangeFilterProcessor<G> {
89
    vector_type: PhantomData<FeatureCollection<G>>,
90
    source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
91
    column: String,
92
    keep_nulls: bool,
93
    ranges: Vec<StringOrNumberRange>,
94
}
95

96
impl<G> ColumnRangeFilterProcessor<G>
97
where
98
    G: Geometry + ArrowTyped + Sync + Send,
99
{
100
    pub fn new(
1✔
101
        source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
1✔
102
        params: ColumnRangeFilterParams,
1✔
103
    ) -> Self {
1✔
104
        Self {
1✔
105
            vector_type: Default::default(),
1✔
106
            source,
1✔
107
            column: params.column,
1✔
108
            keep_nulls: params.keep_nulls,
1✔
109
            ranges: params.ranges,
1✔
110
        }
1✔
111
    }
1✔
112
}
113

114
#[async_trait]
115
impl<G> QueryProcessor for ColumnRangeFilterProcessor<G>
116
where
117
    G: Geometry + ArrowTyped + Sync + Send + 'static,
118
{
119
    type Output = FeatureCollection<G>;
120
    type SpatialBounds = BoundingBox2D;
121
    type Selection = ColumnSelection;
122
    type ResultDescription = VectorResultDescriptor;
123

124
    async fn _query<'a>(
125
        &'a self,
126
        query: VectorQueryRectangle,
127
        ctx: &'a dyn QueryContext,
128
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
1✔
129
        let column_name = self.column.clone();
1✔
130
        let ranges = self.ranges.clone();
1✔
131
        let keep_nulls = self.keep_nulls;
1✔
132

133
        let filter_stream = self.source.query(query, ctx).await?.map(move |collection| {
1✔
134
            let collection = collection?;
1✔
135

136
            // TODO: do transformation work only once
137
            let ranges: Result<Vec<RangeInclusive<FeatureDataValue>>> =
1✔
138
                match collection.column_type(&column_name)? {
1✔
UNCOV
139
                    FeatureDataType::Text => ranges
×
140
                        .iter()
×
141
                        .cloned()
×
142
                        .map(|range| range.into_string_range().map(Into::into))
×
143
                        .collect(),
×
144
                    FeatureDataType::Float => ranges
1✔
145
                        .iter()
1✔
146
                        .cloned()
1✔
147
                        .map(|range| range.into_float_range().map(Into::into))
1✔
148
                        .collect(),
1✔
UNCOV
149
                    FeatureDataType::Int => ranges
×
150
                        .iter()
×
151
                        .cloned()
×
152
                        .map(|range| range.into_int_range().map(Into::into))
×
153
                        .collect(),
×
UNCOV
154
                    FeatureDataType::Bool => ranges
×
155
                        .iter()
×
156
                        .cloned()
×
157
                        .map(|range| range.into_int_range().map(Into::into))
×
158
                        .collect(),
×
UNCOV
159
                    FeatureDataType::DateTime => ranges
×
160
                        .iter()
×
161
                        .cloned()
×
162
                        .map(|range| range.into_int_range().map(Into::into))
×
163
                        .collect(),
×
UNCOV
164
                    FeatureDataType::Category => Err(error::Error::InvalidType {
×
165
                        expected: "text, float, int, bool or datetime".to_string(),
×
166
                        found: "category".to_string(),
×
167
                    }),
×
168
                };
169

170
            collection
1✔
171
                .column_range_filter(&column_name, &ranges?, keep_nulls)
1✔
172
                .map_err(Into::into)
1✔
173
        });
1✔
174

1✔
175
        let merged_chunks_stream =
1✔
176
            FeatureCollectionChunkMerger::new(filter_stream.fuse(), ctx.chunk_byte_size().into());
1✔
177

1✔
178
        Ok(merged_chunks_stream.boxed())
1✔
179
    }
2✔
180

181
    fn result_descriptor(&self) -> &VectorResultDescriptor {
2✔
182
        self.source.result_descriptor()
2✔
183
    }
2✔
184
}
185

186
#[cfg(test)]
187
mod tests {
188
    use super::*;
189
    use crate::engine::{MockExecutionContext, MockQueryContext};
190
    use crate::mock::MockFeatureCollectionSource;
191
    use geoengine_datatypes::collections::{
192
        ChunksEqualIgnoringCacheHint, FeatureCollectionModifications, MultiPointCollection,
193
    };
194
    use geoengine_datatypes::primitives::CacheHint;
195
    use geoengine_datatypes::primitives::{
196
        BoundingBox2D, Coordinate2D, FeatureData, MultiPoint, SpatialResolution, TimeInterval,
197
    };
198
    use geoengine_datatypes::util::test::TestDefault;
199

200
    #[test]
201
    fn serde() {
1✔
202
        let filter = ColumnRangeFilter {
1✔
203
            params: ColumnRangeFilterParams {
1✔
204
                column: "foobar".to_string(),
1✔
205
                ranges: vec![(1..=2).into()],
1✔
206
                keep_nulls: false,
1✔
207
            },
1✔
208
            sources: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![])
1✔
209
                .boxed()
1✔
210
                .into(),
1✔
211
        }
1✔
212
        .boxed();
1✔
213

1✔
214
        let serialized = serde_json::to_value(&filter).unwrap();
1✔
215

1✔
216
        assert_eq!(
1✔
217
            serialized,
1✔
218
            serde_json::json!({
1✔
219
                "type": "ColumnRangeFilter",
1✔
220
                "params": {
1✔
221
                    "column": "foobar",
1✔
222
                    "ranges": [
1✔
223
                        [1, 2]
1✔
224
                    ],
1✔
225
                    "keepNulls": false
1✔
226
                },
1✔
227
                "sources": {
1✔
228
                    "vector": {
1✔
229
                        "type": "MockFeatureCollectionSourceMultiPoint",
1✔
230
                        "params": {
1✔
231
                            "collections": [],
1✔
232
                            "spatialReference": "EPSG:4326",
1✔
233
                            "measurements": null,
1✔
234
                        }
1✔
235
                    }
1✔
236
                },
1✔
237
            })
1✔
238
        );
1✔
239

240
        let _operator: Box<dyn VectorOperator> = serde_json::from_value(serialized).unwrap();
1✔
241
    }
1✔
242

243
    #[tokio::test]
244
    async fn execute() {
1✔
245
        let column_name = "foo";
1✔
246

1✔
247
        let collection = MultiPointCollection::from_data(
1✔
248
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 2.1), (3.0, 3.1)]).unwrap(),
1✔
249
            vec![TimeInterval::new(0, 1).unwrap(); 4],
1✔
250
            [(
1✔
251
                column_name.to_string(),
1✔
252
                FeatureData::Float(vec![0., 1., 2., 3.]),
1✔
253
            )]
1✔
254
            .iter()
1✔
255
            .cloned()
1✔
256
            .collect(),
1✔
257
            CacheHint::default(),
1✔
258
        )
1✔
259
        .unwrap();
1✔
260

1✔
261
        let source = MockFeatureCollectionSource::single(collection.clone()).boxed();
1✔
262

1✔
263
        let filter = ColumnRangeFilter {
1✔
264
            params: ColumnRangeFilterParams {
1✔
265
                column: column_name.to_string(),
1✔
266
                ranges: vec![(1..=2).into()],
1✔
267
                keep_nulls: false,
1✔
268
            },
1✔
269
            sources: source.into(),
1✔
270
        }
1✔
271
        .boxed();
1✔
272

1✔
273
        let initialized = filter
1✔
274
            .initialize(
1✔
275
                WorkflowOperatorPath::initialize_root(),
1✔
276
                &MockExecutionContext::test_default(),
1✔
277
            )
1✔
278
            .await
1✔
279
            .unwrap();
1✔
280

1✔
281
        let Ok(TypedVectorQueryProcessor::MultiPoint(point_processor)) =
1✔
282
            initialized.query_processor()
1✔
283
        else {
1✔
284
            panic!();
1✔
285
        };
1✔
286

1✔
287
        let query_rectangle = VectorQueryRectangle {
1✔
288
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (4., 4.).into()).unwrap(),
1✔
289
            time_interval: TimeInterval::default(),
1✔
290
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
291
            attributes: ColumnSelection::all(),
1✔
292
        };
1✔
293

1✔
294
        let ctx = MockQueryContext::new((2 * std::mem::size_of::<Coordinate2D>()).into());
1✔
295

1✔
296
        let stream = point_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
297

1✔
298
        let collections: Vec<MultiPointCollection> = stream.map(Result::unwrap).collect().await;
1✔
299

1✔
300
        assert_eq!(collections.len(), 1);
1✔
301

1✔
302
        assert!(collections[0].chunks_equal_ignoring_cache_hint(
1✔
303
            &collection.filter(vec![false, true, true, false]).unwrap()
1✔
304
        ));
1✔
305
    }
1✔
306
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc