• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5666839735

26 Jul 2023 09:06AM UTC coverage: 88.916% (-0.3%) from 89.193%
5666839735

Pull #833

github

web-flow
Merge 865f64877 into 3d8a7e0ad
Pull Request #833: Shared-cache

1353 of 1353 new or added lines in 15 files covered. (100.0%)

105888 of 119088 relevant lines covered (88.92%)

60793.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.07
/operators/src/pro/cache/shared_cache.rs
1
use super::{
2
    cache_chunks::{CacheElementHitCheck, CachedFeatures, LandingZoneQueryFeatures},
3
    cache_tiles::{CachedTiles, LandingZoneQueryTiles, TypedCacheTileStream},
4
    error::CacheError,
5
    util::CacheSize,
6
};
7
use crate::engine::CanonicOperatorName;
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::Stream;
11
use geoengine_datatypes::{
12
    collections::FeatureCollection,
13
    identifier,
14
    primitives::{CacheHint, Geometry, RasterQueryRectangle, VectorQueryRectangle},
15
    raster::{Pixel, RasterTile2D},
16
    util::{arrow::ArrowTyped, test::TestDefault, ByteSize, Identifier},
17
};
18
use log::debug;
19
use lru::LruCache;
20
use std::{collections::HashMap, hash::Hash};
21
use tokio::sync::RwLock;
22

23
/// The tile cache caches all tiles of a query and is able to answer queries that are fully contained in the cache.
24
/// New tiles are inserted into the cache on-the-fly as they are produced by query processors.
25
/// The tiles are first inserted into a landing zone, until the query in completely finished and only then moved to the cache.
26
/// Both the landing zone and the cache have a maximum size.
27
/// If the landing zone is full, the caching of the current query will be aborted.
28
/// If the cache is full, the least recently used entries will be evicted if necessary to make room for the new entry.
29
#[derive(Debug)]
×
30
pub struct CacheBackend {
31
    // TODO: more fine granular locking?
32
    // for each operator graph, we have a cache, that can efficiently be accessed
33
    raster_caches: HashMap<CanonicOperatorName, RasterOperatorCacheEntry>,
34
    vector_caches: HashMap<CanonicOperatorName, VectorOperatorCacheEntry>,
35

36
    cache_size: CacheSize,
37
    landing_zone_size: CacheSize,
38

39
    // we only use the LruCache for determining the least recently used elements and evict as many entries as needed to fit the new one
40
    lru: LruCache<CacheEntryId, TypedCanonicOperatorName>,
41
}
42

43
impl CacheBackend {
44
    /// This method removes entries from the cache until it can fit the given amount of bytes.
45
    pub fn evict_until_can_fit_bytes(&mut self, bytes: usize) {
5✔
46
        while !self.cache_size.can_fit_bytes(bytes) {
6✔
47
            if let Some((pop_id, pop_key)) = self.lru.pop_lru() {
1✔
48
                match pop_key {
1✔
49
                    TypedCanonicOperatorName::Raster(raster_pop_key) => {
1✔
50
                        let op_cache = self
1✔
51
                            .raster_caches
1✔
52
                            .get_mut(&raster_pop_key)
1✔
53
                            .expect("LRU entry must exist in the cache!");
1✔
54
                        let query_element = op_cache
1✔
55
                            .remove_cache_entry(&pop_id)
1✔
56
                            .expect("LRU entry must exist in the cache!");
1✔
57
                        self.cache_size.remove_element_bytes(&query_element);
1✔
58
                    }
1✔
59
                    TypedCanonicOperatorName::Vector(vector_pop_key) => {
×
60
                        let op_cache = self
×
61
                            .vector_caches
×
62
                            .get_mut(&vector_pop_key)
×
63
                            .expect("LRU entry must exist in the cache!");
×
64
                        let query_element = op_cache
×
65
                            .remove_cache_entry(&pop_id)
×
66
                            .expect("LRU entry must exist in the cache!");
×
67
                        self.cache_size.remove_element_bytes(&query_element);
×
68
                    }
×
69
                };
70
                self.cache_size.remove_element_bytes(&pop_id);
1✔
71

1✔
72
                debug!(
1✔
73
                    "Evicted query {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
×
74
                    pop_id,
×
75
                    self.cache_size.total_byte_size(),
×
76
                    self.cache_size.byte_size_used(),
×
77
                    self.cache_size.size_used_fraction()
×
78
                );
79
            }
×
80
        }
81
    }
5✔
82
}
83

84
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
×
85
pub enum TypedCanonicOperatorName {
86
    Raster(CanonicOperatorName),
87
    Vector(CanonicOperatorName),
88
}
89

90
impl TypedCanonicOperatorName {
91
    pub fn as_raster(&self) -> Option<&CanonicOperatorName> {
×
92
        match self {
×
93
            Self::Raster(name) => Some(name),
×
94
            Self::Vector(_) => None,
×
95
        }
96
    }
×
97

98
    pub fn as_vector(&self) -> Option<&CanonicOperatorName> {
×
99
        match self {
×
100
            Self::Raster(_) => None,
×
101
            Self::Vector(name) => Some(name),
×
102
        }
103
    }
×
104
}
105

106
pub trait CacheEvictUntilFit {
107
    fn evict_entries_until_can_fit_bytes(&mut self, bytes: usize);
108
}
109

110
impl CacheEvictUntilFit for CacheBackend {
111
    fn evict_entries_until_can_fit_bytes(&mut self, bytes: usize) {
5✔
112
        self.evict_until_can_fit_bytes(bytes);
5✔
113
    }
5✔
114
}
115

116
pub trait CacheView<C, L>: CacheEvictUntilFit {
117
    fn operator_caches_mut(
118
        &mut self,
119
    ) -> &mut HashMap<CanonicOperatorName, OperatorCacheEntry<C, L>>;
120

121
    fn create_operator_cache_if_needed(&mut self, key: CanonicOperatorName) {
7✔
122
        // TODO: add size of the OperatorCacheEntry to the cache size?
7✔
123
        self.operator_caches_mut()
7✔
124
            .entry(key)
7✔
125
            .or_insert_with(|| OperatorCacheEntry::new());
7✔
126
    }
7✔
127

128
    fn remove_operator_cache(
1✔
129
        &mut self,
1✔
130
        key: &CanonicOperatorName,
1✔
131
    ) -> Option<OperatorCacheEntry<C, L>> {
1✔
132
        // TODO: remove the size of the OperatorCacheEntry to the cache size?
1✔
133
        self.operator_caches_mut().remove(key)
1✔
134
    }
1✔
135
}
136

137
#[allow(clippy::type_complexity)]
138
pub struct OperatorCacheEntryView<'a, C: CacheElement> {
139
    operator_cache: &'a mut OperatorCacheEntry<
140
        CacheQueryEntry<C::Query, C::CacheContainer>,
141
        CacheQueryEntry<C::Query, C::LandingZoneContainer>,
142
    >,
143
    cache_size: &'a mut CacheSize,
144
    landing_zone_size: &'a mut CacheSize,
145
    lru: &'a mut LruCache<CacheEntryId, TypedCanonicOperatorName>,
146
}
147

148
impl<'a, C> OperatorCacheEntryView<'a, C>
149
where
150
    C: CacheElement + ByteSize,
151
    C::Query: Clone + CacheQueryMatch,
152
    CacheQueryEntry<C::Query, C::LandingZoneContainer>: ByteSize,
153
    CacheQueryEntry<C::Query, C::CacheContainer>: ByteSize,
154
{
155
    fn is_empty(&self) -> bool {
1✔
156
        self.operator_cache.is_empty()
1✔
157
    }
1✔
158

159
    /// This method removes a query from the landing zone.
160
    ///
161
    /// If the query is not in the landing zone, this method returns None.
162
    ///
163
    fn remove_query_from_landing_zone(
164
        &mut self,
165
        query_id: &QueryId,
166
    ) -> Option<CacheQueryEntry<C::Query, C::LandingZoneContainer>> {
167
        if let Some(entry) = self.operator_cache.remove_landing_zone_entry(query_id) {
6✔
168
            self.landing_zone_size.remove_element_bytes(query_id);
6✔
169
            self.landing_zone_size.remove_element_bytes(&entry);
6✔
170

6✔
171
            // debug output
6✔
172
            log::debug!(
6✔
173
                "Removed query {}. Landing zone size: {}. Landing zone size used: {}, Landing zone used percentage: {}.",
×
174
                query_id, self.landing_zone_size.total_byte_size(), self.landing_zone_size.byte_size_used(), self.landing_zone_size.size_used_fraction()
×
175
            );
176

177
            Some(entry)
6✔
178
        } else {
179
            None
×
180
        }
181
    }
6✔
182

183
    /// This method removes a query from the cache and the LRU.
184
    /// It will remove a queries cache entry from the cache and the LRU.
185
    ///
186
    /// If the query is not in the cache, this method returns None.
187
    ///
188
    fn remove_query_from_cache_and_lru(
1✔
189
        &mut self,
1✔
190
        cache_entry_id: &CacheEntryId,
1✔
191
    ) -> Option<CacheQueryEntry<C::Query, C::CacheContainer>> {
1✔
192
        if let Some(entry) = self.operator_cache.remove_cache_entry(cache_entry_id) {
1✔
193
            let old_lru_entry = self.lru.pop_entry(cache_entry_id);
1✔
194
            debug_assert!(old_lru_entry.is_some(), "CacheEntryId not found in LRU");
1✔
195
            self.cache_size.remove_element_bytes(cache_entry_id);
1✔
196
            self.cache_size.remove_element_bytes(&entry);
1✔
197

1✔
198
            log::debug!(
1✔
199
                "Removed cache entry {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
×
200
                cache_entry_id, self.cache_size.total_byte_size(), self.cache_size.byte_size_used(), self.cache_size.size_used_fraction()
×
201
            );
202

203
            Some(entry)
1✔
204
        } else {
205
            None
×
206
        }
207
    }
1✔
208

209
    /// This method removes a list of queries from the cache and the LRU.
210
    fn discard_queries_from_cache_and_lru(&mut self, cache_entry_ids: &[CacheEntryId]) {
7✔
211
        for cache_entry_id in cache_entry_ids {
8✔
212
            let old_entry = self.remove_query_from_cache_and_lru(cache_entry_id);
1✔
213
            debug_assert!(
214
                old_entry.is_some(),
1✔
215
                "CacheEntryId not found in OperatorCacheEntry"
×
216
            );
217
        }
218
    }
7✔
219

220
    /// This method adds a query element to the landing zone.
221
    /// It will add the element to the landing zone entry of the query.
222
    ///
223
    /// # Errors
224
    ///
225
    /// This method returns an error if the query is not in the landing zone.
226
    /// This method returns an error if the element is already expired.
227
    /// This method returns an error if the landing zone is full or the new element would cause the landing zone to overflow.
228
    ///
229
    fn add_query_element_to_landing_zone(
6✔
230
        &mut self,
6✔
231
        query_id: &QueryId,
6✔
232
        landing_zone_element: C,
6✔
233
    ) -> Result<(), CacheError> {
6✔
234
        let landing_zone_entry = self
6✔
235
            .operator_cache
6✔
236
            .landing_zone_entry_mut(query_id)
6✔
237
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
6✔
238

239
        if landing_zone_element.cache_hint().is_expired() {
6✔
240
            log::trace!("Element is already expired");
1✔
241
            return Err(CacheError::TileExpiredBeforeInsertion);
1✔
242
        };
5✔
243

5✔
244
        let element_bytes_size = landing_zone_element.byte_size();
5✔
245

5✔
246
        if !self.landing_zone_size.can_fit_bytes(element_bytes_size) {
5✔
247
            return Err(CacheError::NotEnoughSpaceInLandingZone);
×
248
        }
5✔
249

5✔
250
        // new entries might update the query bounds stored for this entry
5✔
251
        landing_zone_element.update_stored_query(&mut landing_zone_entry.query)?;
5✔
252

253
        // actually insert the element into the landing zone
254
        landing_zone_entry.insert_element(landing_zone_element)?;
5✔
255

256
        // we add the bytes size of the element to the landing zone size after we have inserted it.
257
        self.landing_zone_size
5✔
258
            .try_add_bytes(element_bytes_size)
5✔
259
            .expect(
5✔
260
            "The Landing Zone must have enough space for the element since we checked it before",
5✔
261
        );
5✔
262

5✔
263
        log::trace!(
5✔
264
            "Inserted tile for query {} into landing zone. Landing zone size: {}. Landing zone size used: {}. Landing zone used percentage: {}",
×
265
            query_id, self.landing_zone_size.total_byte_size(), self.landing_zone_size.byte_size_used(), self.landing_zone_size.size_used_fraction()
×
266
        );
267

268
        Ok(())
5✔
269
    }
6✔
270

271
    /// This method inserts a query into the landing zone.
272
    /// It will cause the operator cache to create a new landing zone entry.
273
    /// Therefore, the size of the query and the size of the landing zone entry will be added to the landing zone size.
274
    ///
275
    /// # Errors
276
    ///
277
    /// This method returns an error if the query is already in the landing zone.
278
    /// This method returns an error if the landing zone is full or the new query would cause the landing zone to overflow.
279
    ///
280
    fn insert_query_into_landing_zone(&mut self, query: &C::Query) -> Result<QueryId, CacheError> {
7✔
281
        let landing_zone_entry = CacheQueryEntry::create_empty::<C>(query.clone());
7✔
282
        let query_id = QueryId::new();
7✔
283

7✔
284
        let query_id_bytes_size = query_id.byte_size();
7✔
285
        let landing_zone_entry_bytes_size = landing_zone_entry.byte_size();
7✔
286

7✔
287
        self.landing_zone_size.try_add_bytes(query_id_bytes_size)?;
7✔
288

289
        // if this fails, we have to remove the query id size again
290
        if let Err(e) = self
7✔
291
            .landing_zone_size
7✔
292
            .try_add_bytes(landing_zone_entry_bytes_size)
7✔
293
        {
294
            self.landing_zone_size.remove_bytes(query_id_bytes_size);
×
295
            return Err(e);
×
296
        }
7✔
297

298
        // if this fails, we have to remove the query id size and the landing zone entry size again
299
        if let Err(e) = self
7✔
300
            .operator_cache
7✔
301
            .insert_landing_zone_entry(query_id, landing_zone_entry)
7✔
302
        {
303
            self.landing_zone_size.remove_bytes(query_id_bytes_size);
×
304
            self.landing_zone_size
×
305
                .remove_bytes(landing_zone_entry_bytes_size);
×
306
            return Err(e);
×
307
        }
7✔
308

7✔
309
        // debug output
7✔
310
        log::trace!(
7✔
311
            "Added query {} to landing zone. Landing zone size: {}. Landing zone size used: {}, Landing zone used percentage: {}.",
×
312
            query_id, self.landing_zone_size.total_byte_size(), self.landing_zone_size.byte_size_used(), self.landing_zone_size.size_used_fraction()
×
313
        );
314

315
        Ok(query_id)
7✔
316
    }
7✔
317

318
    /// This method inserts a cache entry into the cache and the LRU.
319
    /// It allows the element cache to overflow the cache size.
320
    /// This is done because the total cache size is the cache size + the landing zone size.
321
    /// This method is used when moving an element from the landing zone to the cache.
322
    ///
323
    /// # Errors
324
    ///
325
    /// This method returns an error if the cache entry is already in the cache.
326
    ///
327
    fn insert_cache_entry_allow_overflow(
5✔
328
        &mut self,
5✔
329
        cache_entry: CacheQueryEntry<C::Query, C::CacheContainer>,
5✔
330
        key: &CanonicOperatorName,
5✔
331
    ) -> Result<CacheEntryId, CacheError> {
5✔
332
        let cache_entry_id = CacheEntryId::new();
5✔
333
        let bytes = cache_entry.byte_size() + cache_entry_id.byte_size();
5✔
334
        // When inserting data from the landing zone into the cache, we allow the cache to overflow.
5✔
335
        // This is done because the total cache size is the cache size + the landing zone size.
5✔
336
        self.cache_size.add_bytes_allow_overflow(bytes);
5✔
337
        self.operator_cache
5✔
338
            .insert_cache_entry(cache_entry_id, cache_entry)?;
5✔
339
        // we have to wrap the key in a TypedCanonicOperatorName to be able to insert it into the LRU
340
        self.lru.push(
5✔
341
            cache_entry_id,
5✔
342
            C::typed_canonical_operator_name(key.clone()),
5✔
343
        );
5✔
344

5✔
345
        // debug output
5✔
346
        log::trace!(
5✔
347
            "Added cache entry {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
×
348
            cache_entry_id,
×
349
            self.cache_size.total_byte_size(),
×
350
            self.cache_size.byte_size_used(),
×
351
            self.cache_size.size_used_fraction()
×
352
        );
353

354
        Ok(cache_entry_id)
5✔
355
    }
5✔
356

357
    /// This method finds a cache entry in the cache that matches the query.
358
    /// It will also collect all expired cache entries.
359
    /// The cache entry is returned together with the expired ids.
360
    fn find_matching_cache_entry_and_collect_expired_entries(
7✔
361
        &mut self,
7✔
362
        query: &C::Query,
7✔
363
    ) -> CacheQueryResult<C::Query, C::CacheContainer> {
7✔
364
        let mut expired_cache_entry_ids = vec![];
7✔
365

7✔
366
        let x = self.operator_cache.iter().find(|&(id, entry)| {
7✔
367
            if entry.elements.is_expired() {
6✔
368
                expired_cache_entry_ids.push(*id);
1✔
369
                return false;
1✔
370
            }
5✔
371
            entry.query.is_match(query)
5✔
372
        });
7✔
373

7✔
374
        CacheQueryResult {
7✔
375
            cache_hit: x.map(|(id, entry)| (*id, entry)),
7✔
376
            expired_cache_entry_ids,
7✔
377
        }
7✔
378
    }
7✔
379
}
380

381
struct CacheQueryResult<'a, Query, CE> {
382
    cache_hit: Option<(CacheEntryId, &'a CacheQueryEntry<Query, CE>)>,
383
    expired_cache_entry_ids: Vec<CacheEntryId>,
384
}
385

386
pub trait Cache<C: CacheElement>:
387
    CacheView<
388
    CacheQueryEntry<C::Query, C::CacheContainer>,
389
    CacheQueryEntry<C::Query, C::LandingZoneContainer>,
390
>
391
where
392
    C::Query: Clone + CacheQueryMatch,
393
    CacheQueryEntry<C::Query, C::LandingZoneContainer>: ByteSize,
394
    CacheQueryEntry<C::Query, C::CacheContainer>: ByteSize,
395
    CacheQueryEntry<C::Query, C::CacheContainer>:
396
        From<CacheQueryEntry<C::Query, C::LandingZoneContainer>>,
397
{
398
    /// This method returns a mutable reference to the cache entry of an operator.
399
    /// If there is no cache entry for the operator, this method returns None.
400
    fn operator_cache_view_mut(
401
        &mut self,
402
        key: &CanonicOperatorName,
403
    ) -> Option<OperatorCacheEntryView<C>>;
404

405
    /// This method queries the cache for a given query.
406
    /// If the query matches an entry in the cache, the cache entry is returned and it is promoted in the LRU.
407
    /// If the query does not match an entry in the cache, None is returned. Also if a cache entry is found but it is expired, None is returned.
408
    ///
409
    /// # Errors
410
    /// This method returns an error if the cache entry is not found.
411
    ///
412
    fn query_and_promote(
9✔
413
        &mut self,
9✔
414
        key: &CanonicOperatorName,
9✔
415
        query: &C::Query,
9✔
416
    ) -> Result<Option<C::ResultStream>, CacheError> {
9✔
417
        let mut cache = self
9✔
418
            .operator_cache_view_mut(key)
9✔
419
            .ok_or(CacheError::OperatorCacheEntryNotFound)?;
9✔
420

421
        let CacheQueryResult {
422
            cache_hit,
7✔
423
            expired_cache_entry_ids,
7✔
424
        } = cache.find_matching_cache_entry_and_collect_expired_entries(query);
7✔
425

426
        let res = if let Some((cache_entry_id, cache_entry)) = cache_hit {
7✔
427
            let stream = cache_entry.elements.result_stream(query);
5✔
428

5✔
429
            // promote the cache entry in the LRU
5✔
430
            cache.lru.promote(&cache_entry_id);
5✔
431
            Some(stream)
5✔
432
        } else {
433
            None
2✔
434
        };
435

436
        // discard expired cache entries
437
        cache.discard_queries_from_cache_and_lru(&expired_cache_entry_ids);
7✔
438

7✔
439
        Ok(res.flatten())
7✔
440
    }
9✔
441

442
    /// This method inserts a query into the cache.
443
    ///
444
    /// # Errors
445
    /// This method returns an error if the query is already in the cache.
446
    ///
447
    fn insert_query_into_landing_zone(
7✔
448
        &mut self,
7✔
449
        key: &CanonicOperatorName,
7✔
450
        query: &C::Query,
7✔
451
    ) -> Result<QueryId, CacheError> {
7✔
452
        self.create_operator_cache_if_needed(key.clone());
7✔
453
        self.operator_cache_view_mut(key)
7✔
454
            .expect("This method must not fail since the OperatorCache was created one line above.")
7✔
455
            .insert_query_into_landing_zone(query)
7✔
456
    }
7✔
457

458
    fn insert_query_element_into_landing_zone(
13✔
459
        &mut self,
13✔
460
        key: &CanonicOperatorName,
13✔
461
        query_id: &QueryId,
13✔
462
        landing_zone_element: C,
13✔
463
    ) -> Result<(), CacheError> {
13✔
464
        let mut cache = self
13✔
465
            .operator_cache_view_mut(key)
13✔
466
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
13✔
467
        let res = cache.add_query_element_to_landing_zone(query_id, landing_zone_element);
6✔
468

6✔
469
        // if we cant add the element to the landing zone, we remove the query from the landing zone
6✔
470
        if res.is_err() {
6✔
471
            let _old_entry = cache.remove_query_from_landing_zone(query_id);
1✔
472

1✔
473
            // if the operator cache is empty, we remove it from the cache
1✔
474
            if cache.is_empty() {
1✔
475
                self.remove_operator_cache(key);
1✔
476
            }
1✔
477
        }
5✔
478

479
        res
6✔
480
    }
13✔
481

482
    /// This method discards a query from the landing zone.
483
    /// If the query is not in the landing zone, this method does nothing.
484
    fn discard_query_from_landing_zone(&mut self, key: &CanonicOperatorName, query_id: &QueryId) {
485
        if let Some(mut cache) = self.operator_cache_view_mut(key) {
×
486
            cache.remove_query_from_landing_zone(query_id);
×
487
            if cache.is_empty() {
×
488
                self.remove_operator_cache(key);
×
489
            }
×
490
        }
×
491
    }
×
492

493
    /// This method discards a query from the cache and the LRU.
494
    /// If the query is not in the cache, this method does nothing.
495
    fn discard_querys_from_cache_and_lru(
496
        &mut self,
497
        key: &CanonicOperatorName,
498
        cache_entry_ids: &[CacheEntryId],
499
    ) {
500
        if let Some(mut cache) = self.operator_cache_view_mut(key) {
×
501
            cache.discard_queries_from_cache_and_lru(cache_entry_ids);
×
502
            if cache.is_empty() {
×
503
                self.remove_operator_cache(key);
×
504
            }
×
505
        }
×
506
    }
×
507

508
    /// This method moves a query from the landing zone to the cache.
509
    /// It will remove the query from the landing zone and insert it into the cache.
510
    /// If the cache is full, the least recently used entries will be evicted if necessary to make room for the new entry.
511
    /// This method returns the cache entry id of the inserted cache entry.
512
    ///
513
    /// # Errors
514
    /// This method returns an error if the query is not in the landing zone.
515
    /// This method returns an error if the cache entry is already in the cache.
516
    /// This method returns an error if the cache is full and the least recently used entries cannot be evicted to make room for the new entry.
517
    ///
518
    fn move_query_from_landing_zone_to_cache(
6✔
519
        &mut self,
6✔
520
        key: &CanonicOperatorName,
6✔
521
        query_id: &QueryId,
6✔
522
    ) -> Result<CacheEntryId, CacheError> {
6✔
523
        let mut operator_cache = self
6✔
524
            .operator_cache_view_mut(key)
6✔
525
            .ok_or(CacheError::OperatorCacheEntryNotFound)?;
6✔
526
        let landing_zone_entry = operator_cache
5✔
527
            .remove_query_from_landing_zone(query_id)
5✔
528
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
5✔
529
        let cache_entry: CacheQueryEntry<
5✔
530
            <C as CacheElement>::Query,
5✔
531
            <C as CacheElement>::CacheContainer,
5✔
532
        > = landing_zone_entry.into();
5✔
533
        // when moving an element from the landing zone to the cache, we allow the cache size to overflow.
534
        // This is done because the total cache size is the cache size + the landing zone size.
535
        let cache_entry_id = operator_cache.insert_cache_entry_allow_overflow(cache_entry, key)?;
5✔
536
        // We could also first try to evict until the cache can hold the entry.
537
        // However, then we would need to lookup the cache entry twice.
538
        // To avoid that, we just evict after we moved the entry from the landing zone to the cache.
539
        // This is also not a problem since the total cache size is the cache size + the landing zone size.
540
        self.evict_entries_until_can_fit_bytes(0);
5✔
541

5✔
542
        Ok(cache_entry_id)
5✔
543
    }
6✔
544
}
545

546
impl<T> Cache<RasterTile2D<T>> for CacheBackend
547
where
548
    T: Pixel + CacheElementSubType<CacheElementType = RasterTile2D<T>>,
549
{
550
    fn operator_cache_view_mut(
35✔
551
        &mut self,
35✔
552
        key: &CanonicOperatorName,
35✔
553
    ) -> Option<OperatorCacheEntryView<RasterTile2D<T>>> {
35✔
554
        self.raster_caches
35✔
555
            .get_mut(key)
35✔
556
            .map(|op| OperatorCacheEntryView {
35✔
557
                operator_cache: op,
25✔
558
                cache_size: &mut self.cache_size,
25✔
559
                landing_zone_size: &mut self.landing_zone_size,
25✔
560
                lru: &mut self.lru,
25✔
561
            })
35✔
562
    }
35✔
563
}
564

565
impl<T> Cache<FeatureCollection<T>> for CacheBackend
566
where
567
    T: Geometry + CacheElementSubType<CacheElementType = FeatureCollection<T>> + ArrowTyped,
568
    FeatureCollection<T>: CacheElementHitCheck,
569
{
570
    fn operator_cache_view_mut(
×
571
        &mut self,
×
572
        key: &CanonicOperatorName,
×
573
    ) -> Option<OperatorCacheEntryView<FeatureCollection<T>>> {
×
574
        self.vector_caches
×
575
            .get_mut(key)
×
576
            .map(|op| OperatorCacheEntryView {
×
577
                operator_cache: op,
×
578
                cache_size: &mut self.cache_size,
×
579
                landing_zone_size: &mut self.landing_zone_size,
×
580
                lru: &mut self.lru,
×
581
            })
×
582
    }
×
583
}
584

585
impl CacheView<RasterCacheQueryEntry, RasterLandingQueryEntry> for CacheBackend {
586
    fn operator_caches_mut(
8✔
587
        &mut self,
8✔
588
    ) -> &mut HashMap<CanonicOperatorName, RasterOperatorCacheEntry> {
8✔
589
        &mut self.raster_caches
8✔
590
    }
8✔
591
}
592

593
impl CacheView<VectorCacheQueryEntry, VectorLandingQueryEntry> for CacheBackend {
594
    fn operator_caches_mut(
×
595
        &mut self,
×
596
    ) -> &mut HashMap<CanonicOperatorName, VectorOperatorCacheEntry> {
×
597
        &mut self.vector_caches
×
598
    }
×
599
}
600

601
pub trait CacheElement: ByteSize + Send + ByteSize
602
where
603
    Self: Sized,
604
{
605
    type Query: CacheQueryMatch + Clone + Send + Sync;
606
    type LandingZoneContainer: LandingZoneElementsContainer<Self>;
607
    type CacheContainer: CacheElementsContainer<Self::Query, Self, ResultStream = Self::ResultStream>
608
        + From<Self::LandingZoneContainer>;
609
    type ResultStream;
610
    type CacheElementSubType: CacheElementSubType<CacheElementType = Self>;
611

612
    fn move_into_landing_zone(
×
613
        self,
×
614
        landing_zone: &mut Self::LandingZoneContainer,
×
615
    ) -> Result<(), CacheError> {
×
616
        landing_zone.insert_element(self)
×
617
    }
×
618

619
    fn update_stored_query(&self, query: &mut Self::Query) -> Result<(), CacheError>;
620

621
    fn cache_hint(&self) -> CacheHint;
622

623
    fn typed_canonical_operator_name(key: CanonicOperatorName) -> TypedCanonicOperatorName;
624
}
625

626
pub trait CacheElementSubType {
627
    type CacheElementType: CacheElement;
628

629
    fn insert_element_into_landing_zone(
630
        landing_zone: &mut <Self::CacheElementType as CacheElement>::LandingZoneContainer,
631
        element: Self::CacheElementType,
632
    ) -> Result<(), super::error::CacheError>;
633

634
    fn create_empty_landing_zone() -> <Self::CacheElementType as CacheElement>::LandingZoneContainer;
635

636
    fn result_stream(
637
        cache_elements_container: &<Self::CacheElementType as CacheElement>::CacheContainer,
638
        query: &<Self::CacheElementType as CacheElement>::Query,
639
    ) -> Option<<Self::CacheElementType as CacheElement>::ResultStream>;
640
}
641

642
#[derive(Debug)]
×
643
pub struct SharedCache {
644
    backend: RwLock<CacheBackend>,
645
}
646

647
impl SharedCache {
648
    pub fn new(cache_size_in_mb: usize, landing_zone_ratio: f64) -> Result<Self> {
7✔
649
        if landing_zone_ratio <= 0.0 {
7✔
650
            return Err(crate::error::Error::QueryingProcessorFailed {
×
651
                source: Box::new(CacheError::LandingZoneRatioMustBeLargerThanZero),
×
652
            });
×
653
        }
7✔
654

7✔
655
        if landing_zone_ratio >= 0.5 {
7✔
656
            return Err(crate::error::Error::QueryingProcessorFailed {
×
657
                source: Box::new(CacheError::LandingZoneRatioMustBeSmallerThenHalfCacheSize),
×
658
            });
×
659
        }
7✔
660

7✔
661
        let cache_size_bytes =
7✔
662
            (cache_size_in_mb as f64 * (1.0 - landing_zone_ratio) * 1024.0 * 1024.0) as usize;
7✔
663

7✔
664
        let landing_zone_size_bytes =
7✔
665
            (cache_size_in_mb as f64 * landing_zone_ratio * 1024.0 * 1024.0) as usize;
7✔
666

7✔
667
        Ok(Self {
7✔
668
            backend: RwLock::new(CacheBackend {
7✔
669
                vector_caches: Default::default(),
7✔
670
                raster_caches: Default::default(),
7✔
671
                lru: LruCache::unbounded(), // we need no cap because we evict manually
7✔
672
                cache_size: CacheSize::new(cache_size_bytes),
7✔
673
                landing_zone_size: CacheSize::new(landing_zone_size_bytes),
7✔
674
            }),
7✔
675
        })
7✔
676
    }
7✔
677
}
678

679
impl TestDefault for SharedCache {
680
    fn test_default() -> Self {
111✔
681
        Self {
111✔
682
            backend: RwLock::new(CacheBackend {
111✔
683
                vector_caches: Default::default(),
111✔
684
                raster_caches: Default::default(),
111✔
685
                lru: LruCache::unbounded(), // we need no cap because we evict manually
111✔
686
                cache_size: CacheSize::new(usize::MAX),
111✔
687
                landing_zone_size: CacheSize::new(usize::MAX),
111✔
688
            }),
111✔
689
        }
111✔
690
    }
111✔
691
}
692

693
/// Holds all the cached results for an operator graph (workflow)
694
#[derive(Debug, Default)]
×
695
pub struct OperatorCacheEntry<CacheEntriesContainer, LandingZoneEntriesContainer> {
696
    // for a given operator and query we need to look through all entries to find one that matches
697
    // TODO: use a multi-dimensional index to speed up the lookup
698
    entries: HashMap<CacheEntryId, CacheEntriesContainer>,
699

700
    // running queries insert their tiles as they are produced. The entry will be created once the query is done.
701
    // The query is identified by a Uuid instead of the query rectangle to avoid confusions with other queries
702
    landing_zone: HashMap<QueryId, LandingZoneEntriesContainer>,
703
}
704

705
impl<CacheEntriesContainer, LandingZoneEntriesContainer>
706
    OperatorCacheEntry<CacheEntriesContainer, LandingZoneEntriesContainer>
707
{
708
    pub fn new() -> Self {
7✔
709
        Self {
7✔
710
            entries: Default::default(),
7✔
711
            landing_zone: Default::default(),
7✔
712
        }
7✔
713
    }
7✔
714

715
    fn insert_landing_zone_entry(
7✔
716
        &mut self,
7✔
717
        query_id: QueryId,
7✔
718
        landing_zone_entry: LandingZoneEntriesContainer,
7✔
719
    ) -> Result<(), CacheError> {
7✔
720
        let old_entry = self.landing_zone.insert(query_id, landing_zone_entry);
7✔
721

7✔
722
        if old_entry.is_some() {
7✔
723
            Err(CacheError::QueryIdAlreadyInLandingZone)
×
724
        } else {
725
            Ok(())
7✔
726
        }
727
    }
7✔
728

729
    fn remove_landing_zone_entry(
6✔
730
        &mut self,
6✔
731
        query_id: &QueryId,
6✔
732
    ) -> Option<LandingZoneEntriesContainer> {
6✔
733
        self.landing_zone.remove(query_id)
6✔
734
    }
6✔
735

736
    fn landing_zone_entry_mut(
6✔
737
        &mut self,
6✔
738
        query_id: &QueryId,
6✔
739
    ) -> Option<&mut LandingZoneEntriesContainer> {
6✔
740
        self.landing_zone.get_mut(query_id)
6✔
741
    }
6✔
742

743
    fn insert_cache_entry(
5✔
744
        &mut self,
5✔
745
        cache_entry_id: CacheEntryId,
5✔
746
        cache_entry: CacheEntriesContainer,
5✔
747
    ) -> Result<(), CacheError> {
5✔
748
        let old_entry = self.entries.insert(cache_entry_id, cache_entry);
5✔
749

5✔
750
        if old_entry.is_some() {
5✔
751
            Err(CacheError::CacheEntryIdAlreadyInCache)
×
752
        } else {
753
            Ok(())
5✔
754
        }
755
    }
5✔
756

757
    fn remove_cache_entry(
2✔
758
        &mut self,
2✔
759
        cache_entry_id: &CacheEntryId,
2✔
760
    ) -> Option<CacheEntriesContainer> {
2✔
761
        self.entries.remove(cache_entry_id)
2✔
762
    }
2✔
763

764
    fn is_empty(&self) -> bool {
1✔
765
        self.entries.is_empty() && self.landing_zone.is_empty()
1✔
766
    }
1✔
767

768
    fn iter(&self) -> impl Iterator<Item = (&CacheEntryId, &CacheEntriesContainer)> {
7✔
769
        self.entries.iter()
7✔
770
    }
7✔
771
}
772

773
identifier!(QueryId);
×
774

775
impl ByteSize for QueryId {}
776

777
identifier!(CacheEntryId);
×
778

779
impl ByteSize for CacheEntryId {}
780

781
/// Holds all the elements for a given query and is able to answer queries that are fully contained
782
#[derive(Debug, Hash)]
×
783
pub struct CacheQueryEntry<Query, Elements> {
784
    query: Query,
785
    elements: Elements,
786
}
787
type RasterOperatorCacheEntry = OperatorCacheEntry<RasterCacheQueryEntry, RasterLandingQueryEntry>;
788
type RasterCacheQueryEntry = CacheQueryEntry<RasterQueryRectangle, CachedTiles>;
789
type RasterLandingQueryEntry = CacheQueryEntry<RasterQueryRectangle, LandingZoneQueryTiles>;
790

791
type VectorOperatorCacheEntry = OperatorCacheEntry<VectorCacheQueryEntry, VectorLandingQueryEntry>;
792
type VectorCacheQueryEntry = CacheQueryEntry<VectorQueryRectangle, CachedFeatures>;
793
type VectorLandingQueryEntry = CacheQueryEntry<VectorQueryRectangle, LandingZoneQueryFeatures>;
794

795
impl<Query, Elements> CacheQueryEntry<Query, Elements> {
796
    pub fn create_empty<E>(query: Query) -> Self
7✔
797
    where
7✔
798
        Elements: LandingZoneElementsContainer<E>,
7✔
799
    {
7✔
800
        Self {
7✔
801
            query,
7✔
802
            elements: Elements::create_empty(),
7✔
803
        }
7✔
804
    }
7✔
805

806
    pub fn query(&self) -> &Query {
×
807
        &self.query
×
808
    }
×
809

810
    pub fn elements_mut(&mut self) -> &mut Elements {
×
811
        &mut self.elements
×
812
    }
×
813

814
    pub fn insert_element<E>(&mut self, element: E) -> Result<(), CacheError>
5✔
815
    where
5✔
816
        Elements: LandingZoneElementsContainer<E>,
5✔
817
    {
5✔
818
        self.elements.insert_element(element)
5✔
819
    }
5✔
820
}
821

822
impl<Query, Elements> ByteSize for CacheQueryEntry<Query, Elements>
823
where
824
    Elements: ByteSize,
825
{
826
    fn heap_byte_size(&self) -> usize {
22✔
827
        self.elements.heap_byte_size()
22✔
828
    }
22✔
829
}
830

831
pub trait CacheQueryMatch<RHS = Self> {
832
    fn is_match(&self, query: &RHS) -> bool;
833
}
834

835
impl CacheQueryMatch for RasterQueryRectangle {
836
    fn is_match(&self, query: &RasterQueryRectangle) -> bool {
5✔
837
        self.spatial_bounds.contains(&query.spatial_bounds)
5✔
838
            && self.time_interval.contains(&query.time_interval)
5✔
839
            && self.spatial_resolution == query.spatial_resolution
5✔
840
    }
5✔
841
}
842

843
impl CacheQueryMatch for VectorQueryRectangle {
844
    // TODO: check if that is what we need
845
    fn is_match(&self, query: &VectorQueryRectangle) -> bool {
×
846
        self.spatial_bounds.contains_bbox(&query.spatial_bounds)
×
847
            && self.time_interval.contains(&query.time_interval)
×
848
            && self.spatial_resolution == query.spatial_resolution
×
849
    }
×
850
}
851

852
pub trait LandingZoneElementsContainer<E> {
853
    fn insert_element(&mut self, element: E) -> Result<(), CacheError>;
854
    fn create_empty() -> Self;
855
}
856

857
pub trait CacheElementsContainerInfos<Query> {
858
    fn is_expired(&self) -> bool;
859
}
860

861
pub trait CacheElementsContainer<Query, E>: CacheElementsContainerInfos<Query> {
862
    type ResultStream: Stream<Item = Result<E>>;
863

864
    fn result_stream(&self, query: &Query) -> Option<Self::ResultStream>;
865
}
866

867
impl CacheQueryEntry<RasterQueryRectangle, CachedTiles> {
868
    /// Return true if the query can be answered in full by this cache entry
869
    /// For this, the bbox and time has to be fully contained, and the spatial resolution has to match
870
    pub fn matches(&self, query: &RasterQueryRectangle) -> bool {
×
871
        self.query.spatial_bounds.contains(&query.spatial_bounds)
×
872
            && self.query.time_interval.contains(&query.time_interval)
×
873
            && self.query.spatial_resolution == query.spatial_resolution
×
874
    }
×
875

876
    /// Produces a tile stream from the cache
877
    pub fn tile_stream(&self, query: &RasterQueryRectangle) -> TypedCacheTileStream {
×
878
        self.elements.tile_stream(query)
×
879
    }
×
880
}
881

882
impl From<RasterLandingQueryEntry> for RasterCacheQueryEntry {
883
    fn from(value: RasterLandingQueryEntry) -> Self {
6✔
884
        Self {
6✔
885
            query: value.query,
6✔
886
            elements: value.elements.into(),
6✔
887
        }
6✔
888
    }
6✔
889
}
890

891
impl From<VectorLandingQueryEntry> for VectorCacheQueryEntry {
892
    fn from(value: VectorLandingQueryEntry) -> Self {
×
893
        Self {
×
894
            query: value.query,
×
895
            elements: value.elements.into(),
×
896
        }
×
897
    }
×
898
}
899

900
#[async_trait]
901
pub trait AsyncCache<C: CacheElement> {
902
    async fn query_cache<S: CacheElementSubType<CacheElementType = C>>(
903
        &self,
904
        key: &CanonicOperatorName,
905
        query: &C::Query,
906
    ) -> Result<Option<C::ResultStream>, CacheError>;
907

908
    async fn insert_query<S: CacheElementSubType<CacheElementType = C>>(
909
        &self,
910
        key: &CanonicOperatorName,
911
        query: &C::Query,
912
    ) -> Result<QueryId, CacheError>;
913

914
    async fn insert_query_element<S: CacheElementSubType<CacheElementType = C>>(
915
        &self,
916
        key: &CanonicOperatorName,
917
        query_id: &QueryId,
918
        landing_zone_element: S::CacheElementType,
919
    ) -> Result<(), CacheError>;
920

921
    async fn abort_query<S: CacheElementSubType<CacheElementType = C>>(
922
        &self,
923
        key: &CanonicOperatorName,
924
        query_id: &QueryId,
925
    );
926

927
    async fn finish_query<S: CacheElementSubType<CacheElementType = C>>(
928
        &self,
929
        key: &CanonicOperatorName,
930
        query_id: &QueryId,
931
    ) -> Result<CacheEntryId, CacheError>;
932
}
933

934
#[async_trait]
935
impl<C> AsyncCache<C> for SharedCache
936
where
937
    C: CacheElement + ByteSize + Send + 'static,
938
    C::Query: Send + Sync,
939
    CacheBackend: Cache<C>,
940
    CacheQueryEntry<C::Query, C::LandingZoneContainer>: ByteSize,
941
    CacheQueryEntry<C::Query, C::CacheContainer>: ByteSize,
942
    CacheQueryEntry<C::Query, C::CacheContainer>:
943
        From<CacheQueryEntry<C::Query, C::LandingZoneContainer>>,
944
{
945
    /// Query the cache and on hit create a stream of cache elements
946
    async fn query_cache<S: CacheElementSubType<CacheElementType = C>>(
9✔
947
        &self,
9✔
948
        key: &CanonicOperatorName,
9✔
949
        query: &C::Query,
9✔
950
    ) -> Result<Option<C::ResultStream>, CacheError> {
9✔
951
        let mut backend = self.backend.write().await;
9✔
952
        backend.query_and_promote(key, query)
9✔
953
    }
18✔
954

955
    /// When inserting a new query, we first register the query and then insert the elements as they are produced
956
    /// This is to avoid confusing different queries on the same operator and query rectangle
957
    async fn insert_query<S: CacheElementSubType<CacheElementType = C>>(
7✔
958
        &self,
7✔
959
        key: &CanonicOperatorName,
7✔
960
        query: &C::Query,
7✔
961
    ) -> Result<QueryId, CacheError> {
7✔
962
        let mut backend = self.backend.write().await;
7✔
963
        backend.insert_query_into_landing_zone(key, query)
7✔
964
    }
14✔
965

966
    /// Insert a cachable element for a given query. The query has to be inserted first.
967
    /// The element is inserted into the landing zone and only moved to the cache when the query is finished.
968
    /// If the landing zone is full or the element size would cause the landing zone size to overflow, the caching of the query is aborted.
969
    async fn insert_query_element<S: CacheElementSubType<CacheElementType = C>>(
13✔
970
        &self,
13✔
971
        key: &CanonicOperatorName,
13✔
972
        query_id: &QueryId,
13✔
973
        landing_zone_element: C,
13✔
974
    ) -> Result<(), CacheError> {
13✔
975
        let mut backend = self.backend.write().await;
13✔
976
        backend.insert_query_element_into_landing_zone(key, query_id, landing_zone_element)
13✔
977
    }
26✔
978

979
    /// Abort the query and remove already inserted elements from the caches landing zone
980
    async fn abort_query<S: CacheElementSubType<CacheElementType = C>>(
×
981
        &self,
×
982
        key: &CanonicOperatorName,
×
983
        query_id: &QueryId,
×
984
    ) {
×
985
        let mut backend = self.backend.write().await;
×
986
        backend.discard_query_from_landing_zone(key, query_id);
×
987
    }
×
988

989
    /// Finish the query and make the inserted elements available in the cache
990
    async fn finish_query<S: CacheElementSubType<CacheElementType = C>>(
6✔
991
        &self,
6✔
992
        key: &CanonicOperatorName,
6✔
993
        query_id: &QueryId,
6✔
994
    ) -> Result<CacheEntryId, CacheError> {
6✔
995
        let mut backend = self.backend.write().await;
6✔
996
        backend.move_query_from_landing_zone_to_cache(key, query_id)
6✔
997
    }
12✔
998
}
999

1000
#[cfg(test)]
1001
mod tests {
1002
    use std::sync::Arc;
1003

1004
    use geoengine_datatypes::{
1005
        primitives::{CacheHint, DateTime, SpatialPartition2D, SpatialResolution, TimeInterval},
1006
        raster::{Grid, RasterProperties},
1007
    };
1008
    use serde_json::json;
1009

1010
    use super::*;
1011

1012
    async fn process_query(tile_cache: &mut SharedCache, op_name: CanonicOperatorName) {
5✔
1013
        let query_id = tile_cache
5✔
1014
            .insert_query::<u8>(&op_name, &query_rect())
5✔
1015
            .await
×
1016
            .unwrap();
5✔
1017

5✔
1018
        tile_cache
5✔
1019
            .insert_query_element::<u8>(&op_name, &query_id, create_tile())
5✔
1020
            .await
×
1021
            .unwrap();
5✔
1022

5✔
1023
        tile_cache
5✔
1024
            .finish_query::<u8>(&op_name, &query_id)
5✔
1025
            .await
×
1026
            .unwrap();
5✔
1027
    }
5✔
1028

1029
    fn create_tile() -> RasterTile2D<u8> {
10✔
1030
        RasterTile2D::<u8> {
10✔
1031
            time: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0)).unwrap(),
10✔
1032
            tile_position: [-1, 0].into(),
10✔
1033
            global_geo_transform: TestDefault::test_default(),
10✔
1034
            grid_array: Grid::new([3, 2].into(), vec![1, 2, 3, 4, 5, 6])
10✔
1035
                .unwrap()
10✔
1036
                .into(),
10✔
1037
            properties: RasterProperties::default(),
10✔
1038
            cache_hint: CacheHint::max_duration(),
10✔
1039
        }
10✔
1040
    }
10✔
1041

1042
    fn query_rect() -> RasterQueryRectangle {
13✔
1043
        RasterQueryRectangle {
13✔
1044
            spatial_bounds: SpatialPartition2D::new_unchecked(
13✔
1045
                (-180., 90.).into(),
13✔
1046
                (180., -90.).into(),
13✔
1047
            ),
13✔
1048
            time_interval: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0))
13✔
1049
                .unwrap(),
13✔
1050
            spatial_resolution: SpatialResolution::one(),
13✔
1051
        }
13✔
1052
    }
13✔
1053

1054
    fn op(idx: usize) -> CanonicOperatorName {
12✔
1055
        CanonicOperatorName::new_unchecked(&json!({
12✔
1056
            "type": "GdalSource",
12✔
1057
            "params": {
12✔
1058
                "data": idx
12✔
1059
            }
12✔
1060
        }))
12✔
1061
    }
12✔
1062

1063
    #[tokio::test]
1✔
1064
    async fn it_evicts_lru() {
1✔
1065
        // Create cache entry and landing zone entry to geht the size of both
1✔
1066
        let landing_zone_entry = RasterLandingQueryEntry {
1✔
1067
            query: query_rect(),
1✔
1068
            elements: LandingZoneQueryTiles::U8(vec![create_tile()]),
1✔
1069
        };
1✔
1070
        let query_id = QueryId::new();
1✔
1071
        let size_of_landing_zone_entry = landing_zone_entry.byte_size() + query_id.byte_size();
1✔
1072
        let cache_entry: RasterCacheQueryEntry = landing_zone_entry.into();
1✔
1073
        let cache_entry_id = CacheEntryId::new();
1✔
1074
        let size_of_cache_entry = cache_entry.byte_size() + cache_entry_id.byte_size();
1✔
1075

1✔
1076
        // Select the max of both sizes
1✔
1077
        // This is done because the landing zone should not be smaller then the cache
1✔
1078
        let m_size = size_of_cache_entry.max(size_of_landing_zone_entry);
1✔
1079

1✔
1080
        // set limits s.t. three tiles fit
1✔
1081
        let mut tile_cache = SharedCache {
1✔
1082
            backend: RwLock::new(CacheBackend {
1✔
1083
                raster_caches: Default::default(),
1✔
1084
                vector_caches: Default::default(),
1✔
1085
                lru: LruCache::unbounded(),
1✔
1086
                cache_size: CacheSize::new(m_size * 3),
1✔
1087
                landing_zone_size: CacheSize::new(m_size * 3),
1✔
1088
            }),
1✔
1089
        };
1✔
1090

1✔
1091
        // process three different queries
1✔
1092
        process_query(&mut tile_cache, op(1)).await;
1✔
1093
        process_query(&mut tile_cache, op(2)).await;
1✔
1094
        process_query(&mut tile_cache, op(3)).await;
1✔
1095

1096
        // query the first one s.t. it is the most recently used
1097
        tile_cache
1✔
1098
            .query_cache::<u8>(&op(1), &query_rect())
1✔
1099
            .await
×
1100
            .unwrap();
1✔
1101
        // process a fourth query
1✔
1102
        process_query(&mut tile_cache, op(4)).await;
1✔
1103

1104
        // assure the seconds query is evicted because it is the least recently used
1105
        assert!(tile_cache
1✔
1106
            .query_cache::<u8>(&op(2), &query_rect())
1✔
1107
            .await
×
1108
            .unwrap()
1✔
1109
            .is_none());
1✔
1110

1111
        // assure that the other queries are still in the cache
1112
        for i in [1, 3, 4] {
4✔
1113
            assert!(tile_cache
3✔
1114
                .query_cache::<u8>(&op(i), &query_rect())
3✔
1115
                .await
×
1116
                .unwrap()
3✔
1117
                .is_some());
3✔
1118
        }
1119

1120
        assert_eq!(
1✔
1121
            tile_cache.backend.read().await.cache_size.byte_size_used(),
1✔
1122
            3 * size_of_cache_entry
1✔
1123
        );
1124
    }
1125

1126
    #[test]
1✔
1127
    fn cache_byte_size() {
1✔
1128
        assert_eq!(create_tile().byte_size(), 284);
1✔
1129
        assert_eq!(
1✔
1130
            CachedTiles::U8(Arc::new(vec![create_tile()])).byte_size(),
1✔
1131
            /* enum + arc */ 16 + /* vec */ 24  + /* tile */ 284
1✔
1132
        );
1✔
1133
        assert_eq!(
1✔
1134
            CachedTiles::U8(Arc::new(vec![create_tile(), create_tile()])).byte_size(),
1✔
1135
            /* enum + arc */ 16 + /* vec */ 24  + /* tile */ 2 * 284
1✔
1136
        );
1✔
1137
    }
1✔
1138

1139
    #[tokio::test]
1✔
1140
    async fn it_checks_ttl() {
1✔
1141
        let mut tile_cache = SharedCache {
1✔
1142
            backend: RwLock::new(CacheBackend {
1✔
1143
                raster_caches: Default::default(),
1✔
1144
                vector_caches: Default::default(),
1✔
1145
                lru: LruCache::unbounded(),
1✔
1146
                cache_size: CacheSize::new(usize::MAX),
1✔
1147
                landing_zone_size: CacheSize::new(usize::MAX),
1✔
1148
            }),
1✔
1149
        };
1✔
1150

1✔
1151
        process_query(&mut tile_cache, op(1)).await;
1✔
1152

1153
        // access works because no ttl is set
1154
        tile_cache
1✔
1155
            .query_cache::<u8>(&op(1), &query_rect())
1✔
1156
            .await
×
1157
            .unwrap()
1✔
1158
            .unwrap();
1✔
1159

1160
        // manually expire entry
1161
        {
1162
            let mut backend = tile_cache.backend.write().await;
1✔
1163
            let cache = backend.raster_caches.iter_mut().next().unwrap();
1✔
1164

1✔
1165
            let tiles = &mut cache.1.entries.iter_mut().next().unwrap().1.elements;
1✔
1166
            match tiles {
1✔
1167
                CachedTiles::U8(tiles) => {
1✔
1168
                    let mut expired_tiles = (**tiles).clone();
1✔
1169
                    expired_tiles[0].cache_hint = CacheHint::with_created_and_expires(
1✔
1170
                        DateTime::new_utc(0, 1, 1, 0, 0, 0),
1✔
1171
                        DateTime::new_utc(0, 1, 1, 0, 0, 1).into(),
1✔
1172
                    );
1✔
1173
                    *tiles = Arc::new(expired_tiles);
1✔
1174
                }
1✔
1175
                _ => panic!("wrong tile type"),
×
1176
            }
1177
        }
1178

1179
        // access fails because ttl is expired
1180
        assert!(tile_cache
1✔
1181
            .query_cache::<u8>(&op(1), &query_rect())
1✔
1182
            .await
×
1183
            .unwrap()
1✔
1184
            .is_none());
1✔
1185
    }
1186

1187
    #[tokio::test]
1✔
1188
    async fn tile_cache_init_size() {
1✔
1189
        let tile_cache = SharedCache::new(100, 0.1).unwrap();
1✔
1190

1191
        let backend = tile_cache.backend.read().await;
1✔
1192

1193
        let cache_size = 90 * 1024 * 1024;
1✔
1194
        let landing_zone_size = 10 * 1024 * 1024;
1✔
1195

1✔
1196
        assert_eq!(backend.cache_size.total_byte_size(), cache_size);
1✔
1197
        assert_eq!(
1✔
1198
            backend.landing_zone_size.total_byte_size(),
1✔
1199
            landing_zone_size
1✔
1200
        );
1✔
1201
    }
1202
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc