• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5641674846

24 Jul 2023 06:42AM UTC coverage: 88.938% (-0.2%) from 89.184%
5641674846

Pull #833

github

web-flow
Merge 054f347f4 into 8c287ecf7
Pull Request #833: Shared-cache

1301 of 1301 new or added lines in 15 files covered. (100.0%)

105810 of 118970 relevant lines covered (88.94%)

60854.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/operators/src/pro/cache/cache_chunks.rs
1
use super::error::CacheError;
2
use super::shared_cache::CacheElementSubType;
3
use super::shared_cache::{
4
    CacheElement, CacheElementsContainer, CacheElementsContainerInfos, LandingZoneElementsContainer,
5
};
6
use crate::util::Result;
7
use futures::Stream;
8
use geoengine_datatypes::{
9
    collections::{
10
        DataCollection, FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
11
        GeometryCollection, IntoGeometryIterator, MultiLineStringCollection, MultiPointCollection,
12
        MultiPolygonCollection,
13
    },
14
    primitives::{
15
        Geometry, MultiLineString, MultiPoint, MultiPolygon, NoGeometry, VectorQueryRectangle,
16
    },
17
    util::{arrow::ArrowTyped, ByteSize},
18
};
19
use pin_project::pin_project;
20
use std::{pin::Pin, sync::Arc};
21

22
#[derive(Debug)]
×
23
pub enum CachedFeatures {
24
    NoGeometry(Arc<Vec<DataCollection>>),
25
    MultiPoint(Arc<Vec<MultiPointCollection>>),
26
    MultiLineString(Arc<Vec<MultiLineStringCollection>>),
27
    MultiPolygon(Arc<Vec<MultiPolygonCollection>>),
28
}
29

30
impl CachedFeatures {
31
    pub fn len(&self) -> usize {
×
32
        match self {
×
33
            CachedFeatures::NoGeometry(v) => v.len(),
×
34
            CachedFeatures::MultiPoint(v) => v.len(),
×
35
            CachedFeatures::MultiLineString(v) => v.len(),
×
36
            CachedFeatures::MultiPolygon(v) => v.len(),
×
37
        }
38
    }
×
39

40
    pub fn is_empty(&self) -> bool {
×
41
        self.len() == 0
×
42
    }
×
43

44
    pub fn is_expired(&self) -> bool {
×
45
        match self {
×
46
            CachedFeatures::NoGeometry(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
47
            CachedFeatures::MultiPoint(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
48
            CachedFeatures::MultiLineString(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
49
            CachedFeatures::MultiPolygon(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
50
        }
51
    }
×
52

53
    pub fn chunk_stream(&self, query: &VectorQueryRectangle) -> TypedCacheChunkStream {
×
54
        match self {
×
55
            CachedFeatures::NoGeometry(v) => {
×
56
                TypedCacheChunkStream::NoGeometry(CacheChunkStream::new(Arc::clone(v), *query))
×
57
            }
58
            CachedFeatures::MultiPoint(v) => {
×
59
                TypedCacheChunkStream::MultiPoint(CacheChunkStream::new(Arc::clone(v), *query))
×
60
            }
61
            CachedFeatures::MultiLineString(v) => {
×
62
                TypedCacheChunkStream::MultiLineString(CacheChunkStream::new(Arc::clone(v), *query))
×
63
            }
64
            CachedFeatures::MultiPolygon(v) => {
×
65
                TypedCacheChunkStream::MultiPolygon(CacheChunkStream::new(Arc::clone(v), *query))
×
66
            }
67
        }
68
    }
×
69
}
70

71
impl ByteSize for CachedFeatures {
72
    fn heap_byte_size(&self) -> usize {
×
73
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
74
        match self {
×
75
            CachedFeatures::NoGeometry(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
76
            CachedFeatures::MultiPoint(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
77
            CachedFeatures::MultiLineString(v) => {
×
78
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
79
            }
80
            CachedFeatures::MultiPolygon(v) => {
×
81
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
82
            }
83
        }
84
    }
×
85
}
86

87
#[derive(Debug)]
×
88
pub enum LandingZoneQueryFeatures {
89
    NoGeometry(Vec<DataCollection>),
90
    MultiPoint(Vec<MultiPointCollection>),
91
    MultiLineString(Vec<MultiLineStringCollection>),
92
    MultiPolygon(Vec<MultiPolygonCollection>),
93
}
94

95
impl LandingZoneQueryFeatures {
96
    pub fn len(&self) -> usize {
×
97
        match self {
×
98
            LandingZoneQueryFeatures::NoGeometry(v) => v.len(),
×
99
            LandingZoneQueryFeatures::MultiPoint(v) => v.len(),
×
100
            LandingZoneQueryFeatures::MultiLineString(v) => v.len(),
×
101
            LandingZoneQueryFeatures::MultiPolygon(v) => v.len(),
×
102
        }
103
    }
×
104

105
    pub fn is_empty(&self) -> bool {
×
106
        self.len() == 0
×
107
    }
×
108
}
109

110
impl ByteSize for LandingZoneQueryFeatures {
111
    fn heap_byte_size(&self) -> usize {
×
112
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
113
        match self {
×
114
            LandingZoneQueryFeatures::NoGeometry(v) => {
×
115
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
116
            }
117
            LandingZoneQueryFeatures::MultiPoint(v) => {
×
118
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
119
            }
120
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
121
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
122
            }
123
            LandingZoneQueryFeatures::MultiPolygon(v) => {
×
124
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
125
            }
126
        }
127
    }
×
128
}
129

130
impl From<LandingZoneQueryFeatures> for CachedFeatures {
131
    fn from(value: LandingZoneQueryFeatures) -> Self {
×
132
        match value {
×
133
            LandingZoneQueryFeatures::NoGeometry(v) => CachedFeatures::NoGeometry(Arc::new(v)),
×
134
            LandingZoneQueryFeatures::MultiPoint(v) => CachedFeatures::MultiPoint(Arc::new(v)),
×
135
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
136
                CachedFeatures::MultiLineString(Arc::new(v))
×
137
            }
138
            LandingZoneQueryFeatures::MultiPolygon(v) => CachedFeatures::MultiPolygon(Arc::new(v)),
×
139
        }
140
    }
×
141
}
142

143
impl CacheElementsContainerInfos<VectorQueryRectangle> for CachedFeatures {
144
    fn is_expired(&self) -> bool {
×
145
        self.is_expired()
×
146
    }
×
147
}
148

149
impl<G> CacheElementsContainer<VectorQueryRectangle, FeatureCollection<G>> for CachedFeatures
150
where
151
    G: CacheElementSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
152
    FeatureCollection<G>: CacheElementHitCheck,
153
{
154
    type ResultStream = CacheChunkStream<G>;
155

156
    fn result_stream(&self, query: &VectorQueryRectangle) -> Option<CacheChunkStream<G>> {
×
157
        G::result_stream(self, query)
×
158
    }
×
159
}
160

161
impl<G> LandingZoneElementsContainer<FeatureCollection<G>> for LandingZoneQueryFeatures
162
where
163
    G: CacheElementSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
164
    FeatureCollection<G>: CacheElementHitCheck,
165
{
166
    fn insert_element(
×
167
        &mut self,
×
168
        element: FeatureCollection<G>,
×
169
    ) -> Result<(), super::error::CacheError> {
×
170
        G::insert_element_into_landing_zone(self, element)
×
171
    }
×
172

173
    fn create_empty() -> Self {
×
174
        G::create_empty_landing_zone()
×
175
    }
×
176
}
177

178
impl<G> CacheElement for FeatureCollection<G>
179
where
180
    G: Geometry + ArrowTyped + CacheElementSubType<CacheElementType = Self> + ArrowTyped + Sized,
181
    FeatureCollection<G>: CacheElementHitCheck,
182
{
183
    type Query = VectorQueryRectangle;
184
    type LandingZoneContainer = LandingZoneQueryFeatures;
185
    type CacheContainer = CachedFeatures;
186
    type ResultStream = CacheChunkStream<G>;
187
    type CacheElementSubType = G;
188

189
    fn cache_hint(&self) -> geoengine_datatypes::primitives::CacheHint {
×
190
        self.cache_hint
×
191
    }
×
192

193
    fn typed_canonical_operator_name(
×
194
        key: crate::engine::CanonicOperatorName,
×
195
    ) -> super::shared_cache::TypedCanonicOperatorName {
×
196
        super::shared_cache::TypedCanonicOperatorName::Vector(key)
×
197
    }
×
198

199
    fn update_stored_query(&self, _query: &mut Self::Query) -> Result<(), CacheError> {
×
200
        Ok(())
×
201
    }
×
202
}
203

204
macro_rules! impl_cache_element_subtype_magic {
205
    ($g:ty, $variant:ident) => {
206
        impl CacheElementSubType for $g {
207
            type CacheElementType = FeatureCollection<$g>;
208

209
            fn insert_element_into_landing_zone(
×
210
                landing_zone: &mut LandingZoneQueryFeatures,
×
211
                element: Self::CacheElementType,
×
212
            ) -> Result<(), super::error::CacheError> {
×
213
                match landing_zone {
×
214
                    LandingZoneQueryFeatures::$variant(v) => {
×
215
                        v.push(element);
×
216
                        Ok(())
×
217
                    }
218
                    _ => Err(super::error::CacheError::InvalidTypeForInsertion),
×
219
                }
220
            }
×
221

222
            fn create_empty_landing_zone() -> LandingZoneQueryFeatures {
×
223
                LandingZoneQueryFeatures::$variant(Vec::new())
×
224
            }
×
225

226
            fn result_stream(
227
                cache_elements_container: &CachedFeatures,
228
                query: &VectorQueryRectangle,
229
            ) -> Option<CacheChunkStream<$g>> {
230
                if let TypedCacheChunkStream::$variant(v) =
×
231
                    cache_elements_container.chunk_stream(query)
×
232
                {
233
                    Some(v)
×
234
                } else {
235
                    None
×
236
                }
237
            }
×
238
        }
239
    };
240
}
241
impl_cache_element_subtype_magic!(NoGeometry, NoGeometry);
242
impl_cache_element_subtype_magic!(MultiPoint, MultiPoint);
243
impl_cache_element_subtype_magic!(MultiLineString, MultiLineString);
244
impl_cache_element_subtype_magic!(MultiPolygon, MultiPolygon);
245

246
/// Our own tile stream that "owns" the data (more precisely a reference to the data)
247
#[pin_project(project = CacheChunkStreamProjection)]
×
248
pub struct CacheChunkStream<G> {
249
    data: Arc<Vec<FeatureCollection<G>>>,
250
    query: VectorQueryRectangle,
251
    idx: usize,
252
}
253

254
impl<G> CacheChunkStream<G>
255
where
256
    G: Geometry,
257
    FeatureCollection<G>: FeatureCollectionInfos,
258
{
259
    pub fn new(data: Arc<Vec<FeatureCollection<G>>>, query: VectorQueryRectangle) -> Self {
×
260
        Self {
×
261
            data,
×
262
            query,
×
263
            idx: 0,
×
264
        }
×
265
    }
×
266

267
    pub fn element_count(&self) -> usize {
×
268
        self.data.len()
×
269
    }
×
270
}
271

272
impl<G: Geometry> Stream for CacheChunkStream<G>
273
where
274
    FeatureCollection<G>: CacheElementHitCheck,
275
    G: ArrowTyped,
276
{
277
    type Item = Result<FeatureCollection<G>>;
278

279
    fn poll_next(
×
280
        mut self: Pin<&mut Self>,
×
281
        _cx: &mut std::task::Context<'_>,
×
282
    ) -> std::task::Poll<Option<Self::Item>> {
×
283
        let CacheChunkStreamProjection { data, query, idx } = self.as_mut().project();
×
284

285
        // return the next tile that is contained in the query, skip all tiles that are not contained
286
        for i in *idx..data.len() {
×
287
            let chunk = &data[i];
×
288
            if chunk.cache_element_hit(query) {
×
289
                // TODO: we really should cache the elements bbox somewhere
290
                let Ok(chunk) = chunk.filter_cache_element_entries(query) else {
×
291
                    // This should not happen, since we already checked that the element is contained in the query
292
                    log::error!("Could not filter cache element entries");
×
293
                    continue;
×
294
                };
295

296
                // if the chunk is empty, we can skip it
297
                if chunk.is_empty() {
×
298
                    log::trace!("Skipping empty chunk after filtering for query rectangle");
×
299
                    continue;
×
300
                }
×
301

×
302
                // set the index to the next element
×
303
                *idx = i + 1;
×
304
                return std::task::Poll::Ready(Some(Ok(chunk)));
×
305
            }
×
306
        }
307

308
        std::task::Poll::Ready(None)
×
309
    }
×
310
}
311

312
pub enum TypedCacheChunkStream {
313
    NoGeometry(CacheChunkStream<NoGeometry>),
314
    MultiPoint(CacheChunkStream<MultiPoint>),
315
    MultiLineString(CacheChunkStream<MultiLineString>),
316
    MultiPolygon(CacheChunkStream<MultiPolygon>),
317
}
318

319
pub trait CacheElementHitCheck {
320
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool;
321

322
    fn filter_cache_element_entries(
323
        &self,
324
        query_rect: &VectorQueryRectangle,
325
    ) -> Result<Self, CacheError>
326
    where
327
        Self: Sized;
328
}
329

330
impl CacheElementHitCheck for FeatureCollection<NoGeometry> {
331
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
332
        let Some(time_bounds) = self.time_bounds() else {
×
333
            return false;
×
334
        };
335

336
        time_bounds == query_rect.time_interval || time_bounds.intersects(&query_rect.time_interval)
×
337
    }
×
338
    fn filter_cache_element_entries(
×
339
        &self,
×
340
        query_rect: &VectorQueryRectangle,
×
341
    ) -> Result<Self, CacheError> {
×
342
        let time_filter_bools = self
×
343
            .time_intervals()
×
344
            .iter()
×
345
            .map(|t| t.intersects(&query_rect.time_interval))
×
346
            .collect::<Vec<bool>>();
×
347
        self.filter(time_filter_bools)
×
348
            .map_err(|_err| CacheError::CouldNotFilterResults)
×
349
    }
×
350
}
351

352
macro_rules! impl_cache_result_check {
353
    ($t:ty) => {
354
        impl<'a> CacheElementHitCheck for FeatureCollection<$t>
355
        where
356
            FeatureCollection<$t>: GeometryCollection,
357
        {
358
            fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
359
                let Some(bbox) = self.bbox() else {return false;};
×
360

361
                let Some(time_bounds) = self.time_bounds() else {return false;};
×
362

363
                (bbox == query_rect.spatial_bounds
×
364
                    || bbox.intersects_bbox(&query_rect.spatial_bounds))
×
365
                    && (time_bounds == query_rect.time_interval
×
366
                        || time_bounds.intersects(&query_rect.time_interval))
×
367
            }
×
368

369
            fn filter_cache_element_entries(
×
370
                &self,
×
371
                query_rect: &VectorQueryRectangle,
×
372
            ) -> Result<Self, CacheError> {
×
373
                let geoms_filter_bools = self.geometries().map(|g| {
×
374
                    g.bbox()
375
                        .map(|bbox| bbox.intersects_bbox(&query_rect.spatial_bounds))
376
                        .unwrap_or(false)
377
                });
×
378

×
379
                let time_filter_bools = self
×
380
                    .time_intervals()
×
381
                    .iter()
×
382
                    .map(|t| t.intersects(&query_rect.time_interval));
×
383

×
384
                let filter_bools = geoms_filter_bools
×
385
                    .zip(time_filter_bools)
×
386
                    .map(|(g, t)| g && t)
×
387
                    .collect::<Vec<bool>>();
×
388

×
389
                self.filter(filter_bools)
×
390
                    .map_err(|_err| CacheError::CouldNotFilterResults)
×
391
            }
×
392
        }
393
    };
394
}
395

396
impl_cache_result_check!(MultiPoint);
×
397
impl_cache_result_check!(MultiLineString);
×
398
impl_cache_result_check!(MultiPolygon);
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc