• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 6022770913

30 Aug 2023 08:59AM UTC coverage: 89.934% (+0.09%) from 89.84%
6022770913

push

github

web-flow
Merge pull request #864 from geo-engine/compressed-vector-cache

Compressed-vector-cache

569 of 569 new or added lines in 6 files covered. (100.0%)

106288 of 118185 relevant lines covered (89.93%)

61263.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

61.11
/operators/src/pro/cache/cache_stream.rs
1
use super::error::CacheError;
2
use super::shared_cache::{CacheBackendElementExt, CacheElement};
3
use crate::util::Result;
4
use futures::stream::FusedStream;
5
use futures::{Future, Stream};
6
use pin_project::pin_project;
7
use std::{pin::Pin, sync::Arc};
8

9
type DecompressorFutureType<X> = tokio::task::JoinHandle<std::result::Result<X, CacheError>>;
10

11
/// Our own cache element stream that "owns" the data (more precisely a reference to the data)
12

13
#[pin_project(project = CacheStreamProjection)]
×
14
pub struct CacheStream<I, O, Q> {
15
    inner: CacheStreamInner<I, Q>,
16
    #[pin]
17
    state: Option<DecompressorFutureType<O>>,
18
}
19

20
pub struct CacheStreamInner<I, Q> {
21
    data: Arc<Vec<I>>,
22
    query: Q,
23
    idx: usize,
24
}
25

26
impl<I, Q> CacheStreamInner<I, Q>
27
where
28
    I: CacheBackendElementExt<Query = Q>,
29
{
30
    // TODO: we could use a iter + filter adapter here to return refs however this would require a lot of lifetime annotations
31
    fn next_idx(&mut self) -> Option<usize> {
14✔
32
        for i in self.idx..self.data.len() {
19✔
33
            let element_ref = &self.data[i];
19✔
34
            if element_ref.intersects_query(&self.query) {
19✔
35
                self.idx = i + 1;
12✔
36
                return Some(i);
12✔
37
            }
7✔
38
        }
39
        None
2✔
40
    }
14✔
41

42
    fn data_arc(&self) -> Arc<Vec<I>> {
×
43
        self.data.clone()
×
44
    }
×
45

46
    fn len(&self) -> usize {
×
47
        self.data.len()
×
48
    }
×
49

50
    fn remaining(&self) -> usize {
×
51
        self.len() - self.idx
×
52
    }
×
53

54
    fn terminated(&self) -> bool {
×
55
        self.idx >= self.len()
×
56
    }
×
57

58
    fn new(data: Arc<Vec<I>>, query: Q) -> Self {
3✔
59
        Self {
3✔
60
            data,
3✔
61
            query,
3✔
62
            idx: 0,
3✔
63
        }
3✔
64
    }
3✔
65
}
66

67
impl<I, O, Q> CacheStream<I, O, Q>
68
where
69
    O: CacheElement<Query = Q, StoredCacheElement = I> + 'static,
70
    I: CacheBackendElementExt<Query = Q> + 'static,
71
{
72
    pub fn new(data: Arc<Vec<I>>, query: Q) -> Self {
1✔
73
        Self {
1✔
74
            inner: CacheStreamInner::new(data, query),
1✔
75
            state: None,
1✔
76
        }
1✔
77
    }
1✔
78

79
    pub fn element_count(&self) -> usize {
×
80
        self.inner.len()
×
81
    }
×
82

83
    fn terminated(&self) -> bool {
×
84
        self.state.is_none() && self.inner.terminated()
×
85
    }
×
86

87
    fn check_decompress_future_res(
88
        future_res: Result<Result<O, CacheError>, tokio::task::JoinError>,
89
    ) -> Result<O, CacheError> {
90
        match future_res {
×
91
            Ok(Ok(res)) => Ok(res),
×
92
            Ok(Err(err)) => Err(err),
×
93
            Err(source) => Err(CacheError::CouldNotRunDecompressionTask { source }),
×
94
        }
95
    }
×
96

97
    fn create_decompression_future(data: Arc<Vec<I>>, idx: usize) -> DecompressorFutureType<O> {
×
98
        crate::util::spawn_blocking(move || O::from_stored_element_ref(&data[idx]))
×
99
    }
×
100
}
101

102
impl<I, O, Q> Stream for CacheStream<I, O, Q>
103
where
104
    O: CacheElement<StoredCacheElement = I, Query = Q> + 'static,
105
    I: CacheBackendElementExt<Query = Q> + 'static,
106
{
107
    type Item = Result<O, CacheError>;
108

109
    fn poll_next(
×
110
        mut self: Pin<&mut Self>,
×
111
        cx: &mut std::task::Context<'_>,
×
112
    ) -> std::task::Poll<Option<Self::Item>> {
×
113
        if self.terminated() {
×
114
            return std::task::Poll::Ready(None);
×
115
        }
×
116

×
117
        let CacheStreamProjection { inner, mut state } = self.as_mut().project();
×
118

×
119
        if state.is_none() {
×
120
            if let Some(next) = inner.next_idx() {
×
121
                let future_data = inner.data_arc();
×
122
                let future = Self::create_decompression_future(future_data, next);
×
123
                state.set(Some(future));
×
124
            }
×
125
        }
×
126

127
        if let Some(pin_state) = state.as_mut().as_pin_mut() {
×
128
            let res = futures::ready!(pin_state.poll(cx));
×
129
            state.set(None);
×
130
            let element = Self::check_decompress_future_res(res);
×
131
            return std::task::Poll::Ready(Some(element));
×
132
        }
×
133

×
134
        std::task::Poll::Ready(None)
×
135
    }
×
136

137
    fn size_hint(&self) -> (usize, Option<usize>) {
×
138
        if self.terminated() {
×
139
            return (0, Some(0));
×
140
        }
×
141
        // There must be a cache hit to produce this stream. So there must be at least one element inside the query.
×
142
        (1, Some(self.inner.remaining()))
×
143
    }
×
144
}
145

146
impl<I, O, Q> FusedStream for CacheStream<I, O, Q>
147
where
148
    O: CacheElement<Query = Q, StoredCacheElement = I> + 'static,
149
    I: CacheBackendElementExt<Query = Q> + 'static,
150
{
151
    fn is_terminated(&self) -> bool {
×
152
        self.terminated()
×
153
    }
×
154
}
155

156
#[cfg(test)]
157
mod tests {
158
    use std::{collections::HashMap, sync::Arc};
159

160
    use geoengine_datatypes::{
161
        collections::MultiPointCollection,
162
        primitives::{
163
            BoundingBox2D, CacheHint, FeatureData, MultiPoint, RasterQueryRectangle,
164
            SpatialPartition2D, SpatialResolution, TimeInterval, VectorQueryRectangle,
165
        },
166
        raster::{GeoTransform, Grid2D, GridIdx2D, RasterTile2D},
167
    };
168

169
    use crate::pro::cache::{
170
        cache_chunks::CompressedFeatureCollection,
171
        cache_stream::CacheStreamInner,
172
        cache_tiles::{CompressedRasterTile2D, CompressedRasterTileExt},
173
    };
174

175
    fn create_test_raster_data() -> Vec<CompressedRasterTile2D<u8>> {
1✔
176
        let mut data = Vec::new();
1✔
177
        for i in 0..10 {
11✔
178
            let tile = RasterTile2D::<u8>::new(
10✔
179
                TimeInterval::new_unchecked(0, 10),
10✔
180
                GridIdx2D::new([i, i]),
10✔
181
                GeoTransform::new([0., 0.].into(), 0.5, -0.5),
10✔
182
                geoengine_datatypes::raster::GridOrEmpty::from(
10✔
183
                    Grid2D::new([2, 2].into(), vec![i as u8; 4]).unwrap(),
10✔
184
                ),
10✔
185
                CacheHint::default(),
10✔
186
            );
10✔
187
            let compressed_tile = CompressedRasterTile2D::compress_tile(tile);
10✔
188
            data.push(compressed_tile);
10✔
189
        }
10✔
190
        data
1✔
191
    }
1✔
192

193
    fn create_test_vecor_data() -> Vec<CompressedFeatureCollection<MultiPoint>> {
1✔
194
        let mut data = Vec::new();
1✔
195

196
        for x in 0..9 {
10✔
197
            let mut points = Vec::new();
9✔
198
            let mut strngs = Vec::new();
9✔
199
            for i in x..x + 2 {
18✔
200
                let p = MultiPoint::new(vec![(f64::from(i), f64::from(i)).into()]).unwrap();
18✔
201
                points.push(p);
18✔
202
                strngs.push(format!("test {i}"));
18✔
203
            }
18✔
204

205
            let collection = MultiPointCollection::from_data(
9✔
206
                points,
9✔
207
                vec![TimeInterval::default(); 2],
9✔
208
                HashMap::<String, FeatureData>::from([(
9✔
209
                    "strings".to_owned(),
9✔
210
                    FeatureData::Text(strngs),
9✔
211
                )]),
9✔
212
                CacheHint::default(),
9✔
213
            )
9✔
214
            .unwrap();
9✔
215

9✔
216
            let compressed_collection =
9✔
217
                CompressedFeatureCollection::from_collection(collection).unwrap();
9✔
218
            data.push(compressed_collection);
9✔
219
        }
220
        data
1✔
221
    }
1✔
222

223
    #[test]
1✔
224
    fn test_cache_stream_inner_raster() {
1✔
225
        let data = Arc::new(create_test_raster_data());
1✔
226
        let query = RasterQueryRectangle {
1✔
227
            spatial_bounds: SpatialPartition2D::new_unchecked((2., -2.).into(), (8., -8.).into()),
1✔
228
            time_interval: TimeInterval::new_unchecked(0, 10),
1✔
229
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
230
        };
1✔
231

1✔
232
        let mut res = Vec::new();
1✔
233
        let mut inner = CacheStreamInner::new(data, query);
1✔
234

235
        // next_idx should return the idx of tiles that hit the query
236
        while let Some(n) = inner.next_idx() {
7✔
237
            res.push(n);
6✔
238
        }
6✔
239
        assert_eq!(res.len(), 6);
1✔
240
        assert!(res.iter().all(|&n| (2..=8).contains(&n)));
6✔
241
    }
1✔
242

243
    #[test]
1✔
244
    fn test_cache_stream_inner_vector() {
1✔
245
        let data = Arc::new(create_test_vecor_data());
1✔
246
        let query = VectorQueryRectangle {
1✔
247
            spatial_bounds: BoundingBox2D::new_unchecked((2.1, 2.1).into(), (7.9, 7.9).into()),
1✔
248
            time_interval: TimeInterval::new_unchecked(0, 10),
1✔
249
            spatial_resolution: SpatialResolution::zero_point_five(),
1✔
250
        };
1✔
251

1✔
252
        let mut res = Vec::new();
1✔
253
        let mut inner = CacheStreamInner::new(data, query);
1✔
254

255
        // next_idx should return the idx of tiles that hit the query
256
        while let Some(n) = inner.next_idx() {
7✔
257
            res.push(n);
6✔
258
        }
6✔
259
        assert_eq!(res.len(), 6);
1✔
260
        assert!(res.iter().all(|&n| (2..=8).contains(&n)));
6✔
261
    }
1✔
262
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc