• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5856046428

14 Aug 2023 01:37PM UTC coverage: 89.484% (-0.1%) from 89.596%
5856046428

push

github

web-flow
Merge pull request #848 from geo-engine/compressed-raster-cache

compress raster tile cache

475 of 475 new or added lines in 4 files covered. (100.0%)

104049 of 116277 relevant lines covered (89.48%)

62266.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/operators/src/pro/cache/cache_chunks.rs
1
use super::error::CacheError;
2
use super::shared_cache::{
3
    CacheBackendElement, CacheBackendElementExt, CacheElement, CacheElementsContainer,
4
    CacheElementsContainerInfos, LandingZoneElementsContainer, VectorCacheQueryEntry,
5
    VectorLandingQueryEntry,
6
};
7
use crate::util::Result;
8
use futures::stream::FusedStream;
9
use futures::Stream;
10

11
use geoengine_datatypes::{
12
    collections::{
13
        DataCollection, FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
14
        GeometryCollection, IntoGeometryIterator, MultiLineStringCollection, MultiPointCollection,
15
        MultiPolygonCollection,
16
    },
17
    primitives::{
18
        Geometry, MultiLineString, MultiPoint, MultiPolygon, NoGeometry, VectorQueryRectangle,
19
    },
20
    util::{arrow::ArrowTyped, ByteSize},
21
};
22
use pin_project::pin_project;
23

24
use std::{pin::Pin, sync::Arc};
25

26
#[derive(Debug)]
×
27
pub enum CachedFeatures {
28
    NoGeometry(Arc<Vec<DataCollection>>),
29
    MultiPoint(Arc<Vec<MultiPointCollection>>),
30
    MultiLineString(Arc<Vec<MultiLineStringCollection>>),
31
    MultiPolygon(Arc<Vec<MultiPolygonCollection>>),
32
}
33

34
impl CachedFeatures {
35
    pub fn len(&self) -> usize {
×
36
        match self {
×
37
            CachedFeatures::NoGeometry(v) => v.len(),
×
38
            CachedFeatures::MultiPoint(v) => v.len(),
×
39
            CachedFeatures::MultiLineString(v) => v.len(),
×
40
            CachedFeatures::MultiPolygon(v) => v.len(),
×
41
        }
42
    }
×
43

44
    pub fn is_empty(&self) -> bool {
×
45
        self.len() == 0
×
46
    }
×
47

48
    pub fn is_expired(&self) -> bool {
×
49
        match self {
×
50
            CachedFeatures::NoGeometry(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
51
            CachedFeatures::MultiPoint(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
52
            CachedFeatures::MultiLineString(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
53
            CachedFeatures::MultiPolygon(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
54
        }
55
    }
×
56
}
57

58
impl ByteSize for CachedFeatures {
59
    fn heap_byte_size(&self) -> usize {
×
60
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
61
        match self {
×
62
            CachedFeatures::NoGeometry(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
63
            CachedFeatures::MultiPoint(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
64
            CachedFeatures::MultiLineString(v) => {
×
65
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
66
            }
67
            CachedFeatures::MultiPolygon(v) => {
×
68
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
69
            }
70
        }
71
    }
×
72
}
73

74
#[derive(Debug)]
×
75
pub enum LandingZoneQueryFeatures {
76
    NoGeometry(Vec<DataCollection>),
77
    MultiPoint(Vec<MultiPointCollection>),
78
    MultiLineString(Vec<MultiLineStringCollection>),
79
    MultiPolygon(Vec<MultiPolygonCollection>),
80
}
81

82
impl LandingZoneQueryFeatures {
83
    pub fn len(&self) -> usize {
×
84
        match self {
×
85
            LandingZoneQueryFeatures::NoGeometry(v) => v.len(),
×
86
            LandingZoneQueryFeatures::MultiPoint(v) => v.len(),
×
87
            LandingZoneQueryFeatures::MultiLineString(v) => v.len(),
×
88
            LandingZoneQueryFeatures::MultiPolygon(v) => v.len(),
×
89
        }
90
    }
×
91

92
    pub fn is_empty(&self) -> bool {
×
93
        self.len() == 0
×
94
    }
×
95
}
96

97
impl ByteSize for LandingZoneQueryFeatures {
98
    fn heap_byte_size(&self) -> usize {
×
99
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
100
        match self {
×
101
            LandingZoneQueryFeatures::NoGeometry(v) => {
×
102
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
103
            }
104
            LandingZoneQueryFeatures::MultiPoint(v) => {
×
105
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
106
            }
107
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
108
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
109
            }
110
            LandingZoneQueryFeatures::MultiPolygon(v) => {
×
111
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
112
            }
113
        }
114
    }
×
115
}
116

117
impl From<LandingZoneQueryFeatures> for CachedFeatures {
118
    fn from(value: LandingZoneQueryFeatures) -> Self {
×
119
        match value {
×
120
            LandingZoneQueryFeatures::NoGeometry(v) => CachedFeatures::NoGeometry(Arc::new(v)),
×
121
            LandingZoneQueryFeatures::MultiPoint(v) => CachedFeatures::MultiPoint(Arc::new(v)),
×
122
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
123
                CachedFeatures::MultiLineString(Arc::new(v))
×
124
            }
125
            LandingZoneQueryFeatures::MultiPolygon(v) => CachedFeatures::MultiPolygon(Arc::new(v)),
×
126
        }
127
    }
×
128
}
129

130
impl CacheElementsContainerInfos<VectorQueryRectangle> for CachedFeatures {
131
    fn is_expired(&self) -> bool {
×
132
        self.is_expired()
×
133
    }
×
134
}
135

136
impl<G> CacheElementsContainer<VectorQueryRectangle, FeatureCollection<G>> for CachedFeatures
137
where
138
    G: Geometry + ArrowTyped,
139
    FeatureCollection<G>: CacheBackendElementExt<CacheContainer = Self> + CacheElementHitCheck,
140
{
141
    fn results_arc(&self) -> Option<Arc<Vec<FeatureCollection<G>>>> {
×
142
        FeatureCollection::<G>::results_arc(self)
×
143
    }
×
144
}
145

146
impl<G> LandingZoneElementsContainer<FeatureCollection<G>> for LandingZoneQueryFeatures
147
where
148
    G: Geometry + ArrowTyped,
149
    FeatureCollection<G>:
150
        CacheBackendElementExt<LandingZoneContainer = Self> + CacheElementHitCheck,
151
{
152
    fn insert_element(
×
153
        &mut self,
×
154
        element: FeatureCollection<G>,
×
155
    ) -> Result<(), super::error::CacheError> {
×
156
        FeatureCollection::<G>::move_element_into_landing_zone(element, self)
×
157
    }
×
158

159
    fn create_empty() -> Self {
×
160
        FeatureCollection::<G>::create_empty_landing_zone()
×
161
    }
×
162
}
163

164
impl<G> CacheBackendElement for FeatureCollection<G>
165
where
166
    G: Geometry + ArrowTyped + ArrowTyped + Sized,
167
    FeatureCollection<G>: CacheElementHitCheck,
168
{
169
    type Query = VectorQueryRectangle;
170

171
    fn cache_hint(&self) -> geoengine_datatypes::primitives::CacheHint {
×
172
        self.cache_hint
×
173
    }
×
174

175
    fn typed_canonical_operator_name(
×
176
        key: crate::engine::CanonicOperatorName,
×
177
    ) -> super::shared_cache::TypedCanonicOperatorName {
×
178
        super::shared_cache::TypedCanonicOperatorName::Vector(key)
×
179
    }
×
180

181
    fn update_stored_query(&self, _query: &mut Self::Query) -> Result<(), CacheError> {
×
182
        // In this case, the elements of the cache are vector data chunks.
×
183
        // Unlike raster data, chunks have no guaranteed extent (spatial or temporal) other than the limits of the query itself.
×
184
        // If a vector element has a larger extent than the query, then the bbox computed for the collection is larger than the query bbox.
×
185
        // However, there may be a point that is outside the query bbox but inside the collection bbox. As it is not in the query bbox, it must not be returned as the result of the query.
×
186
        // So the query is not updated.
×
187
        Ok(())
×
188
    }
×
189
}
190

191
macro_rules! impl_cache_element_subtype {
192
    ($g:ty, $variant:ident) => {
193
        impl CacheBackendElementExt for FeatureCollection<$g> {
194
            type LandingZoneContainer = LandingZoneQueryFeatures;
195
            type CacheContainer = CachedFeatures;
196

197
            fn move_element_into_landing_zone(
×
198
                self,
×
199
                landing_zone: &mut LandingZoneQueryFeatures,
×
200
            ) -> Result<(), super::error::CacheError> {
×
201
                match landing_zone {
×
202
                    LandingZoneQueryFeatures::$variant(v) => {
×
203
                        v.push(self);
×
204
                        Ok(())
×
205
                    }
206
                    _ => Err(super::error::CacheError::InvalidTypeForInsertion),
×
207
                }
208
            }
×
209

210
            fn create_empty_landing_zone() -> LandingZoneQueryFeatures {
×
211
                LandingZoneQueryFeatures::$variant(Vec::new())
×
212
            }
×
213

214
            fn results_arc(cache_elements_container: &CachedFeatures) -> Option<Arc<Vec<Self>>> {
215
                if let CachedFeatures::$variant(v) = cache_elements_container {
×
216
                    Some(v.clone())
×
217
                } else {
218
                    None
×
219
                }
220
            }
×
221

222
            fn landing_zone_to_cache_entry(
×
223
                landing_zone_entry: VectorLandingQueryEntry,
×
224
            ) -> VectorCacheQueryEntry {
×
225
                landing_zone_entry.into()
×
226
            }
×
227
        }
228
    };
229
}
230
impl_cache_element_subtype!(NoGeometry, NoGeometry);
231
impl_cache_element_subtype!(MultiPoint, MultiPoint);
232
impl_cache_element_subtype!(MultiLineString, MultiLineString);
233
impl_cache_element_subtype!(MultiPolygon, MultiPolygon);
234

235
/// Our own tile stream that "owns" the data (more precisely a reference to the data)
236
#[pin_project(project = CacheChunkStreamProjection)]
×
237
pub struct CacheChunkStream<G> {
238
    data: Arc<Vec<FeatureCollection<G>>>,
239
    query: VectorQueryRectangle,
240
    idx: usize,
241
}
242

243
impl<G> CacheChunkStream<G>
244
where
245
    G: Geometry,
246
    FeatureCollection<G>: FeatureCollectionInfos,
247
{
248
    pub fn new(data: Arc<Vec<FeatureCollection<G>>>, query: VectorQueryRectangle) -> Self {
×
249
        Self {
×
250
            data,
×
251
            query,
×
252
            idx: 0,
×
253
        }
×
254
    }
×
255

256
    pub fn element_count(&self) -> usize {
×
257
        self.data.len()
×
258
    }
×
259
}
260

261
impl<G: Geometry> Stream for CacheChunkStream<G>
262
where
263
    FeatureCollection<G>: CacheElementHitCheck,
264
    G: ArrowTyped,
265
{
266
    type Item = Result<FeatureCollection<G>, CacheError>;
267

268
    fn poll_next(
×
269
        mut self: Pin<&mut Self>,
×
270
        _cx: &mut std::task::Context<'_>,
×
271
    ) -> std::task::Poll<Option<Self::Item>> {
×
272
        let CacheChunkStreamProjection { data, query, idx } = self.as_mut().project();
×
273

274
        // return the next tile that is contained in the query, skip all tiles that are not contained
275
        for i in *idx..data.len() {
×
276
            let chunk = &data[i];
×
277
            if chunk.cache_element_hit(query) {
×
278
                // TODO: we really should cache the elements bbox somewhere
279
                let Ok(chunk) = chunk.filter_cache_element_entries(query) else {
×
280
                    // This should not happen, since we already checked that the element is contained in the query
281
                    log::error!("Could not filter cache element entries");
×
282
                    continue;
×
283
                };
284

285
                // if the chunk is empty, we can skip it
286
                if chunk.is_empty() {
×
287
                    log::trace!("Skipping empty chunk after filtering for query rectangle");
×
288
                    continue;
×
289
                }
×
290

×
291
                // set the index to the next element
×
292
                *idx = i + 1;
×
293
                return std::task::Poll::Ready(Some(Ok(chunk)));
×
294
            }
×
295
        }
296

297
        std::task::Poll::Ready(None)
×
298
    }
×
299
}
300

301
impl<G> FusedStream for CacheChunkStream<G>
302
where
303
    G: Geometry + ArrowTyped,
304
    FeatureCollection<G>: CacheElementHitCheck,
305
{
306
    fn is_terminated(&self) -> bool {
×
307
        self.idx >= self.data.len()
×
308
    }
×
309
}
310

311
pub trait CacheElementHitCheck {
312
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool;
313

314
    fn filter_cache_element_entries(
315
        &self,
316
        query_rect: &VectorQueryRectangle,
317
    ) -> Result<Self, CacheError>
318
    where
319
        Self: Sized;
320
}
321

322
impl CacheElementHitCheck for FeatureCollection<NoGeometry> {
323
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
324
        let Some(time_bounds) = self.time_bounds() else {
×
325
            return false;
×
326
        };
327

328
        time_bounds == query_rect.time_interval || time_bounds.intersects(&query_rect.time_interval)
×
329
    }
×
330
    fn filter_cache_element_entries(
×
331
        &self,
×
332
        query_rect: &VectorQueryRectangle,
×
333
    ) -> Result<Self, CacheError> {
×
334
        let time_filter_bools = self
×
335
            .time_intervals()
×
336
            .iter()
×
337
            .map(|t| t.intersects(&query_rect.time_interval))
×
338
            .collect::<Vec<bool>>();
×
339
        self.filter(time_filter_bools)
×
340
            .map_err(|_err| CacheError::CouldNotFilterResults)
×
341
    }
×
342
}
343

344
macro_rules! impl_cache_result_check {
345
    ($t:ty) => {
346
        impl<'a> CacheElementHitCheck for FeatureCollection<$t>
347
        where
348
            FeatureCollection<$t>: GeometryCollection,
349
        {
350
            fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
351
                let Some(bbox) = self.bbox() else {return false;};
×
352

353
                let Some(time_bounds) = self.time_bounds() else {return false;};
×
354

355
                (bbox == query_rect.spatial_bounds
×
356
                    || bbox.intersects_bbox(&query_rect.spatial_bounds))
×
357
                    && (time_bounds == query_rect.time_interval
×
358
                        || time_bounds.intersects(&query_rect.time_interval))
×
359
            }
×
360

361
            fn filter_cache_element_entries(
×
362
                &self,
×
363
                query_rect: &VectorQueryRectangle,
×
364
            ) -> Result<Self, CacheError> {
×
365
                let geoms_filter_bools = self.geometries().map(|g| {
×
366
                    g.bbox()
367
                        .map(|bbox| bbox.intersects_bbox(&query_rect.spatial_bounds))
368
                        .unwrap_or(false)
369
                });
×
370

×
371
                let time_filter_bools = self
×
372
                    .time_intervals()
×
373
                    .iter()
×
374
                    .map(|t| t.intersects(&query_rect.time_interval));
×
375

×
376
                let filter_bools = geoms_filter_bools
×
377
                    .zip(time_filter_bools)
×
378
                    .map(|(g, t)| g && t)
×
379
                    .collect::<Vec<bool>>();
×
380

×
381
                self.filter(filter_bools)
×
382
                    .map_err(|_err| CacheError::CouldNotFilterResults)
×
383
            }
×
384
        }
385
    };
386
}
387

388
impl_cache_result_check!(MultiPoint);
×
389
impl_cache_result_check!(MultiLineString);
×
390
impl_cache_result_check!(MultiPolygon);
×
391

392
impl<G> CacheElement for FeatureCollection<G>
393
where
394
    G: Geometry + ArrowTyped,
395
    FeatureCollection<G>: ByteSize
396
        + CacheElementHitCheck
397
        + CacheBackendElementExt<
398
            Query = VectorQueryRectangle,
399
            LandingZoneContainer = LandingZoneQueryFeatures,
400
            CacheContainer = CachedFeatures,
401
        >,
402
{
403
    type StoredCacheElement = FeatureCollection<G>;
404
    type Query = VectorQueryRectangle;
405
    type ResultStream = CacheChunkStream<G>;
406

407
    fn into_stored_element(self) -> Self::StoredCacheElement {
×
408
        self
×
409
    }
×
410

411
    fn from_stored_element_ref(stored: &Self::StoredCacheElement) -> Result<Self, CacheError> {
×
412
        Ok(stored.clone())
×
413
    }
×
414

415
    fn result_stream(
×
416
        stored_data: Arc<Vec<Self::StoredCacheElement>>,
×
417
        query: Self::Query,
×
418
    ) -> Self::ResultStream {
×
419
        CacheChunkStream::new(stored_data, query)
×
420
    }
×
421
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc