• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5620643772

21 Jul 2023 09:09AM UTC coverage: 89.043% (-0.2%) from 89.194%
5620643772

Pull #833

github

web-flow
Merge 1ee0a296a into 2852314aa
Pull Request #833: Shared-cache

1128 of 1128 new or added lines in 9 files covered. (100.0%)

102925 of 115590 relevant lines covered (89.04%)

62633.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/operators/src/pro/cache/cache_chunks.rs
1
use super::shared_cache::CachableSubType;
2
use super::shared_cache::{
3
    CacheElement, CacheElementsContainer, CacheElementsContainerInfos, LandingZoneElementsContainer,
4
};
5
use crate::util::Result;
6
use futures::Stream;
7
use geoengine_datatypes::primitives::NoGeometry;
8
use geoengine_datatypes::{
9
    collections::{
10
        DataCollection, FeatureCollection, FeatureCollectionInfos, GeometryCollection,
11
        MultiLineStringCollection, MultiPointCollection, MultiPolygonCollection,
12
    },
13
    primitives::{Geometry, MultiLineString, MultiPoint, MultiPolygon, VectorQueryRectangle},
14
    util::{arrow::ArrowTyped, ByteSize},
15
};
16
use pin_project::pin_project;
17
use std::{pin::Pin, sync::Arc};
18

19
#[derive(Debug)]
×
20
pub enum CachedFeatures {
21
    NoGeometry(Arc<Vec<DataCollection>>),
22
    MultiPoint(Arc<Vec<MultiPointCollection>>),
23
    MultiLineString(Arc<Vec<MultiLineStringCollection>>),
24
    MultiPolygon(Arc<Vec<MultiPolygonCollection>>),
25
}
26

27
impl CachedFeatures {
28
    pub fn len(&self) -> usize {
×
29
        match self {
×
30
            CachedFeatures::NoGeometry(v) => v.len(),
×
31
            CachedFeatures::MultiPoint(v) => v.len(),
×
32
            CachedFeatures::MultiLineString(v) => v.len(),
×
33
            CachedFeatures::MultiPolygon(v) => v.len(),
×
34
        }
35
    }
×
36

37
    pub fn is_empty(&self) -> bool {
×
38
        self.len() == 0
×
39
    }
×
40

41
    pub fn is_expired(&self) -> bool {
×
42
        match self {
×
43
            CachedFeatures::NoGeometry(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
44
            CachedFeatures::MultiPoint(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
45
            CachedFeatures::MultiLineString(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
46
            CachedFeatures::MultiPolygon(v) => v.iter().any(|c| c.cache_hint.is_expired()),
×
47
        }
48
    }
×
49

50
    pub fn chunk_stream(&self, query: &VectorQueryRectangle) -> TypedCacheChunkStream {
×
51
        match self {
×
52
            CachedFeatures::NoGeometry(v) => {
×
53
                TypedCacheChunkStream::NoGeometry(CacheChunkStream::new(Arc::clone(v), *query))
×
54
            }
55
            CachedFeatures::MultiPoint(v) => {
×
56
                TypedCacheChunkStream::MultiPoint(CacheChunkStream::new(Arc::clone(v), *query))
×
57
            }
58
            CachedFeatures::MultiLineString(v) => {
×
59
                TypedCacheChunkStream::MultiLineString(CacheChunkStream::new(Arc::clone(v), *query))
×
60
            }
61
            CachedFeatures::MultiPolygon(v) => {
×
62
                TypedCacheChunkStream::MultiPolygon(CacheChunkStream::new(Arc::clone(v), *query))
×
63
            }
64
        }
65
    }
×
66
}
67

68
impl ByteSize for CachedFeatures {
69
    fn heap_byte_size(&self) -> usize {
×
70
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
71
        match self {
×
72
            CachedFeatures::NoGeometry(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
73
            CachedFeatures::MultiPoint(v) => v.iter().map(FeatureCollectionInfos::byte_size).sum(),
×
74
            CachedFeatures::MultiLineString(v) => {
×
75
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
76
            }
77
            CachedFeatures::MultiPolygon(v) => {
×
78
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
79
            }
80
        }
81
    }
×
82
}
83

84
#[derive(Debug)]
×
85
pub enum LandingZoneQueryFeatures {
86
    NoGeometry(Vec<DataCollection>),
87
    MultiPoint(Vec<MultiPointCollection>),
88
    MultiLineString(Vec<MultiLineStringCollection>),
89
    MultiPolygon(Vec<MultiPolygonCollection>),
90
}
91

92
impl LandingZoneQueryFeatures {
93
    pub fn len(&self) -> usize {
×
94
        match self {
×
95
            LandingZoneQueryFeatures::NoGeometry(v) => v.len(),
×
96
            LandingZoneQueryFeatures::MultiPoint(v) => v.len(),
×
97
            LandingZoneQueryFeatures::MultiLineString(v) => v.len(),
×
98
            LandingZoneQueryFeatures::MultiPolygon(v) => v.len(),
×
99
        }
100
    }
×
101

102
    pub fn is_empty(&self) -> bool {
×
103
        self.len() == 0
×
104
    }
×
105
}
106

107
impl ByteSize for LandingZoneQueryFeatures {
108
    fn heap_byte_size(&self) -> usize {
×
109
        // we need to use `byte_size` instead of `heap_byte_size` here, because `Vec` stores its data on the heap
×
110
        match self {
×
111
            LandingZoneQueryFeatures::NoGeometry(v) => {
×
112
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
113
            }
114
            LandingZoneQueryFeatures::MultiPoint(v) => {
×
115
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
116
            }
117
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
118
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
119
            }
120
            LandingZoneQueryFeatures::MultiPolygon(v) => {
×
121
                v.iter().map(FeatureCollectionInfos::byte_size).sum()
×
122
            }
123
        }
124
    }
×
125
}
126

127
impl From<LandingZoneQueryFeatures> for CachedFeatures {
128
    fn from(value: LandingZoneQueryFeatures) -> Self {
×
129
        match value {
×
130
            LandingZoneQueryFeatures::NoGeometry(v) => CachedFeatures::NoGeometry(Arc::new(v)),
×
131
            LandingZoneQueryFeatures::MultiPoint(v) => CachedFeatures::MultiPoint(Arc::new(v)),
×
132
            LandingZoneQueryFeatures::MultiLineString(v) => {
×
133
                CachedFeatures::MultiLineString(Arc::new(v))
×
134
            }
135
            LandingZoneQueryFeatures::MultiPolygon(v) => CachedFeatures::MultiPolygon(Arc::new(v)),
×
136
        }
137
    }
×
138
}
139

140
impl CacheElementsContainerInfos<VectorQueryRectangle> for CachedFeatures {
141
    fn is_expired(&self) -> bool {
×
142
        self.is_expired()
×
143
    }
×
144
}
145

146
impl<G> CacheElementsContainer<VectorQueryRectangle, FeatureCollection<G>> for CachedFeatures
147
where
148
    G: CachableSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
149
    FeatureCollection<G>: CacheElementHitCheck,
150
{
151
    type ResultStream = CacheChunkStream<G>;
152

153
    fn result_stream(&self, query: &VectorQueryRectangle) -> Option<CacheChunkStream<G>> {
×
154
        G::result_stream(self, query)
×
155
    }
×
156
}
157

158
impl<G> LandingZoneElementsContainer<FeatureCollection<G>> for LandingZoneQueryFeatures
159
where
160
    G: CachableSubType<CacheElementType = FeatureCollection<G>> + Geometry + ArrowTyped,
161
    FeatureCollection<G>: CacheElementHitCheck,
162
{
163
    fn insert_element(
×
164
        &mut self,
×
165
        element: FeatureCollection<G>,
×
166
    ) -> Result<(), super::error::CacheError> {
×
167
        G::insert_element_into_landing_zone(self, element)
×
168
    }
×
169

170
    fn create_empty() -> Self {
×
171
        G::create_empty_landing_zone()
×
172
    }
×
173
}
174

175
impl<G> CacheElement for FeatureCollection<G>
176
where
177
    G: Geometry + ArrowTyped + CachableSubType<CacheElementType = Self> + ArrowTyped + Sized,
178
    FeatureCollection<G>: CacheElementHitCheck,
179
{
180
    type Query = VectorQueryRectangle;
181
    type LandingZoneContainer = LandingZoneQueryFeatures;
182
    type CacheContainer = CachedFeatures;
183
    type ResultStream = CacheChunkStream<G>;
184

185
    fn cache_hint(&self) -> geoengine_datatypes::primitives::CacheHint {
×
186
        self.cache_hint
×
187
    }
×
188

189
    fn typed_canonical_operator_name(
×
190
        key: crate::engine::CanonicOperatorName,
×
191
    ) -> super::shared_cache::TypedCanonicOperatorName {
×
192
        super::shared_cache::TypedCanonicOperatorName::Vector(key)
×
193
    }
×
194
}
195

196
macro_rules! impl_cache_element_subtype_magic {
197
    ($g:ty, $variant:ident) => {
198
        impl CachableSubType for $g {
199
            type CacheElementType = FeatureCollection<$g>;
200

201
            fn insert_element_into_landing_zone(
×
202
                landing_zone: &mut LandingZoneQueryFeatures,
×
203
                element: Self::CacheElementType,
×
204
            ) -> Result<(), super::error::CacheError> {
×
205
                match landing_zone {
×
206
                    LandingZoneQueryFeatures::$variant(v) => {
×
207
                        v.push(element);
×
208
                        Ok(())
×
209
                    }
210
                    _ => Err(super::error::CacheError::InvalidTypeForInsertion),
×
211
                }
212
            }
×
213

214
            fn create_empty_landing_zone() -> LandingZoneQueryFeatures {
×
215
                LandingZoneQueryFeatures::$variant(Vec::new())
×
216
            }
×
217

218
            fn result_stream(
219
                cache_elements_container: &CachedFeatures,
220
                query: &VectorQueryRectangle,
221
            ) -> Option<CacheChunkStream<$g>> {
222
                if let TypedCacheChunkStream::$variant(v) =
×
223
                    cache_elements_container.chunk_stream(query)
×
224
                {
225
                    Some(v)
×
226
                } else {
227
                    None
×
228
                }
229
            }
×
230
        }
231
    };
232
}
233
impl_cache_element_subtype_magic!(MultiPoint, MultiPoint);
234
impl_cache_element_subtype_magic!(MultiLineString, MultiLineString);
235
impl_cache_element_subtype_magic!(MultiPolygon, MultiPolygon);
236

237
/// Our own tile stream that "owns" the data (more precisely a reference to the data)
238
#[pin_project(project = CacheChunkStreamProjection)]
×
239
pub struct CacheChunkStream<G> {
240
    data: Arc<Vec<FeatureCollection<G>>>,
241
    query: VectorQueryRectangle,
242
    idx: usize,
243
}
244

245
impl<G> CacheChunkStream<G>
246
where
247
    G: Geometry,
248
    FeatureCollection<G>: FeatureCollectionInfos,
249
{
250
    pub fn new(data: Arc<Vec<FeatureCollection<G>>>, query: VectorQueryRectangle) -> Self {
×
251
        Self {
×
252
            data,
×
253
            query,
×
254
            idx: 0,
×
255
        }
×
256
    }
×
257

258
    pub fn element_count(&self) -> usize {
×
259
        self.data.len()
×
260
    }
×
261
}
262

263
impl<G: Geometry> Stream for CacheChunkStream<G>
264
where
265
    FeatureCollection<G>: CacheElementHitCheck,
266
{
267
    type Item = Result<FeatureCollection<G>>;
268

269
    fn poll_next(
×
270
        mut self: Pin<&mut Self>,
×
271
        _cx: &mut std::task::Context<'_>,
×
272
    ) -> std::task::Poll<Option<Self::Item>> {
×
273
        let CacheChunkStreamProjection { data, query, idx } = self.as_mut().project();
×
274

275
        // return the next tile that is contained in the query, skip all tiles that are not contained
276
        for i in *idx..data.len() {
×
277
            let chunk = &data[i];
×
278
            if chunk.cache_element_hit(query) {
×
279
                *idx = i + 1;
×
280
                return std::task::Poll::Ready(Some(Ok(chunk.clone())));
×
281
            }
×
282
        }
283

284
        std::task::Poll::Ready(None)
×
285
    }
×
286
}
287

288
pub enum TypedCacheChunkStream {
289
    NoGeometry(CacheChunkStream<NoGeometry>),
290
    MultiPoint(CacheChunkStream<MultiPoint>),
291
    MultiLineString(CacheChunkStream<MultiLineString>),
292
    MultiPolygon(CacheChunkStream<MultiPolygon>),
293
}
294

295
trait CacheElementHitCheck {
296
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool;
297
}
298

299
impl CacheElementHitCheck for FeatureCollection<NoGeometry> {
300
    fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
301
        let Some(time_bounds) = self.time_bounds() else {
×
302
            return false;
×
303
        };
304

305
        time_bounds == query_rect.time_interval || time_bounds.intersects(&query_rect.time_interval)
×
306
    }
×
307
}
308

309
macro_rules! impl_cache_result_check {
310
    ($t:ty) => {
311
        impl CacheElementHitCheck for FeatureCollection<$t> {
312
            fn cache_element_hit(&self, query_rect: &VectorQueryRectangle) -> bool {
×
313
                let Some(bbox) = self.bbox() else {return false;};
×
314

315
                let Some(time_bounds) = self.time_bounds() else {return false;};
×
316

317
                (bbox == query_rect.spatial_bounds
×
318
                    || bbox.intersects_bbox(&query_rect.spatial_bounds))
×
319
                    && (time_bounds == query_rect.time_interval
×
320
                        || time_bounds.intersects(&query_rect.time_interval))
×
321
            }
×
322
        }
323
    };
324
}
325

326
impl_cache_result_check!(MultiPoint);
327
impl_cache_result_check!(MultiLineString);
328
impl_cache_result_check!(MultiPolygon);
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc