• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 5620643772

21 Jul 2023 09:09AM UTC coverage: 89.043% (-0.2%) from 89.194%
5620643772

Pull #833

github

web-flow
Merge 1ee0a296a into 2852314aa
Pull Request #833: Shared-cache

1128 of 1128 new or added lines in 9 files covered. (100.0%)

102925 of 115590 relevant lines covered (89.04%)

62633.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.74
/operators/src/pro/cache/shared_cache.rs
1
use super::{
2
    cache_chunks::{CachedFeatures, LandingZoneQueryFeatures},
3
    cache_tiles::{CachedTiles, LandingZoneQueryTiles, TypedCacheTileStream},
4
    error::CacheError,
5
    util::CacheSize,
6
};
7
use crate::engine::CanonicOperatorName;
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::Stream;
11
use geoengine_datatypes::{
12
    identifier,
13
    primitives::{CacheHint, RasterQueryRectangle, VectorQueryRectangle},
14
    raster::{Pixel, RasterTile2D},
15
    util::{test::TestDefault, ByteSize, Identifier},
16
};
17
use log::debug;
18
use lru::LruCache;
19
use std::{collections::HashMap, hash::Hash};
20
use tokio::sync::RwLock;
21

22
/// The tile cache caches all tiles of a query and is able to answer queries that are fully contained in the cache.
23
/// New tiles are inserted into the cache on-the-fly as they are produced by query processors.
24
/// The tiles are first inserted into a landing zone, until the query in completely finished and only then moved to the cache.
25
/// Both the landing zone and the cache have a maximum size.
26
/// If the landing zone is full, the caching of the current query will be aborted.
27
/// If the cache is full, the least recently used entries will be evicted if necessary to make room for the new entry.
28
#[derive(Debug)]
×
29
pub struct CacheBackend {
30
    // TODO: more fine granular locking?
31
    // for each operator graph, we have a cache, that can efficiently be accessed
32
    raster_caches: HashMap<CanonicOperatorName, RasterOperatorCacheEntry>,
33
    vector_caches: HashMap<CanonicOperatorName, VectorOperatorCacheEntry>,
34

35
    cache_size: CacheSize,
36
    landing_zone_size: CacheSize,
37

38
    // we only use the LruCache for determining the least recently used elements and evict as many entries as needed to fit the new one
39
    lru: LruCache<CacheEntryId, TypedCanonicOperatorName>,
40
}
41

42
impl CacheBackend {
43
    /// This method removes entries from the cache until it can fit the given amount of bytes.
44
    pub fn evict_until_can_fit_bytes(&mut self, bytes: usize) {
5✔
45
        while !self.cache_size.can_fit_bytes(bytes) {
6✔
46
            if let Some((pop_id, pop_key)) = self.lru.pop_lru() {
1✔
47
                match pop_key {
1✔
48
                    TypedCanonicOperatorName::Raster(raster_pop_key) => {
1✔
49
                        let op_cache = self
1✔
50
                            .raster_caches
1✔
51
                            .get_mut(&raster_pop_key)
1✔
52
                            .expect("LRU entry must exist in the cache!");
1✔
53
                        let query_element = op_cache
1✔
54
                            .remove_cache_entry(&pop_id)
1✔
55
                            .expect("LRU entry must exist in the cache!");
1✔
56
                        self.cache_size.remove_element_bytes(&query_element);
1✔
57
                    }
1✔
58
                    TypedCanonicOperatorName::Vector(vector_pop_key) => {
×
59
                        let op_cache = self
×
60
                            .vector_caches
×
61
                            .get_mut(&vector_pop_key)
×
62
                            .expect("LRU entry must exist in the cache!");
×
63
                        let query_element = op_cache
×
64
                            .remove_cache_entry(&pop_id)
×
65
                            .expect("LRU entry must exist in the cache!");
×
66
                        self.cache_size.remove_element_bytes(&query_element);
×
67
                    }
×
68
                };
69
                self.cache_size.remove_element_bytes(&pop_id);
1✔
70

1✔
71
                debug!(
1✔
72
                    "Evicted query {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
×
73
                    pop_id,
×
74
                    self.cache_size.total_byte_size(),
×
75
                    self.cache_size.byte_size_used(),
×
76
                    self.cache_size.size_used_fraction()
×
77
                );
78
            }
×
79
        }
80
    }
5✔
81
}
82

83
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
×
84
pub enum TypedCanonicOperatorName {
85
    Raster(CanonicOperatorName),
86
    Vector(CanonicOperatorName),
87
}
88

89
impl TypedCanonicOperatorName {
90
    pub fn as_raster(&self) -> Option<&CanonicOperatorName> {
×
91
        match self {
×
92
            Self::Raster(name) => Some(name),
×
93
            Self::Vector(_) => None,
×
94
        }
95
    }
×
96

97
    pub fn as_vector(&self) -> Option<&CanonicOperatorName> {
×
98
        match self {
×
99
            Self::Raster(_) => None,
×
100
            Self::Vector(name) => Some(name),
×
101
        }
102
    }
×
103
}
104

105
pub trait CacheEvictUntilFit {
106
    fn evict_entries_until_can_fit_bytes(&mut self, bytes: usize);
107
}
108

109
impl CacheEvictUntilFit for CacheBackend {
110
    fn evict_entries_until_can_fit_bytes(&mut self, bytes: usize) {
5✔
111
        self.evict_until_can_fit_bytes(bytes);
5✔
112
    }
5✔
113
}
114

115
pub trait CacheView<C, L>: CacheEvictUntilFit {
116
    fn operator_caches_mut(
117
        &mut self,
118
    ) -> &mut HashMap<CanonicOperatorName, OperatorCacheEntry<C, L>>;
119

120
    fn create_operator_cache_if_needed(&mut self, key: CanonicOperatorName) {
7✔
121
        self.operator_caches_mut()
7✔
122
            .entry(key)
7✔
123
            .or_insert_with(|| OperatorCacheEntry::new());
7✔
124
    }
7✔
125

126
    fn remove_operator_cache(
1✔
127
        &mut self,
1✔
128
        key: &CanonicOperatorName,
1✔
129
    ) -> Option<OperatorCacheEntry<C, L>> {
1✔
130
        // TODO: maybe remove the size of the OperatorCacheEntry to the cache size?
1✔
131
        self.operator_caches_mut().remove(key)
1✔
132
    }
1✔
133
}
134

135
#[allow(clippy::type_complexity)]
136
pub struct OperatorCacheEntryView<'a, C: CacheElement> {
137
    operator_cache: &'a mut OperatorCacheEntry<
138
        CacheQueryEntry<C::Query, C::CacheContainer>,
139
        CacheQueryEntry<C::Query, C::LandingZoneContainer>,
140
    >,
141
    cache_size: &'a mut CacheSize,
142
    landing_zone_size: &'a mut CacheSize,
143
    lru: &'a mut LruCache<CacheEntryId, TypedCanonicOperatorName>,
144
}
145

146
impl<'a, C> OperatorCacheEntryView<'a, C>
147
where
148
    C: CacheElement + ByteSize,
149
    C::Query: Clone + CacheQueryMatch,
150
    CacheQueryEntry<C::Query, C::LandingZoneContainer>: ByteSize,
151
    CacheQueryEntry<C::Query, C::CacheContainer>: ByteSize,
152
{
153
    fn is_empty(&self) -> bool {
1✔
154
        self.operator_cache.is_empty()
1✔
155
    }
1✔
156

157
    /// This method removes a query from the landing zone.
158
    ///
159
    /// If the query is not in the landing zone, this method returns None.
160
    ///
161
    fn remove_query_from_landing_zone(
162
        &mut self,
163
        query_id: &QueryId,
164
    ) -> Option<CacheQueryEntry<C::Query, C::LandingZoneContainer>> {
165
        if let Some(entry) = self.operator_cache.remove_landing_zone_entry(query_id) {
6✔
166
            self.landing_zone_size.remove_element_bytes(query_id);
6✔
167
            self.landing_zone_size.remove_element_bytes(&entry);
6✔
168

6✔
169
            // debug output
6✔
170
            log::debug!(
6✔
171
                "Removed query {}. Landing zone size: {}. Landing zone size used: {}, Landing zone used percentage: {}.",
×
172
                query_id, self.landing_zone_size.total_byte_size(), self.landing_zone_size.byte_size_used(), self.landing_zone_size.size_used_fraction()
×
173
            );
174

175
            Some(entry)
6✔
176
        } else {
177
            None
×
178
        }
179
    }
6✔
180

181
    /// This method removes a query from the cache and the LRU.
182
    /// It will remove a queries cache entry from the cache and the LRU.
183
    ///
184
    /// If the query is not in the cache, this method returns None.
185
    ///
186
    fn remove_query_from_cache_and_lru(
1✔
187
        &mut self,
1✔
188
        cache_entry_id: &CacheEntryId,
1✔
189
    ) -> Option<CacheQueryEntry<C::Query, C::CacheContainer>> {
1✔
190
        if let Some(entry) = self.operator_cache.remove_cache_entry(cache_entry_id) {
1✔
191
            let old_lru_entry = self.lru.pop_entry(cache_entry_id);
1✔
192
            debug_assert!(old_lru_entry.is_some(), "CacheEntryId not found in LRU");
1✔
193
            self.cache_size.remove_element_bytes(cache_entry_id);
1✔
194
            self.cache_size.remove_element_bytes(&entry);
1✔
195

1✔
196
            log::debug!(
1✔
197
                "Removed cache entry {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
×
198
                cache_entry_id, self.cache_size.total_byte_size(), self.cache_size.byte_size_used(), self.cache_size.size_used_fraction()
×
199
            );
200

201
            Some(entry)
1✔
202
        } else {
203
            None
×
204
        }
205
    }
1✔
206

207
    /// This method removes a list of queries from the cache and the LRU.
208
    fn discard_querys_from_cache_and_lru(&mut self, cache_entry_ids: &[CacheEntryId]) {
7✔
209
        for cache_entry_id in cache_entry_ids {
8✔
210
            let old_entry = self.remove_query_from_cache_and_lru(cache_entry_id);
1✔
211
            debug_assert!(
212
                old_entry.is_some(),
1✔
213
                "CacheEntryId not found in OperatorCacheEntry"
×
214
            );
215
        }
216
    }
7✔
217

218
    /// This method adds a query element to the landing zone.
219
    /// It will add the element to the landing zone entry of the query.
220
    ///
221
    /// # Errors
222
    ///
223
    /// This method returns an error if the query is not in the landing zone.
224
    /// This method returns an error if the element is already expired.
225
    /// This method returns an error if the landing zone is full or the new element would cause the landing zone to overflow.
226
    ///
227
    fn add_query_element_to_landing_zone(
6✔
228
        &mut self,
6✔
229
        query_id: &QueryId,
6✔
230
        landing_zone_element: C,
6✔
231
    ) -> Result<(), CacheError> {
6✔
232
        let landing_zone_entry = self
6✔
233
            .operator_cache
6✔
234
            .landing_zone_entry_mut(query_id)
6✔
235
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
6✔
236

237
        if landing_zone_element.cache_hint().is_expired() {
6✔
238
            return Err(CacheError::TileExpiredBeforeInsertion);
1✔
239
        };
5✔
240

5✔
241
        let element_bytes_size = landing_zone_element.byte_size();
5✔
242

5✔
243
        if !self.landing_zone_size.can_fit_bytes(element_bytes_size) {
5✔
244
            return Err(CacheError::NotEnoughSpaceInLandingZone);
×
245
        }
5✔
246

5✔
247
        landing_zone_entry.insert_element(landing_zone_element)?;
5✔
248

249
        // we add the bytes size of the element to the landing zone size after we have inserted it.
250
        self.landing_zone_size
5✔
251
            .try_add_bytes(element_bytes_size)
5✔
252
            .expect(
5✔
253
            "The Landing Zone must have enough space for the element since we checked it before",
5✔
254
        );
5✔
255

5✔
256
        log::trace!(
5✔
257
            "Inserted tile for query {} into landing zone. Landing zone size: {}. Landing zone size used: {}. Landing zone used percentage: {}",
×
258
            query_id, self.landing_zone_size.total_byte_size(), self.landing_zone_size.byte_size_used(), self.landing_zone_size.size_used_fraction()
×
259
        );
260

261
        Ok(())
5✔
262
    }
6✔
263

264
    /// This method inserts a query into the landing zone.
265
    /// It will cause the operator cache to create a new landing zone entry.
266
    /// Therefore, the size of the query and the size of the landing zone entry will be added to the landing zone size.
267
    ///
268
    /// # Errors
269
    ///
270
    /// This method returns an error if the query is already in the landing zone.
271
    /// This method returns an error if the landing zone is full or the new query would cause the landing zone to overflow.
272
    ///
273
    fn insert_query_into_landing_zone(&mut self, query: &C::Query) -> Result<QueryId, CacheError> {
7✔
274
        let landing_zone_entry = CacheQueryEntry::create_empty::<C>(query.clone());
7✔
275
        let query_id = QueryId::new();
7✔
276

7✔
277
        self.landing_zone_size.try_add_element_bytes(&query_id)?;
7✔
278
        self.landing_zone_size
7✔
279
            .try_add_element_bytes(&landing_zone_entry)?;
7✔
280

281
        self.operator_cache
7✔
282
            .insert_landing_zone_entry(query_id, landing_zone_entry)?;
7✔
283

284
        // debug output
285
        log::trace!(
7✔
286
            "Added query {} to landing zone. Landing zone size: {}. Landing zone size used: {}, Landing zone used percentage: {}.",
×
287
            query_id, self.landing_zone_size.total_byte_size(), self.landing_zone_size.byte_size_used(), self.landing_zone_size.size_used_fraction()
×
288
        );
289

290
        Ok(query_id)
7✔
291
    }
7✔
292

293
    /// This method inserts a cache entry into the cache and the LRU.
294
    /// It allows the element cache to overflow the cache size.
295
    /// This is done because the total cache size is the cache size + the landing zone size.
296
    /// This method is used when moving an element from the landing zone to the cache.
297
    ///
298
    /// # Errors
299
    ///
300
    /// This method returns an error if the cache entry is already in the cache.
301
    ///
302
    fn insert_cache_entry_allow_overflow(
5✔
303
        &mut self,
5✔
304
        cache_entry: CacheQueryEntry<C::Query, C::CacheContainer>,
5✔
305
        key: &CanonicOperatorName,
5✔
306
    ) -> Result<CacheEntryId, CacheError> {
5✔
307
        let cache_entry_id = CacheEntryId::new();
5✔
308
        let bytes = cache_entry.byte_size() + cache_entry_id.byte_size();
5✔
309
        // When inserting data from the landing zone into the cache, we allow the cache to overflow.
5✔
310
        // This is done because the total cache size is the cache size + the landing zone size.
5✔
311
        self.cache_size.add_bytes_allow_overflow(bytes);
5✔
312
        self.operator_cache
5✔
313
            .insert_cache_entry(cache_entry_id, cache_entry)?;
5✔
314
        // we have to wrap the key in a TypedCanonicOperatorName to be able to insert it into the LRU
315
        self.lru.push(
5✔
316
            cache_entry_id,
5✔
317
            C::typed_canonical_operator_name(key.clone()),
5✔
318
        );
5✔
319

5✔
320
        // debug output
5✔
321
        log::trace!(
5✔
322
            "Added cache entry {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
×
323
            cache_entry_id,
×
324
            self.cache_size.total_byte_size(),
×
325
            self.cache_size.byte_size_used(),
×
326
            self.cache_size.size_used_fraction()
×
327
        );
328

329
        Ok(cache_entry_id)
5✔
330
    }
5✔
331

332
    /*
333
    fn insert_cache_entry(
334
        &mut self,
335
        cache_entry: CacheQueryEntry<C::Query, C::CacheContainer>,
336
        key: CanonicOperatorName,
337
    ) -> Result<CacheEntryId, CacheError> {
338
        let cache_entry_id = CacheEntryId::new();
339
        self.cache_size.try_add_element_bytes(&cache_entry)?;
340
        self.cache_size.try_add_element_bytes(&cache_entry_id)?;
341
        self.operator_cache
342
            .insert_cache_entry(cache_entry_id, cache_entry)?;
343
        self.lru
344
            .push(cache_entry_id, C::typed_canonical_operator_name(key));
345

346
        Ok(cache_entry_id)
347
    }
348
    */
349

350
    /// This method finds a cache entry in the cache that matches the query.
351
    /// It will also collect all expired cache entries.
352
    /// The cache entry is returned together with the expired ids.
353
    fn find_matching_cache_entry_and_collect_expired_entries(
7✔
354
        &mut self,
7✔
355
        query: &C::Query,
7✔
356
    ) -> CacheQueryResult<C::Query, C::CacheContainer> {
7✔
357
        let mut expired_cache_entry_ids = vec![];
7✔
358

7✔
359
        let x = self.operator_cache.iter().find(|&(id, entry)| {
7✔
360
            if entry.elements.is_expired() {
6✔
361
                expired_cache_entry_ids.push(*id);
1✔
362
                return false;
1✔
363
            }
5✔
364
            entry.query.is_match(query)
5✔
365
        });
7✔
366

7✔
367
        CacheQueryResult {
7✔
368
            cache_hit: x.map(|(id, entry)| (*id, entry)),
7✔
369
            expired_cache_entry_ids,
7✔
370
        }
7✔
371
    }
7✔
372
}
373

374
struct CacheQueryResult<'a, Query, CE> {
375
    cache_hit: Option<(CacheEntryId, &'a CacheQueryEntry<Query, CE>)>,
376
    expired_cache_entry_ids: Vec<CacheEntryId>,
377
}
378

379
pub trait Cache<C: CacheElement>:
380
    CacheView<
381
    CacheQueryEntry<C::Query, C::CacheContainer>,
382
    CacheQueryEntry<C::Query, C::LandingZoneContainer>,
383
>
384
where
385
    C::Query: Clone + CacheQueryMatch,
386
    CacheQueryEntry<C::Query, C::LandingZoneContainer>: ByteSize,
387
    CacheQueryEntry<C::Query, C::CacheContainer>: ByteSize,
388
    CacheQueryEntry<C::Query, C::CacheContainer>:
389
        From<CacheQueryEntry<C::Query, C::LandingZoneContainer>>,
390
{
391
    /// This method returns a mutable reference to the cache entry of an operator.
392
    /// If there is no cache entry for the operator, this method returns None.
393
    fn operator_cache_view_mut(
394
        &mut self,
395
        key: &CanonicOperatorName,
396
    ) -> Option<OperatorCacheEntryView<C>>;
397

398
    /// This method queries the cache for a given query.
399
    /// If the query matches an entry in the cache, the cache entry is returned and it is promoted in the LRU.
400
    /// If the query does not match an entry in the cache, None is returned. Also if a cache entry is found but it is expired, None is returned.
401
    ///
402
    /// # Errors
403
    /// This method returns an error if the cache entry is not found.
404
    ///
405
    fn query_and_promote(
9✔
406
        &mut self,
9✔
407
        key: &CanonicOperatorName,
9✔
408
        query: &C::Query,
9✔
409
    ) -> Result<Option<C::ResultStream>, CacheError> {
9✔
410
        let mut cache = self
9✔
411
            .operator_cache_view_mut(key)
9✔
412
            .ok_or(CacheError::OperatorCacheEntryNotFound)?;
9✔
413

414
        let CacheQueryResult {
415
            cache_hit,
7✔
416
            expired_cache_entry_ids,
7✔
417
        } = cache.find_matching_cache_entry_and_collect_expired_entries(query);
7✔
418

419
        let res = if let Some((cache_entry_id, cache_entry)) = cache_hit {
7✔
420
            let stream = cache_entry.elements.result_stream(query);
5✔
421

5✔
422
            // promote the cache entry in the LRU
5✔
423
            cache.lru.promote(&cache_entry_id);
5✔
424
            Some(stream)
5✔
425
        } else {
426
            None
2✔
427
        };
428

429
        // discard expired cache entries
430
        cache.discard_querys_from_cache_and_lru(&expired_cache_entry_ids);
7✔
431

7✔
432
        Ok(res.flatten())
7✔
433
    }
9✔
434

435
    /// This method inserts a query into the cache.
436
    ///
437
    /// # Errors
438
    /// This method returns an error if the query is already in the cache.
439
    ///
440
    fn insert_query_into_landing_zone(
7✔
441
        &mut self,
7✔
442
        key: &CanonicOperatorName,
7✔
443
        query: &C::Query,
7✔
444
    ) -> Result<QueryId, CacheError> {
7✔
445
        self.create_operator_cache_if_needed(key.clone());
7✔
446
        self.operator_cache_view_mut(key)
7✔
447
            .expect("OperatorCache was Just created ")
7✔
448
            .insert_query_into_landing_zone(query)
7✔
449
    }
7✔
450

451
    fn insert_query_element_into_landing_zone(
13✔
452
        &mut self,
13✔
453
        key: &CanonicOperatorName,
13✔
454
        query_id: &QueryId,
13✔
455
        landing_zone_element: C,
13✔
456
    ) -> Result<(), CacheError> {
13✔
457
        let mut cache = self
13✔
458
            .operator_cache_view_mut(key)
13✔
459
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
13✔
460
        let res = cache.add_query_element_to_landing_zone(query_id, landing_zone_element);
6✔
461

6✔
462
        // if we cant add the element to the landing zone, we remove the query from the landing zone
6✔
463
        if res.is_err() {
6✔
464
            let _old_entry = cache.remove_query_from_landing_zone(query_id);
1✔
465

1✔
466
            // if the operator cache is empty, we remove it from the cache
1✔
467
            if cache.is_empty() {
1✔
468
                self.remove_operator_cache(key);
1✔
469
            }
1✔
470
        }
5✔
471

472
        res
6✔
473
    }
13✔
474

475
    /// This method discards a query from the landing zone.
476
    /// If the query is not in the landing zone, this method does nothing.
477
    fn discard_query_from_landing_zone(&mut self, key: &CanonicOperatorName, query_id: &QueryId) {
478
        if let Some(mut cache) = self.operator_cache_view_mut(key) {
×
479
            cache.remove_query_from_landing_zone(query_id);
×
480
            if cache.is_empty() {
×
481
                self.remove_operator_cache(key);
×
482
            }
×
483
        }
×
484
    }
×
485

486
    /// This method discards a query from the cache and the LRU.
487
    /// If the query is not in the cache, this method does nothing.
488
    fn discard_querys_from_cache_and_lru(
489
        &mut self,
490
        key: &CanonicOperatorName,
491
        cache_entry_ids: &[CacheEntryId],
492
    ) {
493
        if let Some(mut cache) = self.operator_cache_view_mut(key) {
×
494
            cache.discard_querys_from_cache_and_lru(cache_entry_ids);
×
495
            if cache.is_empty() {
×
496
                self.remove_operator_cache(key);
×
497
            }
×
498
        }
×
499
    }
×
500

501
    /// This method moves a query from the landing zone to the cache.
502
    /// It will remove the query from the landing zone and insert it into the cache.
503
    /// If the cache is full, the least recently used entries will be evicted if necessary to make room for the new entry.
504
    /// This method returns the cache entry id of the inserted cache entry.
505
    ///
506
    /// # Errors
507
    /// This method returns an error if the query is not in the landing zone.
508
    /// This method returns an error if the cache entry is already in the cache.
509
    /// This method returns an error if the cache is full and the least recently used entries cannot be evicted to make room for the new entry.
510
    ///
511
    fn move_query_from_landing_to_cache(
6✔
512
        &mut self,
6✔
513
        key: &CanonicOperatorName,
6✔
514
        query_id: &QueryId,
6✔
515
    ) -> Result<CacheEntryId, CacheError> {
6✔
516
        let mut operator_cache = self
6✔
517
            .operator_cache_view_mut(key)
6✔
518
            .ok_or(CacheError::OperatorCacheEntryNotFound)?;
6✔
519
        let landing_zone_entry = operator_cache
5✔
520
            .remove_query_from_landing_zone(query_id)
5✔
521
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
5✔
522
        let cache_entry: CacheQueryEntry<
5✔
523
            <C as CacheElement>::Query,
5✔
524
            <C as CacheElement>::CacheContainer,
5✔
525
        > = landing_zone_entry.into();
5✔
526
        // when moving an element from the landing zone to the cache, we allow the cache size to overflow.
527
        // This is done because the total cache size is the cache size + the landing zone size.
528
        let cache_entry_id = operator_cache.insert_cache_entry_allow_overflow(cache_entry, key)?;
5✔
529
        // We could also first try to evict until the cache can hold the entry.
530
        // However, then we would need to lookup the cache entry twice.
531
        // To avoid that, we just evict after we moved the entry from the landing zone to the cache.
532
        // This is also not a problem since the total cache size is the cache size + the landing zone size.
533
        self.evict_entries_until_can_fit_bytes(0);
5✔
534

5✔
535
        Ok(cache_entry_id)
5✔
536
    }
6✔
537
}
538

539
impl<T> Cache<RasterTile2D<T>> for CacheBackend
540
where
541
    T: Pixel + CachableSubType<CacheElementType = RasterTile2D<T>>,
542
{
543
    fn operator_cache_view_mut(
35✔
544
        &mut self,
35✔
545
        key: &CanonicOperatorName,
35✔
546
    ) -> Option<OperatorCacheEntryView<RasterTile2D<T>>> {
35✔
547
        self.raster_caches
35✔
548
            .get_mut(key)
35✔
549
            .map(|op| OperatorCacheEntryView {
35✔
550
                operator_cache: op,
25✔
551
                cache_size: &mut self.cache_size,
25✔
552
                landing_zone_size: &mut self.landing_zone_size,
25✔
553
                lru: &mut self.lru,
25✔
554
            })
35✔
555
    }
35✔
556
}
557

558
impl CacheView<RasterCacheQueryEntry, RasterLandingQueryEntry> for CacheBackend {
559
    fn operator_caches_mut(
8✔
560
        &mut self,
8✔
561
    ) -> &mut HashMap<CanonicOperatorName, RasterOperatorCacheEntry> {
8✔
562
        &mut self.raster_caches
8✔
563
    }
8✔
564
}
565

566
impl CacheView<VectorCacheQueryEntry, VectorLandingQueryEntry> for CacheBackend {
567
    fn operator_caches_mut(
×
568
        &mut self,
×
569
    ) -> &mut HashMap<CanonicOperatorName, VectorOperatorCacheEntry> {
×
570
        &mut self.vector_caches
×
571
    }
×
572
}
573

574
pub trait CacheElement: ByteSize + Send + ByteSize
575
where
576
    Self: Sized,
577
{
578
    type Query: CacheQueryMatch + Clone + Send + Sync;
579
    type LandingZoneContainer: LandingZoneElementsContainer<Self>;
580
    type CacheContainer: CacheElementsContainer<Self::Query, Self, ResultStream = Self::ResultStream>
581
        + From<Self::LandingZoneContainer>;
582
    type ResultStream;
583

584
    fn move_into_landing_zone(
×
585
        self,
×
586
        landing_zone: &mut Self::LandingZoneContainer,
×
587
    ) -> Result<(), CacheError> {
×
588
        landing_zone.insert_element(self)
×
589
    }
×
590

591
    fn cache_hint(&self) -> CacheHint;
592

593
    fn typed_canonical_operator_name(key: CanonicOperatorName) -> TypedCanonicOperatorName;
594
}
595

596
pub trait CachableSubType {
597
    type CacheElementType: CacheElement;
598

599
    fn insert_element_into_landing_zone(
600
        landing_zone: &mut <Self::CacheElementType as CacheElement>::LandingZoneContainer,
601
        element: Self::CacheElementType,
602
    ) -> Result<(), super::error::CacheError>;
603

604
    fn create_empty_landing_zone() -> <Self::CacheElementType as CacheElement>::LandingZoneContainer;
605

606
    fn result_stream(
607
        cache_elements_container: &<Self::CacheElementType as CacheElement>::CacheContainer,
608
        query: &<Self::CacheElementType as CacheElement>::Query,
609
    ) -> Option<<Self::CacheElementType as CacheElement>::ResultStream>;
610
}
611

612
#[derive(Debug)]
×
613
pub struct SharedCache {
614
    backend: RwLock<CacheBackend>,
615
}
616

617
impl SharedCache {
618
    pub fn new(cache_size_in_mb: usize, landing_zone_ratio: f64) -> Result<Self> {
7✔
619
        if landing_zone_ratio <= 0.0 {
7✔
620
            return Err(crate::error::Error::QueryingProcessorFailed {
×
621
                source: Box::new(CacheError::LandingZoneRatioMustBeLargerThanZero),
×
622
            });
×
623
        }
7✔
624

7✔
625
        if landing_zone_ratio >= 1.0 {
7✔
626
            return Err(crate::error::Error::QueryingProcessorFailed {
×
627
                source: Box::new(CacheError::LandingZoneRatioMustBeSmallerThanOne),
×
628
            });
×
629
        }
7✔
630

7✔
631
        // TODO: landing zone < 50% of cache size
7✔
632

7✔
633
        let cache_size_bytes =
7✔
634
            (cache_size_in_mb as f64 * (1.0 - landing_zone_ratio) * 1024.0 * 1024.0) as usize;
7✔
635

7✔
636
        let landing_zone_size_bytes =
7✔
637
            (cache_size_in_mb as f64 * landing_zone_ratio * 1024.0 * 1024.0) as usize;
7✔
638

7✔
639
        Ok(Self {
7✔
640
            backend: RwLock::new(CacheBackend {
7✔
641
                vector_caches: Default::default(),
7✔
642
                raster_caches: Default::default(),
7✔
643
                lru: LruCache::unbounded(), // we need no cap because we evict manually
7✔
644
                cache_size: CacheSize::new(cache_size_bytes),
7✔
645
                landing_zone_size: CacheSize::new(landing_zone_size_bytes),
7✔
646
            }),
7✔
647
        })
7✔
648
    }
7✔
649
}
650

651
impl TestDefault for SharedCache {
652
    fn test_default() -> Self {
122✔
653
        Self {
122✔
654
            backend: RwLock::new(CacheBackend {
122✔
655
                vector_caches: Default::default(),
122✔
656
                raster_caches: Default::default(),
122✔
657
                lru: LruCache::unbounded(), // we need no cap because we evict manually
122✔
658
                cache_size: CacheSize::new(usize::MAX),
122✔
659
                landing_zone_size: CacheSize::new(usize::MAX),
122✔
660
            }),
122✔
661
        }
122✔
662
    }
122✔
663
}
664

665
/// Holds all the cached results for an operator graph (workflow)
666
#[derive(Debug, Default)]
×
667
pub struct OperatorCacheEntry<C, L> {
668
    // for a given operator and query we need to look through all entries to find one that matches
669
    // TODO: use a multi-dimensional index to speed up the lookup
670
    entries: HashMap<CacheEntryId, C>,
671

672
    // running queries insert their tiles as they are produced. The entry will be created once the query is done.
673
    // The query is identified by a Uuid instead of the query rectangle to avoid confusions with other queries
674
    landing_zone: HashMap<QueryId, L>,
675
}
676

677
impl<C, L> OperatorCacheEntry<C, L> {
678
    pub fn new() -> Self {
7✔
679
        Self {
7✔
680
            entries: Default::default(),
7✔
681
            landing_zone: Default::default(),
7✔
682
        }
7✔
683
    }
7✔
684

685
    fn insert_landing_zone_entry(
7✔
686
        &mut self,
7✔
687
        query_id: QueryId,
7✔
688
        landing_zone_entry: L,
7✔
689
    ) -> Result<(), CacheError> {
7✔
690
        let old_entry = self.landing_zone.insert(query_id, landing_zone_entry);
7✔
691

7✔
692
        if old_entry.is_some() {
7✔
693
            Err(CacheError::QueryIdAlreadyInLandingZone)
×
694
        } else {
695
            Ok(())
7✔
696
        }
697
    }
7✔
698

699
    fn remove_landing_zone_entry(&mut self, query_id: &QueryId) -> Option<L> {
6✔
700
        self.landing_zone.remove(query_id)
6✔
701
    }
6✔
702

703
    fn landing_zone_entry_mut(&mut self, query_id: &QueryId) -> Option<&mut L> {
6✔
704
        self.landing_zone.get_mut(query_id)
6✔
705
    }
6✔
706

707
    fn insert_cache_entry(
5✔
708
        &mut self,
5✔
709
        cache_entry_id: CacheEntryId,
5✔
710
        cache_entry: C,
5✔
711
    ) -> Result<(), CacheError> {
5✔
712
        let old_entry = self.entries.insert(cache_entry_id, cache_entry);
5✔
713

5✔
714
        if old_entry.is_some() {
5✔
715
            Err(CacheError::CacheEntryIdAlreadyInCache)
×
716
        } else {
717
            Ok(())
5✔
718
        }
719
    }
5✔
720

721
    fn remove_cache_entry(&mut self, cache_entry_id: &CacheEntryId) -> Option<C> {
2✔
722
        self.entries.remove(cache_entry_id)
2✔
723
    }
2✔
724

725
    fn is_empty(&self) -> bool {
1✔
726
        self.entries.is_empty() && self.landing_zone.is_empty()
1✔
727
    }
1✔
728

729
    fn iter(&self) -> impl Iterator<Item = (&CacheEntryId, &C)> {
7✔
730
        self.entries.iter()
7✔
731
    }
7✔
732
}
733

734
identifier!(QueryId);
×
735

736
impl ByteSize for QueryId {}
737

738
identifier!(CacheEntryId);
×
739

740
impl ByteSize for CacheEntryId {}
741

742
/// Holds all the tiles for a given query and is able to answer queries that are fully contained
743
#[derive(Debug, Hash)]
×
744
pub struct CacheQueryEntry<Query, Elements> {
745
    query: Query,
746
    elements: Elements,
747
}
748
type RasterOperatorCacheEntry = OperatorCacheEntry<RasterCacheQueryEntry, RasterLandingQueryEntry>;
749
type RasterCacheQueryEntry = CacheQueryEntry<RasterQueryRectangle, CachedTiles>;
750
type RasterLandingQueryEntry = CacheQueryEntry<RasterQueryRectangle, LandingZoneQueryTiles>;
751

752
type VectorOperatorCacheEntry = OperatorCacheEntry<VectorCacheQueryEntry, VectorLandingQueryEntry>;
753
type VectorCacheQueryEntry = CacheQueryEntry<VectorQueryRectangle, CachedFeatures>;
754
type VectorLandingQueryEntry = CacheQueryEntry<VectorQueryRectangle, LandingZoneQueryFeatures>;
755

756
impl<Query, Elements> CacheQueryEntry<Query, Elements> {
757
    pub fn create_empty<E>(query: Query) -> Self
7✔
758
    where
7✔
759
        Elements: LandingZoneElementsContainer<E>,
7✔
760
    {
7✔
761
        Self {
7✔
762
            query,
7✔
763
            elements: Elements::create_empty(),
7✔
764
        }
7✔
765
    }
7✔
766

767
    pub fn query(&self) -> &Query {
×
768
        &self.query
×
769
    }
×
770

771
    pub fn elements_mut(&mut self) -> &mut Elements {
×
772
        &mut self.elements
×
773
    }
×
774

775
    pub fn insert_element<E>(&mut self, element: E) -> Result<(), CacheError>
5✔
776
    where
5✔
777
        Elements: LandingZoneElementsContainer<E>,
5✔
778
    {
5✔
779
        self.elements.insert_element(element)
5✔
780
    }
5✔
781
}
782

783
impl<Query, Elements> ByteSize for CacheQueryEntry<Query, Elements>
784
where
785
    Elements: ByteSize,
786
{
787
    fn heap_byte_size(&self) -> usize {
22✔
788
        self.elements.heap_byte_size()
22✔
789
    }
22✔
790
}
791

792
pub trait CacheQueryMatch<RHS = Self> {
793
    fn is_match(&self, query: &RHS) -> bool;
794
}
795

796
impl CacheQueryMatch for RasterQueryRectangle {
797
    fn is_match(&self, query: &RasterQueryRectangle) -> bool {
5✔
798
        self.spatial_bounds.contains(&query.spatial_bounds)
5✔
799
            && self.time_interval.contains(&query.time_interval)
5✔
800
            && self.spatial_resolution == query.spatial_resolution
5✔
801
    }
5✔
802
}
803

804
impl CacheQueryMatch for VectorQueryRectangle {
805
    // TODO: check if that is what we need
806
    fn is_match(&self, query: &VectorQueryRectangle) -> bool {
×
807
        self.spatial_bounds.contains_bbox(&query.spatial_bounds)
×
808
            && self.time_interval.contains(&query.time_interval)
×
809
            && self.spatial_resolution == query.spatial_resolution
×
810
    }
×
811
}
812

813
pub trait LandingZoneElementsContainer<E> {
814
    fn insert_element(&mut self, element: E) -> Result<(), CacheError>;
815
    fn create_empty() -> Self;
816
}
817

818
pub trait CacheElementsContainerInfos<Query> {
819
    fn is_expired(&self) -> bool;
820
}
821

822
pub trait CacheElementsContainer<Query, E>: CacheElementsContainerInfos<Query> {
823
    type ResultStream: Stream<Item = Result<E>>;
824

825
    fn result_stream(&self, query: &Query) -> Option<Self::ResultStream>;
826
}
827

828
impl CacheQueryEntry<RasterQueryRectangle, CachedTiles> {
829
    /// Return true if the query can be answered in full by this cache entry
830
    /// For this, the bbox and time has to be fully contained, and the spatial resolution has to match
831
    pub fn matches(&self, query: &RasterQueryRectangle) -> bool {
×
832
        self.query.spatial_bounds.contains(&query.spatial_bounds)
×
833
            && self.query.time_interval.contains(&query.time_interval)
×
834
            && self.query.spatial_resolution == query.spatial_resolution
×
835
    }
×
836

837
    /// Produces a tile stream from the cache
838
    pub fn tile_stream(&self, query: &RasterQueryRectangle) -> TypedCacheTileStream {
×
839
        self.elements.tile_stream(query)
×
840
    }
×
841
}
842

843
impl From<RasterLandingQueryEntry> for RasterCacheQueryEntry {
844
    fn from(value: RasterLandingQueryEntry) -> Self {
6✔
845
        Self {
6✔
846
            query: value.query,
6✔
847
            elements: value.elements.into(),
6✔
848
        }
6✔
849
    }
6✔
850
}
851

852
impl From<VectorLandingQueryEntry> for VectorCacheQueryEntry {
853
    fn from(value: VectorLandingQueryEntry) -> Self {
×
854
        Self {
×
855
            query: value.query,
×
856
            elements: value.elements.into(),
×
857
        }
×
858
    }
×
859
}
860

861
#[async_trait]
862
pub trait AsyncCache<C: CacheElement> {
863
    async fn query_cache<S: CachableSubType<CacheElementType = C>>(
864
        &self,
865
        key: &CanonicOperatorName,
866
        query: &C::Query,
867
    ) -> Result<Option<C::ResultStream>, CacheError>;
868

869
    async fn insert_query<S: CachableSubType<CacheElementType = C>>(
870
        &self,
871
        key: &CanonicOperatorName,
872
        query: &C::Query,
873
    ) -> Result<QueryId, CacheError>;
874

875
    async fn insert_query_element<S: CachableSubType<CacheElementType = C>>(
876
        &self,
877
        key: &CanonicOperatorName,
878
        query_id: &QueryId,
879
        landing_zone_element: S::CacheElementType,
880
    ) -> Result<(), CacheError>;
881

882
    async fn abort_query<S: CachableSubType<CacheElementType = C>>(
883
        &self,
884
        key: &CanonicOperatorName,
885
        query_id: &QueryId,
886
    );
887

888
    async fn finish_query<S: CachableSubType<CacheElementType = C>>(
889
        &self,
890
        key: &CanonicOperatorName,
891
        query_id: &QueryId,
892
    ) -> Result<CacheEntryId, CacheError>;
893
}
894

895
#[async_trait]
896
impl<C> AsyncCache<C> for SharedCache
897
where
898
    C: CacheElement + ByteSize + Send + 'static,
899
    C::Query: Send + Sync,
900
    CacheBackend: Cache<C>,
901
    CacheQueryEntry<C::Query, C::LandingZoneContainer>: ByteSize,
902
    CacheQueryEntry<C::Query, C::CacheContainer>: ByteSize,
903
    CacheQueryEntry<C::Query, C::CacheContainer>:
904
        From<CacheQueryEntry<C::Query, C::LandingZoneContainer>>,
905
{
906
    /// Query the cache and on hit create a stream of cache elements
907
    async fn query_cache<S: CachableSubType<CacheElementType = C>>(
9✔
908
        &self,
9✔
909
        key: &CanonicOperatorName,
9✔
910
        query: &C::Query,
9✔
911
    ) -> Result<Option<C::ResultStream>, CacheError> {
9✔
912
        let mut backend = self.backend.write().await;
9✔
913
        backend.query_and_promote(key, query)
9✔
914
    }
18✔
915

916
    /// When inserting a new query, we first register the query and then insert the elements as they are produced
917
    /// This is to avoid confusing different queries on the same operator and query rectangle
918
    async fn insert_query<S: CachableSubType<CacheElementType = C>>(
7✔
919
        &self,
7✔
920
        key: &CanonicOperatorName,
7✔
921
        query: &C::Query,
7✔
922
    ) -> Result<QueryId, CacheError> {
7✔
923
        let mut backend = self.backend.write().await;
7✔
924
        backend.insert_query_into_landing_zone(key, query)
7✔
925
    }
14✔
926

927
    /// Insert a cachable element for a given query. The query has to be inserted first.
928
    /// The element is inserted into the landing zone and only moved to the cache when the query is finished.
929
    /// If the landing zone is full or the element size would cause the landing zone size to overflow, the caching of the query is aborted.
930
    async fn insert_query_element<S: CachableSubType<CacheElementType = C>>(
13✔
931
        &self,
13✔
932
        key: &CanonicOperatorName,
13✔
933
        query_id: &QueryId,
13✔
934
        landing_zone_element: C,
13✔
935
    ) -> Result<(), CacheError> {
13✔
936
        let mut backend = self.backend.write().await;
13✔
937
        backend.insert_query_element_into_landing_zone(key, query_id, landing_zone_element)
13✔
938
    }
26✔
939

940
    /// Abort the query and remove already inserted elements from the caches landing zone
941
    async fn abort_query<S: CachableSubType<CacheElementType = C>>(
×
942
        &self,
×
943
        key: &CanonicOperatorName,
×
944
        query_id: &QueryId,
×
945
    ) {
×
946
        let mut backend = self.backend.write().await;
×
947
        backend.discard_query_from_landing_zone(key, query_id);
×
948
    }
×
949

950
    /// Finish the query and make the inserted elements available in the cache
951
    async fn finish_query<S: CachableSubType<CacheElementType = C>>(
6✔
952
        &self,
6✔
953
        key: &CanonicOperatorName,
6✔
954
        query_id: &QueryId,
6✔
955
    ) -> Result<CacheEntryId, CacheError> {
6✔
956
        let mut backend = self.backend.write().await;
6✔
957
        backend.move_query_from_landing_to_cache(key, query_id)
6✔
958
    }
12✔
959
}
960

961
#[cfg(test)]
962
mod tests {
963
    use std::sync::Arc;
964

965
    use geoengine_datatypes::{
966
        primitives::{CacheHint, DateTime, SpatialPartition2D, SpatialResolution, TimeInterval},
967
        raster::{Grid, RasterProperties},
968
    };
969
    use serde_json::json;
970

971
    use super::*;
972

973
    async fn process_query(tile_cache: &mut SharedCache, op_name: CanonicOperatorName) {
5✔
974
        let query_id = tile_cache
5✔
975
            .insert_query::<u8>(&op_name, &query_rect())
5✔
976
            .await
×
977
            .unwrap();
5✔
978

5✔
979
        tile_cache
5✔
980
            .insert_query_element::<u8>(&op_name, &query_id, create_tile())
5✔
981
            .await
×
982
            .unwrap();
5✔
983

5✔
984
        tile_cache
5✔
985
            .finish_query::<u8>(&op_name, &query_id)
5✔
986
            .await
×
987
            .unwrap();
5✔
988
    }
5✔
989

990
    fn create_tile() -> RasterTile2D<u8> {
10✔
991
        RasterTile2D::<u8> {
10✔
992
            time: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0)).unwrap(),
10✔
993
            tile_position: [-1, 0].into(),
10✔
994
            global_geo_transform: TestDefault::test_default(),
10✔
995
            grid_array: Grid::new([3, 2].into(), vec![1, 2, 3, 4, 5, 6])
10✔
996
                .unwrap()
10✔
997
                .into(),
10✔
998
            properties: RasterProperties::default(),
10✔
999
            cache_hint: CacheHint::max_duration(),
10✔
1000
        }
10✔
1001
    }
10✔
1002

1003
    fn query_rect() -> RasterQueryRectangle {
13✔
1004
        RasterQueryRectangle {
13✔
1005
            spatial_bounds: SpatialPartition2D::new_unchecked(
13✔
1006
                (-180., 90.).into(),
13✔
1007
                (180., -90.).into(),
13✔
1008
            ),
13✔
1009
            time_interval: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0))
13✔
1010
                .unwrap(),
13✔
1011
            spatial_resolution: SpatialResolution::one(),
13✔
1012
        }
13✔
1013
    }
13✔
1014

1015
    fn op(idx: usize) -> CanonicOperatorName {
12✔
1016
        CanonicOperatorName::new_unchecked(&json!({
12✔
1017
            "type": "GdalSource",
12✔
1018
            "params": {
12✔
1019
                "data": idx
12✔
1020
            }
12✔
1021
        }))
12✔
1022
    }
12✔
1023

1024
    #[tokio::test]
1✔
1025
    async fn it_evicts_lru() {
1✔
1026
        // Create cache entry and landing zone entry to geht the size of both
1✔
1027
        let landing_zone_entry = RasterLandingQueryEntry {
1✔
1028
            query: query_rect(),
1✔
1029
            elements: LandingZoneQueryTiles::U8(vec![create_tile()]),
1✔
1030
        };
1✔
1031
        let query_id = QueryId::new();
1✔
1032
        let size_of_landing_zone_entry = landing_zone_entry.byte_size() + query_id.byte_size();
1✔
1033
        let cache_entry: RasterCacheQueryEntry = landing_zone_entry.into();
1✔
1034
        let cache_entry_id = CacheEntryId::new();
1✔
1035
        let size_of_cache_entry = cache_entry.byte_size() + cache_entry_id.byte_size();
1✔
1036

1✔
1037
        // Select the max of both sizes
1✔
1038
        // This is done because the landing zone should not be smaller then the cache
1✔
1039
        let m_size = size_of_cache_entry.max(size_of_landing_zone_entry);
1✔
1040

1✔
1041
        // set limits s.t. three tiles fit
1✔
1042
        let mut tile_cache = SharedCache {
1✔
1043
            backend: RwLock::new(CacheBackend {
1✔
1044
                raster_caches: Default::default(),
1✔
1045
                vector_caches: Default::default(),
1✔
1046
                lru: LruCache::unbounded(),
1✔
1047
                cache_size: CacheSize::new(m_size * 3),
1✔
1048
                landing_zone_size: CacheSize::new(m_size * 3),
1✔
1049
            }),
1✔
1050
        };
1✔
1051

1✔
1052
        // process three different queries
1✔
1053
        process_query(&mut tile_cache, op(1)).await;
1✔
1054
        process_query(&mut tile_cache, op(2)).await;
1✔
1055
        process_query(&mut tile_cache, op(3)).await;
1✔
1056

1057
        // query the first one s.t. it is the most recently used
1058
        tile_cache
1✔
1059
            .query_cache::<u8>(&op(1), &query_rect())
1✔
1060
            .await
×
1061
            .unwrap();
1✔
1062
        // process a fourth query
1✔
1063
        process_query(&mut tile_cache, op(4)).await;
1✔
1064

1065
        // assure the seconds query is evicted because it is the least recently used
1066
        assert!(tile_cache
1✔
1067
            .query_cache::<u8>(&op(2), &query_rect())
1✔
1068
            .await
×
1069
            .unwrap()
1✔
1070
            .is_none());
1✔
1071

1072
        // assure that the other queries are still in the cache
1073
        for i in [1, 3, 4] {
4✔
1074
            assert!(tile_cache
3✔
1075
                .query_cache::<u8>(&op(i), &query_rect())
3✔
1076
                .await
×
1077
                .unwrap()
3✔
1078
                .is_some());
3✔
1079
        }
1080

1081
        assert_eq!(
1✔
1082
            tile_cache.backend.read().await.cache_size.byte_size_used(),
1✔
1083
            3 * size_of_cache_entry
1✔
1084
        );
1085
    }
1086

1087
    #[test]
1✔
1088
    fn cache_byte_size() {
1✔
1089
        assert_eq!(create_tile().byte_size(), 284);
1✔
1090
        assert_eq!(
1✔
1091
            CachedTiles::U8(Arc::new(vec![create_tile()])).byte_size(),
1✔
1092
            /* enum + arc */ 16 + /* vec */ 24  + /* tile */ 284
1✔
1093
        );
1✔
1094
        assert_eq!(
1✔
1095
            CachedTiles::U8(Arc::new(vec![create_tile(), create_tile()])).byte_size(),
1✔
1096
            /* enum + arc */ 16 + /* vec */ 24  + /* tile */ 2 * 284
1✔
1097
        );
1✔
1098
    }
1✔
1099

1100
    #[tokio::test]
1✔
1101
    async fn it_checks_ttl() {
1✔
1102
        let mut tile_cache = SharedCache {
1✔
1103
            backend: RwLock::new(CacheBackend {
1✔
1104
                raster_caches: Default::default(),
1✔
1105
                vector_caches: Default::default(),
1✔
1106
                lru: LruCache::unbounded(),
1✔
1107
                cache_size: CacheSize::new(usize::MAX),
1✔
1108
                landing_zone_size: CacheSize::new(usize::MAX),
1✔
1109
            }),
1✔
1110
        };
1✔
1111

1✔
1112
        process_query(&mut tile_cache, op(1)).await;
1✔
1113

1114
        // access works because no ttl is set
1115
        tile_cache
1✔
1116
            .query_cache::<u8>(&op(1), &query_rect())
1✔
1117
            .await
×
1118
            .unwrap()
1✔
1119
            .unwrap();
1✔
1120

1121
        // manually expire entry
1122
        {
1123
            let mut backend = tile_cache.backend.write().await;
1✔
1124
            let cache = backend.raster_caches.iter_mut().next().unwrap();
1✔
1125

1✔
1126
            let tiles = &mut cache.1.entries.iter_mut().next().unwrap().1.elements;
1✔
1127
            match tiles {
1✔
1128
                CachedTiles::U8(tiles) => {
1✔
1129
                    let mut expired_tiles = (**tiles).clone();
1✔
1130
                    expired_tiles[0].cache_hint = CacheHint::with_created_and_expires(
1✔
1131
                        DateTime::new_utc(0, 1, 1, 0, 0, 0),
1✔
1132
                        DateTime::new_utc(0, 1, 1, 0, 0, 1).into(),
1✔
1133
                    );
1✔
1134
                    *tiles = Arc::new(expired_tiles);
1✔
1135
                }
1✔
1136
                _ => panic!("wrong tile type"),
×
1137
            }
1138
        }
1139

1140
        // access fails because ttl is expired
1141
        assert!(tile_cache
1✔
1142
            .query_cache::<u8>(&op(1), &query_rect())
1✔
1143
            .await
×
1144
            .unwrap()
1✔
1145
            .is_none());
1✔
1146
    }
1147

1148
    #[tokio::test]
1✔
1149
    async fn tile_cache_init_size() {
1✔
1150
        let tile_cache = SharedCache::new(100, 0.1).unwrap();
1✔
1151

1152
        let backend = tile_cache.backend.read().await;
1✔
1153

1154
        let cache_size = 90 * 1024 * 1024;
1✔
1155
        let landing_zone_size = 10 * 1024 * 1024;
1✔
1156

1✔
1157
        assert_eq!(backend.cache_size.total_byte_size(), cache_size);
1✔
1158
        assert_eq!(
1✔
1159
            backend.landing_zone_size.total_byte_size(),
1✔
1160
            landing_zone_size
1✔
1161
        );
1✔
1162
    }
1163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc