• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5660902123

25 Jul 2023 07:21PM UTC coverage: 88.918% (-0.3%) from 89.193%
5660902123

Pull #833

github

web-flow
Merge a0c13c0ab into 3d8a7e0ad
Pull Request #833: Shared-cache

1347 of 1347 new or added lines in 15 files covered. (100.0%)

105886 of 119083 relevant lines covered (88.92%)

60796.44 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/operators/src/pro/cache/cache_chunks.rs
1
use super::error::CacheError;
2
use super::shared_cache::CacheElementSubType;
3
use super::shared_cache::{
4
    CacheElement, CacheElementsContainer, CacheElementsContainerInfos, LandingZoneElementsContainer,
5
};
6
use crate::util::Result;
7
use futures::stream::FusedStream;
8
use futures::Stream;
9
use geoengine_datatypes::{
10
    collections::{
11
        DataCollection, FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
12
        GeometryCollection, IntoGeometryIterator, MultiLineStringCollection, MultiPointCollection,
13
        MultiPolygonCollection,
14
    },
15
    primitives::{
16
        Geometry, MultiLineString, MultiPoint, MultiPolygon, NoGeometry, VectorQueryRectangle,
17
    },
18
    util::{arrow::ArrowTyped, ByteSize},
19
};
20
use pin_project::pin_project;
21
use std::{pin::Pin, sync::Arc};
22

23
#[derive(Debug)]
×
24
pub enum CachedFeatures {
25
    NoGeometry(Arc<Vec<DataCollection>>),
26
    MultiPoint(Arc<Vec<MultiPointCollection>>),
27
    MultiLineString(Arc<Vec<MultiLineStringCollection>>),
28
    MultiPolygon(Arc<Vec<MultiPolygonCollection>>),
29
}
30

31
impl CachedFeatures {
32
    pub fn len(&self) -> usize {
×
33
        match self {
×
34
            CachedFeatures::NoGeometry(v) => v.len(),
×
35
            CachedFeatures::MultiPoint(v) => v.len(),
×
36
            CachedFeatures::MultiLineString(v) => v.len(),
×
37
            CachedFeatures::MultiPolygon(v) => v.len(),
×
38
        }
39
    }
×
40

41
    pub fn is_empty(&self) -> bool {
×
42
        self.len() == 0
×
43
    }
×
44

45
    pub fn is_expired(&self) -> bool {
×
46
        match self {
×
47
            CachedFeatures::NoGeometry(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
48
            CachedFeatures::MultiPoint(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
49
            CachedFeatures::MultiLineString(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
50
            CachedFeatures::MultiPolygon(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
51
        }
52
    }
×
53

54
    pub fn chunk_stream(&self, query: &VectorQueryRectangle) -> TypedCacheChunkStream {
×
55
        match self {
×
56
            CachedFeatures::NoGeometry(v) => {
×
57
                TypedCacheChunkStream::NoGeometry(CacheChunkStream::new(Arc::clone(v), *query))
×
58
            }
59
            CachedFeatures::MultiPoint(v) => {
×
60
                TypedCacheChunkStream::MultiPoint(CacheChunkStream::new(Arc::clone(v), *query))
×
61
            }
62
            CachedFeatures::MultiLineString(v) => {
×
63
                TypedCacheChunkStream::MultiLineString(CacheChunkStream::new(Arc::clone(v), *query))
×
64
            }
65
            CachedFeatures::MultiPolygon(v) => {
×
66
                TypedCacheChunkStream::MultiPolygon(CacheChunkStream::new(Arc::clone(v), *query))
×
67
            }
68
        }
69
    }
×
70
}
71

72
impl ByteSize for CachedFeatures {
73
    fn heap_byte_size(&self) -> usize {
×
74
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
75
        match self {
×
76
            CachedFeatures::NoGeometry(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
77
            CachedFeatures::MultiPoint(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
78
            CachedFeatures::MultiLineString(v) => {
×
79
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
80
            }
81
            CachedFeatures::MultiPolygon(v) => {
×
82
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
83
            }
84
        }
85
    }
×
86
}
87

88
#[derive(Debug)]
×
89
pub enum LandingZoneQueryFeatures {
90
    NoGeometry(Vec<DataCollection>),
91
    MultiPoint(Vec<MultiPointCollection>),
92
    MultiLineString(Vec<MultiLineStringCollection>),
93
    MultiPolygon(Vec<MultiPolygonCollection>),
94
}
95

96
impl LandingZoneQueryFeatures {
97
    pub fn len(&self) -> usize {
×
98
        match self {
×
99
            LandingZoneQueryFeatures::NoGeometry(v) => v.len(),
×
100
            LandingZoneQueryFeatures::MultiPoint(v) => v.len(),
×
101
            LandingZoneQueryFeatures::MultiLineString(v) => v.len(),
×
102
            LandingZoneQueryFeatures::MultiPolygon(v) => v.len(),
×
103
        }
104
    }
×
105

106
    pub fn is_empty(&self) -> bool {
×
107
        self.len() == 0
×
108
    }
×
109
}
110

111
impl ByteSize for LandingZoneQueryFeatures {
112
    fn heap_byte_size(&self) -> usize {
×
113
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
114
        match self {
×
115
            LandingZoneQueryFeatures::NoGeometry(v) => {
×
116
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
117
            }
118
            LandingZoneQueryFeatures::MultiPoint(v) => {
×
119
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
120
            }
121
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
122
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
123
            }
124
            LandingZoneQueryFeatures::MultiPolygon(v) => {
×
125
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
126
            }
127
        }
128
    }
×
129
}
130

131
impl From<LandingZoneQueryFeatures> for CachedFeatures {
132
    fn from(value: LandingZoneQueryFeatures) -> Self {
×
133
        match value {
×
134
            LandingZoneQueryFeatures::NoGeometry(v) => CachedFeatures::NoGeometry(Arc::new(v)),
×
135
            LandingZoneQueryFeatures::MultiPoint(v) => CachedFeatures::MultiPoint(Arc::new(v)),
×
136
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
137
                CachedFeatures::MultiLineString(Arc::new(v))
×
138
            }
139
            LandingZoneQueryFeatures::MultiPolygon(v) => CachedFeatures::MultiPolygon(Arc::new(v)),
×
140
        }
141
    }
×
142
}
143

144
impl CacheElementsContainerInfos<VectorQueryRectangle> for CachedFeatures {
145
    fn is_expired(&self) -> bool {
×
146
        self.is_expired()
×
147
    }
×
148
}
149

150
impl<G> CacheElementsContainer<VectorQueryRectangle, FeatureCollection<G>> for CachedFeatures
151
where
152
    G: CacheElementSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
153
    FeatureCollection<G>: CacheElementHitCheck,
154
{
155
    type ResultStream = CacheChunkStream<G>;
156

157
    fn result_stream(&self, query: &VectorQueryRectangle) -> Option<CacheChunkStream<G>> {
×
158
        G::result_stream(self, query)
×
159
    }
×
160
}
161

162
impl<G> LandingZoneElementsContainer<FeatureCollection<G>> for LandingZoneQueryFeatures
163
where
164
    G: CacheElementSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
165
    FeatureCollection<G>: CacheElementHitCheck,
166
{
167
    fn insert_element(
×
168
        &mut self,
×
169
        element: FeatureCollection<G>,
×
170
    ) -> Result<(), super::error::CacheError> {
×
171
        G::insert_element_into_landing_zone(self, element)
×
172
    }
×
173

174
    fn create_empty() -> Self {
×
175
        G::create_empty_landing_zone()
×
176
    }
×
177
}
178

179
impl<G> CacheElement for FeatureCollection<G>
180
where
181
    G: Geometry + ArrowTyped + CacheElementSubType<CacheElementType = Self> + ArrowTyped + Sized,
182
    FeatureCollection<G>: CacheElementHitCheck,
183
{
184
    type Query = VectorQueryRectangle;
185
    type LandingZoneContainer = LandingZoneQueryFeatures;
186
    type CacheContainer = CachedFeatures;
187
    type ResultStream = CacheChunkStream<G>;
188
    type CacheElementSubType = G;
189

190
    fn cache_hint(&self) -> geoengine_datatypes::primitives::CacheHint {
×
191
        self.cache_hint
×
192
    }
×
193

194
    fn typed_canonical_operator_name(
×
195
        key: crate::engine::CanonicOperatorName,
×
196
    ) -> super::shared_cache::TypedCanonicOperatorName {
×
197
        super::shared_cache::TypedCanonicOperatorName::Vector(key)
×
198
    }
×
199

200
    fn update_stored_query(&self, _query: &mut Self::Query) -> Result<(), CacheError> {
×
201
        Ok(())
×
202
    }
×
203
}
204

205
macro_rules! impl_cache_element_subtype {
206
    ($g:ty, $variant:ident) => {
207
        impl CacheElementSubType for $g {
208
            type CacheElementType = FeatureCollection<$g>;
209

210
            fn insert_element_into_landing_zone(
×
211
                landing_zone: &mut LandingZoneQueryFeatures,
×
212
                element: Self::CacheElementType,
×
213
            ) -> Result<(), super::error::CacheError> {
×
214
                match landing_zone {
×
215
                    LandingZoneQueryFeatures::$variant(v) => {
×
216
                        v.push(element);
×
217
                        Ok(())
×
218
                    }
219
                    _ => Err(super::error::CacheError::InvalidTypeForInsertion),
×
220
                }
221
            }
×
222

223
            fn create_empty_landing_zone() -> LandingZoneQueryFeatures {
×
224
                LandingZoneQueryFeatures::$variant(Vec::new())
×
225
            }
×
226

227
            fn result_stream(
228
                cache_elements_container: &CachedFeatures,
229
                query: &VectorQueryRectangle,
230
            ) -> Option<CacheChunkStream<$g>> {
231
                if let TypedCacheChunkStream::$variant(v) =
×
232
                    cache_elements_container.chunk_stream(query)
×
233
                {
234
                    Some(v)
×
235
                } else {
236
                    None
×
237
                }
238
            }
×
239
        }
240
    };
241
}
242
impl_cache_element_subtype!(NoGeometry, NoGeometry);
243
impl_cache_element_subtype!(MultiPoint, MultiPoint);
244
impl_cache_element_subtype!(MultiLineString, MultiLineString);
245
impl_cache_element_subtype!(MultiPolygon, MultiPolygon);
246

247
/// Our own tile stream that "owns" the data (more precisely a reference to the data)
248
#[pin_project(project = CacheChunkStreamProjection)]
×
249
pub struct CacheChunkStream<G> {
250
    data: Arc<Vec<FeatureCollection<G>>>,
251
    query: VectorQueryRectangle,
252
    idx: usize,
253
}
254

255
impl<G> CacheChunkStream<G>
256
where
257
    G: Geometry,
258
    FeatureCollection<G>: FeatureCollectionInfos,
259
{
260
    pub fn new(data: Arc<Vec<FeatureCollection<G>>>, query: VectorQueryRectangle) -> Self {
×
261
        Self {
×
262
            data,
×
263
            query,
×
264
            idx: 0,
×
265
        }
×
266
    }
×
267

268
    pub fn element_count(&self) -> usize {
×
269
        self.data.len()
×
270
    }
×
271
}
272

273
impl<G: Geometry> Stream for CacheChunkStream<G>
274
where
275
    FeatureCollection<G>: CacheElementHitCheck,
276
    G: ArrowTyped,
277
{
278
    type Item = Result<FeatureCollection<G>>;
279

280
    fn poll_next(
×
281
        mut self: Pin<&mut Self>,
×
282
        _cx: &mut std::task::Context<'_>,
×
283
    ) -> std::task::Poll<Option<Self::Item>> {
×
284
        let CacheChunkStreamProjection { data, query, idx } = self.as_mut().project();
×
285

286
        // return the next tile that is contained in the query, skip all tiles that are not contained
287
        for i in *idx..data.len() {
×
288
            let chunk = &data[i];
×
289
            if chunk.cache_element_hit(query) {
×
290
                // TODO: we really should cache the elements bbox somewhere
291
                let Ok(chunk) = chunk.filter_cache_element_entries(query) else {
×
292
                    // This should not happen, since we already checked that the element is contained in the query
293
                    log::error!("Could not filter cache element entries");
×
294
                    continue;
×
295
                };
296

297
                // if the chunk is empty, we can skip it
298
                if chunk.is_empty() {
×
299
                    log::trace!("Skipping empty chunk after filtering for query rectangle");
×
300
                    continue;
×
301
                }
×
302

×
303
                // set the index to the next element
×
304
                *idx = i + 1;
×
305
                return std::task::Poll::Ready(Some(Ok(chunk)));
×
306
            }
×
307
        }
308

309
        std::task::Poll::Ready(None)
×
310
    }
×
311
}
312

313
impl<G> FusedStream for CacheChunkStream<G>
314
where
315
    G: Geometry + ArrowTyped,
316
    FeatureCollection<G>: CacheElementHitCheck,
317
{
318
    fn is_terminated(&self) -> bool {
×
319
        self.idx >= self.data.len()
×
320
    }
×
321
}
322

323
pub enum TypedCacheChunkStream {
324
    NoGeometry(CacheChunkStream<NoGeometry>),
325
    MultiPoint(CacheChunkStream<MultiPoint>),
326
    MultiLineString(CacheChunkStream<MultiLineString>),
327
    MultiPolygon(CacheChunkStream<MultiPolygon>),
328
}
329

330
pub trait CacheElementHitCheck {
331
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool;
332

333
    fn filter_cache_element_entries(
334
        &self,
335
        query_rect: &VectorQueryRectangle,
336
    ) -> Result<Self, CacheError>
337
    where
338
        Self: Sized;
339
}
340

341
impl CacheElementHitCheck for FeatureCollection<NoGeometry> {
342
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
343
        let Some(time_bounds) = self.time_bounds() else {
×
344
            return false;
×
345
        };
346

347
        time_bounds == query_rect.time_interval || time_bounds.intersects(&query_rect.time_interval)
×
348
    }
×
349
    fn filter_cache_element_entries(
×
350
        &self,
×
351
        query_rect: &VectorQueryRectangle,
×
352
    ) -> Result<Self, CacheError> {
×
353
        let time_filter_bools = self
×
354
            .time_intervals()
×
355
            .iter()
×
356
            .map(|t| t.intersects(&query_rect.time_interval))
×
357
            .collect::<Vec<bool>>();
×
358
        self.filter(time_filter_bools)
×
359
            .map_err(|_err| CacheError::CouldNotFilterResults)
×
360
    }
×
361
}
362

363
macro_rules! impl_cache_result_check {
364
    ($t:ty) => {
365
        impl<'a> CacheElementHitCheck for FeatureCollection<$t>
366
        where
367
            FeatureCollection<$t>: GeometryCollection,
368
        {
369
            fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
370
                let Some(bbox) = self.bbox() else {return false;};
×
371

372
                let Some(time_bounds) = self.time_bounds() else {return false;};
×
373

374
                (bbox == query_rect.spatial_bounds
×
375
                    || bbox.intersects_bbox(&query_rect.spatial_bounds))
×
376
                    && (time_bounds == query_rect.time_interval
×
377
                        || time_bounds.intersects(&query_rect.time_interval))
×
378
            }
×
379

380
            fn filter_cache_element_entries(
×
381
                &self,
×
382
                query_rect: &VectorQueryRectangle,
×
383
            ) -> Result<Self, CacheError> {
×
384
                let geoms_filter_bools = self.geometries().map(|g| {
×
385
                    g.bbox()
386
                        .map(|bbox| bbox.intersects_bbox(&query_rect.spatial_bounds))
387
                        .unwrap_or(false)
388
                });
×
389

×
390
                let time_filter_bools = self
×
391
                    .time_intervals()
×
392
                    .iter()
×
393
                    .map(|t| t.intersects(&query_rect.time_interval));
×
394

×
395
                let filter_bools = geoms_filter_bools
×
396
                    .zip(time_filter_bools)
×
397
                    .map(|(g, t)| g && t)
×
398
                    .collect::<Vec<bool>>();
×
399

×
400
                self.filter(filter_bools)
×
401
                    .map_err(|_err| CacheError::CouldNotFilterResults)
×
402
            }
×
403
        }
404
    };
405
}
406

407
impl_cache_result_check!(MultiPoint);
×
408
impl_cache_result_check!(MultiLineString);
×
409
impl_cache_result_check!(MultiPolygon);
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc