• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 18554766227

16 Oct 2025 08:12AM UTC coverage: 88.843% (+0.3%) from 88.543%
18554766227

push

github

web-flow
build: update dependencies (#1081)

* update sqlfluff

* clippy autofix

* manual clippy fixes

* removal of unused code

* update deps

* upgrade packages

* enable cargo lints

* make sqlfluff happy

* fix chrono parsin error

* clippy

* byte_size

* fix image cmp with tiffs

* remove debug

177 of 205 new or added lines in 38 files covered. (86.34%)

41 existing lines in 20 files now uncovered.

106415 of 119779 relevant lines covered (88.84%)

84190.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.15
/operators/src/cache/cache_stream.rs
1
use super::error::CacheError;
2
use super::shared_cache::{CacheBackendElementExt, CacheElement};
3
use crate::util::Result;
4
use futures::stream::FusedStream;
5
use futures::{Future, Stream};
6
use pin_project::pin_project;
7
use std::{pin::Pin, sync::Arc};
8

9
type DecompressorFutureType<X> = tokio::task::JoinHandle<std::result::Result<X, CacheError>>;
10

11
/// Our own cache element stream that "owns" the data (more precisely a reference to the data)
12
#[pin_project(project = CacheStreamProjection)]
13
pub struct CacheStream<I, O, Q> {
14
    inner: CacheStreamInner<I, Q>,
15
    #[pin]
16
    state: Option<DecompressorFutureType<O>>,
17
}
18

19
pub struct CacheStreamInner<I, Q> {
20
    data: Arc<Vec<I>>,
21
    query: Q,
22
    idx: usize,
23
}
24

25
impl<I, Q> CacheStreamInner<I, Q>
26
where
27
    I: CacheBackendElementExt<Query = Q>,
28
{
29
    // TODO: we could use a iter + filter adapter here to return refs however this would require a lot of lifetime annotations
30
    fn next_idx(&mut self) -> Option<usize> {
14✔
31
        for i in self.idx..self.data.len() {
19✔
32
            let element_ref = &self.data[i];
19✔
33
            if element_ref.intersects_query(&self.query) {
19✔
34
                self.idx = i + 1;
12✔
35
                return Some(i);
12✔
36
            }
7✔
37
        }
38
        None
2✔
39
    }
14✔
40

41
    fn data_arc(&self) -> Arc<Vec<I>> {
×
42
        self.data.clone()
×
43
    }
×
44

45
    fn len(&self) -> usize {
×
46
        self.data.len()
×
47
    }
×
48

49
    fn remaining(&self) -> usize {
×
50
        self.len() - self.idx
×
51
    }
×
52

53
    fn terminated(&self) -> bool {
×
54
        self.idx >= self.len()
×
55
    }
×
56

57
    fn new(data: Arc<Vec<I>>, query: Q) -> Self {
3✔
58
        Self {
3✔
59
            data,
3✔
60
            query,
3✔
61
            idx: 0,
3✔
62
        }
3✔
63
    }
3✔
64
}
65

66
impl<I, O, Q> CacheStream<I, O, Q>
67
where
68
    O: CacheElement<Query = Q, StoredCacheElement = I> + 'static,
69
    I: CacheBackendElementExt<Query = Q> + 'static,
70
{
71
    pub fn new(data: Arc<Vec<I>>, query: Q) -> Self {
1✔
72
        Self {
1✔
73
            inner: CacheStreamInner::new(data, query),
1✔
74
            state: None,
1✔
75
        }
1✔
76
    }
1✔
77

78
    pub fn element_count(&self) -> usize {
×
79
        self.inner.len()
×
80
    }
×
81

82
    fn terminated(&self) -> bool {
×
83
        self.state.is_none() && self.inner.terminated()
×
84
    }
×
85

86
    fn check_decompress_future_res(
×
87
        future_res: Result<Result<O, CacheError>, tokio::task::JoinError>,
×
88
    ) -> Result<O, CacheError> {
×
89
        match future_res {
×
90
            Ok(Ok(res)) => Ok(res),
×
91
            Ok(Err(err)) => Err(err),
×
92
            Err(source) => Err(CacheError::CouldNotRunDecompressionTask { source }),
×
93
        }
94
    }
×
95

96
    fn create_decompression_future(data: Arc<Vec<I>>, idx: usize) -> DecompressorFutureType<O> {
×
97
        crate::util::spawn_blocking(move || O::from_stored_element_ref(&data[idx]))
×
98
    }
×
99
}
100

101
impl<I, O, Q> Stream for CacheStream<I, O, Q>
102
where
103
    O: CacheElement<StoredCacheElement = I, Query = Q> + 'static,
104
    I: CacheBackendElementExt<Query = Q> + 'static,
105
{
106
    type Item = Result<O, CacheError>;
107

108
    fn poll_next(
×
109
        mut self: Pin<&mut Self>,
×
110
        cx: &mut std::task::Context<'_>,
×
111
    ) -> std::task::Poll<Option<Self::Item>> {
×
112
        if self.terminated() {
×
113
            return std::task::Poll::Ready(None);
×
114
        }
×
115

116
        let CacheStreamProjection { inner, mut state } = self.as_mut().project();
×
117

NEW
118
        if state.is_none()
×
NEW
119
            && let Some(next) = inner.next_idx()
×
NEW
120
        {
×
NEW
121
            let future_data = inner.data_arc();
×
NEW
122
            let future = Self::create_decompression_future(future_data, next);
×
NEW
123
            state.set(Some(future));
×
UNCOV
124
        }
×
125

126
        if let Some(pin_state) = state.as_mut().as_pin_mut() {
×
127
            let res = futures::ready!(pin_state.poll(cx));
×
128
            state.set(None);
×
129
            let element = Self::check_decompress_future_res(res);
×
130
            // Note: this is where we would have to filter feature collections by query attributes
131
            //       raster tiles on the other hand are already filtered entirely using `intersects_query`
132
            return std::task::Poll::Ready(Some(element));
×
133
        }
×
134

135
        std::task::Poll::Ready(None)
×
136
    }
×
137

138
    fn size_hint(&self) -> (usize, Option<usize>) {
×
139
        if self.terminated() {
×
140
            return (0, Some(0));
×
141
        }
×
142
        // There must be a cache hit to produce this stream. So there must be at least one element inside the query.
143
        (1, Some(self.inner.remaining()))
×
144
    }
×
145
}
146

147
impl<I, O, Q> FusedStream for CacheStream<I, O, Q>
148
where
149
    O: CacheElement<Query = Q, StoredCacheElement = I> + 'static,
150
    I: CacheBackendElementExt<Query = Q> + 'static,
151
{
152
    fn is_terminated(&self) -> bool {
×
153
        self.terminated()
×
154
    }
×
155
}
156

157
#[cfg(test)]
158
mod tests {
159
    use std::{collections::HashMap, sync::Arc};
160

161
    use geoengine_datatypes::{
162
        collections::MultiPointCollection,
163
        primitives::{
164
            BandSelection, BoundingBox2D, CacheHint, ColumnSelection, FeatureData, MultiPoint,
165
            RasterQueryRectangle, SpatialPartition2D, SpatialResolution, TimeInterval,
166
            VectorQueryRectangle,
167
        },
168
        raster::{GeoTransform, Grid2D, GridIdx2D, RasterTile2D},
169
    };
170

171
    use crate::cache::{
172
        cache_chunks::CompressedFeatureCollection,
173
        cache_stream::CacheStreamInner,
174
        cache_tiles::{CompressedRasterTile2D, CompressedRasterTileExt},
175
    };
176

177
    fn create_test_raster_data() -> Vec<CompressedRasterTile2D<u8>> {
1✔
178
        let mut data = Vec::new();
1✔
179
        for i in 0..10 {
11✔
180
            let tile = RasterTile2D::<u8>::new(
10✔
181
                TimeInterval::new_unchecked(0, 10),
10✔
182
                GridIdx2D::new([i, i]),
10✔
183
                0,
10✔
184
                GeoTransform::new([0., 0.].into(), 0.5, -0.5),
10✔
185
                geoengine_datatypes::raster::GridOrEmpty::from(
10✔
186
                    Grid2D::new([2, 2].into(), vec![i as u8; 4]).unwrap(),
10✔
187
                ),
10✔
188
                CacheHint::default(),
10✔
189
            );
10✔
190
            let compressed_tile = CompressedRasterTile2D::compress_tile(tile);
10✔
191
            data.push(compressed_tile);
10✔
192
        }
10✔
193
        data
1✔
194
    }
1✔
195

196
    fn create_test_vecor_data() -> Vec<CompressedFeatureCollection<MultiPoint>> {
1✔
197
        let mut data = Vec::new();
1✔
198

199
        for x in 0..9 {
10✔
200
            let mut points = Vec::new();
9✔
201
            let mut strngs = Vec::new();
9✔
202
            for i in x..x + 2 {
18✔
203
                let p = MultiPoint::new(vec![(f64::from(i), f64::from(i)).into()]).unwrap();
18✔
204
                points.push(p);
18✔
205
                strngs.push(format!("test {i}"));
18✔
206
            }
18✔
207

208
            let collection = MultiPointCollection::from_data(
9✔
209
                points,
9✔
210
                vec![TimeInterval::default(); 2],
9✔
211
                HashMap::<String, FeatureData>::from([(
9✔
212
                    "strings".to_owned(),
9✔
213
                    FeatureData::Text(strngs),
9✔
214
                )]),
9✔
215
                CacheHint::default(),
9✔
216
            )
217
            .unwrap();
9✔
218

219
            let compressed_collection =
9✔
220
                CompressedFeatureCollection::from_collection(collection).unwrap();
9✔
221
            data.push(compressed_collection);
9✔
222
        }
223
        data
1✔
224
    }
1✔
225

226
    #[test]
227
    fn test_cache_stream_inner_raster() {
1✔
228
        let data = Arc::new(create_test_raster_data());
1✔
229
        let query = RasterQueryRectangle {
1✔
230
            spatial_bounds: SpatialPartition2D::new_unchecked((2., -2.).into(), (8., -8.).into()),
1✔
231
            time_interval: TimeInterval::new_unchecked(0, 10),
1✔
232
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
233
            attributes: BandSelection::first(),
1✔
234
        };
1✔
235

236
        let mut res = Vec::new();
1✔
237
        let mut inner = CacheStreamInner::new(data, query);
1✔
238

239
        // next_idx should return the idx of tiles that hit the query
240
        while let Some(n) = inner.next_idx() {
7✔
241
            res.push(n);
6✔
242
        }
6✔
243
        assert_eq!(res.len(), 6);
1✔
244
        assert!(res.iter().all(|&n| (2..=8).contains(&n)));
6✔
245
    }
1✔
246

247
    #[test]
248
    fn test_cache_stream_inner_vector() {
1✔
249
        let data = Arc::new(create_test_vecor_data());
1✔
250
        let query = VectorQueryRectangle {
1✔
251
            spatial_bounds: BoundingBox2D::new_unchecked((2.1, 2.1).into(), (7.9, 7.9).into()),
1✔
252
            time_interval: TimeInterval::new_unchecked(0, 10),
1✔
253
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
254
            attributes: ColumnSelection::all(),
1✔
255
        };
1✔
256

257
        let mut res = Vec::new();
1✔
258
        let mut inner = CacheStreamInner::new(data, query);
1✔
259

260
        // next_idx should return the idx of tiles that hit the query
261
        while let Some(n) = inner.next_idx() {
7✔
262
            res.push(n);
6✔
263
        }
6✔
264
        assert_eq!(res.len(), 6);
1✔
265
        assert!(res.iter().all(|&n| (2..=8).contains(&n)));
6✔
266
    }
1✔
267
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc