• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5006008836

pending completion
5006008836

push

github

GitHub
Merge #785 #787

936 of 936 new or added lines in 50 files covered. (100.0%)

96010 of 107707 relevant lines covered (89.14%)

72676.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.85
/operators/src/pro/cache/tile_cache.rs
1
use std::{collections::HashMap, pin::Pin, sync::Arc};
2

3
use futures::Stream;
4
use geoengine_datatypes::{
5
    primitives::{RasterQueryRectangle, SpatialPartitioned},
6
    raster::{Pixel, RasterTile2D},
7
};
8
use pin_project::pin_project;
9
use tokio::sync::RwLock;
10
use uuid::Uuid;
11

12
use crate::engine::CanonicOperatorName;
13
use crate::util::Result;
14

15
/// The tile cache caches all tiles of a query and is able to answer queries that are fully contained in the cache.
16
#[derive(Debug, Default)]
130✔
17
pub struct TileCache {
18
    // TODO: more fine granular locking?
19
    // for each operator graph, we have a cache, that can efficiently be accessed
20
    operator_caches: RwLock<HashMap<CanonicOperatorName, OperatorTileCache>>,
21
}
22

23
/// Holds all the cached results for an operator graph (workflow)
24
#[derive(Debug, Default)]
1✔
25
pub struct OperatorTileCache {
26
    // for a given operator and query we need to look through all entries to find one that matches
27
    // TODO: use a multi-dimensional index to speed up the lookup
28
    entries: Vec<CachedQueryResult>,
29

30
    // running queries insert their tiles as they are produced. The entry will be created once the query is done.
31
    // The query is identified by a Uuid instead of the query rectangle to avoid confusions with other queries
32
    active_queries: HashMap<QueryId, ActiveQueryResult>,
33
}
34

35
pub type QueryId = Uuid;
36

37
impl OperatorTileCache {
38
    pub fn find_match(&self, query: &RasterQueryRectangle) -> Option<&CachedQueryResult> {
1✔
39
        self.entries.iter().find(|r| r.matches(query))
1✔
40
    }
1✔
41
}
42

43
/// Holds all the tiles for a given query and is able to answer queries that are fully contained
44
#[derive(Debug)]
×
45
pub struct CachedQueryResult {
46
    query: RasterQueryRectangle,
47
    tiles: CachedTiles,
48
}
49

50
impl CachedQueryResult {
51
    /// Return true if the query can be answered in full by this cache entry
52
    /// For this, the bbox and time has to be fully contained, and the spatial resolution has to match
53
    pub fn matches(&self, query: &RasterQueryRectangle) -> bool {
1✔
54
        (self.query.spatial_bounds == query.spatial_bounds
1✔
55
            || self.query.spatial_bounds.contains(&query.spatial_bounds))
×
56
            && self.query.time_interval.contains(&query.time_interval)
1✔
57
            && self.query.spatial_resolution == query.spatial_resolution
1✔
58
    }
1✔
59

60
    /// Produces a tile stream from the cache
61
    pub fn tile_stream(&self, query: &RasterQueryRectangle) -> TypedCacheTileStream {
1✔
62
        self.tiles.tile_stream(query)
1✔
63
    }
1✔
64
}
65

66
#[derive(Debug)]
×
67
pub enum CachedTiles {
68
    U8(Arc<Vec<RasterTile2D<u8>>>),
69
    U16(Arc<Vec<RasterTile2D<u16>>>),
70
    U32(Arc<Vec<RasterTile2D<u32>>>),
71
    U64(Arc<Vec<RasterTile2D<u64>>>),
72
    I8(Arc<Vec<RasterTile2D<i8>>>),
73
    I16(Arc<Vec<RasterTile2D<i16>>>),
74
    I32(Arc<Vec<RasterTile2D<i32>>>),
75
    I64(Arc<Vec<RasterTile2D<i64>>>),
76
    F32(Arc<Vec<RasterTile2D<f32>>>),
77
    F64(Arc<Vec<RasterTile2D<f64>>>),
78
}
79

80
#[derive(Debug)]
×
81
struct ActiveQueryResult {
82
    query: RasterQueryRectangle,
83
    tiles: ActiveQueryTiles,
84
}
85

86
#[derive(Debug)]
×
87
pub enum ActiveQueryTiles {
88
    U8(Vec<RasterTile2D<u8>>),
89
    U16(Vec<RasterTile2D<u16>>),
90
    U32(Vec<RasterTile2D<u32>>),
91
    U64(Vec<RasterTile2D<u64>>),
92
    I8(Vec<RasterTile2D<i8>>),
93
    I16(Vec<RasterTile2D<i16>>),
94
    I32(Vec<RasterTile2D<i32>>),
95
    I64(Vec<RasterTile2D<i64>>),
96
    F32(Vec<RasterTile2D<f32>>),
97
    F64(Vec<RasterTile2D<f64>>),
98
}
99

100
impl From<ActiveQueryTiles> for CachedTiles {
101
    fn from(value: ActiveQueryTiles) -> Self {
1✔
102
        match value {
1✔
103
            ActiveQueryTiles::U8(t) => CachedTiles::U8(Arc::new(t)),
1✔
104
            ActiveQueryTiles::U16(t) => CachedTiles::U16(Arc::new(t)),
×
105
            ActiveQueryTiles::U32(t) => CachedTiles::U32(Arc::new(t)),
×
106
            ActiveQueryTiles::U64(t) => CachedTiles::U64(Arc::new(t)),
×
107
            ActiveQueryTiles::I8(t) => CachedTiles::I8(Arc::new(t)),
×
108
            ActiveQueryTiles::I16(t) => CachedTiles::I16(Arc::new(t)),
×
109
            ActiveQueryTiles::I32(t) => CachedTiles::I32(Arc::new(t)),
×
110
            ActiveQueryTiles::I64(t) => CachedTiles::I64(Arc::new(t)),
×
111
            ActiveQueryTiles::F32(t) => CachedTiles::F32(Arc::new(t)),
×
112
            ActiveQueryTiles::F64(t) => CachedTiles::F64(Arc::new(t)),
×
113
        }
114
    }
1✔
115
}
116

117
impl From<ActiveQueryResult> for CachedQueryResult {
118
    fn from(value: ActiveQueryResult) -> Self {
1✔
119
        Self {
1✔
120
            query: value.query,
1✔
121
            tiles: value.tiles.into(),
1✔
122
        }
1✔
123
    }
1✔
124
}
125

126
impl CachedTiles {
127
    pub fn tile_stream(&self, query: &RasterQueryRectangle) -> TypedCacheTileStream {
1✔
128
        match self {
1✔
129
            CachedTiles::U8(v) => TypedCacheTileStream::U8(CacheTileStream::new(v.clone(), *query)),
1✔
130
            CachedTiles::U16(v) => {
×
131
                TypedCacheTileStream::U16(CacheTileStream::new(v.clone(), *query))
×
132
            }
133
            CachedTiles::U32(v) => {
×
134
                TypedCacheTileStream::U32(CacheTileStream::new(v.clone(), *query))
×
135
            }
136
            CachedTiles::U64(v) => {
×
137
                TypedCacheTileStream::U64(CacheTileStream::new(v.clone(), *query))
×
138
            }
139
            CachedTiles::I8(v) => TypedCacheTileStream::I8(CacheTileStream::new(v.clone(), *query)),
×
140
            CachedTiles::I16(v) => {
×
141
                TypedCacheTileStream::I16(CacheTileStream::new(v.clone(), *query))
×
142
            }
143
            CachedTiles::I32(v) => {
×
144
                TypedCacheTileStream::I32(CacheTileStream::new(v.clone(), *query))
×
145
            }
146
            CachedTiles::I64(v) => {
×
147
                TypedCacheTileStream::I64(CacheTileStream::new(v.clone(), *query))
×
148
            }
149
            CachedTiles::F32(v) => {
×
150
                TypedCacheTileStream::F32(CacheTileStream::new(v.clone(), *query))
×
151
            }
152
            CachedTiles::F64(v) => {
×
153
                TypedCacheTileStream::F64(CacheTileStream::new(v.clone(), *query))
×
154
            }
155
        }
156
    }
1✔
157
}
158

159
/// Our own tile stream that "owns" the data (more precisely a reference to the data)
160
#[pin_project(project = CacheTileStreamProjection)]
9✔
161
pub struct CacheTileStream<T> {
162
    data: Arc<Vec<RasterTile2D<T>>>,
163
    query: RasterQueryRectangle,
164
    idx: usize,
165
}
166

167
impl<T> CacheTileStream<T> {
168
    pub fn new(data: Arc<Vec<RasterTile2D<T>>>, query: RasterQueryRectangle) -> Self {
1✔
169
        Self {
1✔
170
            data,
1✔
171
            query,
1✔
172
            idx: 0,
1✔
173
        }
1✔
174
    }
1✔
175
}
176

177
impl<T: Pixel> Stream for CacheTileStream<T> {
178
    type Item = Result<RasterTile2D<T>>;
179

180
    fn poll_next(
9✔
181
        mut self: Pin<&mut Self>,
9✔
182
        _cx: &mut std::task::Context<'_>,
9✔
183
    ) -> std::task::Poll<Option<Self::Item>> {
9✔
184
        let CacheTileStreamProjection { data, query, idx } = self.as_mut().project();
9✔
185

186
        // return the next tile that is contained in the query, skip all tiles that are not contained
187
        for i in *idx..data.len() {
9✔
188
            let tile = &data[i];
8✔
189
            let tile_bbox = tile.tile_information().spatial_partition();
8✔
190

8✔
191
            if tile_bbox.intersects(&query.spatial_bounds)
8✔
192
                && tile.time.intersects(&query.time_interval)
8✔
193
            {
194
                *idx = i + 1;
8✔
195
                return std::task::Poll::Ready(Some(Ok(tile.clone())));
8✔
196
            }
×
197
        }
198

199
        std::task::Poll::Ready(None)
1✔
200
    }
9✔
201
}
202

203
pub enum TypedCacheTileStream {
204
    U8(CacheTileStream<u8>),
205
    U16(CacheTileStream<u16>),
206
    U32(CacheTileStream<u32>),
207
    U64(CacheTileStream<u64>),
208
    I8(CacheTileStream<i8>),
209
    I16(CacheTileStream<i16>),
210
    I32(CacheTileStream<i32>),
211
    I64(CacheTileStream<i64>),
212
    F32(CacheTileStream<f32>),
213
    F64(CacheTileStream<f64>),
214
}
215

216
/// A helper trait that allows converting between enums variants and generic structs
217
pub trait Cachable: Sized {
218
    fn stream(b: TypedCacheTileStream) -> Option<CacheTileStream<Self>>;
219

220
    fn insert_tile(tiles: &mut ActiveQueryTiles, tile: RasterTile2D<Self>) -> Result<()>;
221

222
    fn create_active_query_tiles() -> ActiveQueryTiles;
223
}
224

225
macro_rules! impl_tile_streamer {
226
    ($t:ty, $variant:ident) => {
227
        impl Cachable for $t {
228
            fn stream(t: TypedCacheTileStream) -> Option<CacheTileStream<$t>> {
229
                if let TypedCacheTileStream::$variant(s) = t {
1✔
230
                    return Some(s);
1✔
231
                }
×
232
                None
×
233
            }
1✔
234

235
            fn insert_tile(tiles: &mut ActiveQueryTiles, tile: RasterTile2D<Self>) -> Result<()> {
236
                if let ActiveQueryTiles::$variant(ref mut tiles) = tiles {
8✔
237
                    tiles.push(tile);
8✔
238
                    return Ok(());
8✔
239
                }
×
240
                Err(crate::error::Error::QueryProcessor)
×
241
            }
8✔
242

243
            fn create_active_query_tiles() -> ActiveQueryTiles {
1✔
244
                ActiveQueryTiles::$variant(Vec::new())
1✔
245
            }
1✔
246
        }
247
    };
248
}
249
impl_tile_streamer!(i8, I8);
250
impl_tile_streamer!(u8, U8);
251
impl_tile_streamer!(i16, I16);
252
impl_tile_streamer!(u16, U16);
253
impl_tile_streamer!(i32, I32);
254
impl_tile_streamer!(u32, U32);
255
impl_tile_streamer!(i64, I64);
256
impl_tile_streamer!(u64, U64);
257
impl_tile_streamer!(f32, F32);
258
impl_tile_streamer!(f64, F64);
259

260
impl TileCache {
261
    /// Query the cache and on hit create a stream of tiles
262
    pub async fn query_cache<T>(
2✔
263
        &self,
2✔
264
        key: CanonicOperatorName,
2✔
265
        query: &RasterQueryRectangle,
2✔
266
    ) -> Option<CacheTileStream<T>>
2✔
267
    where
2✔
268
        T: Pixel + Cachable,
2✔
269
    {
2✔
270
        let caches = self.operator_caches.read().await;
2✔
271
        let cache = caches.get(&key)?;
2✔
272

273
        let entry = cache.find_match(query)?;
1✔
274
        let typed_stream = entry.tile_stream(query);
1✔
275
        T::stream(typed_stream)
1✔
276
    }
2✔
277

278
    /// When inserting a new query, we first register the query and then insert the tiles as they are produced
279
    /// This is to avoid confusing different queries on the same operator and query rectangle
280
    pub async fn insert_query<T: Pixel + Cachable>(
1✔
281
        &self,
1✔
282
        key: CanonicOperatorName,
1✔
283
        query: &RasterQueryRectangle,
1✔
284
    ) -> QueryId {
1✔
285
        let mut caches = self.operator_caches.write().await;
1✔
286
        let cache = caches.entry(key).or_default();
1✔
287

1✔
288
        let query_id = Uuid::new_v4();
1✔
289
        cache.active_queries.insert(
1✔
290
            query_id,
1✔
291
            ActiveQueryResult {
1✔
292
                query: *query,
1✔
293
                tiles: T::create_active_query_tiles(),
1✔
294
            },
1✔
295
        );
1✔
296

1✔
297
        query_id
1✔
298
    }
1✔
299

300
    /// Insert a tile for a given query. The query has to be inserted first.
301
    pub async fn insert_tile<T>(
8✔
302
        &self,
8✔
303
        key: CanonicOperatorName,
8✔
304
        query_id: QueryId,
8✔
305
        tile: RasterTile2D<T>,
8✔
306
    ) -> Result<()>
8✔
307
    where
8✔
308
        T: Pixel + Cachable,
8✔
309
    {
8✔
310
        let mut caches = self.operator_caches.write().await;
8✔
311
        let cache = caches.entry(key).or_default();
8✔
312

313
        let entry = cache
8✔
314
            .active_queries
8✔
315
            .get_mut(&query_id)
8✔
316
            .ok_or(crate::error::Error::QueryProcessor)?; // TODO: better error
8✔
317

318
        T::insert_tile(&mut entry.tiles, tile)?;
8✔
319

320
        Ok(())
8✔
321
    }
8✔
322

323
    /// Abort the query and remove the tiles from the cache
324
    pub async fn abort_query(&self, key: CanonicOperatorName, query_id: QueryId) {
×
325
        let mut caches = self.operator_caches.write().await;
×
326
        let cache = caches.entry(key).or_default();
×
327
        cache.active_queries.remove(&query_id);
×
328
    }
×
329

330
    /// Finish the query and make the tiles available in the cache
331
    pub async fn finish_query(&self, key: CanonicOperatorName, query_id: QueryId) -> Result<()> {
1✔
332
        let mut caches = self.operator_caches.write().await;
1✔
333
        let cache = caches.entry(key).or_default();
1✔
334
        let active_query = cache
1✔
335
            .active_queries
1✔
336
            .remove(&query_id)
1✔
337
            .ok_or(crate::error::Error::QueryProcessor)?; // TODO: better error
1✔
338

339
        // TODO: maybe check if this cache result is already in the cache or could displace another one
340
        cache.entries.push(active_query.into());
1✔
341

1✔
342
        Ok(())
1✔
343
    }
1✔
344
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc