• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5629709896

22 Jul 2023 08:23AM UTC coverage: 88.937% (-0.2%) from 89.184%
5629709896

Pull #833

github

web-flow
Merge e15895ab3 into 8c287ecf7
Pull Request #833: Shared-cache

1288 of 1288 new or added lines in 15 files covered. (100.0%)

105797 of 118957 relevant lines covered (88.94%)

60860.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/operators/src/pro/cache/cache_chunks.rs
1
use super::error::CacheError;
2
use super::shared_cache::CacheElementSubType;
3
use super::shared_cache::{
4
    CacheElement, CacheElementsContainer, CacheElementsContainerInfos, LandingZoneElementsContainer,
5
};
6
use crate::util::Result;
7
use futures::Stream;
8
use geoengine_datatypes::{
9
    collections::{
10
        DataCollection, FeatureCollection, FeatureCollectionInfos, FeatureCollectionModifications,
11
        GeometryCollection, IntoGeometryIterator, MultiLineStringCollection, MultiPointCollection,
12
        MultiPolygonCollection,
13
    },
14
    primitives::{
15
        Geometry, MultiLineString, MultiPoint, MultiPolygon, NoGeometry, VectorQueryRectangle,
16
    },
17
    util::{arrow::ArrowTyped, ByteSize},
18
};
19
use pin_project::pin_project;
20
use std::{pin::Pin, sync::Arc};
21

22
#[derive(Debug)]
×
23
pub enum CachedFeatures {
24
    NoGeometry(Arc<Vec<DataCollection>>),
25
    MultiPoint(Arc<Vec<MultiPointCollection>>),
26
    MultiLineString(Arc<Vec<MultiLineStringCollection>>),
27
    MultiPolygon(Arc<Vec<MultiPolygonCollection>>),
28
}
29

30
impl CachedFeatures {
31
    pub fn len(&self) -> usize {
×
32
        match self {
×
33
            CachedFeatures::NoGeometry(v) => v.len(),
×
34
            CachedFeatures::MultiPoint(v) => v.len(),
×
35
            CachedFeatures::MultiLineString(v) => v.len(),
×
36
            CachedFeatures::MultiPolygon(v) => v.len(),
×
37
        }
38
    }
×
39

40
    pub fn is_empty(&self) -> bool {
×
41
        self.len() == 0
×
42
    }
×
43

44
    pub fn is_expired(&self) -> bool {
×
45
        match self {
×
46
            CachedFeatures::NoGeometry(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
47
            CachedFeatures::MultiPoint(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
48
            CachedFeatures::MultiLineString(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
49
            CachedFeatures::MultiPolygon(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
50
        }
51
    }
×
52

53
    pub fn chunk_stream(&self, query: &VectorQueryRectangle) -> TypedCacheChunkStream {
×
54
        match self {
×
55
            CachedFeatures::NoGeometry(v) => {
×
56
                TypedCacheChunkStream::NoGeometry(CacheChunkStream::new(Arc::clone(v), *query))
×
57
            }
58
            CachedFeatures::MultiPoint(v) => {
×
59
                TypedCacheChunkStream::MultiPoint(CacheChunkStream::new(Arc::clone(v), *query))
×
60
            }
61
            CachedFeatures::MultiLineString(v) => {
×
62
                TypedCacheChunkStream::MultiLineString(CacheChunkStream::new(Arc::clone(v), *query))
×
63
            }
64
            CachedFeatures::MultiPolygon(v) => {
×
65
                TypedCacheChunkStream::MultiPolygon(CacheChunkStream::new(Arc::clone(v), *query))
×
66
            }
67
        }
68
    }
×
69
}
70

71
impl ByteSize for CachedFeatures {
72
    fn heap_byte_size(&self) -> usize {
×
73
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
74
        match self {
×
75
            CachedFeatures::NoGeometry(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
76
            CachedFeatures::MultiPoint(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
77
            CachedFeatures::MultiLineString(v) => {
×
78
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
79
            }
80
            CachedFeatures::MultiPolygon(v) => {
×
81
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
82
            }
83
        }
84
    }
×
85
}
86

87
#[derive(Debug)]
×
88
pub enum LandingZoneQueryFeatures {
89
    NoGeometry(Vec<DataCollection>),
90
    MultiPoint(Vec<MultiPointCollection>),
91
    MultiLineString(Vec<MultiLineStringCollection>),
92
    MultiPolygon(Vec<MultiPolygonCollection>),
93
}
94

95
impl LandingZoneQueryFeatures {
96
    pub fn len(&self) -> usize {
×
97
        match self {
×
98
            LandingZoneQueryFeatures::NoGeometry(v) => v.len(),
×
99
            LandingZoneQueryFeatures::MultiPoint(v) => v.len(),
×
100
            LandingZoneQueryFeatures::MultiLineString(v) => v.len(),
×
101
            LandingZoneQueryFeatures::MultiPolygon(v) => v.len(),
×
102
        }
103
    }
×
104

105
    pub fn is_empty(&self) -> bool {
×
106
        self.len() == 0
×
107
    }
×
108
}
109

110
impl ByteSize for LandingZoneQueryFeatures {
111
    fn heap_byte_size(&self) -> usize {
×
112
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
113
        match self {
×
114
            LandingZoneQueryFeatures::NoGeometry(v) => {
×
115
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
116
            }
117
            LandingZoneQueryFeatures::MultiPoint(v) => {
×
118
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
119
            }
120
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
121
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
122
            }
123
            LandingZoneQueryFeatures::MultiPolygon(v) => {
×
124
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
125
            }
126
        }
127
    }
×
128
}
129

130
impl From<LandingZoneQueryFeatures> for CachedFeatures {
131
    fn from(value: LandingZoneQueryFeatures) -> Self {
×
132
        match value {
×
133
            LandingZoneQueryFeatures::NoGeometry(v) => CachedFeatures::NoGeometry(Arc::new(v)),
×
134
            LandingZoneQueryFeatures::MultiPoint(v) => CachedFeatures::MultiPoint(Arc::new(v)),
×
135
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
136
                CachedFeatures::MultiLineString(Arc::new(v))
×
137
            }
138
            LandingZoneQueryFeatures::MultiPolygon(v) => CachedFeatures::MultiPolygon(Arc::new(v)),
×
139
        }
140
    }
×
141
}
142

143
impl CacheElementsContainerInfos<VectorQueryRectangle> for CachedFeatures {
144
    fn is_expired(&self) -> bool {
×
145
        self.is_expired()
×
146
    }
×
147
}
148

149
impl<G> CacheElementsContainer<VectorQueryRectangle, FeatureCollection<G>> for CachedFeatures
150
where
151
    G: CacheElementSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
152
    FeatureCollection<G>: CacheElementHitCheck,
153
{
154
    type ResultStream = CacheChunkStream<G>;
155

156
    fn result_stream(&self, query: &VectorQueryRectangle) -> Option<CacheChunkStream<G>> {
×
157
        G::result_stream(self, query)
×
158
    }
×
159
}
160

161
impl<G> LandingZoneElementsContainer<FeatureCollection<G>> for LandingZoneQueryFeatures
162
where
163
    G: CacheElementSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
164
    FeatureCollection<G>: CacheElementHitCheck,
165
{
166
    fn insert_element(
×
167
        &mut self,
×
168
        element: FeatureCollection<G>,
×
169
    ) -> Result<(), super::error::CacheError> {
×
170
        G::insert_element_into_landing_zone(self, element)
×
171
    }
×
172

173
    fn create_empty() -> Self {
×
174
        G::create_empty_landing_zone()
×
175
    }
×
176
}
177

178
impl<G> CacheElement for FeatureCollection<G>
179
where
180
    G: Geometry + ArrowTyped + CacheElementSubType<CacheElementType = Self> + ArrowTyped + Sized,
181
    FeatureCollection<G>: CacheElementHitCheck,
182
{
183
    type Query = VectorQueryRectangle;
184
    type LandingZoneContainer = LandingZoneQueryFeatures;
185
    type CacheContainer = CachedFeatures;
186
    type ResultStream = CacheChunkStream<G>;
187
    type CacheElementSubType = G;
188

189
    fn cache_hint(&self) -> geoengine_datatypes::primitives::CacheHint {
×
190
        self.cache_hint
×
191
    }
×
192

193
    fn typed_canonical_operator_name(
×
194
        key: crate::engine::CanonicOperatorName,
×
195
    ) -> super::shared_cache::TypedCanonicOperatorName {
×
196
        super::shared_cache::TypedCanonicOperatorName::Vector(key)
×
197
    }
×
198
}
199

200
macro_rules! impl_cache_element_subtype_magic {
201
    ($g:ty, $variant:ident) => {
202
        impl CacheElementSubType for $g {
203
            type CacheElementType = FeatureCollection<$g>;
204

205
            fn insert_element_into_landing_zone(
×
206
                landing_zone: &mut LandingZoneQueryFeatures,
×
207
                element: Self::CacheElementType,
×
208
            ) -> Result<(), super::error::CacheError> {
×
209
                match landing_zone {
×
210
                    LandingZoneQueryFeatures::$variant(v) => {
×
211
                        v.push(element);
×
212
                        Ok(())
×
213
                    }
214
                    _ => Err(super::error::CacheError::InvalidTypeForInsertion),
×
215
                }
216
            }
×
217

218
            fn create_empty_landing_zone() -> LandingZoneQueryFeatures {
×
219
                LandingZoneQueryFeatures::$variant(Vec::new())
×
220
            }
×
221

222
            fn result_stream(
223
                cache_elements_container: &CachedFeatures,
224
                query: &VectorQueryRectangle,
225
            ) -> Option<CacheChunkStream<$g>> {
226
                if let TypedCacheChunkStream::$variant(v) =
×
227
                    cache_elements_container.chunk_stream(query)
×
228
                {
229
                    Some(v)
×
230
                } else {
231
                    None
×
232
                }
233
            }
×
234
        }
235
    };
236
}
237
impl_cache_element_subtype_magic!(NoGeometry, NoGeometry);
238
impl_cache_element_subtype_magic!(MultiPoint, MultiPoint);
239
impl_cache_element_subtype_magic!(MultiLineString, MultiLineString);
240
impl_cache_element_subtype_magic!(MultiPolygon, MultiPolygon);
241

242
/// Our own tile stream that "owns" the data (more precisely a reference to the data)
243
#[pin_project(project = CacheChunkStreamProjection)]
×
244
pub struct CacheChunkStream<G> {
245
    data: Arc<Vec<FeatureCollection<G>>>,
246
    query: VectorQueryRectangle,
247
    idx: usize,
248
}
249

250
impl<G> CacheChunkStream<G>
251
where
252
    G: Geometry,
253
    FeatureCollection<G>: FeatureCollectionInfos,
254
{
255
    pub fn new(data: Arc<Vec<FeatureCollection<G>>>, query: VectorQueryRectangle) -> Self {
×
256
        Self {
×
257
            data,
×
258
            query,
×
259
            idx: 0,
×
260
        }
×
261
    }
×
262

263
    pub fn element_count(&self) -> usize {
×
264
        self.data.len()
×
265
    }
×
266
}
267

268
impl<G: Geometry> Stream for CacheChunkStream<G>
269
where
270
    FeatureCollection<G>: CacheElementHitCheck,
271
    G: ArrowTyped,
272
{
273
    type Item = Result<FeatureCollection<G>>;
274

275
    fn poll_next(
×
276
        mut self: Pin<&mut Self>,
×
277
        _cx: &mut std::task::Context<'_>,
×
278
    ) -> std::task::Poll<Option<Self::Item>> {
×
279
        let CacheChunkStreamProjection { data, query, idx } = self.as_mut().project();
×
280

281
        // return the next tile that is contained in the query, skip all tiles that are not contained
282
        for i in *idx..data.len() {
×
283
            let chunk = &data[i];
×
284
            if chunk.cache_element_hit(query) {
×
285
                // TODO: we really should cache the elements bbox somewhere
286
                let Ok(chunk) = chunk.filter_cache_element_entries(query) else {
×
287
                    // This should not happen, since we already checked that the element is contained in the query
288
                    log::error!("Could not filter cache element entries");
×
289
                    continue;
×
290
                };
291

292
                // if the chunk is empty, we can skip it
293
                if chunk.is_empty() {
×
294
                    log::trace!("Skipping empty chunk after filtering for query rectangle");
×
295
                    continue;
×
296
                }
×
297

×
298
                // set the index to the next element
×
299
                *idx = i + 1;
×
300
                return std::task::Poll::Ready(Some(Ok(chunk)));
×
301
            }
×
302
        }
303

304
        std::task::Poll::Ready(None)
×
305
    }
×
306
}
307

308
pub enum TypedCacheChunkStream {
309
    NoGeometry(CacheChunkStream<NoGeometry>),
310
    MultiPoint(CacheChunkStream<MultiPoint>),
311
    MultiLineString(CacheChunkStream<MultiLineString>),
312
    MultiPolygon(CacheChunkStream<MultiPolygon>),
313
}
314

315
pub trait CacheElementHitCheck {
316
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool;
317

318
    fn filter_cache_element_entries(
319
        &self,
320
        query_rect: &VectorQueryRectangle,
321
    ) -> Result<Self, CacheError>
322
    where
323
        Self: Sized;
324
}
325

326
impl CacheElementHitCheck for FeatureCollection<NoGeometry> {
327
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
328
        let Some(time_bounds) = self.time_bounds() else {
×
329
            return false;
×
330
        };
331

332
        time_bounds == query_rect.time_interval || time_bounds.intersects(&query_rect.time_interval)
×
333
    }
×
334
    fn filter_cache_element_entries(
×
335
        &self,
×
336
        query_rect: &VectorQueryRectangle,
×
337
    ) -> Result<Self, CacheError> {
×
338
        let time_filter_bools = self
×
339
            .time_intervals()
×
340
            .iter()
×
341
            .map(|t| t.intersects(&query_rect.time_interval))
×
342
            .collect::<Vec<bool>>();
×
343
        self.filter(time_filter_bools)
×
344
            .map_err(|_err| CacheError::CouldNotFilterResults)
×
345
    }
×
346
}
347

348
macro_rules! impl_cache_result_check {
349
    ($t:ty) => {
350
        impl<'a> CacheElementHitCheck for FeatureCollection<$t>
351
        where
352
            FeatureCollection<$t>: GeometryCollection,
353
        {
354
            fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
355
                let Some(bbox) = self.bbox() else {return false;};
×
356

357
                let Some(time_bounds) = self.time_bounds() else {return false;};
×
358

359
                (bbox == query_rect.spatial_bounds
×
360
                    || bbox.intersects_bbox(&query_rect.spatial_bounds))
×
361
                    && (time_bounds == query_rect.time_interval
×
362
                        || time_bounds.intersects(&query_rect.time_interval))
×
363
            }
×
364

365
            fn filter_cache_element_entries(
×
366
                &self,
×
367
                query_rect: &VectorQueryRectangle,
×
368
            ) -> Result<Self, CacheError> {
×
369
                let geoms_filter_bools = self.geometries().map(|g| {
×
370
                    g.bbox()
371
                        .map(|bbox| bbox.intersects_bbox(&query_rect.spatial_bounds))
372
                        .unwrap_or(false)
373
                });
×
374

×
375
                let time_filter_bools = self
×
376
                    .time_intervals()
×
377
                    .iter()
×
378
                    .map(|t| t.intersects(&query_rect.time_interval));
×
379

×
380
                let filter_bools = geoms_filter_bools
×
381
                    .zip(time_filter_bools)
×
382
                    .map(|(g, t)| g && t)
×
383
                    .collect::<Vec<bool>>();
×
384

×
385
                self.filter(filter_bools)
×
386
                    .map_err(|_err| CacheError::CouldNotFilterResults)
×
387
            }
×
388
        }
389
    };
390
}
391

392
impl_cache_result_check!(MultiPoint);
×
393
impl_cache_result_check!(MultiLineString);
×
394
impl_cache_result_check!(MultiPolygon);
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc