• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5666839735

26 Jul 2023 09:06AM UTC coverage: 88.916% (-0.3%) from 89.193%
5666839735

Pull #833

github

web-flow
Merge 865f64877 into 3d8a7e0ad
Pull Request #833: Shared-cache

1353 of 1353 new or added lines in 15 files covered. (100.0%)

105888 of 119088 relevant lines covered (88.92%)

60793.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.98
/operators/src/pro/cache/cache_tiles.rs
1
use super::error::CacheError;
2
use super::shared_cache::{
3
    CacheElement, CacheElementSubType, CacheElementsContainer, CacheElementsContainerInfos,
4
    LandingZoneElementsContainer,
5
};
6
use crate::util::Result;
7
use futures::Stream;
8
use geoengine_datatypes::primitives::SpatialPartitioned;
9
use geoengine_datatypes::{
10
    primitives::RasterQueryRectangle,
11
    raster::{Pixel, RasterTile2D},
12
    util::ByteSize,
13
};
14
use pin_project::pin_project;
15
use std::{pin::Pin, sync::Arc};
16

17
#[derive(Debug)]
×
18
pub enum CachedTiles {
19
    U8(Arc<Vec<RasterTile2D<u8>>>),
20
    U16(Arc<Vec<RasterTile2D<u16>>>),
21
    U32(Arc<Vec<RasterTile2D<u32>>>),
22
    U64(Arc<Vec<RasterTile2D<u64>>>),
23
    I8(Arc<Vec<RasterTile2D<i8>>>),
24
    I16(Arc<Vec<RasterTile2D<i16>>>),
25
    I32(Arc<Vec<RasterTile2D<i32>>>),
26
    I64(Arc<Vec<RasterTile2D<i64>>>),
27
    F32(Arc<Vec<RasterTile2D<f32>>>),
28
    F64(Arc<Vec<RasterTile2D<f64>>>),
29
}
30

31
impl ByteSize for CachedTiles {
32
    fn heap_byte_size(&self) -> usize {
10✔
33
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Arc` stores its data on the heap
10✔
34
        match self {
10✔
35
            CachedTiles::U8(tiles) => tiles.byte_size(),
10✔
36
            CachedTiles::U16(tiles) => tiles.byte_size(),
×
37
            CachedTiles::U32(tiles) => tiles.byte_size(),
×
38
            CachedTiles::U64(tiles) => tiles.byte_size(),
×
39
            CachedTiles::I8(tiles) => tiles.byte_size(),
×
40
            CachedTiles::I16(tiles) => tiles.byte_size(),
×
41
            CachedTiles::I32(tiles) => tiles.byte_size(),
×
42
            CachedTiles::I64(tiles) => tiles.byte_size(),
×
43
            CachedTiles::F32(tiles) => tiles.byte_size(),
×
44
            CachedTiles::F64(tiles) => tiles.byte_size(),
×
45
        }
46
    }
10✔
47
}
48

49
impl CachedTiles {
50
    fn is_expired(&self) -> bool {
6✔
51
        match self {
6✔
52
            CachedTiles::U8(v) => v.iter().any(|t| t.cache_hint.is_expired()),
6✔
53
            CachedTiles::U16(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
54
            CachedTiles::U32(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
55
            CachedTiles::U64(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
56
            CachedTiles::I8(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
57
            CachedTiles::I16(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
58
            CachedTiles::I32(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
59
            CachedTiles::I64(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
60
            CachedTiles::F32(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
61
            CachedTiles::F64(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
62
        }
63
    }
6✔
64
}
65

66
#[derive(Debug)]
×
67
pub enum LandingZoneQueryTiles {
68
    U8(Vec<RasterTile2D<u8>>),
69
    U16(Vec<RasterTile2D<u16>>),
70
    U32(Vec<RasterTile2D<u32>>),
71
    U64(Vec<RasterTile2D<u64>>),
72
    I8(Vec<RasterTile2D<i8>>),
73
    I16(Vec<RasterTile2D<i16>>),
74
    I32(Vec<RasterTile2D<i32>>),
75
    I64(Vec<RasterTile2D<i64>>),
76
    F32(Vec<RasterTile2D<f32>>),
77
    F64(Vec<RasterTile2D<f64>>),
78
}
79

80
impl LandingZoneQueryTiles {
81
    pub fn len(&self) -> usize {
×
82
        match self {
×
83
            LandingZoneQueryTiles::U8(v) => v.len(),
×
84
            LandingZoneQueryTiles::U16(v) => v.len(),
×
85
            LandingZoneQueryTiles::U32(v) => v.len(),
×
86
            LandingZoneQueryTiles::U64(v) => v.len(),
×
87
            LandingZoneQueryTiles::I8(v) => v.len(),
×
88
            LandingZoneQueryTiles::I16(v) => v.len(),
×
89
            LandingZoneQueryTiles::I32(v) => v.len(),
×
90
            LandingZoneQueryTiles::I64(v) => v.len(),
×
91
            LandingZoneQueryTiles::F32(v) => v.len(),
×
92
            LandingZoneQueryTiles::F64(v) => v.len(),
×
93
        }
94
    }
×
95

96
    pub fn is_empty(&self) -> bool {
×
97
        self.len() == 0
×
98
    }
×
99
}
100

101
impl ByteSize for LandingZoneQueryTiles {
102
    fn heap_byte_size(&self) -> usize {
14✔
103
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
14✔
104
        match self {
14✔
105
            LandingZoneQueryTiles::U8(v) => v.byte_size(),
14✔
106
            LandingZoneQueryTiles::U16(v) => v.byte_size(),
×
107
            LandingZoneQueryTiles::U32(v) => v.byte_size(),
×
108
            LandingZoneQueryTiles::U64(v) => v.byte_size(),
×
109
            LandingZoneQueryTiles::I8(v) => v.byte_size(),
×
110
            LandingZoneQueryTiles::I16(v) => v.byte_size(),
×
111
            LandingZoneQueryTiles::I32(v) => v.byte_size(),
×
112
            LandingZoneQueryTiles::I64(v) => v.byte_size(),
×
113
            LandingZoneQueryTiles::F32(v) => v.byte_size(),
×
114
            LandingZoneQueryTiles::F64(v) => v.byte_size(),
×
115
        }
116
    }
14✔
117
}
118

119
impl From<LandingZoneQueryTiles> for CachedTiles {
120
    fn from(value: LandingZoneQueryTiles) -> Self {
6✔
121
        match value {
6✔
122
            LandingZoneQueryTiles::U8(t) => CachedTiles::U8(Arc::new(t)),
6✔
123
            LandingZoneQueryTiles::U16(t) => CachedTiles::U16(Arc::new(t)),
×
124
            LandingZoneQueryTiles::U32(t) => CachedTiles::U32(Arc::new(t)),
×
125
            LandingZoneQueryTiles::U64(t) => CachedTiles::U64(Arc::new(t)),
×
126
            LandingZoneQueryTiles::I8(t) => CachedTiles::I8(Arc::new(t)),
×
127
            LandingZoneQueryTiles::I16(t) => CachedTiles::I16(Arc::new(t)),
×
128
            LandingZoneQueryTiles::I32(t) => CachedTiles::I32(Arc::new(t)),
×
129
            LandingZoneQueryTiles::I64(t) => CachedTiles::I64(Arc::new(t)),
×
130
            LandingZoneQueryTiles::F32(t) => CachedTiles::F32(Arc::new(t)),
×
131
            LandingZoneQueryTiles::F64(t) => CachedTiles::F64(Arc::new(t)),
×
132
        }
133
    }
6✔
134
}
135

136
impl CachedTiles {
137
    pub fn tile_stream(&self, query: &RasterQueryRectangle) -> TypedCacheTileStream {
5✔
138
        match self {
5✔
139
            CachedTiles::U8(v) => TypedCacheTileStream::U8(CacheTileStream::new(v.clone(), *query)),
5✔
140
            CachedTiles::U16(v) => {
×
141
                TypedCacheTileStream::U16(CacheTileStream::new(v.clone(), *query))
×
142
            }
143
            CachedTiles::U32(v) => {
×
144
                TypedCacheTileStream::U32(CacheTileStream::new(v.clone(), *query))
×
145
            }
146
            CachedTiles::U64(v) => {
×
147
                TypedCacheTileStream::U64(CacheTileStream::new(v.clone(), *query))
×
148
            }
149
            CachedTiles::I8(v) => TypedCacheTileStream::I8(CacheTileStream::new(v.clone(), *query)),
×
150
            CachedTiles::I16(v) => {
×
151
                TypedCacheTileStream::I16(CacheTileStream::new(v.clone(), *query))
×
152
            }
153
            CachedTiles::I32(v) => {
×
154
                TypedCacheTileStream::I32(CacheTileStream::new(v.clone(), *query))
×
155
            }
156
            CachedTiles::I64(v) => {
×
157
                TypedCacheTileStream::I64(CacheTileStream::new(v.clone(), *query))
×
158
            }
159
            CachedTiles::F32(v) => {
×
160
                TypedCacheTileStream::F32(CacheTileStream::new(v.clone(), *query))
×
161
            }
162
            CachedTiles::F64(v) => {
×
163
                TypedCacheTileStream::F64(CacheTileStream::new(v.clone(), *query))
×
164
            }
165
        }
166
    }
5✔
167
}
168

169
impl CacheElementsContainerInfos<RasterQueryRectangle> for CachedTiles {
170
    fn is_expired(&self) -> bool {
6✔
171
        self.is_expired()
6✔
172
    }
6✔
173
}
174

175
impl<T> CacheElementsContainer<RasterQueryRectangle, RasterTile2D<T>> for CachedTiles
176
where
177
    T: CacheElementSubType<CacheElementType = RasterTile2D<T>> + Pixel,
178
{
179
    type ResultStream = CacheTileStream<T>;
180

181
    fn result_stream(&self, query: &RasterQueryRectangle) -> Option<CacheTileStream<T>> {
5✔
182
        T::result_stream(self, query)
5✔
183
    }
5✔
184
}
185

186
impl<T> LandingZoneElementsContainer<RasterTile2D<T>> for LandingZoneQueryTiles
187
where
188
    T: CacheElementSubType<CacheElementType = RasterTile2D<T>> + Pixel,
189
{
190
    fn insert_element(&mut self, element: RasterTile2D<T>) -> Result<(), super::error::CacheError> {
5✔
191
        T::insert_element_into_landing_zone(self, element)
5✔
192
    }
5✔
193

194
    fn create_empty() -> Self {
7✔
195
        T::create_empty_landing_zone()
7✔
196
    }
7✔
197
}
198

199
impl<T> CacheElement for RasterTile2D<T>
200
where
201
    T: Pixel + CacheElementSubType<CacheElementType = Self>,
202
{
203
    type Query = RasterQueryRectangle;
204
    type LandingZoneContainer = LandingZoneQueryTiles;
205
    type CacheContainer = CachedTiles;
206
    type ResultStream = CacheTileStream<T>;
207
    type CacheElementSubType = T;
208

209
    fn cache_hint(&self) -> geoengine_datatypes::primitives::CacheHint {
6✔
210
        self.cache_hint
6✔
211
    }
6✔
212

213
    fn typed_canonical_operator_name(
5✔
214
        key: crate::engine::CanonicOperatorName,
5✔
215
    ) -> super::shared_cache::TypedCanonicOperatorName {
5✔
216
        super::shared_cache::TypedCanonicOperatorName::Raster(key)
5✔
217
    }
5✔
218

219
    fn update_stored_query(&self, query: &mut Self::Query) -> Result<(), CacheError> {
5✔
220
        query.spatial_bounds.extend(&self.spatial_partition());
5✔
221
        query.time_interval = query
5✔
222
            .time_interval
5✔
223
            .union(&self.time)
5✔
224
            .map_err(|_| CacheError::ElementAndQueryDoNotIntersect)?;
5✔
225
        Ok(())
5✔
226
    }
5✔
227
}
228

229
macro_rules! impl_cache_element_subtype {
230
    ($t:ty, $variant:ident) => {
231
        impl CacheElementSubType for $t {
232
            type CacheElementType = RasterTile2D<$t>;
233

234
            fn insert_element_into_landing_zone(
5✔
235
                landing_zone: &mut LandingZoneQueryTiles,
5✔
236
                element: Self::CacheElementType,
5✔
237
            ) -> Result<(), super::error::CacheError> {
5✔
238
                match landing_zone {
5✔
239
                    LandingZoneQueryTiles::$variant(v) => {
5✔
240
                        v.push(element);
5✔
241
                        Ok(())
5✔
242
                    }
243
                    _ => Err(super::error::CacheError::InvalidTypeForInsertion),
×
244
                }
245
            }
5✔
246

247
            fn create_empty_landing_zone() -> LandingZoneQueryTiles {
7✔
248
                LandingZoneQueryTiles::$variant(Vec::new())
7✔
249
            }
7✔
250

251
            fn result_stream(
252
                cache_elements_container: &CachedTiles,
253
                query: &RasterQueryRectangle,
254
            ) -> Option<CacheTileStream<$t>> {
255
                if let TypedCacheTileStream::$variant(v) =
5✔
256
                    cache_elements_container.tile_stream(query)
5✔
257
                {
258
                    Some(v)
5✔
259
                } else {
260
                    None
×
261
                }
262
            }
5✔
263
        }
264
    };
265
}
266
impl_cache_element_subtype!(i8, I8);
267
impl_cache_element_subtype!(u8, U8);
268
impl_cache_element_subtype!(i16, I16);
269
impl_cache_element_subtype!(u16, U16);
270
impl_cache_element_subtype!(i32, I32);
271
impl_cache_element_subtype!(u32, U32);
272
impl_cache_element_subtype!(i64, I64);
273
impl_cache_element_subtype!(u64, U64);
274
impl_cache_element_subtype!(f32, F32);
275
impl_cache_element_subtype!(f64, F64);
276

277
/// Our own tile stream that "owns" the data (more precisely a reference to the data)
278
#[pin_project(project = CacheTileStreamProjection)]
×
279
pub struct CacheTileStream<T> {
280
    data: Arc<Vec<RasterTile2D<T>>>,
281
    query: RasterQueryRectangle,
282
    idx: usize,
283
}
284

285
impl<T> CacheTileStream<T> {
286
    pub fn new(data: Arc<Vec<RasterTile2D<T>>>, query: RasterQueryRectangle) -> Self {
5✔
287
        Self {
5✔
288
            data,
5✔
289
            query,
5✔
290
            idx: 0,
5✔
291
        }
5✔
292
    }
5✔
293

294
    pub fn element_count(&self) -> usize {
×
295
        self.data.len()
×
296
    }
×
297
}
298

299
impl<T: Pixel> Stream for CacheTileStream<T> {
300
    type Item = Result<RasterTile2D<T>>;
301

302
    fn poll_next(
×
303
        mut self: Pin<&mut Self>,
×
304
        _cx: &mut std::task::Context<'_>,
×
305
    ) -> std::task::Poll<Option<Self::Item>> {
×
306
        let CacheTileStreamProjection { data, query, idx } = self.as_mut().project();
×
307

308
        // return the next tile that is contained in the query, skip all tiles that are not contained
309
        for i in *idx..data.len() {
×
310
            let tile = &data[i];
×
311
            let tile_bbox = tile.spatial_partition();
×
312

×
313
            if tile_bbox.intersects(&query.spatial_bounds)
×
314
                && tile.time.intersects(&query.time_interval)
×
315
            {
316
                *idx = i + 1;
×
317
                return std::task::Poll::Ready(Some(Ok(tile.clone())));
×
318
            }
×
319
        }
320

321
        std::task::Poll::Ready(None)
×
322
    }
×
323
}
324

325
pub enum TypedCacheTileStream {
326
    U8(CacheTileStream<u8>),
327
    U16(CacheTileStream<u16>),
328
    U32(CacheTileStream<u32>),
329
    U64(CacheTileStream<u64>),
330
    I8(CacheTileStream<i8>),
331
    I16(CacheTileStream<i16>),
332
    I32(CacheTileStream<i32>),
333
    I64(CacheTileStream<i64>),
334
    F32(CacheTileStream<f32>),
335
    F64(CacheTileStream<f64>),
336
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc