• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5667745553

26 Jul 2023 09:52AM UTC coverage: 88.919% (-0.3%) from 89.193%
5667745553

push

github

web-flow
Merge pull request #833 from geo-engine/shared-cache

Shared-cache

1353 of 1353 new or added lines in 15 files covered. (100.0%)

105892 of 119088 relevant lines covered (88.92%)

60794.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/operators/src/pro/cache/cache_chunks.rs
1
use super::error::CacheError;
2
use super::shared_cache::CacheElementSubType;
3
use super::shared_cache::{
4
    CacheElement, CacheElementsContainer, CacheElementsContainerInfos, LandingZoneElementsContainer,
5
};
6
use crate::util::Result;
7
use futures::stream::FusedStream;
8
use futures::Stream;
9
use geoengine_datatypes::{
10
    collections::{
11
        DataCollection, FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
12
        GeometryCollection, IntoGeometryIterator, MultiLineStringCollection, MultiPointCollection,
13
        MultiPolygonCollection,
14
    },
15
    primitives::{
16
        Geometry, MultiLineString, MultiPoint, MultiPolygon, NoGeometry, VectorQueryRectangle,
17
    },
18
    util::{arrow::ArrowTyped, ByteSize},
19
};
20
use pin_project::pin_project;
21
use std::{pin::Pin, sync::Arc};
22

23
#[derive(Debug)]
×
24
pub enum CachedFeatures {
25
    NoGeometry(Arc<Vec<DataCollection>>),
26
    MultiPoint(Arc<Vec<MultiPointCollection>>),
27
    MultiLineString(Arc<Vec<MultiLineStringCollection>>),
28
    MultiPolygon(Arc<Vec<MultiPolygonCollection>>),
29
}
30

31
impl CachedFeatures {
32
    pub fn len(&self) -> usize {
×
33
        match self {
×
34
            CachedFeatures::NoGeometry(v) => v.len(),
×
35
            CachedFeatures::MultiPoint(v) => v.len(),
×
36
            CachedFeatures::MultiLineString(v) => v.len(),
×
37
            CachedFeatures::MultiPolygon(v) => v.len(),
×
38
        }
39
    }
×
40

41
    pub fn is_empty(&self) -> bool {
×
42
        self.len() == 0
×
43
    }
×
44

45
    pub fn is_expired(&self) -> bool {
×
46
        match self {
×
47
            CachedFeatures::NoGeometry(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
48
            CachedFeatures::MultiPoint(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
49
            CachedFeatures::MultiLineString(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
50
            CachedFeatures::MultiPolygon(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
51
        }
52
    }
×
53

54
    pub fn chunk_stream(&self, query: &VectorQueryRectangle) -> TypedCacheChunkStream {
×
55
        match self {
×
56
            CachedFeatures::NoGeometry(v) => {
×
57
                TypedCacheChunkStream::NoGeometry(CacheChunkStream::new(Arc::clone(v), *query))
×
58
            }
59
            CachedFeatures::MultiPoint(v) => {
×
60
                TypedCacheChunkStream::MultiPoint(CacheChunkStream::new(Arc::clone(v), *query))
×
61
            }
62
            CachedFeatures::MultiLineString(v) => {
×
63
                TypedCacheChunkStream::MultiLineString(CacheChunkStream::new(Arc::clone(v), *query))
×
64
            }
65
            CachedFeatures::MultiPolygon(v) => {
×
66
                TypedCacheChunkStream::MultiPolygon(CacheChunkStream::new(Arc::clone(v), *query))
×
67
            }
68
        }
69
    }
×
70
}
71

72
impl ByteSize for CachedFeatures {
73
    fn heap_byte_size(&self) -> usize {
×
74
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
75
        match self {
×
76
            CachedFeatures::NoGeometry(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
77
            CachedFeatures::MultiPoint(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
78
            CachedFeatures::MultiLineString(v) => {
×
79
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
80
            }
81
            CachedFeatures::MultiPolygon(v) => {
×
82
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
83
            }
84
        }
85
    }
×
86
}
87

88
#[derive(Debug)]
×
89
pub enum LandingZoneQueryFeatures {
90
    NoGeometry(Vec<DataCollection>),
91
    MultiPoint(Vec<MultiPointCollection>),
92
    MultiLineString(Vec<MultiLineStringCollection>),
93
    MultiPolygon(Vec<MultiPolygonCollection>),
94
}
95

96
impl LandingZoneQueryFeatures {
97
    pub fn len(&self) -> usize {
×
98
        match self {
×
99
            LandingZoneQueryFeatures::NoGeometry(v) => v.len(),
×
100
            LandingZoneQueryFeatures::MultiPoint(v) => v.len(),
×
101
            LandingZoneQueryFeatures::MultiLineString(v) => v.len(),
×
102
            LandingZoneQueryFeatures::MultiPolygon(v) => v.len(),
×
103
        }
104
    }
×
105

106
    pub fn is_empty(&self) -> bool {
×
107
        self.len() == 0
×
108
    }
×
109
}
110

111
impl ByteSize for LandingZoneQueryFeatures {
112
    fn heap_byte_size(&self) -> usize {
×
113
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
114
        match self {
×
115
            LandingZoneQueryFeatures::NoGeometry(v) => {
×
116
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
117
            }
118
            LandingZoneQueryFeatures::MultiPoint(v) => {
×
119
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
120
            }
121
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
122
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
123
            }
124
            LandingZoneQueryFeatures::MultiPolygon(v) => {
×
125
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
126
            }
127
        }
128
    }
×
129
}
130

131
impl From<LandingZoneQueryFeatures> for CachedFeatures {
132
    fn from(value: LandingZoneQueryFeatures) -> Self {
×
133
        match value {
×
134
            LandingZoneQueryFeatures::NoGeometry(v) => CachedFeatures::NoGeometry(Arc::new(v)),
×
135
            LandingZoneQueryFeatures::MultiPoint(v) => CachedFeatures::MultiPoint(Arc::new(v)),
×
136
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
137
                CachedFeatures::MultiLineString(Arc::new(v))
×
138
            }
139
            LandingZoneQueryFeatures::MultiPolygon(v) => CachedFeatures::MultiPolygon(Arc::new(v)),
×
140
        }
141
    }
×
142
}
143

144
impl CacheElementsContainerInfos<VectorQueryRectangle> for CachedFeatures {
145
    fn is_expired(&self) -> bool {
×
146
        self.is_expired()
×
147
    }
×
148
}
149

150
impl<G> CacheElementsContainer<VectorQueryRectangle, FeatureCollection<G>> for CachedFeatures
151
where
152
    G: CacheElementSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
153
    FeatureCollection<G>: CacheElementHitCheck,
154
{
155
    type ResultStream = CacheChunkStream<G>;
156

157
    fn result_stream(&self, query: &VectorQueryRectangle) -> Option<CacheChunkStream<G>> {
×
158
        G::result_stream(self, query)
×
159
    }
×
160
}
161

162
impl<G> LandingZoneElementsContainer<FeatureCollection<G>> for LandingZoneQueryFeatures
163
where
164
    G: CacheElementSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
165
    FeatureCollection<G>: CacheElementHitCheck,
166
{
167
    fn insert_element(
×
168
        &mut self,
×
169
        element: FeatureCollection<G>,
×
170
    ) -> Result<(), super::error::CacheError> {
×
171
        G::insert_element_into_landing_zone(self, element)
×
172
    }
×
173

174
    fn create_empty() -> Self {
×
175
        G::create_empty_landing_zone()
×
176
    }
×
177
}
178

179
impl<G> CacheElement for FeatureCollection<G>
180
where
181
    G: Geometry + ArrowTyped + CacheElementSubType<CacheElementType = Self> + ArrowTyped + Sized,
182
    FeatureCollection<G>: CacheElementHitCheck,
183
{
184
    type Query = VectorQueryRectangle;
185
    type LandingZoneContainer = LandingZoneQueryFeatures;
186
    type CacheContainer = CachedFeatures;
187
    type ResultStream = CacheChunkStream<G>;
188
    type CacheElementSubType = G;
189

190
    fn cache_hint(&self) -> geoengine_datatypes::primitives::CacheHint {
×
191
        self.cache_hint
×
192
    }
×
193

194
    fn typed_canonical_operator_name(
×
195
        key: crate::engine::CanonicOperatorName,
×
196
    ) -> super::shared_cache::TypedCanonicOperatorName {
×
197
        super::shared_cache::TypedCanonicOperatorName::Vector(key)
×
198
    }
×
199

200
    fn update_stored_query(&self, _query: &mut Self::Query) -> Result<(), CacheError> {
×
201
        // In this case, the elements of the cache are vector data chunks.
×
202
        // Unlike raster data, chunks have no guaranteed extent (spatial or temporal) other than the limits of the query itself.
×
203
        // If a vector element has a larger extent than the query, then the bbox computed for the collection is larger than the query bbox.
×
204
        // However, there may be a point that is outside the query bbox but inside the collection bbox. As it is not in the query bbox, it must not be returned as the result of the query.
×
205
        // So the query is not updated.
×
206
        Ok(())
×
207
    }
×
208
}
209

210
macro_rules! impl_cache_element_subtype {
211
    ($g:ty, $variant:ident) => {
212
        impl CacheElementSubType for $g {
213
            type CacheElementType = FeatureCollection<$g>;
214

215
            fn insert_element_into_landing_zone(
×
216
                landing_zone: &mut LandingZoneQueryFeatures,
×
217
                element: Self::CacheElementType,
×
218
            ) -> Result<(), super::error::CacheError> {
×
219
                match landing_zone {
×
220
                    LandingZoneQueryFeatures::$variant(v) => {
×
221
                        v.push(element);
×
222
                        Ok(())
×
223
                    }
224
                    _ => Err(super::error::CacheError::InvalidTypeForInsertion),
×
225
                }
226
            }
×
227

228
            fn create_empty_landing_zone() -> LandingZoneQueryFeatures {
×
229
                LandingZoneQueryFeatures::$variant(Vec::new())
×
230
            }
×
231

232
            fn result_stream(
233
                cache_elements_container: &CachedFeatures,
234
                query: &VectorQueryRectangle,
235
            ) -> Option<CacheChunkStream<$g>> {
236
                if let TypedCacheChunkStream::$variant(v) =
×
237
                    cache_elements_container.chunk_stream(query)
×
238
                {
239
                    Some(v)
×
240
                } else {
241
                    None
×
242
                }
243
            }
×
244
        }
245
    };
246
}
247
impl_cache_element_subtype!(NoGeometry, NoGeometry);
248
impl_cache_element_subtype!(MultiPoint, MultiPoint);
249
impl_cache_element_subtype!(MultiLineString, MultiLineString);
250
impl_cache_element_subtype!(MultiPolygon, MultiPolygon);
251

252
/// Our own tile stream that "owns" the data (more precisely a reference to the data)
253
#[pin_project(project = CacheChunkStreamProjection)]
×
254
pub struct CacheChunkStream<G> {
255
    data: Arc<Vec<FeatureCollection<G>>>,
256
    query: VectorQueryRectangle,
257
    idx: usize,
258
}
259

260
impl<G> CacheChunkStream<G>
261
where
262
    G: Geometry,
263
    FeatureCollection<G>: FeatureCollectionInfos,
264
{
265
    pub fn new(data: Arc<Vec<FeatureCollection<G>>>, query: VectorQueryRectangle) -> Self {
×
266
        Self {
×
267
            data,
×
268
            query,
×
269
            idx: 0,
×
270
        }
×
271
    }
×
272

273
    pub fn element_count(&self) -> usize {
×
274
        self.data.len()
×
275
    }
×
276
}
277

278
impl<G: Geometry> Stream for CacheChunkStream<G>
279
where
280
    FeatureCollection<G>: CacheElementHitCheck,
281
    G: ArrowTyped,
282
{
283
    type Item = Result<FeatureCollection<G>>;
284

285
    fn poll_next(
×
286
        mut self: Pin<&mut Self>,
×
287
        _cx: &mut std::task::Context<'_>,
×
288
    ) -> std::task::Poll<Option<Self::Item>> {
×
289
        let CacheChunkStreamProjection { data, query, idx } = self.as_mut().project();
×
290

291
        // return the next tile that is contained in the query, skip all tiles that are not contained
292
        for i in *idx..data.len() {
×
293
            let chunk = &data[i];
×
294
            if chunk.cache_element_hit(query) {
×
295
                // TODO: we really should cache the elements bbox somewhere
296
                let Ok(chunk) = chunk.filter_cache_element_entries(query) else {
×
297
                    // This should not happen, since we already checked that the element is contained in the query
298
                    log::error!("Could not filter cache element entries");
×
299
                    continue;
×
300
                };
301

302
                // if the chunk is empty, we can skip it
303
                if chunk.is_empty() {
×
304
                    log::trace!("Skipping empty chunk after filtering for query rectangle");
×
305
                    continue;
×
306
                }
×
307

×
308
                // set the index to the next element
×
309
                *idx = i + 1;
×
310
                return std::task::Poll::Ready(Some(Ok(chunk)));
×
311
            }
×
312
        }
313

314
        std::task::Poll::Ready(None)
×
315
    }
×
316
}
317

318
impl<G> FusedStream for CacheChunkStream<G>
319
where
320
    G: Geometry + ArrowTyped,
321
    FeatureCollection<G>: CacheElementHitCheck,
322
{
323
    fn is_terminated(&self) -> bool {
×
324
        self.idx >= self.data.len()
×
325
    }
×
326
}
327

328
pub enum TypedCacheChunkStream {
329
    NoGeometry(CacheChunkStream<NoGeometry>),
330
    MultiPoint(CacheChunkStream<MultiPoint>),
331
    MultiLineString(CacheChunkStream<MultiLineString>),
332
    MultiPolygon(CacheChunkStream<MultiPolygon>),
333
}
334

335
pub trait CacheElementHitCheck {
336
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool;
337

338
    fn filter_cache_element_entries(
339
        &self,
340
        query_rect: &VectorQueryRectangle,
341
    ) -> Result<Self, CacheError>
342
    where
343
        Self: Sized;
344
}
345

346
impl CacheElementHitCheck for FeatureCollection<NoGeometry> {
347
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
348
        let Some(time_bounds) = self.time_bounds() else {
×
349
            return false;
×
350
        };
351

352
        time_bounds == query_rect.time_interval || time_bounds.intersects(&query_rect.time_interval)
×
353
    }
×
354
    fn filter_cache_element_entries(
×
355
        &self,
×
356
        query_rect: &VectorQueryRectangle,
×
357
    ) -> Result<Self, CacheError> {
×
358
        let time_filter_bools = self
×
359
            .time_intervals()
×
360
            .iter()
×
361
            .map(|t| t.intersects(&query_rect.time_interval))
×
362
            .collect::<Vec<bool>>();
×
363
        self.filter(time_filter_bools)
×
364
            .map_err(|_err| CacheError::CouldNotFilterResults)
×
365
    }
×
366
}
367

368
macro_rules! impl_cache_result_check {
369
    ($t:ty) => {
370
        impl<'a> CacheElementHitCheck for FeatureCollection<$t>
371
        where
372
            FeatureCollection<$t>: GeometryCollection,
373
        {
374
            fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
375
                let Some(bbox) = self.bbox() else {return false;};
×
376

377
                let Some(time_bounds) = self.time_bounds() else {return false;};
×
378

379
                (bbox == query_rect.spatial_bounds
×
380
                    || bbox.intersects_bbox(&query_rect.spatial_bounds))
×
381
                    && (time_bounds == query_rect.time_interval
×
382
                        || time_bounds.intersects(&query_rect.time_interval))
×
383
            }
×
384

385
            fn filter_cache_element_entries(
×
386
                &self,
×
387
                query_rect: &VectorQueryRectangle,
×
388
            ) -> Result<Self, CacheError> {
×
389
                let geoms_filter_bools = self.geometries().map(|g| {
×
390
                    g.bbox()
391
                        .map(|bbox| bbox.intersects_bbox(&query_rect.spatial_bounds))
392
                        .unwrap_or(false)
393
                });
×
394

×
395
                let time_filter_bools = self
×
396
                    .time_intervals()
×
397
                    .iter()
×
398
                    .map(|t| t.intersects(&query_rect.time_interval));
×
399

×
400
                let filter_bools = geoms_filter_bools
×
401
                    .zip(time_filter_bools)
×
402
                    .map(|(g, t)| g && t)
×
403
                    .collect::<Vec<bool>>();
×
404

×
405
                self.filter(filter_bools)
×
406
                    .map_err(|_err| CacheError::CouldNotFilterResults)
×
407
            }
×
408
        }
409
    };
410
}
411

412
impl_cache_result_check!(MultiPoint);
×
413
impl_cache_result_check!(MultiLineString);
×
414
impl_cache_result_check!(MultiPolygon);
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc