• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 6022770913

30 Aug 2023 08:59AM UTC coverage: 89.934% (+0.09%) from 89.84%
6022770913

push

github

web-flow
Merge pull request #864 from geo-engine/compressed-vector-cache

Compressed-vector-cache

569 of 569 new or added lines in 6 files covered. (100.0%)

106288 of 118185 relevant lines covered (89.93%)

61263.93 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.54
/operators/src/pro/cache/shared_cache.rs
1
use super::{
2
    cache_chunks::{CachedFeatures, CompressedFeatureCollection, LandingZoneQueryFeatures},
3
    cache_tiles::{CachedTiles, CompressedRasterTile2D, LandingZoneQueryTiles},
4
    error::CacheError,
5
    util::CacheSize,
6
};
7
use crate::engine::CanonicOperatorName;
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::Stream;
11
use geoengine_datatypes::{
12
    identifier,
13
    primitives::{CacheHint, Geometry, RasterQueryRectangle, VectorQueryRectangle},
14
    raster::Pixel,
15
    util::{arrow::ArrowTyped, test::TestDefault, ByteSize, Identifier},
16
};
17
use log::{debug, log_enabled};
18
use lru::LruCache;
19
use std::{collections::HashMap, hash::Hash, sync::Arc};
20
use tokio::sync::RwLock;
21

22
/// The tile cache caches all tiles of a query and is able to answer queries that are fully contained in the cache.
23
/// New tiles are inserted into the cache on-the-fly as they are produced by query processors.
24
/// The tiles are first inserted into a landing zone, until the query in completely finished and only then moved to the cache.
25
/// Both the landing zone and the cache have a maximum size.
26
/// If the landing zone is full, the caching of the current query will be aborted.
27
/// If the cache is full, the least recently used entries will be evicted if necessary to make room for the new entry.
28
#[derive(Debug)]
×
29
pub struct CacheBackend {
30
    // TODO: more fine granular locking?
31
    // for each operator graph, we have a cache, that can efficiently be accessed
32
    raster_caches: HashMap<CanonicOperatorName, RasterOperatorCacheEntry>,
33
    vector_caches: HashMap<CanonicOperatorName, VectorOperatorCacheEntry>,
34

35
    cache_size: CacheSize,
36
    landing_zone_size: CacheSize,
37

38
    // we only use the LruCache for determining the least recently used elements and evict as many entries as needed to fit the new one
39
    lru: LruCache<CacheEntryId, TypedCanonicOperatorName>,
40
}
41

42
impl CacheBackend {
43
    /// This method removes entries from the cache until it can fit the given amount of bytes.
44
    pub fn evict_until_can_fit_bytes(&mut self, bytes: usize) {
5✔
45
        while !self.cache_size.can_fit_bytes(bytes) {
6✔
46
            if let Some((pop_id, pop_key)) = self.lru.pop_lru() {
1✔
47
                match pop_key {
1✔
48
                    TypedCanonicOperatorName::Raster(raster_pop_key) => {
1✔
49
                        let op_cache = self
1✔
50
                            .raster_caches
1✔
51
                            .get_mut(&raster_pop_key)
1✔
52
                            .expect("LRU entry must exist in the cache!");
1✔
53
                        let query_element = op_cache
1✔
54
                            .remove_cache_entry(&pop_id)
1✔
55
                            .expect("LRU entry must exist in the cache!");
1✔
56
                        self.cache_size.remove_element_bytes(&query_element);
1✔
57
                    }
1✔
58
                    TypedCanonicOperatorName::Vector(vector_pop_key) => {
×
59
                        let op_cache = self
×
60
                            .vector_caches
×
61
                            .get_mut(&vector_pop_key)
×
62
                            .expect("LRU entry must exist in the cache!");
×
63
                        let query_element = op_cache
×
64
                            .remove_cache_entry(&pop_id)
×
65
                            .expect("LRU entry must exist in the cache!");
×
66
                        self.cache_size.remove_element_bytes(&query_element);
×
67
                    }
×
68
                };
69
                self.cache_size.remove_element_bytes(&pop_id);
1✔
70

1✔
71
                debug!(
1✔
72
                    "Evicted query {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
×
73
                    pop_id,
×
74
                    self.cache_size.total_byte_size(),
×
75
                    self.cache_size.byte_size_used(),
×
76
                    self.cache_size.size_used_fraction()
×
77
                );
78
            }
×
79
        }
80
    }
5✔
81
}
82

83
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
×
84
pub enum TypedCanonicOperatorName {
85
    Raster(CanonicOperatorName),
86
    Vector(CanonicOperatorName),
87
}
88

89
impl TypedCanonicOperatorName {
90
    pub fn as_raster(&self) -> Option<&CanonicOperatorName> {
×
91
        match self {
×
92
            Self::Raster(name) => Some(name),
×
93
            Self::Vector(_) => None,
×
94
        }
95
    }
×
96

97
    pub fn as_vector(&self) -> Option<&CanonicOperatorName> {
×
98
        match self {
×
99
            Self::Raster(_) => None,
×
100
            Self::Vector(name) => Some(name),
×
101
        }
102
    }
×
103
}
104

105
pub trait CacheEvictUntilFit {
106
    fn evict_entries_until_can_fit_bytes(&mut self, bytes: usize);
107
}
108

109
impl CacheEvictUntilFit for CacheBackend {
110
    fn evict_entries_until_can_fit_bytes(&mut self, bytes: usize) {
5✔
111
        self.evict_until_can_fit_bytes(bytes);
5✔
112
    }
5✔
113
}
114

115
pub trait CacheView<C, L>: CacheEvictUntilFit {
116
    fn operator_caches_mut(
117
        &mut self,
118
    ) -> &mut HashMap<CanonicOperatorName, OperatorCacheEntry<C, L>>;
119

120
    fn create_operator_cache_if_needed(&mut self, key: CanonicOperatorName) {
7✔
121
        // TODO: add size of the OperatorCacheEntry to the cache size?
7✔
122
        self.operator_caches_mut()
7✔
123
            .entry(key)
7✔
124
            .or_insert_with(|| OperatorCacheEntry::new());
7✔
125
    }
7✔
126

127
    fn remove_operator_cache(
1✔
128
        &mut self,
1✔
129
        key: &CanonicOperatorName,
1✔
130
    ) -> Option<OperatorCacheEntry<C, L>> {
1✔
131
        // TODO: remove the size of the OperatorCacheEntry to the cache size?
1✔
132
        self.operator_caches_mut().remove(key)
1✔
133
    }
1✔
134
}
135

136
#[allow(clippy::type_complexity)]
137
pub struct OperatorCacheEntryView<'a, C: CacheBackendElementExt> {
138
    operator_cache: &'a mut OperatorCacheEntry<
139
        CacheQueryEntry<C::Query, C::CacheContainer>,
140
        CacheQueryEntry<C::Query, C::LandingZoneContainer>,
141
    >,
142
    cache_size: &'a mut CacheSize,
143
    landing_zone_size: &'a mut CacheSize,
144
    lru: &'a mut LruCache<CacheEntryId, TypedCanonicOperatorName>,
145
}
146

147
impl<'a, C> OperatorCacheEntryView<'a, C>
148
where
149
    C: CacheBackendElementExt + ByteSize,
150
    C::Query: Clone + CacheQueryMatch,
151
    CacheQueryEntry<C::Query, C::LandingZoneContainer>: ByteSize,
152
    CacheQueryEntry<C::Query, C::CacheContainer>: ByteSize,
153
{
154
    fn is_empty(&self) -> bool {
1✔
155
        self.operator_cache.is_empty()
1✔
156
    }
1✔
157

158
    /// This method removes a query from the landing zone.
159
    ///
160
    /// If the query is not in the landing zone, this method returns None.
161
    ///
162
    fn remove_query_from_landing_zone(
163
        &mut self,
164
        query_id: &QueryId,
165
    ) -> Option<CacheQueryEntry<C::Query, C::LandingZoneContainer>> {
166
        if let Some(entry) = self.operator_cache.remove_landing_zone_entry(query_id) {
6✔
167
            self.landing_zone_size.remove_element_bytes(query_id);
6✔
168
            self.landing_zone_size.remove_element_bytes(&entry);
6✔
169

6✔
170
            // debug output
6✔
171
            log::debug!(
6✔
172
                "Removed query {}. Landing zone size: {}. Landing zone size used: {}, Landing zone used percentage: {}.",
×
173
                query_id, self.landing_zone_size.total_byte_size(), self.landing_zone_size.byte_size_used(), self.landing_zone_size.size_used_fraction()
×
174
            );
175

176
            Some(entry)
6✔
177
        } else {
178
            None
×
179
        }
180
    }
6✔
181

182
    /// This method removes a query from the cache and the LRU.
183
    /// It will remove a queries cache entry from the cache and the LRU.
184
    ///
185
    /// If the query is not in the cache, this method returns None.
186
    ///
187
    fn remove_query_from_cache_and_lru(
1✔
188
        &mut self,
1✔
189
        cache_entry_id: &CacheEntryId,
1✔
190
    ) -> Option<CacheQueryEntry<C::Query, C::CacheContainer>> {
1✔
191
        if let Some(entry) = self.operator_cache.remove_cache_entry(cache_entry_id) {
1✔
192
            let old_lru_entry = self.lru.pop_entry(cache_entry_id);
1✔
193
            debug_assert!(old_lru_entry.is_some(), "CacheEntryId not found in LRU");
1✔
194
            self.cache_size.remove_element_bytes(cache_entry_id);
1✔
195
            self.cache_size.remove_element_bytes(&entry);
1✔
196

1✔
197
            log::debug!(
1✔
198
                "Removed cache entry {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
×
199
                cache_entry_id, self.cache_size.total_byte_size(), self.cache_size.byte_size_used(), self.cache_size.size_used_fraction()
×
200
            );
201

202
            Some(entry)
1✔
203
        } else {
204
            None
×
205
        }
206
    }
1✔
207

208
    /// This method removes a list of queries from the cache and the LRU.
209
    fn discard_queries_from_cache_and_lru(&mut self, cache_entry_ids: &[CacheEntryId]) {
7✔
210
        for cache_entry_id in cache_entry_ids {
8✔
211
            let old_entry = self.remove_query_from_cache_and_lru(cache_entry_id);
1✔
212
            debug_assert!(
213
                old_entry.is_some(),
1✔
214
                "CacheEntryId not found in OperatorCacheEntry"
×
215
            );
216
        }
217
    }
7✔
218

219
    /// This method adds a query element to the landing zone.
220
    /// It will add the element to the landing zone entry of the query.
221
    ///
222
    /// # Errors
223
    ///
224
    /// This method returns an error if the query is not in the landing zone.
225
    /// This method returns an error if the element is already expired.
226
    /// This method returns an error if the landing zone is full or the new element would cause the landing zone to overflow.
227
    ///
228
    fn add_query_element_to_landing_zone(
6✔
229
        &mut self,
6✔
230
        query_id: &QueryId,
6✔
231
        landing_zone_element: C,
6✔
232
    ) -> Result<(), CacheError> {
6✔
233
        let landing_zone_entry = self
6✔
234
            .operator_cache
6✔
235
            .landing_zone_entry_mut(query_id)
6✔
236
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
6✔
237

238
        if landing_zone_element.cache_hint().is_expired() {
6✔
239
            log::trace!("Element is already expired");
1✔
240
            return Err(CacheError::TileExpiredBeforeInsertion);
1✔
241
        };
5✔
242

5✔
243
        let element_bytes_size = landing_zone_element.byte_size();
5✔
244

5✔
245
        if !self.landing_zone_size.can_fit_bytes(element_bytes_size) {
5✔
246
            return Err(CacheError::NotEnoughSpaceInLandingZone);
×
247
        }
5✔
248

5✔
249
        // new entries might update the query bounds stored for this entry
5✔
250
        landing_zone_element.update_stored_query(&mut landing_zone_entry.query)?;
5✔
251

252
        // actually insert the element into the landing zone
253
        landing_zone_entry.insert_element(landing_zone_element)?;
5✔
254

255
        // we add the bytes size of the element to the landing zone size after we have inserted it.
256
        self.landing_zone_size
5✔
257
            .try_add_bytes(element_bytes_size)
5✔
258
            .expect(
5✔
259
            "The Landing Zone must have enough space for the element since we checked it before",
5✔
260
        );
5✔
261

5✔
262
        log::trace!(
5✔
263
            "Inserted tile for query {} into landing zone. Landing zone size: {}. Landing zone size used: {}. Landing zone used percentage: {}",
×
264
            query_id, self.landing_zone_size.total_byte_size(), self.landing_zone_size.byte_size_used(), self.landing_zone_size.size_used_fraction()
×
265
        );
266

267
        Ok(())
5✔
268
    }
6✔
269

270
    /// This method inserts a query into the landing zone.
271
    /// It will cause the operator cache to create a new landing zone entry.
272
    /// Therefore, the size of the query and the size of the landing zone entry will be added to the landing zone size.
273
    ///
274
    /// # Errors
275
    ///
276
    /// This method returns an error if the query is already in the landing zone.
277
    /// This method returns an error if the landing zone is full or the new query would cause the landing zone to overflow.
278
    ///
279
    fn insert_query_into_landing_zone(&mut self, query: &C::Query) -> Result<QueryId, CacheError> {
7✔
280
        let landing_zone_entry = CacheQueryEntry::create_empty::<C>(query.clone());
7✔
281
        let query_id = QueryId::new();
7✔
282

7✔
283
        let query_id_bytes_size = query_id.byte_size();
7✔
284
        let landing_zone_entry_bytes_size = landing_zone_entry.byte_size();
7✔
285

7✔
286
        self.landing_zone_size.try_add_bytes(query_id_bytes_size)?;
7✔
287

288
        // if this fails, we have to remove the query id size again
289
        if let Err(e) = self
7✔
290
            .landing_zone_size
7✔
291
            .try_add_bytes(landing_zone_entry_bytes_size)
7✔
292
        {
293
            self.landing_zone_size.remove_bytes(query_id_bytes_size);
×
294
            return Err(e);
×
295
        }
7✔
296

297
        // if this fails, we have to remove the query id size and the landing zone entry size again
298
        if let Err(e) = self
7✔
299
            .operator_cache
7✔
300
            .insert_landing_zone_entry(query_id, landing_zone_entry)
7✔
301
        {
302
            self.landing_zone_size.remove_bytes(query_id_bytes_size);
×
303
            self.landing_zone_size
×
304
                .remove_bytes(landing_zone_entry_bytes_size);
×
305
            return Err(e);
×
306
        }
7✔
307

7✔
308
        // debug output
7✔
309
        log::trace!(
7✔
310
            "Added query {} to landing zone. Landing zone size: {}. Landing zone size used: {}, Landing zone used percentage: {}.",
×
311
            query_id, self.landing_zone_size.total_byte_size(), self.landing_zone_size.byte_size_used(), self.landing_zone_size.size_used_fraction()
×
312
        );
313

314
        Ok(query_id)
7✔
315
    }
7✔
316

317
    /// This method inserts a cache entry into the cache and the LRU.
318
    /// It allows the element cache to overflow the cache size.
319
    /// This is done because the total cache size is the cache size + the landing zone size.
320
    /// This method is used when moving an element from the landing zone to the cache.
321
    ///
322
    /// # Errors
323
    ///
324
    /// This method returns an error if the cache entry is already in the cache.
325
    ///
326
    fn insert_cache_entry_allow_overflow(
5✔
327
        &mut self,
5✔
328
        cache_entry: CacheQueryEntry<C::Query, C::CacheContainer>,
5✔
329
        key: &CanonicOperatorName,
5✔
330
    ) -> Result<CacheEntryId, CacheError> {
5✔
331
        let cache_entry_id = CacheEntryId::new();
5✔
332
        let bytes = cache_entry.byte_size() + cache_entry_id.byte_size();
5✔
333
        // When inserting data from the landing zone into the cache, we allow the cache to overflow.
5✔
334
        // This is done because the total cache size is the cache size + the landing zone size.
5✔
335
        self.cache_size.add_bytes_allow_overflow(bytes);
5✔
336
        self.operator_cache
5✔
337
            .insert_cache_entry(cache_entry_id, cache_entry)?;
5✔
338
        // we have to wrap the key in a TypedCanonicOperatorName to be able to insert it into the LRU
339
        self.lru.push(
5✔
340
            cache_entry_id,
5✔
341
            C::typed_canonical_operator_name(key.clone()),
5✔
342
        );
5✔
343

5✔
344
        // debug output
5✔
345
        log::trace!(
5✔
346
            "Added cache entry {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
×
347
            cache_entry_id,
×
348
            self.cache_size.total_byte_size(),
×
349
            self.cache_size.byte_size_used(),
×
350
            self.cache_size.size_used_fraction()
×
351
        );
352

353
        Ok(cache_entry_id)
5✔
354
    }
5✔
355

356
    /// This method finds a cache entry in the cache that matches the query.
357
    /// It will also collect all expired cache entries.
358
    /// The cache entry is returned together with the expired ids.
359
    fn find_matching_cache_entry_and_collect_expired_entries(
7✔
360
        &mut self,
7✔
361
        query: &C::Query,
7✔
362
    ) -> CacheQueryResult<C::Query, C::CacheContainer> {
7✔
363
        let mut expired_cache_entry_ids = vec![];
7✔
364

7✔
365
        let x = self.operator_cache.iter().find(|&(id, entry)| {
7✔
366
            if entry.elements.is_expired() {
6✔
367
                expired_cache_entry_ids.push(*id);
1✔
368
                return false;
1✔
369
            }
5✔
370
            entry.query.is_match(query)
5✔
371
        });
7✔
372

7✔
373
        CacheQueryResult {
7✔
374
            cache_hit: x.map(|(id, entry)| (*id, entry)),
7✔
375
            expired_cache_entry_ids,
7✔
376
        }
7✔
377
    }
7✔
378
}
379

380
struct CacheQueryResult<'a, Query, CE> {
381
    cache_hit: Option<(CacheEntryId, &'a CacheQueryEntry<Query, CE>)>,
382
    expired_cache_entry_ids: Vec<CacheEntryId>,
383
}
384

385
pub trait Cache<C: CacheBackendElementExt>:
386
    CacheView<
387
    CacheQueryEntry<C::Query, C::CacheContainer>,
388
    CacheQueryEntry<C::Query, C::LandingZoneContainer>,
389
>
390
where
391
    C::Query: Clone + CacheQueryMatch,
392
{
393
    /// This method returns a mutable reference to the cache entry of an operator.
394
    /// If there is no cache entry for the operator, this method returns None.
395
    fn operator_cache_view_mut(
396
        &mut self,
397
        key: &CanonicOperatorName,
398
    ) -> Option<OperatorCacheEntryView<C>>;
399

400
    /// This method queries the cache for a given query.
401
    /// If the query matches an entry in the cache, the cache entry is returned and it is promoted in the LRU.
402
    /// If the query does not match an entry in the cache, None is returned. Also if a cache entry is found but it is expired, None is returned.
403
    ///
404
    /// # Errors
405
    /// This method returns an error if the cache entry is not found.
406
    ///
407
    fn query_and_promote(
9✔
408
        &mut self,
9✔
409
        key: &CanonicOperatorName,
9✔
410
        query: &C::Query,
9✔
411
    ) -> Result<Option<Arc<Vec<C>>>, CacheError> {
9✔
412
        let mut cache = self
9✔
413
            .operator_cache_view_mut(key)
9✔
414
            .ok_or(CacheError::OperatorCacheEntryNotFound)?;
9✔
415

416
        let CacheQueryResult {
417
            cache_hit,
7✔
418
            expired_cache_entry_ids,
7✔
419
        } = cache.find_matching_cache_entry_and_collect_expired_entries(query);
7✔
420

421
        let res = if let Some((cache_entry_id, cache_entry)) = cache_hit {
7✔
422
            let potential_result_elements = cache_entry.elements.results_arc();
5✔
423

5✔
424
            // promote the cache entry in the LRU
5✔
425
            cache.lru.promote(&cache_entry_id);
5✔
426
            Some(potential_result_elements)
5✔
427
        } else {
428
            None
2✔
429
        };
430

431
        // discard expired cache entries
432
        cache.discard_queries_from_cache_and_lru(&expired_cache_entry_ids);
7✔
433

7✔
434
        Ok(res.flatten())
7✔
435
    }
9✔
436

437
    /// This method inserts a query into the cache.
438
    ///
439
    /// # Errors
440
    /// This method returns an error if the query is already in the cache.
441
    ///
442
    fn insert_query_into_landing_zone(
7✔
443
        &mut self,
7✔
444
        key: &CanonicOperatorName,
7✔
445
        query: &C::Query,
7✔
446
    ) -> Result<QueryId, CacheError> {
7✔
447
        self.create_operator_cache_if_needed(key.clone());
7✔
448
        self.operator_cache_view_mut(key)
7✔
449
            .expect("This method must not fail since the OperatorCache was created one line above.")
7✔
450
            .insert_query_into_landing_zone(query)
7✔
451
    }
7✔
452

453
    fn insert_query_element_into_landing_zone(
13✔
454
        &mut self,
13✔
455
        key: &CanonicOperatorName,
13✔
456
        query_id: &QueryId,
13✔
457
        landing_zone_element: C,
13✔
458
    ) -> Result<(), CacheError> {
13✔
459
        let mut cache = self
13✔
460
            .operator_cache_view_mut(key)
13✔
461
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
13✔
462
        let res = cache.add_query_element_to_landing_zone(query_id, landing_zone_element);
6✔
463

6✔
464
        // if we cant add the element to the landing zone, we remove the query from the landing zone
6✔
465
        if res.is_err() {
6✔
466
            let _old_entry = cache.remove_query_from_landing_zone(query_id);
1✔
467

1✔
468
            // if the operator cache is empty, we remove it from the cache
1✔
469
            if cache.is_empty() {
1✔
470
                self.remove_operator_cache(key);
1✔
471
            }
1✔
472
        }
5✔
473

474
        res
6✔
475
    }
13✔
476

477
    /// This method discards a query from the landing zone.
478
    /// If the query is not in the landing zone, this method does nothing.
479
    fn discard_query_from_landing_zone(&mut self, key: &CanonicOperatorName, query_id: &QueryId) {
480
        if let Some(mut cache) = self.operator_cache_view_mut(key) {
×
481
            cache.remove_query_from_landing_zone(query_id);
×
482
            if cache.is_empty() {
×
483
                self.remove_operator_cache(key);
×
484
            }
×
485
        }
×
486
    }
×
487

488
    /// This method discards a query from the cache and the LRU.
489
    /// If the query is not in the cache, this method does nothing.
490
    fn discard_querys_from_cache_and_lru(
491
        &mut self,
492
        key: &CanonicOperatorName,
493
        cache_entry_ids: &[CacheEntryId],
494
    ) {
495
        if let Some(mut cache) = self.operator_cache_view_mut(key) {
×
496
            cache.discard_queries_from_cache_and_lru(cache_entry_ids);
×
497
            if cache.is_empty() {
×
498
                self.remove_operator_cache(key);
×
499
            }
×
500
        }
×
501
    }
×
502

503
    /// This method moves a query from the landing zone to the cache.
504
    /// It will remove the query from the landing zone and insert it into the cache.
505
    /// If the cache is full, the least recently used entries will be evicted if necessary to make room for the new entry.
506
    /// This method returns the cache entry id of the inserted cache entry.
507
    ///
508
    /// # Errors
509
    /// This method returns an error if the query is not in the landing zone.
510
    /// This method returns an error if the cache entry is already in the cache.
511
    /// This method returns an error if the cache is full and the least recently used entries cannot be evicted to make room for the new entry.
512
    ///
513
    fn move_query_from_landing_zone_to_cache(
6✔
514
        &mut self,
6✔
515
        key: &CanonicOperatorName,
6✔
516
        query_id: &QueryId,
6✔
517
    ) -> Result<CacheEntryId, CacheError> {
6✔
518
        let mut operator_cache = self
6✔
519
            .operator_cache_view_mut(key)
6✔
520
            .ok_or(CacheError::OperatorCacheEntryNotFound)?;
6✔
521
        let landing_zone_entry = operator_cache
5✔
522
            .remove_query_from_landing_zone(query_id)
5✔
523
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
5✔
524
        let cache_entry: CacheQueryEntry<
5✔
525
            <C as CacheBackendElement>::Query,
5✔
526
            <C as CacheBackendElementExt>::CacheContainer,
5✔
527
        > = C::landing_zone_to_cache_entry(landing_zone_entry);
5✔
528
        // when moving an element from the landing zone to the cache, we allow the cache size to overflow.
529
        // This is done because the total cache size is the cache size + the landing zone size.
530
        let cache_entry_id = operator_cache.insert_cache_entry_allow_overflow(cache_entry, key)?;
5✔
531
        // We could also first try to evict until the cache can hold the entry.
532
        // However, then we would need to lookup the cache entry twice.
533
        // To avoid that, we just evict after we moved the entry from the landing zone to the cache.
534
        // This is also not a problem since the total cache size is the cache size + the landing zone size.
535
        self.evict_entries_until_can_fit_bytes(0);
5✔
536

5✔
537
        Ok(cache_entry_id)
5✔
538
    }
6✔
539
}
540

541
impl<T> Cache<CompressedRasterTile2D<T>> for CacheBackend
542
where
543
    T: Pixel,
544
    CompressedRasterTile2D<T>: CacheBackendElementExt<
545
        Query = RasterQueryRectangle,
546
        LandingZoneContainer = LandingZoneQueryTiles,
547
        CacheContainer = CachedTiles,
548
    >,
549
{
550
    fn operator_cache_view_mut(
35✔
551
        &mut self,
35✔
552
        key: &CanonicOperatorName,
35✔
553
    ) -> Option<OperatorCacheEntryView<CompressedRasterTile2D<T>>> {
35✔
554
        self.raster_caches
35✔
555
            .get_mut(key)
35✔
556
            .map(|op| OperatorCacheEntryView {
35✔
557
                operator_cache: op,
25✔
558
                cache_size: &mut self.cache_size,
25✔
559
                landing_zone_size: &mut self.landing_zone_size,
25✔
560
                lru: &mut self.lru,
25✔
561
            })
35✔
562
    }
35✔
563
}
564

565
impl<T> Cache<CompressedFeatureCollection<T>> for CacheBackend
566
where
567
    T: Geometry + ArrowTyped,
568
    CompressedFeatureCollection<T>: CacheBackendElementExt<
569
        Query = VectorQueryRectangle,
570
        LandingZoneContainer = LandingZoneQueryFeatures,
571
        CacheContainer = CachedFeatures,
572
    >,
573
{
574
    fn operator_cache_view_mut(
×
575
        &mut self,
×
576
        key: &CanonicOperatorName,
×
577
    ) -> Option<OperatorCacheEntryView<CompressedFeatureCollection<T>>> {
×
578
        self.vector_caches
×
579
            .get_mut(key)
×
580
            .map(|op| OperatorCacheEntryView {
×
581
                operator_cache: op,
×
582
                cache_size: &mut self.cache_size,
×
583
                landing_zone_size: &mut self.landing_zone_size,
×
584
                lru: &mut self.lru,
×
585
            })
×
586
    }
×
587
}
588

589
impl CacheView<RasterCacheQueryEntry, RasterLandingQueryEntry> for CacheBackend {
590
    fn operator_caches_mut(
8✔
591
        &mut self,
8✔
592
    ) -> &mut HashMap<CanonicOperatorName, RasterOperatorCacheEntry> {
8✔
593
        &mut self.raster_caches
8✔
594
    }
8✔
595
}
596

597
impl CacheView<VectorCacheQueryEntry, VectorLandingQueryEntry> for CacheBackend {
598
    fn operator_caches_mut(
×
599
        &mut self,
×
600
    ) -> &mut HashMap<CanonicOperatorName, VectorOperatorCacheEntry> {
×
601
        &mut self.vector_caches
×
602
    }
×
603
}
604

605
pub trait CacheBackendElement: ByteSize + Send + ByteSize + Sync
606
where
607
    Self: Sized,
608
{
609
    type Query: CacheQueryMatch + Clone + Send + Sync;
610

611
    /// Update the stored query rectangle of the cache entry.
612
    /// This allows to expand the stored query rectangle to the tile bounds produced by the query.
613
    ///
614
    /// # Errors
615
    /// This method returns an error if the stored query cannot be updated.
616
    fn update_stored_query(&self, query: &mut Self::Query) -> Result<(), CacheError>;
617

618
    /// This method returns the cache hint of the element.
619
    fn cache_hint(&self) -> CacheHint;
620

621
    /// This method returns the typed canonical operator name of the element.
622
    fn typed_canonical_operator_name(key: CanonicOperatorName) -> TypedCanonicOperatorName;
623

624
    /// This method checks if this specific element should be included in the answer of the query.
625
    fn intersects_query(&self, query: &Self::Query) -> bool;
626
}
627

628
pub trait CacheBackendElementExt: CacheBackendElement {
629
    type LandingZoneContainer: LandingZoneElementsContainer<Self> + ByteSize;
630
    type CacheContainer: CacheElementsContainer<Self::Query, Self>
631
        + ByteSize
632
        + From<Self::LandingZoneContainer>;
633

634
    fn move_element_into_landing_zone(
635
        self,
636
        landing_zone: &mut Self::LandingZoneContainer,
637
    ) -> Result<(), super::error::CacheError>;
638

639
    fn create_empty_landing_zone() -> Self::LandingZoneContainer;
640

641
    fn results_arc(cache_elements_container: &Self::CacheContainer) -> Option<Arc<Vec<Self>>>;
642

643
    fn landing_zone_to_cache_entry(
644
        landing_zone_entry: CacheQueryEntry<Self::Query, Self::LandingZoneContainer>,
645
    ) -> CacheQueryEntry<Self::Query, Self::CacheContainer>;
646
}
647

648
#[derive(Debug)]
×
649
pub struct SharedCache {
650
    backend: RwLock<CacheBackend>,
651
}
652

653
impl SharedCache {
654
    pub fn new(cache_size_in_mb: usize, landing_zone_ratio: f64) -> Result<Self> {
6✔
655
        if landing_zone_ratio <= 0.0 {
6✔
656
            return Err(crate::error::Error::QueryingProcessorFailed {
×
657
                source: Box::new(CacheError::LandingZoneRatioMustBeLargerThanZero),
×
658
            });
×
659
        }
6✔
660

6✔
661
        if landing_zone_ratio >= 0.5 {
6✔
662
            return Err(crate::error::Error::QueryingProcessorFailed {
×
663
                source: Box::new(CacheError::LandingZoneRatioMustBeSmallerThenHalfCacheSize),
×
664
            });
×
665
        }
6✔
666

6✔
667
        let cache_size_bytes =
6✔
668
            (cache_size_in_mb as f64 * (1.0 - landing_zone_ratio) * 1024.0 * 1024.0) as usize;
6✔
669

6✔
670
        let landing_zone_size_bytes =
6✔
671
            (cache_size_in_mb as f64 * landing_zone_ratio * 1024.0 * 1024.0) as usize;
6✔
672

6✔
673
        Ok(Self {
6✔
674
            backend: RwLock::new(CacheBackend {
6✔
675
                vector_caches: Default::default(),
6✔
676
                raster_caches: Default::default(),
6✔
677
                lru: LruCache::unbounded(), // we need no cap because we evict manually
6✔
678
                cache_size: CacheSize::new(cache_size_bytes),
6✔
679
                landing_zone_size: CacheSize::new(landing_zone_size_bytes),
6✔
680
            }),
6✔
681
        })
6✔
682
    }
6✔
683
}
684

685
impl TestDefault for SharedCache {
686
    fn test_default() -> Self {
88✔
687
        Self {
88✔
688
            backend: RwLock::new(CacheBackend {
88✔
689
                vector_caches: Default::default(),
88✔
690
                raster_caches: Default::default(),
88✔
691
                lru: LruCache::unbounded(), // we need no cap because we evict manually
88✔
692
                cache_size: CacheSize::new(usize::MAX),
88✔
693
                landing_zone_size: CacheSize::new(usize::MAX),
88✔
694
            }),
88✔
695
        }
88✔
696
    }
88✔
697
}
698

699
/// Holds all the cached results for an operator graph (workflow)
700
#[derive(Debug, Default)]
×
701
pub struct OperatorCacheEntry<CacheEntriesContainer, LandingZoneEntriesContainer> {
702
    // for a given operator and query we need to look through all entries to find one that matches
703
    // TODO: use a multi-dimensional index to speed up the lookup
704
    entries: HashMap<CacheEntryId, CacheEntriesContainer>,
705

706
    // running queries insert their tiles as they are produced. The entry will be created once the query is done.
707
    // The query is identified by a Uuid instead of the query rectangle to avoid confusions with other queries
708
    landing_zone: HashMap<QueryId, LandingZoneEntriesContainer>,
709
}
710

711
impl<CacheEntriesContainer, LandingZoneEntriesContainer>
712
    OperatorCacheEntry<CacheEntriesContainer, LandingZoneEntriesContainer>
713
{
714
    pub fn new() -> Self {
7✔
715
        Self {
7✔
716
            entries: Default::default(),
7✔
717
            landing_zone: Default::default(),
7✔
718
        }
7✔
719
    }
7✔
720

721
    fn insert_landing_zone_entry(
7✔
722
        &mut self,
7✔
723
        query_id: QueryId,
7✔
724
        landing_zone_entry: LandingZoneEntriesContainer,
7✔
725
    ) -> Result<(), CacheError> {
7✔
726
        let old_entry = self.landing_zone.insert(query_id, landing_zone_entry);
7✔
727

7✔
728
        if old_entry.is_some() {
7✔
729
            Err(CacheError::QueryIdAlreadyInLandingZone)
×
730
        } else {
731
            Ok(())
7✔
732
        }
733
    }
7✔
734

735
    fn remove_landing_zone_entry(
6✔
736
        &mut self,
6✔
737
        query_id: &QueryId,
6✔
738
    ) -> Option<LandingZoneEntriesContainer> {
6✔
739
        self.landing_zone.remove(query_id)
6✔
740
    }
6✔
741

742
    fn landing_zone_entry_mut(
6✔
743
        &mut self,
6✔
744
        query_id: &QueryId,
6✔
745
    ) -> Option<&mut LandingZoneEntriesContainer> {
6✔
746
        self.landing_zone.get_mut(query_id)
6✔
747
    }
6✔
748

749
    fn insert_cache_entry(
5✔
750
        &mut self,
5✔
751
        cache_entry_id: CacheEntryId,
5✔
752
        cache_entry: CacheEntriesContainer,
5✔
753
    ) -> Result<(), CacheError> {
5✔
754
        let old_entry = self.entries.insert(cache_entry_id, cache_entry);
5✔
755

5✔
756
        if old_entry.is_some() {
5✔
757
            Err(CacheError::CacheEntryIdAlreadyInCache)
×
758
        } else {
759
            Ok(())
5✔
760
        }
761
    }
5✔
762

763
    fn remove_cache_entry(
2✔
764
        &mut self,
2✔
765
        cache_entry_id: &CacheEntryId,
2✔
766
    ) -> Option<CacheEntriesContainer> {
2✔
767
        self.entries.remove(cache_entry_id)
2✔
768
    }
2✔
769

770
    fn is_empty(&self) -> bool {
1✔
771
        self.entries.is_empty() && self.landing_zone.is_empty()
1✔
772
    }
1✔
773

774
    fn iter(&self) -> impl Iterator<Item = (&CacheEntryId, &CacheEntriesContainer)> {
7✔
775
        self.entries.iter()
7✔
776
    }
7✔
777
}
778

779
identifier!(QueryId);
×
780

781
impl ByteSize for QueryId {}
782

783
identifier!(CacheEntryId);
×
784

785
impl ByteSize for CacheEntryId {}
786

787
/// Holds all the elements for a given query and is able to answer queries that are fully contained
788
#[derive(Debug, Hash)]
×
789
pub struct CacheQueryEntry<Query, Elements> {
790
    pub query: Query,
791
    pub elements: Elements,
792
}
793
type RasterOperatorCacheEntry = OperatorCacheEntry<RasterCacheQueryEntry, RasterLandingQueryEntry>;
794
pub type RasterCacheQueryEntry = CacheQueryEntry<RasterQueryRectangle, CachedTiles>;
795
pub type RasterLandingQueryEntry = CacheQueryEntry<RasterQueryRectangle, LandingZoneQueryTiles>;
796

797
type VectorOperatorCacheEntry = OperatorCacheEntry<VectorCacheQueryEntry, VectorLandingQueryEntry>;
798
pub type VectorCacheQueryEntry = CacheQueryEntry<VectorQueryRectangle, CachedFeatures>;
799
pub type VectorLandingQueryEntry = CacheQueryEntry<VectorQueryRectangle, LandingZoneQueryFeatures>;
800

801
impl<Query, Elements> CacheQueryEntry<Query, Elements> {
802
    pub fn create_empty<E>(query: Query) -> Self
9✔
803
    where
9✔
804
        Elements: LandingZoneElementsContainer<E>,
9✔
805
    {
9✔
806
        Self {
9✔
807
            query,
9✔
808
            elements: Elements::create_empty(),
9✔
809
        }
9✔
810
    }
9✔
811

812
    pub fn query(&self) -> &Query {
8✔
813
        &self.query
8✔
814
    }
8✔
815

816
    pub fn elements_mut(&mut self) -> &mut Elements {
12✔
817
        &mut self.elements
12✔
818
    }
12✔
819

820
    pub fn insert_element<E>(&mut self, element: E) -> Result<(), CacheError>
5✔
821
    where
5✔
822
        Elements: LandingZoneElementsContainer<E>,
5✔
823
    {
5✔
824
        self.elements.insert_element(element)
5✔
825
    }
5✔
826
}
827

828
impl<Query, Elements> ByteSize for CacheQueryEntry<Query, Elements>
829
where
830
    Elements: ByteSize,
831
{
832
    fn heap_byte_size(&self) -> usize {
22✔
833
        self.elements.heap_byte_size()
22✔
834
    }
22✔
835
}
836

837
pub trait CacheQueryMatch<RHS = Self> {
838
    fn is_match(&self, query: &RHS) -> bool;
839
}
840

841
impl CacheQueryMatch for RasterQueryRectangle {
842
    fn is_match(&self, query: &RasterQueryRectangle) -> bool {
8✔
843
        self.spatial_bounds.contains(&query.spatial_bounds)
8✔
844
            && self.time_interval.contains(&query.time_interval)
7✔
845
            && self.spatial_resolution == query.spatial_resolution
7✔
846
    }
8✔
847
}
848

849
impl CacheQueryMatch for VectorQueryRectangle {
850
    // TODO: check if that is what we need
851
    fn is_match(&self, query: &VectorQueryRectangle) -> bool {
3✔
852
        self.spatial_bounds.contains_bbox(&query.spatial_bounds)
3✔
853
            && self.time_interval.contains(&query.time_interval)
2✔
854
            && self.spatial_resolution == query.spatial_resolution
2✔
855
    }
3✔
856
}
857

858
pub trait LandingZoneElementsContainer<E> {
859
    fn insert_element(&mut self, element: E) -> Result<(), CacheError>;
860
    fn create_empty() -> Self;
861
}
862

863
pub trait CacheElementsContainerInfos<Query> {
864
    fn is_expired(&self) -> bool;
865
}
866

867
pub trait CacheElementsContainer<Query, E>: CacheElementsContainerInfos<Query> {
868
    fn results_arc(&self) -> Option<Arc<Vec<E>>>;
869
}
870

871
impl From<VectorLandingQueryEntry> for VectorCacheQueryEntry {
872
    fn from(value: VectorLandingQueryEntry) -> Self {
1✔
873
        Self {
1✔
874
            query: value.query,
1✔
875
            elements: value.elements.into(),
1✔
876
        }
1✔
877
    }
1✔
878
}
879

880
pub trait CacheElement: Sized + Send + Sync {
881
    type StoredCacheElement: CacheBackendElementExt<Query = Self::Query>;
882
    type Query: CacheQueryMatch;
883
    type ResultStream: Stream<Item = Result<Self, CacheError>>;
884

885
    fn into_stored_element(self) -> Self::StoredCacheElement;
886
    fn from_stored_element_ref(stored: &Self::StoredCacheElement) -> Result<Self, CacheError>;
887

888
    fn result_stream(
889
        stored_data: Arc<Vec<Self::StoredCacheElement>>,
890
        query: Self::Query,
891
    ) -> Self::ResultStream;
892
}
893

894
#[async_trait]
895
pub trait AsyncCache<C: CacheElement> {
896
    async fn query_cache(
897
        &self,
898
        key: &CanonicOperatorName,
899
        query: &C::Query,
900
    ) -> Result<Option<C::ResultStream>, CacheError>;
901

902
    async fn insert_query(
903
        &self,
904
        key: &CanonicOperatorName,
905
        query: &C::Query,
906
    ) -> Result<QueryId, CacheError>;
907

908
    async fn insert_query_element(
909
        &self,
910
        key: &CanonicOperatorName,
911
        query_id: &QueryId,
912
        landing_zone_element: C,
913
    ) -> Result<(), CacheError>;
914

915
    async fn abort_query(&self, key: &CanonicOperatorName, query_id: &QueryId);
916

917
    async fn finish_query(
918
        &self,
919
        key: &CanonicOperatorName,
920
        query_id: &QueryId,
921
    ) -> Result<CacheEntryId, CacheError>;
922
}
923

924
#[async_trait]
925
impl<C> AsyncCache<C> for SharedCache
926
where
927
    C: CacheElement + Send + Sync + 'static + ByteSize,
928
    CacheBackend: Cache<C::StoredCacheElement>,
929
    C::Query: Clone + CacheQueryMatch + Send + Sync,
930
{
931
    /// Query the cache and on hit create a stream of cache elements
932
    async fn query_cache(
4✔
933
        &self,
4✔
934
        key: &CanonicOperatorName,
4✔
935
        query: &C::Query,
4✔
936
    ) -> Result<Option<C::ResultStream>, CacheError> {
4✔
937
        let mut backend = self.backend.write().await;
4✔
938
        let res_data = backend.query_and_promote(key, query)?;
4✔
939
        Ok(res_data.map(|res_data| C::result_stream(res_data, query.clone())))
2✔
940
    }
8✔
941

942
    /// When inserting a new query, we first register the query and then insert the elements as they are produced
943
    /// This is to avoid confusing different queries on the same operator and query rectangle
944
    async fn insert_query(
3✔
945
        &self,
3✔
946
        key: &CanonicOperatorName,
3✔
947
        query: &C::Query,
3✔
948
    ) -> Result<QueryId, CacheError> {
3✔
949
        let mut backend = self.backend.write().await;
3✔
950
        backend.insert_query_into_landing_zone(key, query)
3✔
951
    }
6✔
952

953
    /// Insert a cachable element for a given query. The query has to be inserted first.
954
    /// The element is inserted into the landing zone and only moved to the cache when the query is finished.
955
    /// If the landing zone is full or the element size would cause the landing zone size to overflow, the caching of the query is aborted.
956
    async fn insert_query_element(
9✔
957
        &self,
9✔
958
        key: &CanonicOperatorName,
9✔
959
        query_id: &QueryId,
9✔
960
        landing_zone_element: C,
9✔
961
    ) -> Result<(), CacheError> {
9✔
962
        const LOG_LEVEL_THRESHOLD: log::Level = log::Level::Trace;
963
        let element_size = if log_enabled!(LOG_LEVEL_THRESHOLD) {
9✔
964
            landing_zone_element.byte_size()
×
965
        } else {
966
            0
9✔
967
        };
968

969
        let storeable_element =
9✔
970
            crate::util::spawn_blocking(|| landing_zone_element.into_stored_element())
9✔
971
                .await
9✔
972
                .map_err(|_| CacheError::BlockingElementConversion)?;
9✔
973

974
        if log_enabled!(LOG_LEVEL_THRESHOLD) {
9✔
975
            let storeable_element_size = storeable_element.byte_size();
×
976
            tracing::trace!(
×
977
                "Inserting element into landing zone for query {:?} on operator {}. Element size: {} bytes, storable element size: {} bytes, ratio: {}",
×
978
                query_id,
×
979
                key,
×
980
                element_size,
×
981
                storeable_element_size,
×
982
                storeable_element_size as f64 / element_size as f64
×
983
            );
×
984
        }
9✔
985

986
        let mut backend = self.backend.write().await;
9✔
987
        backend.insert_query_element_into_landing_zone(key, query_id, storeable_element)
9✔
988
    }
18✔
989

990
    /// Abort the query and remove already inserted elements from the caches landing zone
991
    async fn abort_query(&self, key: &CanonicOperatorName, query_id: &QueryId) {
×
992
        let mut backend = self.backend.write().await;
×
993
        backend.discard_query_from_landing_zone(key, query_id);
×
994
    }
×
995

996
    /// Finish the query and make the inserted elements available in the cache
997
    async fn finish_query(
2✔
998
        &self,
2✔
999
        key: &CanonicOperatorName,
2✔
1000
        query_id: &QueryId,
2✔
1001
    ) -> Result<CacheEntryId, CacheError> {
2✔
1002
        let mut backend = self.backend.write().await;
2✔
1003
        backend.move_query_from_landing_zone_to_cache(key, query_id)
2✔
1004
    }
4✔
1005
}
1006

1007
#[cfg(test)]
1008
mod tests {
1009
    use geoengine_datatypes::{
1010
        primitives::{CacheHint, DateTime, SpatialPartition2D, SpatialResolution, TimeInterval},
1011
        raster::{Grid, RasterProperties, RasterTile2D},
1012
    };
1013
    use serde_json::json;
1014
    use std::sync::Arc;
1015

1016
    use crate::pro::cache::cache_tiles::{CompressedGridOrEmpty, CompressedMaskedGrid};
1017

1018
    use super::*;
1019

1020
    async fn process_query_async(tile_cache: &mut SharedCache, op_name: CanonicOperatorName) {
1✔
1021
        let query_id = <SharedCache as AsyncCache<RasterTile2D<u8>>>::insert_query(
1✔
1022
            tile_cache,
1✔
1023
            &op_name,
1✔
1024
            &query_rect(),
1✔
1025
        )
1✔
1026
        .await
×
1027
        .unwrap();
1✔
1028

1✔
1029
        tile_cache
1✔
1030
            .insert_query_element(&op_name, &query_id, create_tile())
1✔
1031
            .await
1✔
1032
            .unwrap();
1✔
1033

1✔
1034
        <SharedCache as AsyncCache<RasterTile2D<u8>>>::finish_query(
1✔
1035
            tile_cache, &op_name, &query_id,
1✔
1036
        )
1✔
1037
        .await
×
1038
        .unwrap();
1✔
1039
    }
1✔
1040

1041
    fn process_query(tile_cache: &mut CacheBackend, op_name: &CanonicOperatorName) {
4✔
1042
        let query_id =
4✔
1043
            <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::insert_query_into_landing_zone(
4✔
1044
                tile_cache,
4✔
1045
                op_name,
4✔
1046
                &query_rect(),
4✔
1047
            )
4✔
1048
            .unwrap();
4✔
1049

4✔
1050
        tile_cache
4✔
1051
            .insert_query_element_into_landing_zone(op_name, &query_id, create_compressed_tile())
4✔
1052
            .unwrap();
4✔
1053

4✔
1054
        <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::move_query_from_landing_zone_to_cache(
4✔
1055
            tile_cache, op_name, &query_id,
4✔
1056
        )
4✔
1057
        .unwrap();
4✔
1058
    }
4✔
1059

1060
    fn create_tile() -> RasterTile2D<u8> {
1✔
1061
        RasterTile2D::<u8> {
1✔
1062
            time: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0)).unwrap(),
1✔
1063
            tile_position: [-1, 0].into(),
1✔
1064
            global_geo_transform: TestDefault::test_default(),
1✔
1065
            grid_array: Grid::new([3, 2].into(), vec![1, 2, 3, 4, 5, 6])
1✔
1066
                .unwrap()
1✔
1067
                .into(),
1✔
1068
            properties: RasterProperties::default(),
1✔
1069
            cache_hint: CacheHint::max_duration(),
1✔
1070
        }
1✔
1071
    }
1✔
1072

1073
    fn create_compressed_tile() -> CompressedRasterTile2D<u8> {
9✔
1074
        CompressedRasterTile2D::<u8> {
9✔
1075
            time: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0)).unwrap(),
9✔
1076
            tile_position: [-1, 0].into(),
9✔
1077
            global_geo_transform: TestDefault::test_default(),
9✔
1078
            grid_array: CompressedGridOrEmpty::Compressed(CompressedMaskedGrid::new(
9✔
1079
                [3, 2].into(),
9✔
1080
                vec![1, 2, 3, 4, 5, 6],
9✔
1081
                vec![1; 6],
9✔
1082
            )),
9✔
1083
            properties: RasterProperties::default(),
9✔
1084
            cache_hint: CacheHint::max_duration(),
9✔
1085
        }
9✔
1086
    }
9✔
1087

1088
    fn query_rect() -> RasterQueryRectangle {
13✔
1089
        RasterQueryRectangle {
13✔
1090
            spatial_bounds: SpatialPartition2D::new_unchecked(
13✔
1091
                (-180., 90.).into(),
13✔
1092
                (180., -90.).into(),
13✔
1093
            ),
13✔
1094
            time_interval: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0))
13✔
1095
                .unwrap(),
13✔
1096
            spatial_resolution: SpatialResolution::one(),
13✔
1097
        }
13✔
1098
    }
13✔
1099

1100
    fn op(idx: usize) -> CanonicOperatorName {
12✔
1101
        CanonicOperatorName::new_unchecked(&json!({
12✔
1102
            "type": "GdalSource",
12✔
1103
            "params": {
12✔
1104
                "data": idx
12✔
1105
            }
12✔
1106
        }))
12✔
1107
    }
12✔
1108

1109
    #[tokio::test]
1✔
1110
    async fn it_evicts_lru() {
1✔
1111
        // Create cache entry and landing zone entry to geht the size of both
1✔
1112
        let landing_zone_entry = RasterLandingQueryEntry {
1✔
1113
            query: query_rect(),
1✔
1114
            elements: LandingZoneQueryTiles::U8(vec![create_compressed_tile()]),
1✔
1115
        };
1✔
1116
        let query_id = QueryId::new();
1✔
1117
        let size_of_landing_zone_entry = landing_zone_entry.byte_size() + query_id.byte_size();
1✔
1118
        let cache_entry: RasterCacheQueryEntry = landing_zone_entry.into();
1✔
1119
        let cache_entry_id = CacheEntryId::new();
1✔
1120
        let size_of_cache_entry = cache_entry.byte_size() + cache_entry_id.byte_size();
1✔
1121

1✔
1122
        // Select the max of both sizes
1✔
1123
        // This is done because the landing zone should not be smaller then the cache
1✔
1124
        let m_size = size_of_cache_entry.max(size_of_landing_zone_entry);
1✔
1125

1✔
1126
        // set limits s.t. three tiles fit
1✔
1127

1✔
1128
        let mut cache_backend = CacheBackend {
1✔
1129
            raster_caches: Default::default(),
1✔
1130
            vector_caches: Default::default(),
1✔
1131
            lru: LruCache::unbounded(),
1✔
1132
            cache_size: CacheSize::new(m_size * 3),
1✔
1133
            landing_zone_size: CacheSize::new(m_size * 3),
1✔
1134
        };
1✔
1135

1✔
1136
        // process three different queries
1✔
1137
        process_query(&mut cache_backend, &op(1));
1✔
1138
        process_query(&mut cache_backend, &op(2));
1✔
1139
        process_query(&mut cache_backend, &op(3));
1✔
1140

1✔
1141
        // query the first one s.t. it is the most recently used
1✔
1142
        <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::query_and_promote(
1✔
1143
            &mut cache_backend,
1✔
1144
            &op(1),
1✔
1145
            &query_rect(),
1✔
1146
        )
1✔
1147
        .unwrap();
1✔
1148

1✔
1149
        // process a fourth query
1✔
1150
        process_query(&mut cache_backend, &op(4));
1✔
1151

1✔
1152
        // assure the seconds query is evicted because it is the least recently used
1✔
1153
        assert!(
1✔
1154
            <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::query_and_promote(
1✔
1155
                &mut cache_backend,
1✔
1156
                &op(2),
1✔
1157
                &query_rect()
1✔
1158
            )
1✔
1159
            .unwrap()
1✔
1160
            .is_none()
1✔
1161
        );
1✔
1162

1163
        // assure that the other queries are still in the cache
1164
        for i in [1, 3, 4] {
4✔
1165
            assert!(
3✔
1166
                <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::query_and_promote(
3✔
1167
                    &mut cache_backend,
3✔
1168
                    &op(i),
3✔
1169
                    &query_rect()
3✔
1170
                )
3✔
1171
                .unwrap()
3✔
1172
                .is_some()
3✔
1173
            );
3✔
1174
        }
1175

1176
        assert_eq!(
1✔
1177
            cache_backend.cache_size.byte_size_used(),
1✔
1178
            3 * size_of_cache_entry
1✔
1179
        );
1✔
1180
    }
1181

1182
    #[test]
1✔
1183
    fn cache_byte_size() {
1✔
1184
        assert_eq!(create_compressed_tile().byte_size(), 268);
1✔
1185
        assert_eq!(
1✔
1186
            CachedTiles::U8(Arc::new(vec![create_compressed_tile()])).byte_size(),
1✔
1187
            /* enum + arc */ 16 + /* vec */ 24  + /* tile */ 268
1✔
1188
        );
1✔
1189
        assert_eq!(
1✔
1190
            CachedTiles::U8(Arc::new(vec![
1✔
1191
                create_compressed_tile(),
1✔
1192
                create_compressed_tile()
1✔
1193
            ]))
1✔
1194
            .byte_size(),
1✔
1195
            /* enum + arc */ 16 + /* vec */ 24  + /* tile */ 2 * 268
1✔
1196
        );
1✔
1197
    }
1✔
1198

1199
    #[tokio::test]
1✔
1200
    async fn it_checks_ttl() {
1✔
1201
        let mut tile_cache = SharedCache {
1✔
1202
            backend: RwLock::new(CacheBackend {
1✔
1203
                raster_caches: Default::default(),
1✔
1204
                vector_caches: Default::default(),
1✔
1205
                lru: LruCache::unbounded(),
1✔
1206
                cache_size: CacheSize::new(usize::MAX),
1✔
1207
                landing_zone_size: CacheSize::new(usize::MAX),
1✔
1208
            }),
1✔
1209
        };
1✔
1210

1✔
1211
        process_query_async(&mut tile_cache, op(1)).await;
1✔
1212

1213
        // access works because no ttl is set
1214
        <SharedCache as AsyncCache<RasterTile2D<u8>>>::query_cache(
1✔
1215
            &tile_cache,
1✔
1216
            &op(1),
1✔
1217
            &query_rect(),
1✔
1218
        )
1✔
1219
        .await
×
1220
        .unwrap()
1✔
1221
        .unwrap();
1✔
1222

1223
        // manually expire entry
1224
        {
1225
            let mut backend = tile_cache.backend.write().await;
1✔
1226
            let cache = backend.raster_caches.iter_mut().next().unwrap();
1✔
1227

1✔
1228
            let tiles = &mut cache.1.entries.iter_mut().next().unwrap().1.elements;
1✔
1229
            match tiles {
1✔
1230
                CachedTiles::U8(tiles) => {
1✔
1231
                    let mut expired_tiles = (**tiles).clone();
1✔
1232
                    expired_tiles[0].cache_hint = CacheHint::with_created_and_expires(
1✔
1233
                        DateTime::new_utc(0, 1, 1, 0, 0, 0),
1✔
1234
                        DateTime::new_utc(0, 1, 1, 0, 0, 1).into(),
1✔
1235
                    );
1✔
1236
                    *tiles = Arc::new(expired_tiles);
1✔
1237
                }
1✔
1238
                _ => panic!("wrong tile type"),
×
1239
            }
1240
        }
1241

1242
        // access fails because ttl is expired
1243
        assert!(<SharedCache as AsyncCache<RasterTile2D<u8>>>::query_cache(
1✔
1244
            &tile_cache,
1✔
1245
            &op(1),
1✔
1246
            &query_rect()
1✔
1247
        )
1✔
1248
        .await
×
1249
        .unwrap()
1✔
1250
        .is_none());
1✔
1251
    }
1252

1253
    #[tokio::test]
1✔
1254
    async fn tile_cache_init_size() {
1✔
1255
        let tile_cache = SharedCache::new(100, 0.1).unwrap();
1✔
1256

1257
        let backend = tile_cache.backend.read().await;
1✔
1258

1259
        let cache_size = 90 * 1024 * 1024;
1✔
1260
        let landing_zone_size = 10 * 1024 * 1024;
1✔
1261

1✔
1262
        assert_eq!(backend.cache_size.total_byte_size(), cache_size);
1✔
1263
        assert_eq!(
1✔
1264
            backend.landing_zone_size.total_byte_size(),
1✔
1265
            landing_zone_size
1✔
1266
        );
1✔
1267
    }
1268
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc