• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 3929938005

pending completion
3929938005

push

github

GitHub
Merge #713

84930 of 96741 relevant lines covered (87.79%)

79640.1 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.33
/operators/src/processing/column_range_filter.rs
1
use crate::engine::{
2
    CreateSpan, ExecutionContext, InitializedVectorOperator, Operator, OperatorName, QueryContext,
3
    QueryProcessor, TypedVectorQueryProcessor, VectorOperator, VectorQueryProcessor,
4
    VectorResultDescriptor,
5
};
6
use crate::error;
7
use crate::util::input::StringOrNumberRange;
8
use crate::util::Result;
9
use crate::{adapters::FeatureCollectionChunkMerger, engine::SingleVectorSource};
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::StreamExt;
13
use geoengine_datatypes::collections::{
14
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
15
};
16
use geoengine_datatypes::primitives::{
17
    BoundingBox2D, FeatureDataType, FeatureDataValue, Geometry, VectorQueryRectangle,
18
};
19
use geoengine_datatypes::util::arrow::ArrowTyped;
20
use serde::{Deserialize, Serialize};
21
use std::marker::PhantomData;
22
use std::ops::RangeInclusive;
23
use tracing::{span, Level};
24

25
#[derive(Debug, Serialize, Deserialize, Clone)]
7✔
26
#[serde(rename_all = "camelCase")]
27
pub struct ColumnRangeFilterParams {
28
    pub column: String,
29
    pub ranges: Vec<StringOrNumberRange>,
30
    pub keep_nulls: bool,
31
}
32

33
pub type ColumnRangeFilter = Operator<ColumnRangeFilterParams, SingleVectorSource>;
34

35
impl OperatorName for ColumnRangeFilter {
36
    const TYPE_NAME: &'static str = "ColumnRangeFilter";
37
}
38

39
#[typetag::serde]
1✔
40
#[async_trait]
41
impl VectorOperator for ColumnRangeFilter {
42
    async fn _initialize(
1✔
43
        self: Box<Self>,
1✔
44
        context: &dyn ExecutionContext,
1✔
45
    ) -> Result<Box<dyn InitializedVectorOperator>> {
1✔
46
        let vector_source = self.sources.vector.initialize(context).await?;
1✔
47

48
        let initialized_operator = InitializedColumnRangeFilter {
1✔
49
            result_descriptor: vector_source.result_descriptor().clone(),
1✔
50
            vector_source,
1✔
51
            state: self.params,
1✔
52
        };
1✔
53

1✔
54
        Ok(initialized_operator.boxed())
1✔
55
    }
2✔
56

57
    span_fn!(ColumnRangeFilter);
×
58
}
59

60
pub struct InitializedColumnRangeFilter {
61
    result_descriptor: VectorResultDescriptor,
62
    vector_source: Box<dyn InitializedVectorOperator>,
63
    state: ColumnRangeFilterParams,
64
}
65

66
impl InitializedVectorOperator for InitializedColumnRangeFilter {
67
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
1✔
68
        Ok(map_typed_query_processor!(
1✔
69
            self.vector_source.query_processor()?,
1✔
70
            source => ColumnRangeFilterProcessor::new(source, self.state.clone()).boxed()
1✔
71
        ))
72
    }
1✔
73

74
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
75
        &self.result_descriptor
×
76
    }
×
77
}
78

79
pub struct ColumnRangeFilterProcessor<G> {
80
    vector_type: PhantomData<FeatureCollection<G>>,
81
    source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
82
    column: String,
83
    keep_nulls: bool,
84
    ranges: Vec<StringOrNumberRange>,
85
}
86

87
impl<G> ColumnRangeFilterProcessor<G>
88
where
89
    G: Geometry + ArrowTyped + Sync + Send,
90
{
91
    pub fn new(
1✔
92
        source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
1✔
93
        params: ColumnRangeFilterParams,
1✔
94
    ) -> Self {
1✔
95
        Self {
1✔
96
            vector_type: Default::default(),
1✔
97
            source,
1✔
98
            column: params.column,
1✔
99
            keep_nulls: params.keep_nulls,
1✔
100
            ranges: params.ranges,
1✔
101
        }
1✔
102
    }
1✔
103
}
104

105
#[async_trait]
106
impl<G> QueryProcessor for ColumnRangeFilterProcessor<G>
107
where
108
    G: Geometry + ArrowTyped + Sync + Send + 'static,
109
{
110
    type Output = FeatureCollection<G>;
111
    type SpatialBounds = BoundingBox2D;
112

113
    async fn _query<'a>(
1✔
114
        &'a self,
1✔
115
        query: VectorQueryRectangle,
1✔
116
        ctx: &'a dyn QueryContext,
1✔
117
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
1✔
118
        let column_name = self.column.clone();
1✔
119
        let ranges = self.ranges.clone();
1✔
120
        let keep_nulls = self.keep_nulls;
1✔
121

122
        let filter_stream = self.source.query(query, ctx).await?.map(move |collection| {
1✔
123
            let collection = collection?;
1✔
124

125
            // TODO: do transformation work only once
126
            let ranges: Result<Vec<RangeInclusive<FeatureDataValue>>> =
1✔
127
                match collection.column_type(&column_name)? {
1✔
128
                    FeatureDataType::Text => ranges
×
129
                        .iter()
×
130
                        .cloned()
×
131
                        .map(|range| range.into_string_range().map(Into::into))
×
132
                        .collect(),
×
133
                    FeatureDataType::Float => ranges
1✔
134
                        .iter()
1✔
135
                        .cloned()
1✔
136
                        .map(|range| range.into_float_range().map(Into::into))
1✔
137
                        .collect(),
1✔
138
                    FeatureDataType::Int => ranges
×
139
                        .iter()
×
140
                        .cloned()
×
141
                        .map(|range| range.into_int_range().map(Into::into))
×
142
                        .collect(),
×
143
                    FeatureDataType::Bool => ranges
×
144
                        .iter()
×
145
                        .cloned()
×
146
                        .map(|range| range.into_int_range().map(Into::into))
×
147
                        .collect(),
×
148
                    FeatureDataType::DateTime => ranges
×
149
                        .iter()
×
150
                        .cloned()
×
151
                        .map(|range| range.into_int_range().map(Into::into))
×
152
                        .collect(),
×
153
                    FeatureDataType::Category => Err(error::Error::InvalidType {
×
154
                        expected: "text, float, int, bool or datetime".to_string(),
×
155
                        found: "category".to_string(),
×
156
                    }),
×
157
                };
158

159
            collection
160
                .column_range_filter(&column_name, &ranges?, keep_nulls)
1✔
161
                .map_err(Into::into)
1✔
162
        });
1✔
163

1✔
164
        let merged_chunks_stream =
1✔
165
            FeatureCollectionChunkMerger::new(filter_stream.fuse(), ctx.chunk_byte_size().into());
1✔
166

1✔
167
        Ok(merged_chunks_stream.boxed())
1✔
168
    }
2✔
169
}
170

171
#[cfg(test)]
172
mod tests {
173
    use super::*;
174
    use crate::engine::{MockExecutionContext, MockQueryContext};
175
    use crate::mock::MockFeatureCollectionSource;
176
    use geoengine_datatypes::collections::{FeatureCollectionModifications, MultiPointCollection};
177
    use geoengine_datatypes::primitives::{
178
        BoundingBox2D, Coordinate2D, FeatureData, MultiPoint, SpatialResolution, TimeInterval,
179
    };
180
    use geoengine_datatypes::util::test::TestDefault;
181

182
    #[test]
1✔
183
    fn serde() {
1✔
184
        let filter = ColumnRangeFilter {
1✔
185
            params: ColumnRangeFilterParams {
1✔
186
                column: "foobar".to_string(),
1✔
187
                ranges: vec![(1..=2).into()],
1✔
188
                keep_nulls: false,
1✔
189
            },
1✔
190
            sources: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![])
1✔
191
                .boxed()
1✔
192
                .into(),
1✔
193
        }
1✔
194
        .boxed();
1✔
195

1✔
196
        let serialized = serde_json::to_value(&filter).unwrap();
1✔
197

1✔
198
        assert_eq!(
1✔
199
            serialized,
1✔
200
            serde_json::json!({
1✔
201
                "type": "ColumnRangeFilter",
1✔
202
                "params": {
1✔
203
                    "column": "foobar",
1✔
204
                    "ranges": [
1✔
205
                        [1, 2]
1✔
206
                    ],
1✔
207
                    "keepNulls": false
1✔
208
                },
1✔
209
                "sources": {
1✔
210
                    "vector": {
1✔
211
                        "type": "MockFeatureCollectionSourceMultiPoint",
1✔
212
                        "params": {
1✔
213
                            "collections": [],
1✔
214
                            "spatialReference": "EPSG:4326",
1✔
215
                            "measurements": null,
1✔
216
                        }
1✔
217
                    }
1✔
218
                },
1✔
219
            })
1✔
220
        );
1✔
221

222
        let _operator: Box<dyn VectorOperator> = serde_json::from_value(serialized).unwrap();
1✔
223
    }
1✔
224

225
    #[tokio::test]
1✔
226
    async fn execute() {
1✔
227
        let column_name = "foo";
1✔
228

1✔
229
        let collection = MultiPointCollection::from_data(
1✔
230
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 2.1), (3.0, 3.1)]).unwrap(),
1✔
231
            vec![TimeInterval::new(0, 1).unwrap(); 4],
1✔
232
            [(
1✔
233
                column_name.to_string(),
1✔
234
                FeatureData::Float(vec![0., 1., 2., 3.]),
1✔
235
            )]
1✔
236
            .iter()
1✔
237
            .cloned()
1✔
238
            .collect(),
1✔
239
        )
1✔
240
        .unwrap();
1✔
241

1✔
242
        let source = MockFeatureCollectionSource::single(collection.clone()).boxed();
1✔
243

1✔
244
        let filter = ColumnRangeFilter {
1✔
245
            params: ColumnRangeFilterParams {
1✔
246
                column: column_name.to_string(),
1✔
247
                ranges: vec![(1..=2).into()],
1✔
248
                keep_nulls: false,
1✔
249
            },
1✔
250
            sources: source.into(),
1✔
251
        }
1✔
252
        .boxed();
1✔
253

254
        let initialized = filter
1✔
255
            .initialize(&MockExecutionContext::test_default())
1✔
256
            .await
×
257
            .unwrap();
1✔
258

259
        let Ok(TypedVectorQueryProcessor::MultiPoint(point_processor)) = initialized.query_processor() else { panic!(); };
1✔
260

261
        let query_rectangle = VectorQueryRectangle {
1✔
262
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (4., 4.).into()).unwrap(),
1✔
263
            time_interval: TimeInterval::default(),
1✔
264
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
265
        };
1✔
266

1✔
267
        let ctx = MockQueryContext::new((2 * std::mem::size_of::<Coordinate2D>()).into());
1✔
268

269
        let stream = point_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
270

271
        let collections: Vec<MultiPointCollection> = stream.map(Result::unwrap).collect().await;
1✔
272

273
        assert_eq!(collections.len(), 1);
1✔
274

275
        assert_eq!(
1✔
276
            collections[0],
1✔
277
            collection.filter(vec![false, true, true, false]).unwrap()
1✔
278
        );
1✔
279
    }
280
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc