• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 17457762072

04 Sep 2025 08:18AM UTC coverage: 88.257%. First build
17457762072

Pull #1079

github

web-flow
Merge d0272c3f0 into 85068105d
Pull Request #1079: refactor(operators): Reorder QueryProcessor and RasterQueryProcessor traits

70 of 78 new or added lines in 18 files covered. (89.74%)

113324 of 128402 relevant lines covered (88.26%)

229107.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/operators/src/processing/map_query.rs
1
use crate::adapters::{
2
    FillerTileCacheExpirationStrategy, FillerTimeBounds, SparseTilesFillAdapter,
3
};
4
use crate::engine::{
5
    QueryContext, QueryProcessor, RasterQueryProcessor, RasterResultDescriptor,
6
    VectorQueryProcessor, VectorResultDescriptor,
7
};
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::StreamExt;
11
use futures::stream::BoxStream;
12
use geoengine_datatypes::primitives::{BandSelection, RasterQueryRectangle, VectorQueryRectangle};
13
use geoengine_datatypes::raster::{GridBoundingBox2D, RasterTile2D, TilingSpecification};
14

15
/// This `QueryProcessor` allows to rewrite a query. It does not change the data. Results of the children are forwarded.
16
pub(crate) struct MapQueryProcessor<S, Q, A, R> {
17
    source: S,
18
    result_descriptor: R,
19
    query_fn: Q,
20
    additional_data: A,
21
}
22

23
impl<S, Q, A, R> MapQueryProcessor<S, Q, A, R> {
24
    pub fn new(source: S, result_descriptor: R, query_fn: Q, additional_data: A) -> Self {
×
25
        Self {
×
26
            source,
×
27
            result_descriptor,
×
28
            query_fn,
×
29
            additional_data,
×
30
        }
×
31
    }
×
32
}
33

34
#[async_trait]
35
impl<S, Q> QueryProcessor for MapQueryProcessor<S, Q, TilingSpecification, RasterResultDescriptor>
36
where
37
    S: RasterQueryProcessor,
38
    Q: Fn(RasterQueryRectangle) -> Result<Option<RasterQueryRectangle>> + Sync + Send,
39
{
40
    type Output = S::Output;
41
    type SpatialBounds = GridBoundingBox2D;
42
    type ResultDescription = RasterResultDescriptor;
43
    type Selection = BandSelection;
44

45
    async fn _query<'a>(
46
        &'a self,
47
        query: RasterQueryRectangle,
48
        ctx: &'a dyn QueryContext,
49
    ) -> Result<BoxStream<'a, Result<RasterTile2D<S::RasterType>>>> {
×
50
        let rewritten_query = (self.query_fn)(query.clone())?;
×
51

52
        if let Some(rewritten_query) = rewritten_query {
×
53
            self.source.raster_query(rewritten_query, ctx).await
×
54
        } else {
55
            tracing::debug!("Query was rewritten to empty query. Returning empty / filled stream.");
×
56
            let s = futures::stream::empty();
×
57

58
            let res_desc = self.raster_result_descriptor();
×
59
            let tiling_grid_def = res_desc.tiling_grid_definition(self.additional_data);
×
60

61
            let strat = tiling_grid_def.generate_data_tiling_strategy();
×
62

63
            // TODO: The input of the `SparseTilesFillAdapter` is empty here, so we can't derive the expiration, as there are no tiles to derive them from.
64
            //       As this is the result of the query not being rewritten, we should check if the expiration could also be `max`, because this error
65
            //       will be persistent and we might as well cache the empty stream.
66
            Ok(SparseTilesFillAdapter::new_like_subquery(
×
67
                s,
×
68
                &query,
×
69
                strat,
×
70
                FillerTileCacheExpirationStrategy::NoCache,
×
71
                FillerTimeBounds::from(query.time_interval()), // TODO: derive this from the query once the child query can provide this.
×
72
            )
×
73
            .boxed())
×
74
        }
75
    }
×
76

NEW
77
    fn result_descriptor(&self) -> &RasterResultDescriptor {
×
78
        &self.result_descriptor
×
79
    }
×
80
}
81

82
impl<S, Q> RasterQueryProcessor
83
    for MapQueryProcessor<S, Q, TilingSpecification, RasterResultDescriptor>
84
where
85
    S: RasterQueryProcessor,
86
    Q: Fn(RasterQueryRectangle) -> Result<Option<RasterQueryRectangle>> + Sync + Send,
87
{
88
    type RasterType = S::RasterType;
89
}
90

91
#[async_trait]
92
impl<S, Q> VectorQueryProcessor for MapQueryProcessor<S, Q, (), VectorResultDescriptor>
93
where
94
    S: VectorQueryProcessor,
95
    Q: Fn(VectorQueryRectangle) -> Result<Option<VectorQueryRectangle>> + Sync + Send,
96
    S::VectorType: Send,
97
{
98
    type VectorType = S::VectorType;
99
    async fn vector_query<'a>(
100
        &'a self,
101
        query: VectorQueryRectangle,
102
        ctx: &'a dyn QueryContext,
103
    ) -> Result<BoxStream<'a, Result<Self::VectorType>>> {
×
104
        let rewritten_query = (self.query_fn)(query)?;
×
105
        if let Some(rewritten_query) = rewritten_query {
×
106
            self.source.vector_query(rewritten_query, ctx).await
×
107
        } else {
108
            tracing::debug!("Query was rewritten to empty query. Returning empty stream.");
×
109
            Ok(Box::pin(futures::stream::empty())) // TODO: should be empty collection?
×
110
        }
111
    }
×
112

113
    fn vector_result_descriptor(&self) -> &VectorResultDescriptor {
×
114
        self.source.vector_result_descriptor()
×
115
    }
×
116
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc