• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5856046428

14 Aug 2023 01:37PM UTC coverage: 89.484% (-0.1%) from 89.596%
5856046428

push

github

web-flow
Merge pull request #848 from geo-engine/compressed-raster-cache

compress raster tile cache

475 of 475 new or added lines in 4 files covered. (100.0%)

104049 of 116277 relevant lines covered (89.48%)

62266.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

40.28
/operators/src/pro/cache/cache_tiles.rs
1
use super::error::CacheError;
2
use super::shared_cache::{
3
    CacheBackendElement, CacheBackendElementExt, CacheElement, CacheElementsContainer,
4
    CacheElementsContainerInfos, LandingZoneElementsContainer, RasterCacheQueryEntry,
5
    RasterLandingQueryEntry,
6
};
7
use crate::util::Result;
8
use futures::stream::FusedStream;
9
use futures::{Future, Stream};
10
use geoengine_datatypes::primitives::SpatialPartitioned;
11
use geoengine_datatypes::raster::{
12
    BaseTile, EmptyGrid, Grid, GridOrEmpty, GridShape2D, GridSize, GridSpaceToLinearSpace,
13
    MaskedGrid, RasterTile,
14
};
15
use geoengine_datatypes::{
16
    primitives::RasterQueryRectangle,
17
    raster::{Pixel, RasterTile2D},
18
    util::ByteSize,
19
};
20
use pin_project::pin_project;
21
use std::marker::PhantomData;
22
use std::{pin::Pin, sync::Arc};
23

24
#[derive(Debug)]
×
25
pub enum CachedTiles {
26
    U8(Arc<Vec<CompressedRasterTile2D<u8>>>),
27
    U16(Arc<Vec<CompressedRasterTile2D<u16>>>),
28
    U32(Arc<Vec<CompressedRasterTile2D<u32>>>),
29
    U64(Arc<Vec<CompressedRasterTile2D<u64>>>),
30
    I8(Arc<Vec<CompressedRasterTile2D<i8>>>),
31
    I16(Arc<Vec<CompressedRasterTile2D<i16>>>),
32
    I32(Arc<Vec<CompressedRasterTile2D<i32>>>),
33
    I64(Arc<Vec<CompressedRasterTile2D<i64>>>),
34
    F32(Arc<Vec<CompressedRasterTile2D<f32>>>),
35
    F64(Arc<Vec<CompressedRasterTile2D<f64>>>),
36
}
37

38
impl ByteSize for CachedTiles {
39
    fn heap_byte_size(&self) -> usize {
10✔
40
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Arc` stores its data on the heap
10✔
41
        match self {
10✔
42
            CachedTiles::U8(tiles) => tiles.byte_size(),
10✔
43
            CachedTiles::U16(tiles) => tiles.byte_size(),
×
44
            CachedTiles::U32(tiles) => tiles.byte_size(),
×
45
            CachedTiles::U64(tiles) => tiles.byte_size(),
×
46
            CachedTiles::I8(tiles) => tiles.byte_size(),
×
47
            CachedTiles::I16(tiles) => tiles.byte_size(),
×
48
            CachedTiles::I32(tiles) => tiles.byte_size(),
×
49
            CachedTiles::I64(tiles) => tiles.byte_size(),
×
50
            CachedTiles::F32(tiles) => tiles.byte_size(),
×
51
            CachedTiles::F64(tiles) => tiles.byte_size(),
×
52
        }
53
    }
10✔
54
}
55

56
impl CachedTiles {
57
    fn is_expired(&self) -> bool {
6✔
58
        match self {
6✔
59
            CachedTiles::U8(v) => v.iter().any(|t| t.cache_hint.is_expired()),
6✔
60
            CachedTiles::U16(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
61
            CachedTiles::U32(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
62
            CachedTiles::U64(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
63
            CachedTiles::I8(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
64
            CachedTiles::I16(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
65
            CachedTiles::I32(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
66
            CachedTiles::I64(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
67
            CachedTiles::F32(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
68
            CachedTiles::F64(v) => v.iter().any(|t| t.cache_hint.is_expired()),
×
69
        }
70
    }
6✔
71
}
72

73
#[derive(Debug)]
×
74
pub enum LandingZoneQueryTiles {
75
    U8(Vec<CompressedRasterTile2D<u8>>),
76
    U16(Vec<CompressedRasterTile2D<u16>>),
77
    U32(Vec<CompressedRasterTile2D<u32>>),
78
    U64(Vec<CompressedRasterTile2D<u64>>),
79
    I8(Vec<CompressedRasterTile2D<i8>>),
80
    I16(Vec<CompressedRasterTile2D<i16>>),
81
    I32(Vec<CompressedRasterTile2D<i32>>),
82
    I64(Vec<CompressedRasterTile2D<i64>>),
83
    F32(Vec<CompressedRasterTile2D<f32>>),
84
    F64(Vec<CompressedRasterTile2D<f64>>),
85
}
86

87
impl LandingZoneQueryTiles {
88
    pub fn len(&self) -> usize {
×
89
        match self {
×
90
            LandingZoneQueryTiles::U8(v) => v.len(),
×
91
            LandingZoneQueryTiles::U16(v) => v.len(),
×
92
            LandingZoneQueryTiles::U32(v) => v.len(),
×
93
            LandingZoneQueryTiles::U64(v) => v.len(),
×
94
            LandingZoneQueryTiles::I8(v) => v.len(),
×
95
            LandingZoneQueryTiles::I16(v) => v.len(),
×
96
            LandingZoneQueryTiles::I32(v) => v.len(),
×
97
            LandingZoneQueryTiles::I64(v) => v.len(),
×
98
            LandingZoneQueryTiles::F32(v) => v.len(),
×
99
            LandingZoneQueryTiles::F64(v) => v.len(),
×
100
        }
101
    }
×
102

103
    pub fn is_empty(&self) -> bool {
×
104
        self.len() == 0
×
105
    }
×
106
}
107

108
impl ByteSize for LandingZoneQueryTiles {
109
    fn heap_byte_size(&self) -> usize {
14✔
110
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
14✔
111
        match self {
14✔
112
            LandingZoneQueryTiles::U8(v) => v.byte_size(),
14✔
113
            LandingZoneQueryTiles::U16(v) => v.byte_size(),
×
114
            LandingZoneQueryTiles::U32(v) => v.byte_size(),
×
115
            LandingZoneQueryTiles::U64(v) => v.byte_size(),
×
116
            LandingZoneQueryTiles::I8(v) => v.byte_size(),
×
117
            LandingZoneQueryTiles::I16(v) => v.byte_size(),
×
118
            LandingZoneQueryTiles::I32(v) => v.byte_size(),
×
119
            LandingZoneQueryTiles::I64(v) => v.byte_size(),
×
120
            LandingZoneQueryTiles::F32(v) => v.byte_size(),
×
121
            LandingZoneQueryTiles::F64(v) => v.byte_size(),
×
122
        }
123
    }
14✔
124
}
125

126
impl From<LandingZoneQueryTiles> for CachedTiles {
127
    fn from(value: LandingZoneQueryTiles) -> Self {
6✔
128
        match value {
6✔
129
            LandingZoneQueryTiles::U8(t) => CachedTiles::U8(Arc::new(t)),
6✔
130
            LandingZoneQueryTiles::U16(t) => CachedTiles::U16(Arc::new(t)),
×
131
            LandingZoneQueryTiles::U32(t) => CachedTiles::U32(Arc::new(t)),
×
132
            LandingZoneQueryTiles::U64(t) => CachedTiles::U64(Arc::new(t)),
×
133
            LandingZoneQueryTiles::I8(t) => CachedTiles::I8(Arc::new(t)),
×
134
            LandingZoneQueryTiles::I16(t) => CachedTiles::I16(Arc::new(t)),
×
135
            LandingZoneQueryTiles::I32(t) => CachedTiles::I32(Arc::new(t)),
×
136
            LandingZoneQueryTiles::I64(t) => CachedTiles::I64(Arc::new(t)),
×
137
            LandingZoneQueryTiles::F32(t) => CachedTiles::F32(Arc::new(t)),
×
138
            LandingZoneQueryTiles::F64(t) => CachedTiles::F64(Arc::new(t)),
×
139
        }
140
    }
6✔
141
}
142

143
impl CacheElementsContainerInfos<RasterQueryRectangle> for CachedTiles {
144
    fn is_expired(&self) -> bool {
6✔
145
        self.is_expired()
6✔
146
    }
6✔
147
}
148

149
impl<T> CacheElementsContainer<RasterQueryRectangle, CompressedRasterTile2D<T>> for CachedTiles
150
where
151
    T: Pixel,
152
    CompressedRasterTile2D<T>: CacheBackendElementExt<CacheContainer = CachedTiles>,
153
{
154
    fn results_arc(&self) -> Option<Arc<Vec<CompressedRasterTile2D<T>>>> {
5✔
155
        CompressedRasterTile2D::<T>::results_arc(self)
5✔
156
    }
5✔
157
}
158

159
impl<T> LandingZoneElementsContainer<CompressedRasterTile2D<T>> for LandingZoneQueryTiles
160
where
161
    T: Pixel,
162
    CompressedRasterTile2D<T>: CacheBackendElementExt<LandingZoneContainer = LandingZoneQueryTiles>,
163
{
164
    fn insert_element(
5✔
165
        &mut self,
5✔
166
        element: CompressedRasterTile2D<T>,
5✔
167
    ) -> Result<(), super::error::CacheError> {
5✔
168
        CompressedRasterTile2D::<T>::move_element_into_landing_zone(element, self)
5✔
169
    }
5✔
170

171
    fn create_empty() -> Self {
7✔
172
        CompressedRasterTile2D::<T>::create_empty_landing_zone()
7✔
173
    }
7✔
174
}
175

176
impl<T> CacheBackendElement for CompressedRasterTile2D<T>
177
where
178
    T: Pixel,
179
{
180
    type Query = RasterQueryRectangle;
181

182
    fn cache_hint(&self) -> geoengine_datatypes::primitives::CacheHint {
6✔
183
        self.cache_hint
6✔
184
    }
6✔
185

186
    fn typed_canonical_operator_name(
5✔
187
        key: crate::engine::CanonicOperatorName,
5✔
188
    ) -> super::shared_cache::TypedCanonicOperatorName {
5✔
189
        super::shared_cache::TypedCanonicOperatorName::Raster(key)
5✔
190
    }
5✔
191

192
    fn update_stored_query(&self, query: &mut Self::Query) -> Result<(), CacheError> {
5✔
193
        query.spatial_bounds.extend(&self.spatial_partition());
5✔
194
        query.time_interval = query
5✔
195
            .time_interval
5✔
196
            .union(&self.time)
5✔
197
            .map_err(|_| CacheError::ElementAndQueryDoNotIntersect)?;
5✔
198
        Ok(())
5✔
199
    }
5✔
200
}
201

202
macro_rules! impl_cache_element_subtype {
203
    ($t:ty, $variant:ident) => {
204
        impl CacheBackendElementExt for CompressedRasterTile2D<$t> {
205
            type LandingZoneContainer = LandingZoneQueryTiles;
206
            type CacheContainer = CachedTiles;
207

208
            fn move_element_into_landing_zone(
5✔
209
                self,
5✔
210
                landing_zone: &mut LandingZoneQueryTiles,
5✔
211
            ) -> Result<(), super::error::CacheError> {
5✔
212
                match landing_zone {
5✔
213
                    LandingZoneQueryTiles::$variant(v) => {
5✔
214
                        v.push(self);
5✔
215
                        Ok(())
5✔
216
                    }
217
                    _ => Err(super::error::CacheError::InvalidTypeForInsertion),
×
218
                }
219
            }
5✔
220

221
            fn create_empty_landing_zone() -> LandingZoneQueryTiles {
7✔
222
                LandingZoneQueryTiles::$variant(Vec::new())
7✔
223
            }
7✔
224

225
            fn results_arc(cache_elements_container: &CachedTiles) -> Option<Arc<Vec<Self>>> {
226
                if let CachedTiles::$variant(v) = cache_elements_container {
5✔
227
                    Some(v.clone())
5✔
228
                } else {
229
                    None
×
230
                }
231
            }
5✔
232

233
            fn landing_zone_to_cache_entry(
5✔
234
                landing_zone_entry: RasterLandingQueryEntry,
5✔
235
            ) -> RasterCacheQueryEntry {
5✔
236
                landing_zone_entry.into()
5✔
237
            }
5✔
238
        }
239
    };
240
}
241
impl_cache_element_subtype!(i8, I8);
242
impl_cache_element_subtype!(u8, U8);
243
impl_cache_element_subtype!(i16, I16);
244
impl_cache_element_subtype!(u16, U16);
245
impl_cache_element_subtype!(i32, I32);
246
impl_cache_element_subtype!(u32, U32);
247
impl_cache_element_subtype!(i64, I64);
248
impl_cache_element_subtype!(u64, U64);
249
impl_cache_element_subtype!(f32, F32);
250
impl_cache_element_subtype!(f64, F64);
251

252
type DecompressorFutureType<T> =
253
    tokio::task::JoinHandle<std::result::Result<RasterTile2D<T>, CacheError>>;
254

255
/// Our own tile stream that "owns" the data (more precisely a reference to the data)
256

257
#[pin_project(project = CacheTileStreamProjection)]
×
258
pub struct CacheTileStream<T> {
259
    inner: CacheTileStreamInner<T>,
260
    #[pin]
261
    state: Option<DecompressorFutureType<T>>,
262
}
263

264
pub struct CacheTileStreamInner<T> {
265
    data: Arc<Vec<CompressedRasterTile2D<T>>>,
266
    query: RasterQueryRectangle,
267
    idx: usize,
268
}
269

270
impl<T> CacheTileStreamInner<T>
271
where
272
    T: Pixel,
273
{
274
    // TODO: we could use a iter + filter adapter here to return refs however this would require a lot of lifetime annotations
275
    fn next_idx(&mut self) -> Option<usize> {
×
276
        let query_st_bounds = self.query.spatial_bounds;
×
277
        let query_time_bounds = self.query.time_interval;
×
278
        for i in self.idx..self.data.len() {
×
279
            let tile_ref = &self.data[i];
×
280
            let tile_bbox = tile_ref.spatial_partition();
×
281
            if tile_bbox.intersects(&query_st_bounds)
×
282
                && tile_ref.time.intersects(&query_time_bounds)
×
283
            {
284
                self.idx = i + 1;
×
285
                return Some(i);
×
286
            }
×
287
        }
288
        None
×
289
    }
×
290

291
    fn data_arc(&self) -> Arc<Vec<CompressedRasterTile2D<T>>> {
×
292
        self.data.clone()
×
293
    }
×
294

295
    fn len(&self) -> usize {
×
296
        self.data.len()
×
297
    }
×
298

299
    fn remaining(&self) -> usize {
×
300
        self.len() - self.idx
×
301
    }
×
302

303
    fn terminated(&self) -> bool {
×
304
        self.idx >= self.len()
×
305
    }
×
306

307
    fn new(data: Arc<Vec<CompressedRasterTile2D<T>>>, query: RasterQueryRectangle) -> Self {
1✔
308
        Self {
1✔
309
            data,
1✔
310
            query,
1✔
311
            idx: 0,
1✔
312
        }
1✔
313
    }
1✔
314
}
315

316
impl<T> CacheTileStream<T>
317
where
318
    T: Pixel,
319
{
320
    pub fn new(data: Arc<Vec<CompressedRasterTile2D<T>>>, query: RasterQueryRectangle) -> Self {
1✔
321
        Self {
1✔
322
            inner: CacheTileStreamInner::new(data, query),
1✔
323
            state: None,
1✔
324
        }
1✔
325
    }
1✔
326

327
    pub fn element_count(&self) -> usize {
×
328
        self.inner.len()
×
329
    }
×
330

331
    fn terminated(&self) -> bool {
×
332
        self.state.is_none() && self.inner.terminated()
×
333
    }
×
334

335
    fn check_decompress_future_res(
336
        tile_future_res: Result<Result<RasterTile2D<T>, CacheError>, tokio::task::JoinError>,
337
    ) -> Result<RasterTile2D<T>, CacheError> {
338
        match tile_future_res {
×
339
            Ok(Ok(tile)) => Ok(tile),
×
340
            Ok(Err(err)) => Err(err),
×
341
            Err(source) => Err(CacheError::CouldNotRunDecompressionTask { source }),
×
342
        }
343
    }
×
344

345
    fn create_decompression_future(
×
346
        data: Arc<Vec<CompressedRasterTile2D<T>>>,
×
347
        idx: usize,
×
348
    ) -> DecompressorFutureType<T>
×
349
    where
×
350
        T: Pixel,
×
351
        CompressedRasterTile2D<T>: CacheBackendElementExt<Query = RasterQueryRectangle>,
×
352
    {
×
353
        crate::util::spawn_blocking(move || RasterTile2D::<T>::from_stored_element_ref(&data[idx]))
×
354
    }
×
355
}
356

357
impl<T: Pixel> Stream for CacheTileStream<T>
358
where
359
    RasterTile2D<T>: CacheElement<StoredCacheElement = CompressedRasterTile2D<T>>,
360
    CompressedRasterTile2D<T>: CacheBackendElementExt<Query = RasterQueryRectangle>,
361
{
362
    type Item = Result<RasterTile2D<T>, CacheError>;
363

364
    fn poll_next(
×
365
        mut self: Pin<&mut Self>,
×
366
        cx: &mut std::task::Context<'_>,
×
367
    ) -> std::task::Poll<Option<Self::Item>> {
×
368
        if self.terminated() {
×
369
            return std::task::Poll::Ready(None);
×
370
        }
×
371

×
372
        let CacheTileStreamProjection { inner, mut state } = self.as_mut().project();
×
373

×
374
        if state.is_none() {
×
375
            if let Some(next) = inner.next_idx() {
×
376
                let future_data = inner.data_arc();
×
377
                let future = Self::create_decompression_future(future_data, next);
×
378
                state.set(Some(future));
×
379
            }
×
380
        }
×
381

382
        if let Some(pin_state) = state.as_mut().as_pin_mut() {
×
383
            let res = futures::ready!(pin_state.poll(cx));
×
384
            state.set(None);
×
385
            let tile = Self::check_decompress_future_res(res);
×
386
            return std::task::Poll::Ready(Some(tile));
×
387
        }
×
388

×
389
        std::task::Poll::Ready(None)
×
390
    }
×
391

392
    fn size_hint(&self) -> (usize, Option<usize>) {
×
393
        if self.terminated() {
×
394
            return (0, Some(0));
×
395
        }
×
396
        // There must be a cache hit to produce this stream. So there must be at least one tile inside the query.
×
397
        (1, Some(self.inner.remaining()))
×
398
    }
×
399
}
400

401
impl<T> FusedStream for CacheTileStream<T>
402
where
403
    T: Pixel,
404
    CompressedRasterTile2D<T>: CacheBackendElementExt<Query = RasterQueryRectangle>,
405
{
406
    fn is_terminated(&self) -> bool {
×
407
        self.terminated()
×
408
    }
×
409
}
410

411
#[derive(Clone, Debug)]
1✔
412
pub struct CompressedMaskedGrid<D, T, C> {
413
    shape: D,
414
    type_marker: PhantomData<T>,
415
    data: Vec<u8>,
416
    mask: Vec<u8>,
417
    compression_marker: PhantomData<C>,
418
}
419

420
#[derive(Clone, Debug)]
1✔
421
pub enum CompressedGridOrEmpty<D, T, F> {
422
    Empty(EmptyGrid<D, T>),
423
    Compressed(CompressedMaskedGrid<D, T, F>),
424
}
425

426
pub type CompressedRasterTile<D, T> = BaseTile<CompressedGridOrEmpty<D, T, Lz4FlexCompression>>;
427
pub type CompressedRasterTile2D<T> = CompressedRasterTile<GridShape2D, T>;
428

429
impl<D, T, C: TileCompression> CompressedMaskedGrid<D, T, C> {
430
    #[cfg(test)]
431
    pub(crate) fn new(shape: D, data: Vec<u8>, mask: Vec<u8>) -> Self {
9✔
432
        Self {
9✔
433
            shape,
9✔
434
            type_marker: PhantomData,
9✔
435
            data,
9✔
436
            mask,
9✔
437
            compression_marker: PhantomData,
9✔
438
        }
9✔
439
    }
9✔
440

441
    pub fn compressed_data_len(&self) -> usize {
×
442
        self.data.len()
×
443
    }
×
444

445
    pub fn compressed_mask_len(&self) -> usize {
×
446
        self.mask.len()
×
447
    }
×
448

449
    pub fn compressed_len(&self) -> usize {
×
450
        self.compressed_data_len() + self.compressed_mask_len()
×
451
    }
×
452

453
    pub fn compressed_data_slice(&self) -> &[u8] {
×
454
        &self.data
×
455
    }
×
456

457
    pub fn compressed_mask_slice(&self) -> &[u8] {
×
458
        &self.mask
×
459
    }
×
460

461
    pub fn shape(&self) -> &D {
10✔
462
        &self.shape
10✔
463
    }
10✔
464

465
    pub fn compress_masked_grid(grid: &MaskedGrid<D, T>) -> Self
1✔
466
    where
1✔
467
        D: Clone + GridSize + PartialEq,
1✔
468
        T: Copy,
1✔
469
    {
1✔
470
        let grid_data_compressed = C::compress(&grid.inner_grid.data);
1✔
471
        let grid_mask_compressed = C::compress(&grid.validity_mask.data);
1✔
472

1✔
473
        Self {
1✔
474
            shape: grid.shape().clone(),
1✔
475
            type_marker: PhantomData,
1✔
476
            data: grid_data_compressed,
1✔
477
            mask: grid_mask_compressed,
1✔
478
            compression_marker: PhantomData,
1✔
479
        }
1✔
480
    }
1✔
481

482
    pub fn decompress_masked_grid(&self) -> Result<MaskedGrid<D, T>, CacheError>
×
483
    where
×
484
        D: Clone + GridSize + PartialEq,
×
485
        T: Copy,
×
486
    {
×
487
        let elements = self.shape().number_of_elements();
×
488

489
        let grid_data = C::decompress(&self.data, elements)?;
×
490
        let grid_mask = C::decompress(&self.mask, elements)?;
×
491

492
        let masked_grid = MaskedGrid {
×
493
            inner_grid: Grid {
×
494
                shape: self.shape.clone(),
×
495
                data: grid_data,
×
496
            },
×
497
            validity_mask: Grid {
×
498
                shape: self.shape.clone(),
×
499
                data: grid_mask,
×
500
            },
×
501
        };
×
502

×
503
        Ok(masked_grid)
×
504
    }
×
505
}
506

507
impl<D, T, C: TileCompression> CompressedGridOrEmpty<D, T, C> {
508
    pub fn compressed_data_len(&self) -> usize {
×
509
        match self {
×
510
            Self::Empty(_empty_grid) => 0,
×
511
            Self::Compressed(compressed_grid) => compressed_grid.compressed_data_len(),
×
512
        }
513
    }
×
514

515
    pub fn compressed_mask_len(&self) -> usize {
×
516
        match self {
×
517
            Self::Empty(_empty_grid) => 0,
×
518
            Self::Compressed(compressed_grid) => compressed_grid.compressed_mask_len(),
×
519
        }
520
    }
×
521

522
    pub fn compressed_len(&self) -> usize {
×
523
        match self {
×
524
            Self::Empty(_empty_grid) => 0,
×
525
            Self::Compressed(compressed_grid) => compressed_grid.compressed_len(),
×
526
        }
527
    }
×
528

529
    pub fn shape(&self) -> &D {
10✔
530
        match self {
10✔
531
            Self::Empty(empty_grid) => &empty_grid.shape,
×
532
            Self::Compressed(compressed_grid) => compressed_grid.shape(),
10✔
533
        }
534
    }
10✔
535

536
    pub fn is_empty(&self) -> bool {
×
537
        match self {
×
538
            Self::Empty(_) => true,
×
539
            Self::Compressed(_) => false,
×
540
        }
541
    }
×
542

543
    pub fn compress_grid(grid: &GridOrEmpty<D, T>) -> Self
9✔
544
    where
9✔
545
        D: Clone + GridSize + PartialEq,
9✔
546
        T: Copy,
9✔
547
    {
9✔
548
        match grid {
9✔
549
            GridOrEmpty::Empty(empty_grid) => Self::Empty(empty_grid.clone()),
8✔
550
            GridOrEmpty::Grid(grid) => {
1✔
551
                let compressed_grid = CompressedMaskedGrid::compress_masked_grid(grid);
1✔
552
                Self::Compressed(compressed_grid)
1✔
553
            }
554
        }
555
    }
9✔
556

557
    pub fn decompress_grid(&self) -> Result<GridOrEmpty<D, T>, CacheError>
×
558
    where
×
559
        D: Clone + GridSize + PartialEq,
×
560
        T: Copy,
×
561
    {
×
562
        match self {
×
563
            Self::Empty(empty_grid) => Ok(GridOrEmpty::Empty(empty_grid.clone())),
×
564
            Self::Compressed(compressed_grid) => {
×
565
                let decompressed_grid = compressed_grid.decompress_masked_grid()?;
×
566
                Ok(GridOrEmpty::Grid(decompressed_grid))
×
567
            }
568
        }
569
    }
×
570
}
571

572
impl<D, T, C> GridSize for CompressedGridOrEmpty<D, T, C>
573
where
574
    D: GridSize + GridSpaceToLinearSpace + PartialEq + Clone,
575
    T: Clone + Default,
576
    C: TileCompression,
577
{
578
    type ShapeArray = D::ShapeArray;
579

580
    const NDIM: usize = D::NDIM;
581

582
    fn axis_size(&self) -> Self::ShapeArray {
10✔
583
        self.shape().axis_size()
10✔
584
    }
10✔
585

586
    fn number_of_elements(&self) -> usize {
×
587
        self.shape().number_of_elements()
×
588
    }
×
589
}
590

591
impl<D, T, C> ByteSize for CompressedGridOrEmpty<D, T, C> {
592
    fn heap_byte_size(&self) -> usize {
23✔
593
        match self {
23✔
594
            Self::Empty(empty_grid) => empty_grid.heap_byte_size(),
×
595
            Self::Compressed(compressed_grid) => compressed_grid.heap_byte_size(),
23✔
596
        }
597
    }
23✔
598
}
599

600
impl<D, T, C> ByteSize for CompressedMaskedGrid<D, T, C> {
601
    fn heap_byte_size(&self) -> usize {
23✔
602
        self.data.heap_byte_size() + self.mask.heap_byte_size()
23✔
603
    }
23✔
604
}
605

606
pub trait CompressedRasterTileExt<D, T>
607
where
608
    Self: Sized,
609
{
610
    fn compress_tile(tile: RasterTile<D, T>) -> Self;
611

612
    fn decompress_tile(&self) -> Result<RasterTile<D, T>, CacheError>;
613

614
    fn compressed_data_len(&self) -> usize;
615
}
616

617
impl<T, C> CompressedRasterTileExt<GridShape2D, T>
618
    for BaseTile<CompressedGridOrEmpty<GridShape2D, T, C>>
619
where
620
    T: Copy,
621
    C: TileCompression,
622
{
623
    fn compress_tile(tile: RasterTile<GridShape2D, T>) -> Self {
9✔
624
        let compressed_grid = CompressedGridOrEmpty::compress_grid(&tile.grid_array);
9✔
625
        Self {
9✔
626
            grid_array: compressed_grid,
9✔
627
            time: tile.time,
9✔
628
            cache_hint: tile.cache_hint,
9✔
629
            global_geo_transform: tile.global_geo_transform,
9✔
630
            properties: tile.properties,
9✔
631
            tile_position: tile.tile_position,
9✔
632
        }
9✔
633
    }
9✔
634

635
    fn decompress_tile(&self) -> Result<RasterTile<GridShape2D, T>, CacheError> {
×
636
        let grid_array = match &self.grid_array {
×
637
            CompressedGridOrEmpty::Empty(empty_grid) => GridOrEmpty::Empty(*empty_grid),
×
638
            CompressedGridOrEmpty::Compressed(compressed_grid) => {
×
639
                let decompressed_grid = compressed_grid.decompress_masked_grid()?;
×
640
                GridOrEmpty::Grid(decompressed_grid)
×
641
            }
642
        };
643

644
        Ok(RasterTile {
×
645
            grid_array,
×
646
            time: self.time,
×
647
            cache_hint: self.cache_hint,
×
648
            global_geo_transform: self.global_geo_transform,
×
649
            properties: self.properties.clone(),
×
650
            tile_position: self.tile_position,
×
651
        })
×
652
    }
×
653

654
    fn compressed_data_len(&self) -> usize {
×
655
        self.grid_array.compressed_data_len()
×
656
    }
×
657
}
658

659
impl<T> CacheElement for RasterTile2D<T>
660
where
661
    T: Pixel,
662
    CompressedRasterTile2D<T>: CompressedRasterTileExt<GridShape2D, T>
663
        + CacheBackendElement<Query = RasterQueryRectangle>
664
        + CacheBackendElementExt,
665
{
666
    type StoredCacheElement = CompressedRasterTile2D<T>;
667
    type Query = RasterQueryRectangle;
668
    type ResultStream = CacheTileStream<T>;
669

670
    fn into_stored_element(self) -> Self::StoredCacheElement {
9✔
671
        CompressedRasterTile2D::compress_tile(self)
9✔
672
    }
9✔
673

674
    fn from_stored_element_ref(stored: &Self::StoredCacheElement) -> Result<Self, CacheError> {
×
675
        stored.decompress_tile()
×
676
    }
×
677

678
    fn result_stream(
1✔
679
        stored_data: Arc<Vec<Self::StoredCacheElement>>,
1✔
680
        query: Self::Query,
1✔
681
    ) -> Self::ResultStream {
1✔
682
        CacheTileStream::new(stored_data, query)
1✔
683
    }
1✔
684
}
685

686
pub trait TileCompression {
687
    fn compress<T>(data: &[T]) -> Vec<u8>
688
    where
689
        T: Copy;
690

691
    fn decompress<T>(data: &[u8], elements: usize) -> Result<Vec<T>, CacheError>
692
    where
693
        T: Copy;
694
}
695

696
#[derive(Debug, Copy, Clone)]
×
697
pub struct Lz4FlexCompression;
698

699
impl Lz4FlexCompression {
700
    fn cast_data_slice_to_u8_slice<T>(data: &[T]) -> &[u8]
2✔
701
    where
2✔
702
        T: Copy,
2✔
703
    {
2✔
704
        unsafe {
2✔
705
            std::slice::from_raw_parts(data.as_ptr().cast::<u8>(), std::mem::size_of_val(data))
2✔
706
        }
2✔
707
    }
2✔
708

709
    fn cast_u8_slice_to_data_slice<T>(data: &[u8]) -> &[T]
×
710
    where
×
711
        T: Copy,
×
712
    {
×
713
        unsafe {
×
714
            std::slice::from_raw_parts(
×
715
                data.as_ptr().cast::<T>(),
×
716
                data.len() / std::mem::size_of::<T>(),
×
717
            )
×
718
        }
×
719
    }
×
720
}
721

722
impl TileCompression for Lz4FlexCompression {
723
    fn compress<T>(data: &[T]) -> Vec<u8>
2✔
724
    where
2✔
725
        T: Copy,
2✔
726
    {
2✔
727
        let data_as_u8_slice = Self::cast_data_slice_to_u8_slice(data);
2✔
728
        lz4_flex::compress(data_as_u8_slice)
2✔
729
    }
2✔
730

731
    fn decompress<T>(data: &[u8], elements: usize) -> Result<Vec<T>, CacheError>
×
732
    where
×
733
        T: Copy,
×
734
    {
×
735
        let stored_bytes = elements * std::mem::size_of::<T>();
×
736
        let decompressed_data = lz4_flex::decompress(data, stored_bytes)
×
737
            .map_err(|source| CacheError::CouldNotDecompressElement { source })?;
×
738
        let decompressed_data_as_t_slice =
×
739
            Self::cast_u8_slice_to_data_slice(decompressed_data.as_slice());
×
740
        Ok(decompressed_data_as_t_slice.to_vec())
×
741
    }
×
742
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc