• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 12469296660

23 Dec 2024 03:15PM UTC coverage: 90.56% (-0.1%) from 90.695%
12469296660

push

github

web-flow
Merge pull request #998 from geo-engine/quota_log_wip

Quota and Data usage Logging

859 of 1214 new or added lines in 66 files covered. (70.76%)

3 existing lines in 2 files now uncovered.

133923 of 147883 relevant lines covered (90.56%)

54439.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.81
/operators/src/processing/column_range_filter.rs
1
use crate::engine::{
2
    CanonicOperatorName, ExecutionContext, InitializedSources, InitializedVectorOperator, Operator,
3
    OperatorName, QueryContext, QueryProcessor, TypedVectorQueryProcessor, VectorOperator,
4
    VectorQueryProcessor, VectorResultDescriptor, WorkflowOperatorPath,
5
};
6
use crate::error;
7
use crate::util::input::StringOrNumberRange;
8
use crate::util::Result;
9
use crate::{adapters::FeatureCollectionChunkMerger, engine::SingleVectorSource};
10
use async_trait::async_trait;
11
use futures::stream::BoxStream;
12
use futures::StreamExt;
13
use geoengine_datatypes::collections::{
14
    FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
15
};
16
use geoengine_datatypes::primitives::{
17
    BoundingBox2D, ColumnSelection, FeatureDataType, FeatureDataValue, Geometry,
18
    VectorQueryRectangle,
19
};
20
use geoengine_datatypes::util::arrow::ArrowTyped;
21
use serde::{Deserialize, Serialize};
22
use std::marker::PhantomData;
23
use std::ops::RangeInclusive;
24

25
#[derive(Debug, Serialize, Deserialize, Clone)]
26
#[serde(rename_all = "camelCase")]
27
pub struct ColumnRangeFilterParams {
28
    pub column: String,
29
    pub ranges: Vec<StringOrNumberRange>,
30
    pub keep_nulls: bool,
31
}
32

33
pub type ColumnRangeFilter = Operator<ColumnRangeFilterParams, SingleVectorSource>;
34

35
impl OperatorName for ColumnRangeFilter {
36
    const TYPE_NAME: &'static str = "ColumnRangeFilter";
37
}
38

39
#[typetag::serde]
1✔
40
#[async_trait]
41
impl VectorOperator for ColumnRangeFilter {
42
    async fn _initialize(
43
        self: Box<Self>,
44
        path: WorkflowOperatorPath,
45
        context: &dyn ExecutionContext,
46
    ) -> Result<Box<dyn InitializedVectorOperator>> {
1✔
47
        let name = CanonicOperatorName::from(&self);
1✔
48

49
        let initialized_sources = self
1✔
50
            .sources
1✔
51
            .initialize_sources(path.clone(), context)
1✔
52
            .await?;
1✔
53

54
        let initialized_operator = InitializedColumnRangeFilter {
1✔
55
            name,
1✔
56
            path,
1✔
57
            result_descriptor: initialized_sources.vector.result_descriptor().clone(),
1✔
58
            vector_source: initialized_sources.vector,
1✔
59
            state: self.params,
1✔
60
        };
1✔
61

1✔
62
        Ok(initialized_operator.boxed())
1✔
63
    }
2✔
64

65
    span_fn!(ColumnRangeFilter);
66
}
67

68
pub struct InitializedColumnRangeFilter {
69
    name: CanonicOperatorName,
70
    path: WorkflowOperatorPath,
71
    result_descriptor: VectorResultDescriptor,
72
    vector_source: Box<dyn InitializedVectorOperator>,
73
    state: ColumnRangeFilterParams,
74
}
75

76
impl InitializedVectorOperator for InitializedColumnRangeFilter {
77
    fn query_processor(&self) -> Result<TypedVectorQueryProcessor> {
1✔
78
        Ok(map_typed_query_processor!(
×
79
            self.vector_source.query_processor()?,
1✔
80
            source => ColumnRangeFilterProcessor::new(source, self.state.clone()).boxed()
×
81
        ))
82
    }
1✔
83

84
    fn result_descriptor(&self) -> &VectorResultDescriptor {
×
85
        &self.result_descriptor
×
86
    }
×
87

88
    fn canonic_name(&self) -> CanonicOperatorName {
×
89
        self.name.clone()
×
90
    }
×
91

NEW
92
    fn name(&self) -> &'static str {
×
NEW
93
        ColumnRangeFilter::TYPE_NAME
×
NEW
94
    }
×
95

NEW
96
    fn path(&self) -> WorkflowOperatorPath {
×
NEW
97
        self.path.clone()
×
NEW
98
    }
×
99
}
100

101
pub struct ColumnRangeFilterProcessor<G> {
102
    vector_type: PhantomData<FeatureCollection<G>>,
103
    source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
104
    column: String,
105
    keep_nulls: bool,
106
    ranges: Vec<StringOrNumberRange>,
107
}
108

109
impl<G> ColumnRangeFilterProcessor<G>
110
where
111
    G: Geometry + ArrowTyped + Sync + Send,
112
{
113
    pub fn new(
1✔
114
        source: Box<dyn VectorQueryProcessor<VectorType = FeatureCollection<G>>>,
1✔
115
        params: ColumnRangeFilterParams,
1✔
116
    ) -> Self {
1✔
117
        Self {
1✔
118
            vector_type: Default::default(),
1✔
119
            source,
1✔
120
            column: params.column,
1✔
121
            keep_nulls: params.keep_nulls,
1✔
122
            ranges: params.ranges,
1✔
123
        }
1✔
124
    }
1✔
125
}
126

127
#[async_trait]
128
impl<G> QueryProcessor for ColumnRangeFilterProcessor<G>
129
where
130
    G: Geometry + ArrowTyped + Sync + Send + 'static,
131
{
132
    type Output = FeatureCollection<G>;
133
    type SpatialBounds = BoundingBox2D;
134
    type Selection = ColumnSelection;
135
    type ResultDescription = VectorResultDescriptor;
136

137
    async fn _query<'a>(
138
        &'a self,
139
        query: VectorQueryRectangle,
140
        ctx: &'a dyn QueryContext,
141
    ) -> Result<BoxStream<'a, Result<Self::Output>>> {
1✔
142
        let column_name = self.column.clone();
1✔
143
        let ranges = self.ranges.clone();
1✔
144
        let keep_nulls = self.keep_nulls;
1✔
145

146
        let filter_stream = self.source.query(query, ctx).await?.map(move |collection| {
1✔
147
            let collection = collection?;
1✔
148

149
            // TODO: do transformation work only once
150
            let ranges: Result<Vec<RangeInclusive<FeatureDataValue>>> =
1✔
151
                match collection.column_type(&column_name)? {
1✔
152
                    FeatureDataType::Text => ranges
×
153
                        .iter()
×
154
                        .cloned()
×
155
                        .map(|range| range.into_string_range().map(Into::into))
×
156
                        .collect(),
×
157
                    FeatureDataType::Float => ranges
1✔
158
                        .iter()
1✔
159
                        .cloned()
1✔
160
                        .map(|range| range.into_float_range().map(Into::into))
1✔
161
                        .collect(),
1✔
162
                    FeatureDataType::Int => ranges
×
163
                        .iter()
×
164
                        .cloned()
×
165
                        .map(|range| range.into_int_range().map(Into::into))
×
166
                        .collect(),
×
167
                    FeatureDataType::Bool => ranges
×
168
                        .iter()
×
169
                        .cloned()
×
170
                        .map(|range| range.into_int_range().map(Into::into))
×
171
                        .collect(),
×
172
                    FeatureDataType::DateTime => ranges
×
173
                        .iter()
×
174
                        .cloned()
×
175
                        .map(|range| range.into_int_range().map(Into::into))
×
176
                        .collect(),
×
177
                    FeatureDataType::Category => Err(error::Error::InvalidType {
×
178
                        expected: "text, float, int, bool or datetime".to_string(),
×
179
                        found: "category".to_string(),
×
180
                    }),
×
181
                };
182

183
            collection
1✔
184
                .column_range_filter(&column_name, &ranges?, keep_nulls)
1✔
185
                .map_err(Into::into)
1✔
186
        });
1✔
187

1✔
188
        let merged_chunks_stream =
1✔
189
            FeatureCollectionChunkMerger::new(filter_stream.fuse(), ctx.chunk_byte_size().into());
1✔
190

1✔
191
        Ok(merged_chunks_stream.boxed())
1✔
192
    }
2✔
193

194
    fn result_descriptor(&self) -> &VectorResultDescriptor {
2✔
195
        self.source.result_descriptor()
2✔
196
    }
2✔
197
}
198

199
#[cfg(test)]
200
mod tests {
201
    use super::*;
202
    use crate::engine::{MockExecutionContext, MockQueryContext};
203
    use crate::mock::MockFeatureCollectionSource;
204
    use geoengine_datatypes::collections::{
205
        ChunksEqualIgnoringCacheHint, FeatureCollectionModifications, MultiPointCollection,
206
    };
207
    use geoengine_datatypes::primitives::CacheHint;
208
    use geoengine_datatypes::primitives::{
209
        BoundingBox2D, Coordinate2D, FeatureData, MultiPoint, SpatialResolution, TimeInterval,
210
    };
211
    use geoengine_datatypes::util::test::TestDefault;
212

213
    #[test]
214
    fn serde() {
1✔
215
        let filter = ColumnRangeFilter {
1✔
216
            params: ColumnRangeFilterParams {
1✔
217
                column: "foobar".to_string(),
1✔
218
                ranges: vec![(1..=2).into()],
1✔
219
                keep_nulls: false,
1✔
220
            },
1✔
221
            sources: MockFeatureCollectionSource::<MultiPoint>::multiple(vec![])
1✔
222
                .boxed()
1✔
223
                .into(),
1✔
224
        }
1✔
225
        .boxed();
1✔
226

1✔
227
        let serialized = serde_json::to_value(&filter).unwrap();
1✔
228

1✔
229
        assert_eq!(
1✔
230
            serialized,
1✔
231
            serde_json::json!({
1✔
232
                "type": "ColumnRangeFilter",
1✔
233
                "params": {
1✔
234
                    "column": "foobar",
1✔
235
                    "ranges": [
1✔
236
                        [1, 2]
1✔
237
                    ],
1✔
238
                    "keepNulls": false
1✔
239
                },
1✔
240
                "sources": {
1✔
241
                    "vector": {
1✔
242
                        "type": "MockFeatureCollectionSourceMultiPoint",
1✔
243
                        "params": {
1✔
244
                            "collections": [],
1✔
245
                            "spatialReference": "EPSG:4326",
1✔
246
                            "measurements": null,
1✔
247
                        }
1✔
248
                    }
1✔
249
                },
1✔
250
            })
1✔
251
        );
1✔
252

253
        let _operator: Box<dyn VectorOperator> = serde_json::from_value(serialized).unwrap();
1✔
254
    }
1✔
255

256
    #[tokio::test]
257
    async fn execute() {
1✔
258
        let column_name = "foo";
1✔
259

1✔
260
        let collection = MultiPointCollection::from_data(
1✔
261
            MultiPoint::many(vec![(0.0, 0.1), (1.0, 1.1), (2.0, 2.1), (3.0, 3.1)]).unwrap(),
1✔
262
            vec![TimeInterval::new(0, 1).unwrap(); 4],
1✔
263
            [(
1✔
264
                column_name.to_string(),
1✔
265
                FeatureData::Float(vec![0., 1., 2., 3.]),
1✔
266
            )]
1✔
267
            .iter()
1✔
268
            .cloned()
1✔
269
            .collect(),
1✔
270
            CacheHint::default(),
1✔
271
        )
1✔
272
        .unwrap();
1✔
273

1✔
274
        let source = MockFeatureCollectionSource::single(collection.clone()).boxed();
1✔
275

1✔
276
        let filter = ColumnRangeFilter {
1✔
277
            params: ColumnRangeFilterParams {
1✔
278
                column: column_name.to_string(),
1✔
279
                ranges: vec![(1..=2).into()],
1✔
280
                keep_nulls: false,
1✔
281
            },
1✔
282
            sources: source.into(),
1✔
283
        }
1✔
284
        .boxed();
1✔
285

1✔
286
        let initialized = filter
1✔
287
            .initialize(
1✔
288
                WorkflowOperatorPath::initialize_root(),
1✔
289
                &MockExecutionContext::test_default(),
1✔
290
            )
1✔
291
            .await
1✔
292
            .unwrap();
1✔
293

1✔
294
        let Ok(TypedVectorQueryProcessor::MultiPoint(point_processor)) =
1✔
295
            initialized.query_processor()
1✔
296
        else {
1✔
297
            panic!();
1✔
298
        };
1✔
299

1✔
300
        let query_rectangle = VectorQueryRectangle {
1✔
301
            spatial_bounds: BoundingBox2D::new((0., 0.).into(), (4., 4.).into()).unwrap(),
1✔
302
            time_interval: TimeInterval::default(),
1✔
303
            spatial_resolution: SpatialResolution::zero_point_one(),
1✔
304
            attributes: ColumnSelection::all(),
1✔
305
        };
1✔
306

1✔
307
        let ctx = MockQueryContext::new((2 * std::mem::size_of::<Coordinate2D>()).into());
1✔
308

1✔
309
        let stream = point_processor.query(query_rectangle, &ctx).await.unwrap();
1✔
310

1✔
311
        let collections: Vec<MultiPointCollection> = stream.map(Result::unwrap).collect().await;
1✔
312

1✔
313
        assert_eq!(collections.len(), 1);
1✔
314

1✔
315
        assert!(collections[0].chunks_equal_ignoring_cache_hint(
1✔
316
            &collection.filter(vec![false, true, true, false]).unwrap()
1✔
317
        ));
1✔
318
    }
1✔
319
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc