• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 6034971426

31 Aug 2023 08:32AM UTC coverage: 90.041% (+0.1%) from 89.934%
6034971426

push

github

web-flow
Merge pull request #868 from geo-engine/update-2023-08-29

Update 2023 08 29

171 of 171 new or added lines in 36 files covered. (100.0%)

106394 of 118162 relevant lines covered (90.04%)

61276.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.16
/operators/src/processing/column_range_filter.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
3
    OperatorName, QueryContext, QueryProcessor, TypedVectorQueryProcessor, VectorOperator,
4
    VectorQueryProcessor, VectorResultDescriptor, WorkflowOperatorPath,
5
};
6
use crate::error;
7
use crate::util::input::StringOrNumberRange;
8
use crate::util::Result;
9
use crate::{adapters::FeatureCollectionChunkMerger, engine::SingleVectorSource};
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::StreamExt;
13
use geoengine_datatypes::collections::{
14
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
15
};
16
use geoengine_datatypes::primitives::{
17
    BoundingBox2D, FeatureDataType, FeatureDataValue, Geometry, VectorQueryRectangle,
18
};
19
use geoengine_datatypes::util::arrow::ArrowTyped;
20
use serde::{Deserialize, Serialize};
21
use std::marker::PhantomData;
22
use std::ops::RangeInclusive;
23

24
#[derive(Debug, Serialize, Deserialize, Clone)]
7✔
25
#[serde(rename_all = "camelCase")]
26
pub struct ColumnRangeFilterParams {
27
    pub column: String,
28
    pub ranges: Vec<StringOrNumberRange>,
29
    pub keep_nulls: bool,
30
}
31

32
pub type ColumnRangeFilter = Operator<ColumnRangeFilterParams, SingleVectorSource>;
33

34
impl OperatorName for ColumnRangeFilter {
35
    const TYPE_NAME: &'static str = "ColumnRangeFilter";
36
}
37

38
#[typetag::serde]
1✔
39
#[async_trait]
40
impl VectorOperator for ColumnRangeFilter {
41
    async fn _initialize(
1✔
42
        self: Box<Self>,
1✔
43
        path: WorkflowOperatorPath,
1✔
44
        context: &dyn ExecutionContext,
1✔
45
    ) -> Result<Box<dyn InitializedVectorOperator>> {
1✔
46
        let name = CanonicOperatorName::from(&self);
1✔
47

48
        let initialized_sources = self.sources.initialize_sources(path, context).await?;
1✔
49

50
        let initialized_operator = InitializedColumnRangeFilter {
1✔
51
            name,
1✔
52
            result_descriptor: initialized_sources.vector.result_descriptor().clone(),
1✔
53
            vector_source: initialized_sources.vector,
1✔
54
            state: self.params,
1✔
55
        };
1✔
56

1✔
57
        Ok(initialized_operator.boxed())
1✔
58
    }
2✔
59

60
    span_fn!(ColumnRangeFilter);
×
61
}
62

63
pub struct InitializedColumnRangeFilter {
64
    name: CanonicOperatorName,
65
    result_descriptor: VectorResultDescriptor,
66
    vector_source: Box<dyn InitializedVectorOperator>,
67
    state: ColumnRangeFilterParams,
68
}
69

70
impl InitializedVectorOperator for InitializedColumnRangeFilter {
71
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
1✔
72
        Ok(map_typed_query_processor!(
1✔
73
            self.vector_source.query_processor()?,
1✔
74
            source => ColumnRangeFilterProcessor::new(source, self.state.clone()).boxed()
1✔
75
        ))
76
    }
1✔
77

78
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
79
        &self.result_descriptor
×
80
    }
×
81

82
    fn canonic_name(&self) -> CanonicOperatorName {
×
83
        self.name.clone()
×
84
    }
×
85
}
86

87
pub struct ColumnRangeFilterProcessor<G> {
88
    vector_type: PhantomData<FeatureCollection<G>>,
89
    source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
90
    column: String,
91
    keep_nulls: bool,
92
    ranges: Vec<StringOrNumberRange>,
93
}
94

95
impl<G> ColumnRangeFilterProcessor<G>
96
where
97
    G: Geometry + ArrowTyped + Sync + Send,
98
{
99
    pub fn new(
1✔
100
        source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
1✔
101
        params: ColumnRangeFilterParams,
1✔
102
    ) -> Self {
1✔
103
        Self {
1✔
104
            vector_type: Default::default(),
1✔
105
            source,
1✔
106
            column: params.column,
1✔
107
            keep_nulls: params.keep_nulls,
1✔
108
            ranges: params.ranges,
1✔
109
        }
1✔
110
    }
1✔
111
}
112

113
#[async_trait]
114
impl<G> QueryProcessor for ColumnRangeFilterProcessor<G>
115
where
116
    G: Geometry + ArrowTyped + Sync + Send + 'static,
117
{
118
    type Output = FeatureCollection<G>;
119
    type SpatialBounds = BoundingBox2D;
120

121
    async fn _query<'a>(
1✔
122
        &'a self,
1✔
123
        query: VectorQueryRectangle,
1✔
124
        ctx: &'a dyn QueryContext,
1✔
125
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
1✔
126
        let column_name = self.column.clone();
1✔
127
        let ranges = self.ranges.clone();
1✔
128
        let keep_nulls = self.keep_nulls;
1✔
129

130
        let filter_stream = self.source.query(query, ctx).await?.map(move |collection| {
1✔
131
            let collection = collection?;
1✔
132

133
            // TODO: do transformation work only once
134
            let ranges: Result<Vec<RangeInclusive<FeatureDataValue>>> =
1✔
135
                match collection.column_type(&column_name)? {
1✔
136
                    FeatureDataType::Text => ranges
×
137
                        .iter()
×
138
                        .cloned()
×
139
                        .map(|range| range.into_string_range().map(Into::into))
×
140
                        .collect(),
×
141
                    FeatureDataType::Float => ranges
1✔
142
                        .iter()
1✔
143
                        .cloned()
1✔
144
                        .map(|range| range.into_float_range().map(Into::into))
1✔
145
                        .collect(),
1✔
146
                    FeatureDataType::Int => ranges
×
147
                        .iter()
×
148
                        .cloned()
×
149
                        .map(|range| range.into_int_range().map(Into::into))
×
150
                        .collect(),
×
151
                    FeatureDataType::Bool => ranges
×
152
                        .iter()
×
153
                        .cloned()
×
154
                        .map(|range| range.into_int_range().map(Into::into))
×
155
                        .collect(),
×
156
                    FeatureDataType::DateTime => ranges
×
157
                        .iter()
×
158
                        .cloned()
×
159
                        .map(|range| range.into_int_range().map(Into::into))
×
160
                        .collect(),
×
161
                    FeatureDataType::Category => Err(error::Error::InvalidType {
×
162
                        expected: "text, float, int, bool or datetime".to_string(),
×
163
                        found: "category".to_string(),
×
164
                    }),
×
165
                };
166

167
            collection
168
                .column_range_filter(&column_name, &ranges?, keep_nulls)
1✔
169
                .map_err(Into::into)
1✔
170
        });
1✔
171

1✔
172
        let merged_chunks_stream =
1✔
173
            FeatureCollectionChunkMerger::new(filter_stream.fuse(), ctx.chunk_byte_size().into());
1✔
174

1✔
175
        Ok(merged_chunks_stream.boxed())
1✔
176
    }
2✔
177
}
178

179
#[cfg(test)]
180
mod tests {
181
    use super::*;
182
    use crate::engine::{MockExecutionContext, MockQueryContext};
183
    use crate::mock::MockFeatureCollectionSource;
184
    use geoengine_datatypes::collections::{
185
        ChunksEqualIgnoringCacheHint, FeatureCollectionModifications, MultiPointCollection,
186
    };
187
    use geoengine_datatypes::primitives::CacheHint;
188
    use geoengine_datatypes::primitives::{
189
        BoundingBox2D, Coordinate2D, FeatureData, MultiPoint, SpatialResolution, TimeInterval,
190
    };
191
    use geoengine_datatypes::util::test::TestDefault;
192

193
    #[test]
1✔
194
    fn serde() {
1✔
195
        let filter = ColumnRangeFilter {
1✔
196
            params: ColumnRangeFilterParams {
1✔
197
                column: "foobar".to_string(),
1✔
198
                ranges: vec![(1..=2).into()],
1✔
199
                keep_nulls: false,
1✔
200
            },
1✔
201
            sources: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![])
1✔
202
                .boxed()
1✔
203
                .into(),
1✔
204
        }
1✔
205
        .boxed();
1✔
206

1✔
207
        let serialized = serde_json::to_value(&filter).unwrap();
1✔
208

1✔
209
        assert_eq!(
1✔
210
            serialized,
1✔
211
            serde_json::json!({
1✔
212
                "type": "ColumnRangeFilter",
1✔
213
                "params": {
1✔
214
                    "column": "foobar",
1✔
215
                    "ranges": [
1✔
216
                        [1, 2]
1✔
217
                    ],
1✔
218
                    "keepNulls": false
1✔
219
                },
1✔
220
                "sources": {
1✔
221
                    "vector": {
1✔
222
                        "type": "MockFeatureCollectionSourceMultiPoint",
1✔
223
                        "params": {
1✔
224
                            "collections": [],
1✔
225
                            "spatialReference": "EPSG:4326",
1✔
226
                            "measurements": null,
1✔
227
                        }
1✔
228
                    }
1✔
229
                },
1✔
230
            })
1✔
231
        );
1✔
232

233
        let _operator: Box<dyn VectorOperator> = serde_json::from_value(serialized).unwrap();
1✔
234
    }
1✔
235

236
    #[tokio::test]
1✔
237
    async fn execute() {
1✔
238
        let column_name = "foo";
1✔
239

1✔
240
        let collection = MultiPointCollection::from_data(
1✔
241
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 2.1), (3.0, 3.1)]).unwrap(),
1✔
242
            vec![TimeInterval::new(0, 1).unwrap(); 4],
1✔
243
            [(
1✔
244
                column_name.to_string(),
1✔
245
                FeatureData::Float(vec![0., 1., 2., 3.]),
1✔
246
            )]
1✔
247
            .iter()
1✔
248
            .cloned()
1✔
249
            .collect(),
1✔
250
            CacheHint::default(),
1✔
251
        )
1✔
252
        .unwrap();
1✔
253

1✔
254
        let source = MockFeatureCollectionSource::single(collection.clone()).boxed();
1✔
255

1✔
256
        let filter = ColumnRangeFilter {
1✔
257
            params: ColumnRangeFilterParams {
1✔
258
                column: column_name.to_string(),
1✔
259
                ranges: vec![(1..=2).into()],
1✔
260
                keep_nulls: false,
1✔
261
            },
1✔
262
            sources: source.into(),
1✔
263
        }
1✔
264
        .boxed();
1✔
265

266
        let initialized = filter
1✔
267
            .initialize(
1✔
268
                WorkflowOperatorPath::initialize_root(),
1✔
269
                &MockExecutionContext::test_default(),
1✔
270
            )
1✔
271
            .await
×
272
            .unwrap();
1✔
273

274
        let Ok(TypedVectorQueryProcessor::MultiPoint(point_processor)) =
1✔
275
            initialized.query_processor()
1✔
276
        else {
277
            panic!();
×
278
        };
279

280
        let query_rectangle = VectorQueryRectangle {
1✔
281
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (4., 4.).into()).unwrap(),
1✔
282
            time_interval: TimeInterval::default(),
1✔
283
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
284
        };
1✔
285

1✔
286
        let ctx = MockQueryContext::new((2 * std::mem::size_of::<Coordinate2D>()).into());
1✔
287

288
        let stream = point_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
289

290
        let collections: Vec<MultiPointCollection> = stream.map(Result::unwrap).collect().await;
1✔
291

292
        assert_eq!(collections.len(), 1);
1✔
293

294
        assert!(collections[0].chunks_equal_ignoring_cache_hint(
1✔
295
            &collection.filter(vec![false, true, true, false]).unwrap()
1✔
296
        ));
1✔
297
    }
298
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc