• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

geo-engine / geoengine / 23852885222

01 Apr 2026 02:08PM UTC coverage: 87.386% (-0.003%) from 87.389%
23852885222

push

github

web-flow
ci: Refactor benchmark workflow with environment variables (#1140)

* ci: Refactor benchmark workflow with environment variables

Added environment variables for benchmark file paths and updated steps to use them.

* Add condition to download benchmark data on PRs

* Swap benchmark file path and upload name

* Add GitHub token to download artifact step

* Remove GitHub token from artifact download step

* ci: Move benchmark execution step after downloading previous benchmark data

* ci: Update benchmark data download step to use CLI command

* ci: Replace manual download step with action to fetch previous benchmark data

* ci: Update benchmark file paths to use artifacts directory

* ci: Update artifact paths to use runner.temp for benchmark data

* ci: Change artifact download behavior to fail if not found

* ci: Refactor benchmark workflow to use GitHub CLI for artifact download

* ci: Move environment variable definitions to a dedicated step in the benchmark workflow

* ci: Add GitHub token to environment for downloading benchmark data

* ci: Add safe.directory configuration to bypass ownership check for benchmark data download

* ci: Add check for availability of previous benchmark data before running benchmarks

* ci: Update benchmark workflow to use consistent artifact directory and file path

* ci: Update artifact file path for benchmark data in workflow

* ci: Fix artifact file path for benchmark data in workflow

* ci: Remove redundant environment variable definitions in benchmark workflow

113701 of 130113 relevant lines covered (87.39%)

497578.5 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.03
/operators/src/cache/shared_cache.rs
1
use super::{
2
    cache_chunks::{CachedFeatures, CompressedFeatureCollection, LandingZoneQueryFeatures},
3
    cache_tiles::{CachedTiles, CompressedRasterTile2D, LandingZoneQueryTiles},
4
    error::CacheError,
5
    util::CacheSize,
6
};
7
use crate::engine::CanonicOperatorName;
8
use crate::util::Result;
9
use async_trait::async_trait;
10
use futures::Stream;
11
use geoengine_datatypes::{
12
    identifier,
13
    primitives::{CacheHint, Geometry, RasterQueryRectangle, VectorQueryRectangle},
14
    raster::{GridContains, Pixel},
15
    util::{ByteSize, Identifier, arrow::ArrowTyped, test::TestDefault},
16
};
17
use lru::LruCache;
18
use std::{collections::HashMap, hash::Hash, sync::Arc};
19
use tokio::sync::RwLock;
20
use tracing::{debug, event_enabled};
21

22
/// The tile cache caches all tiles of a query and is able to answer queries that are fully contained in the cache.
23
/// New tiles are inserted into the cache on-the-fly as they are produced by query processors.
24
/// The tiles are first inserted into a landing zone, until the query in completely finished and only then moved to the cache.
25
/// Both the landing zone and the cache have a maximum size.
26
/// If the landing zone is full, the caching of the current query will be aborted.
27
/// If the cache is full, the least recently used entries will be evicted if necessary to make room for the new entry.
28
#[derive(Debug)]
29
pub struct CacheBackend {
30
    // TODO: more fine granular locking?
31
    // for each operator graph, we have a cache, that can efficiently be accessed
32
    raster_caches: HashMap<CanonicOperatorName, RasterOperatorCacheEntry>,
33
    vector_caches: HashMap<CanonicOperatorName, VectorOperatorCacheEntry>,
34

35
    cache_size: CacheSize,
36
    landing_zone_size: CacheSize,
37

38
    // we only use the LruCache for determining the least recently used elements and evict as many entries as needed to fit the new one
39
    lru: LruCache<CacheEntryId, TypedCanonicOperatorName>,
40
}
41

42
impl CacheBackend {
43
    /// This method removes entries from the cache until it can fit the given amount of bytes.
44
    #[allow(clippy::missing_panics_doc)]
45
    pub fn evict_until_can_fit_bytes(&mut self, bytes: usize) {
5✔
46
        while !self.cache_size.can_fit_bytes(bytes) {
6✔
47
            if let Some((pop_id, pop_key)) = self.lru.pop_lru() {
1✔
48
                match pop_key {
1✔
49
                    TypedCanonicOperatorName::Raster(raster_pop_key) => {
1✔
50
                        let op_cache = self
1✔
51
                            .raster_caches
1✔
52
                            .get_mut(&raster_pop_key)
1✔
53
                            .expect("LRU entry must exist in the cache!");
1✔
54
                        let query_element = op_cache
1✔
55
                            .remove_cache_entry(&pop_id)
1✔
56
                            .expect("LRU entry must exist in the cache!");
1✔
57
                        self.cache_size.remove_element_bytes(&query_element);
1✔
58
                    }
1✔
59
                    TypedCanonicOperatorName::Vector(vector_pop_key) => {
×
60
                        let op_cache = self
×
61
                            .vector_caches
×
62
                            .get_mut(&vector_pop_key)
×
63
                            .expect("LRU entry must exist in the cache!");
×
64
                        let query_element = op_cache
×
65
                            .remove_cache_entry(&pop_id)
×
66
                            .expect("LRU entry must exist in the cache!");
×
67
                        self.cache_size.remove_element_bytes(&query_element);
×
68
                    }
×
69
                }
70
                self.cache_size.remove_element_bytes(&pop_id);
1✔
71

72
                debug!(
1✔
73
                    "Evicted query {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
74
                    pop_id,
75
                    self.cache_size.total_byte_size(),
×
76
                    self.cache_size.byte_size_used(),
×
77
                    self.cache_size.size_used_fraction()
×
78
                );
79
            }
×
80
        }
81
    }
5✔
82
}
83

84
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
85
pub enum TypedCanonicOperatorName {
86
    Raster(CanonicOperatorName),
87
    Vector(CanonicOperatorName),
88
}
89

90
impl TypedCanonicOperatorName {
91
    pub fn as_raster(&self) -> Option<&CanonicOperatorName> {
×
92
        match self {
×
93
            Self::Raster(name) => Some(name),
×
94
            Self::Vector(_) => None,
×
95
        }
96
    }
×
97

98
    pub fn as_vector(&self) -> Option<&CanonicOperatorName> {
×
99
        match self {
×
100
            Self::Raster(_) => None,
×
101
            Self::Vector(name) => Some(name),
×
102
        }
103
    }
×
104
}
105

106
pub trait CacheEvictUntilFit {
107
    fn evict_entries_until_can_fit_bytes(&mut self, bytes: usize);
108
}
109

110
impl CacheEvictUntilFit for CacheBackend {
111
    fn evict_entries_until_can_fit_bytes(&mut self, bytes: usize) {
5✔
112
        self.evict_until_can_fit_bytes(bytes);
5✔
113
    }
5✔
114
}
115

116
pub trait CacheView<C, L>: CacheEvictUntilFit {
117
    fn operator_caches_mut(
118
        &mut self,
119
    ) -> &mut HashMap<CanonicOperatorName, OperatorCacheEntry<C, L>>;
120

121
    fn create_operator_cache_if_needed(&mut self, key: CanonicOperatorName) {
12✔
122
        // TODO: add size of the OperatorCacheEntry to the cache size?
123
        self.operator_caches_mut()
12✔
124
            .entry(key)
12✔
125
            .or_insert_with(|| OperatorCacheEntry::new());
12✔
126
    }
12✔
127

128
    fn remove_operator_cache(
7✔
129
        &mut self,
7✔
130
        key: &CanonicOperatorName,
7✔
131
    ) -> Option<OperatorCacheEntry<C, L>> {
7✔
132
        // TODO: remove the size of the OperatorCacheEntry to the cache size?
133
        self.operator_caches_mut().remove(key)
7✔
134
    }
7✔
135
}
136

137
#[allow(clippy::type_complexity)]
138
pub struct OperatorCacheEntryView<'a, C: CacheBackendElementExt> {
139
    operator_cache: &'a mut OperatorCacheEntry<
140
        CacheQueryEntry<C::Query, C::CacheContainer>,
141
        CacheQueryEntry<C::Query, C::LandingZoneContainer>,
142
    >,
143
    cache_size: &'a mut CacheSize,
144
    landing_zone_size: &'a mut CacheSize,
145
    lru: &'a mut LruCache<CacheEntryId, TypedCanonicOperatorName>,
146
}
147

148
impl<C> OperatorCacheEntryView<'_, C>
149
where
150
    C: CacheBackendElementExt + ByteSize,
151
    C::Query: Clone + CacheQueryMatch,
152
    CacheQueryEntry<C::Query, C::LandingZoneContainer>: ByteSize,
153
    CacheQueryEntry<C::Query, C::CacheContainer>: ByteSize,
154
{
155
    fn is_empty(&self) -> bool {
7✔
156
        self.operator_cache.is_empty()
7✔
157
    }
7✔
158

159
    /// This method removes a query from the landing zone.
160
    ///
161
    /// If the query is not in the landing zone, this method returns None.
162
    ///
163
    fn remove_query_from_landing_zone(
12✔
164
        &mut self,
12✔
165
        query_id: &QueryId,
12✔
166
    ) -> Option<CacheQueryEntry<C::Query, C::LandingZoneContainer>> {
12✔
167
        if let Some(entry) = self.operator_cache.remove_landing_zone_entry(query_id) {
12✔
168
            self.landing_zone_size.remove_element_bytes(query_id);
12✔
169
            self.landing_zone_size.remove_element_bytes(&entry);
12✔
170

171
            // debug output
172
            tracing::debug!(
12✔
173
                "Removed query {}. Landing zone size: {}. Landing zone size used: {}, Landing zone used percentage: {}.",
174
                query_id,
175
                self.landing_zone_size.total_byte_size(),
×
176
                self.landing_zone_size.byte_size_used(),
×
177
                self.landing_zone_size.size_used_fraction()
×
178
            );
179

180
            Some(entry)
12✔
181
        } else {
182
            None
×
183
        }
184
    }
12✔
185

186
    /// This method removes a query from the cache and the LRU.
187
    /// It will remove a queries cache entry from the cache and the LRU.
188
    ///
189
    /// If the query is not in the cache, this method returns None.
190
    ///
191
    fn remove_query_from_cache_and_lru(
1✔
192
        &mut self,
1✔
193
        cache_entry_id: &CacheEntryId,
1✔
194
    ) -> Option<CacheQueryEntry<C::Query, C::CacheContainer>> {
1✔
195
        if let Some(entry) = self.operator_cache.remove_cache_entry(cache_entry_id) {
1✔
196
            let old_lru_entry = self.lru.pop_entry(cache_entry_id);
1✔
197
            debug_assert!(old_lru_entry.is_some(), "CacheEntryId not found in LRU");
1✔
198
            self.cache_size.remove_element_bytes(cache_entry_id);
1✔
199
            self.cache_size.remove_element_bytes(&entry);
1✔
200

201
            tracing::debug!(
1✔
202
                "Removed cache entry {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
203
                cache_entry_id,
204
                self.cache_size.total_byte_size(),
×
205
                self.cache_size.byte_size_used(),
×
206
                self.cache_size.size_used_fraction()
×
207
            );
208

209
            Some(entry)
1✔
210
        } else {
211
            None
×
212
        }
213
    }
1✔
214

215
    /// This method removes a list of queries from the cache and the LRU.
216
    fn discard_queries_from_cache_and_lru(&mut self, cache_entry_ids: &[CacheEntryId]) {
7✔
217
        for cache_entry_id in cache_entry_ids {
7✔
218
            let old_entry = self.remove_query_from_cache_and_lru(cache_entry_id);
1✔
219
            debug_assert!(
1✔
220
                old_entry.is_some(),
1✔
221
                "CacheEntryId not found in OperatorCacheEntry"
222
            );
223
        }
224
    }
7✔
225

226
    /// This method adds a query element to the landing zone.
227
    /// It will add the element to the landing zone entry of the query.
228
    ///
229
    /// # Errors
230
    ///
231
    /// This method returns an error if the query is not in the landing zone.
232
    /// This method returns an error if the element is already expired.
233
    /// This method returns an error if the landing zone is full or the new element would cause the landing zone to overflow.
234
    ///
235
    fn add_query_element_to_landing_zone(
12✔
236
        &mut self,
12✔
237
        query_id: &QueryId,
12✔
238
        landing_zone_element: C,
12✔
239
    ) -> Result<(), CacheError> {
12✔
240
        let landing_zone_entry = self
12✔
241
            .operator_cache
12✔
242
            .landing_zone_entry_mut(query_id)
12✔
243
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
12✔
244

245
        if landing_zone_element.cache_hint().is_expired() {
12✔
246
            tracing::trace!("Element is already expired");
7✔
247
            return Err(CacheError::TileExpiredBeforeInsertion);
7✔
248
        }
5✔
249

250
        let element_bytes_size = landing_zone_element.byte_size();
5✔
251

252
        if !self.landing_zone_size.can_fit_bytes(element_bytes_size) {
5✔
253
            return Err(CacheError::NotEnoughSpaceInLandingZone);
×
254
        }
5✔
255

256
        // new entries might update the query bounds stored for this entry
257
        landing_zone_element.update_stored_query(&mut landing_zone_entry.query)?;
5✔
258

259
        // actually insert the element into the landing zone
260
        landing_zone_entry.insert_element(landing_zone_element)?;
5✔
261

262
        // we add the bytes size of the element to the landing zone size after we have inserted it.
263
        self.landing_zone_size
5✔
264
            .try_add_bytes(element_bytes_size)
5✔
265
            .expect(
5✔
266
            "The Landing Zone must have enough space for the element since we checked it before",
5✔
267
        );
268

269
        tracing::trace!(
5✔
270
            "Inserted tile for query {} into landing zone. Landing zone size: {}. Landing zone size used: {}. Landing zone used percentage: {}",
271
            query_id,
272
            self.landing_zone_size.total_byte_size(),
×
273
            self.landing_zone_size.byte_size_used(),
×
274
            self.landing_zone_size.size_used_fraction()
×
275
        );
276

277
        Ok(())
5✔
278
    }
12✔
279

280
    /// This method inserts a query into the landing zone.
281
    /// It will cause the operator cache to create a new landing zone entry.
282
    /// Therefore, the size of the query and the size of the landing zone entry will be added to the landing zone size.
283
    ///
284
    /// # Errors
285
    ///
286
    /// This method returns an error if the query is already in the landing zone.
287
    /// This method returns an error if the landing zone is full or the new query would cause the landing zone to overflow.
288
    ///
289
    fn insert_query_into_landing_zone(&mut self, query: &C::Query) -> Result<QueryId, CacheError> {
12✔
290
        let landing_zone_entry = CacheQueryEntry::create_empty::<C>(query.clone());
12✔
291
        let query_id = QueryId::new();
12✔
292

293
        let query_id_bytes_size = query_id.byte_size();
12✔
294
        let landing_zone_entry_bytes_size = landing_zone_entry.byte_size();
12✔
295

296
        self.landing_zone_size.try_add_bytes(query_id_bytes_size)?;
12✔
297

298
        // if this fails, we have to remove the query id size again
299
        if let Err(e) = self
12✔
300
            .landing_zone_size
12✔
301
            .try_add_bytes(landing_zone_entry_bytes_size)
12✔
302
        {
303
            self.landing_zone_size.remove_bytes(query_id_bytes_size);
×
304
            return Err(e);
×
305
        }
12✔
306

307
        // if this fails, we have to remove the query id size and the landing zone entry size again
308
        if let Err(e) = self
12✔
309
            .operator_cache
12✔
310
            .insert_landing_zone_entry(query_id, landing_zone_entry)
12✔
311
        {
312
            self.landing_zone_size.remove_bytes(query_id_bytes_size);
×
313
            self.landing_zone_size
×
314
                .remove_bytes(landing_zone_entry_bytes_size);
×
315
            return Err(e);
×
316
        }
12✔
317

318
        // debug output
319
        tracing::trace!(
12✔
320
            "Added query {} to landing zone. Landing zone size: {}. Landing zone size used: {}, Landing zone used percentage: {}.",
321
            query_id,
322
            self.landing_zone_size.total_byte_size(),
×
323
            self.landing_zone_size.byte_size_used(),
×
324
            self.landing_zone_size.size_used_fraction()
×
325
        );
326

327
        Ok(query_id)
12✔
328
    }
12✔
329

330
    /// This method inserts a cache entry into the cache and the LRU.
331
    /// It allows the element cache to overflow the cache size.
332
    /// This is done because the total cache size is the cache size + the landing zone size.
333
    /// This method is used when moving an element from the landing zone to the cache.
334
    ///
335
    /// # Errors
336
    ///
337
    /// This method returns an error if the cache entry is already in the cache.
338
    ///
339
    fn insert_cache_entry_allow_overflow(
5✔
340
        &mut self,
5✔
341
        cache_entry: CacheQueryEntry<C::Query, C::CacheContainer>,
5✔
342
        key: &CanonicOperatorName,
5✔
343
    ) -> Result<CacheEntryId, CacheError> {
5✔
344
        let cache_entry_id = CacheEntryId::new();
5✔
345
        let bytes = cache_entry.byte_size() + cache_entry_id.byte_size();
5✔
346
        // When inserting data from the landing zone into the cache, we allow the cache to overflow.
347
        // This is done because the total cache size is the cache size + the landing zone size.
348
        self.cache_size.add_bytes_allow_overflow(bytes);
5✔
349
        self.operator_cache
5✔
350
            .insert_cache_entry(cache_entry_id, cache_entry)?;
5✔
351
        // we have to wrap the key in a TypedCanonicOperatorName to be able to insert it into the LRU
352
        self.lru.push(
5✔
353
            cache_entry_id,
5✔
354
            C::typed_canonical_operator_name(key.clone()),
5✔
355
        );
356

357
        // debug output
358
        tracing::trace!(
5✔
359
            "Added cache entry {}. Cache size: {}. Cache size used: {}, Cache used percentage: {}.",
360
            cache_entry_id,
361
            self.cache_size.total_byte_size(),
×
362
            self.cache_size.byte_size_used(),
×
363
            self.cache_size.size_used_fraction()
×
364
        );
365

366
        Ok(cache_entry_id)
5✔
367
    }
5✔
368

369
    /// This method finds a cache entry in the cache that matches the query.
370
    /// It will also collect all expired cache entries.
371
    /// The cache entry is returned together with the expired ids.
372
    fn find_matching_cache_entry_and_collect_expired_entries(
7✔
373
        &mut self,
7✔
374
        query: &C::Query,
7✔
375
    ) -> CacheQueryResult<'_, C::Query, C::CacheContainer> {
7✔
376
        let mut expired_cache_entry_ids = vec![];
7✔
377

378
        let x = self.operator_cache.iter().find(|&(id, entry)| {
7✔
379
            if entry.elements.is_expired() {
6✔
380
                expired_cache_entry_ids.push(*id);
1✔
381
                return false;
1✔
382
            }
5✔
383
            entry.query.is_match(query)
5✔
384
        });
6✔
385

386
        CacheQueryResult {
387
            cache_hit: x.map(|(id, entry)| (*id, entry)),
7✔
388
            expired_cache_entry_ids,
7✔
389
        }
390
    }
7✔
391
}
392

393
struct CacheQueryResult<'a, Query, CE> {
394
    cache_hit: Option<(CacheEntryId, &'a CacheQueryEntry<Query, CE>)>,
395
    expired_cache_entry_ids: Vec<CacheEntryId>,
396
}
397

398
pub trait Cache<C: CacheBackendElementExt>:
399
    CacheView<
400
        CacheQueryEntry<C::Query, C::CacheContainer>,
401
        CacheQueryEntry<C::Query, C::LandingZoneContainer>,
402
    >
403
where
404
    C::Query: Clone + CacheQueryMatch,
405
{
406
    /// This method returns a mutable reference to the cache entry of an operator.
407
    /// If there is no cache entry for the operator, this method returns None.
408
    fn operator_cache_view_mut(
409
        &mut self,
410
        key: &CanonicOperatorName,
411
    ) -> Option<OperatorCacheEntryView<'_, C>>;
412

413
    /// This method queries the cache for a given query.
414
    /// If the query matches an entry in the cache, the cache entry is returned and it is promoted in the LRU.
415
    /// If the query does not match an entry in the cache, None is returned. Also if a cache entry is found but it is expired, None is returned.
416
    ///
417
    /// # Errors
418
    /// This method returns an error if the cache entry is not found.
419
    ///
420
    fn query_and_promote(
14✔
421
        &mut self,
14✔
422
        key: &CanonicOperatorName,
14✔
423
        query: &C::Query,
14✔
424
    ) -> Result<Option<Arc<Vec<C>>>, CacheError> {
14✔
425
        let mut cache = self
14✔
426
            .operator_cache_view_mut(key)
14✔
427
            .ok_or(CacheError::OperatorCacheEntryNotFound)?;
14✔
428

429
        let CacheQueryResult {
430
            cache_hit,
7✔
431
            expired_cache_entry_ids,
7✔
432
        } = cache.find_matching_cache_entry_and_collect_expired_entries(query);
7✔
433

434
        let res = if let Some((cache_entry_id, cache_entry)) = cache_hit {
7✔
435
            let potential_result_elements = cache_entry.elements.results_arc();
5✔
436

437
            // promote the cache entry in the LRU
438
            cache.lru.promote(&cache_entry_id);
5✔
439
            Some(potential_result_elements)
5✔
440
        } else {
441
            None
2✔
442
        };
443

444
        // discard expired cache entries
445
        cache.discard_queries_from_cache_and_lru(&expired_cache_entry_ids);
7✔
446

447
        Ok(res.flatten())
7✔
448
    }
14✔
449

450
    /// This method inserts a query into the cache.
451
    ///
452
    /// # Errors
453
    /// This method returns an error if the query is already in the cache.
454
    ///
455
    fn insert_query_into_landing_zone(
12✔
456
        &mut self,
12✔
457
        key: &CanonicOperatorName,
12✔
458
        query: &C::Query,
12✔
459
    ) -> Result<QueryId, CacheError> {
12✔
460
        self.create_operator_cache_if_needed(key.clone());
12✔
461
        self.operator_cache_view_mut(key)
12✔
462
            .expect("This method must not fail since the OperatorCache was created one line above.")
12✔
463
            .insert_query_into_landing_zone(query)
12✔
464
    }
12✔
465

466
    fn insert_query_element_into_landing_zone(
213✔
467
        &mut self,
213✔
468
        key: &CanonicOperatorName,
213✔
469
        query_id: &QueryId,
213✔
470
        landing_zone_element: C,
213✔
471
    ) -> Result<(), CacheError> {
213✔
472
        let mut cache = self
213✔
473
            .operator_cache_view_mut(key)
213✔
474
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
213✔
475
        let res = cache.add_query_element_to_landing_zone(query_id, landing_zone_element);
12✔
476

477
        #[cfg(debug_assertions)]
478
        match res.as_ref() {
12✔
479
            Err(CacheError::TileExpiredBeforeInsertion) => {
480
                tracing::trace!("Element expired before insertion.");
7✔
481
            }
482
            Err(er) => tracing::debug!("Error on insert query element: {er}"),
×
483
            _ => {}
5✔
484
        }
485

486
        // if we cant add the element to the landing zone, we remove the query from the landing zone
487
        if res.is_err() {
12✔
488
            let _old_entry = cache.remove_query_from_landing_zone(query_id);
7✔
489

490
            // if the operator cache is empty, we remove it from the cache
491
            if cache.is_empty() {
7✔
492
                self.remove_operator_cache(key);
7✔
493
            }
7✔
494
        }
5✔
495

496
        res
12✔
497
    }
213✔
498

499
    /// This method discards a query from the landing zone.
500
    /// If the query is not in the landing zone, this method does nothing.
501
    fn discard_query_from_landing_zone(&mut self, key: &CanonicOperatorName, query_id: &QueryId) {
×
502
        if let Some(mut cache) = self.operator_cache_view_mut(key) {
×
503
            cache.remove_query_from_landing_zone(query_id);
×
504
            if cache.is_empty() {
×
505
                self.remove_operator_cache(key);
×
506
            }
×
507
        }
×
508
    }
×
509

510
    /// This method discards a query from the cache and the LRU.
511
    /// If the query is not in the cache, this method does nothing.
512
    fn discard_querys_from_cache_and_lru(
×
513
        &mut self,
×
514
        key: &CanonicOperatorName,
×
515
        cache_entry_ids: &[CacheEntryId],
×
516
    ) {
×
517
        if let Some(mut cache) = self.operator_cache_view_mut(key) {
×
518
            cache.discard_queries_from_cache_and_lru(cache_entry_ids);
×
519
            if cache.is_empty() {
×
520
                self.remove_operator_cache(key);
×
521
            }
×
522
        }
×
523
    }
×
524

525
    /// This method moves a query from the landing zone to the cache.
526
    /// It will remove the query from the landing zone and insert it into the cache.
527
    /// If the cache is full, the least recently used entries will be evicted if necessary to make room for the new entry.
528
    /// This method returns the cache entry id of the inserted cache entry.
529
    ///
530
    /// # Errors
531
    /// This method returns an error if the query is not in the landing zone.
532
    /// This method returns an error if the cache entry is already in the cache.
533
    /// This method returns an error if the cache is full and the least recently used entries cannot be evicted to make room for the new entry.
534
    ///
535
    fn move_query_from_landing_zone_to_cache(
8✔
536
        &mut self,
8✔
537
        key: &CanonicOperatorName,
8✔
538
        query_id: &QueryId,
8✔
539
    ) -> Result<CacheEntryId, CacheError> {
8✔
540
        let mut operator_cache = self
8✔
541
            .operator_cache_view_mut(key)
8✔
542
            .ok_or(CacheError::OperatorCacheEntryNotFound)?;
8✔
543
        let landing_zone_entry = operator_cache
5✔
544
            .remove_query_from_landing_zone(query_id)
5✔
545
            .ok_or(CacheError::QueryNotFoundInLandingZone)?;
5✔
546
        let cache_entry: CacheQueryEntry<
5✔
547
            <C as CacheBackendElement>::Query,
5✔
548
            <C as CacheBackendElementExt>::CacheContainer,
5✔
549
        > = C::landing_zone_to_cache_entry(landing_zone_entry);
5✔
550
        // when moving an element from the landing zone to the cache, we allow the cache size to overflow.
551
        // This is done because the total cache size is the cache size + the landing zone size.
552
        let cache_entry_id = operator_cache.insert_cache_entry_allow_overflow(cache_entry, key)?;
5✔
553
        // We could also first try to evict until the cache can hold the entry.
554
        // However, then we would need to lookup the cache entry twice.
555
        // To avoid that, we just evict after we moved the entry from the landing zone to the cache.
556
        // This is also not a problem since the total cache size is the cache size + the landing zone size.
557
        self.evict_entries_until_can_fit_bytes(0);
5✔
558

559
        Ok(cache_entry_id)
5✔
560
    }
8✔
561
}
562

563
impl<T> Cache<CompressedRasterTile2D<T>> for CacheBackend
564
where
565
    T: Pixel,
566
    CompressedRasterTile2D<T>: CacheBackendElementExt<
567
            Query = RasterQueryRectangle,
568
            LandingZoneContainer = LandingZoneQueryTiles,
569
            CacheContainer = CachedTiles,
570
        >,
571
{
572
    fn operator_cache_view_mut(
247✔
573
        &mut self,
247✔
574
        key: &CanonicOperatorName,
247✔
575
    ) -> Option<OperatorCacheEntryView<'_, CompressedRasterTile2D<T>>> {
247✔
576
        self.raster_caches
247✔
577
            .get_mut(key)
247✔
578
            .map(|op| OperatorCacheEntryView {
247✔
579
                operator_cache: op,
36✔
580
                cache_size: &mut self.cache_size,
36✔
581
                landing_zone_size: &mut self.landing_zone_size,
36✔
582
                lru: &mut self.lru,
36✔
583
            })
36✔
584
    }
247✔
585
}
586

587
impl<T> Cache<CompressedFeatureCollection<T>> for CacheBackend
588
where
589
    T: Geometry + ArrowTyped,
590
    CompressedFeatureCollection<T>: CacheBackendElementExt<
591
            Query = VectorQueryRectangle,
592
            LandingZoneContainer = LandingZoneQueryFeatures,
593
            CacheContainer = CachedFeatures,
594
        >,
595
{
596
    fn operator_cache_view_mut(
×
597
        &mut self,
×
598
        key: &CanonicOperatorName,
×
599
    ) -> Option<OperatorCacheEntryView<'_, CompressedFeatureCollection<T>>> {
×
600
        self.vector_caches
×
601
            .get_mut(key)
×
602
            .map(|op| OperatorCacheEntryView {
×
603
                operator_cache: op,
×
604
                cache_size: &mut self.cache_size,
×
605
                landing_zone_size: &mut self.landing_zone_size,
×
606
                lru: &mut self.lru,
×
607
            })
×
608
    }
×
609
}
610

611
impl CacheView<RasterCacheQueryEntry, RasterLandingQueryEntry> for CacheBackend {
612
    fn operator_caches_mut(
19✔
613
        &mut self,
19✔
614
    ) -> &mut HashMap<CanonicOperatorName, RasterOperatorCacheEntry> {
19✔
615
        &mut self.raster_caches
19✔
616
    }
19✔
617
}
618

619
impl CacheView<VectorCacheQueryEntry, VectorLandingQueryEntry> for CacheBackend {
620
    fn operator_caches_mut(
×
621
        &mut self,
×
622
    ) -> &mut HashMap<CanonicOperatorName, VectorOperatorCacheEntry> {
×
623
        &mut self.vector_caches
×
624
    }
×
625
}
626

627
pub trait CacheBackendElement: ByteSize + Send + ByteSize + Sync
628
where
629
    Self: Sized,
630
{
631
    type Query: CacheQueryMatch + Clone + Send + Sync;
632

633
    /// Update the stored query rectangle of the cache entry.
634
    /// This allows to expand the stored query rectangle to the tile bounds produced by the query.
635
    ///
636
    /// # Errors
637
    /// This method returns an error if the stored query cannot be updated.
638
    fn update_stored_query(&self, query: &mut Self::Query) -> Result<(), CacheError>;
639

640
    /// This method returns the cache hint of the element.
641
    fn cache_hint(&self) -> CacheHint;
642

643
    /// This method returns the typed canonical operator name of the element.
644
    fn typed_canonical_operator_name(key: CanonicOperatorName) -> TypedCanonicOperatorName;
645

646
    /// This method checks if this specific element should be included in the answer of the query.
647
    fn intersects_query(&self, query: &Self::Query) -> bool;
648
}
649

650
pub trait CacheBackendElementExt: CacheBackendElement {
651
    type LandingZoneContainer: LandingZoneElementsContainer<Self> + ByteSize;
652
    type CacheContainer: CacheElementsContainer<Self::Query, Self>
653
        + ByteSize
654
        + From<Self::LandingZoneContainer>;
655

656
    fn move_element_into_landing_zone(
657
        self,
658
        landing_zone: &mut Self::LandingZoneContainer,
659
    ) -> Result<(), super::error::CacheError>;
660

661
    fn create_empty_landing_zone() -> Self::LandingZoneContainer;
662

663
    fn results_arc(cache_elements_container: &Self::CacheContainer) -> Option<Arc<Vec<Self>>>;
664

665
    fn landing_zone_to_cache_entry(
666
        landing_zone_entry: CacheQueryEntry<Self::Query, Self::LandingZoneContainer>,
667
    ) -> CacheQueryEntry<Self::Query, Self::CacheContainer>;
668
}
669

670
#[derive(Debug)]
671
pub struct SharedCache {
672
    backend: RwLock<CacheBackend>,
673
}
674

675
impl SharedCache {
676
    pub fn new(cache_size_in_mb: usize, landing_zone_ratio: f64) -> Result<Self> {
1✔
677
        if landing_zone_ratio <= 0.0 {
1✔
678
            return Err(crate::error::Error::QueryingProcessorFailed {
×
679
                source: Box::new(CacheError::LandingZoneRatioMustBeLargerThanZero),
×
680
            });
×
681
        }
1✔
682

683
        if landing_zone_ratio >= 0.5 {
1✔
684
            return Err(crate::error::Error::QueryingProcessorFailed {
×
685
                source: Box::new(CacheError::LandingZoneRatioMustBeSmallerThenHalfCacheSize),
×
686
            });
×
687
        }
1✔
688

689
        let cache_size_bytes =
1✔
690
            (cache_size_in_mb as f64 * (1.0 - landing_zone_ratio) * 1024.0 * 1024.0) as usize;
1✔
691

692
        let landing_zone_size_bytes =
1✔
693
            (cache_size_in_mb as f64 * landing_zone_ratio * 1024.0 * 1024.0) as usize;
1✔
694

695
        Ok(Self {
1✔
696
            backend: RwLock::new(CacheBackend {
1✔
697
                vector_caches: Default::default(),
1✔
698
                raster_caches: Default::default(),
1✔
699
                lru: LruCache::unbounded(), // we need no cap because we evict manually
1✔
700
                cache_size: CacheSize::new(cache_size_bytes),
1✔
701
                landing_zone_size: CacheSize::new(landing_zone_size_bytes),
1✔
702
            }),
1✔
703
        })
1✔
704
    }
1✔
705
}
706

707
impl TestDefault for SharedCache {
708
    fn test_default() -> Self {
334✔
709
        Self {
334✔
710
            backend: RwLock::new(CacheBackend {
334✔
711
                vector_caches: Default::default(),
334✔
712
                raster_caches: Default::default(),
334✔
713
                lru: LruCache::unbounded(), // we need no cap because we evict manually
334✔
714
                cache_size: CacheSize::new(usize::MAX),
334✔
715
                landing_zone_size: CacheSize::new(usize::MAX),
334✔
716
            }),
334✔
717
        }
334✔
718
    }
334✔
719
}
720

721
/// Holds all the cached results for an operator graph (workflow)
722
#[derive(Debug, Default)]
723
pub struct OperatorCacheEntry<CacheEntriesContainer, LandingZoneEntriesContainer> {
724
    // for a given operator and query we need to look through all entries to find one that matches
725
    // TODO: use a multi-dimensional index to speed up the lookup
726
    entries: HashMap<CacheEntryId, CacheEntriesContainer>,
727

728
    // running queries insert their tiles as they are produced. The entry will be created once the query is done.
729
    // The query is identified by a Uuid instead of the query rectangle to avoid confusions with other queries
730
    landing_zone: HashMap<QueryId, LandingZoneEntriesContainer>,
731
}
732

733
impl<CacheEntriesContainer, LandingZoneEntriesContainer>
734
    OperatorCacheEntry<CacheEntriesContainer, LandingZoneEntriesContainer>
735
{
736
    pub fn new() -> Self {
12✔
737
        Self {
12✔
738
            entries: Default::default(),
12✔
739
            landing_zone: Default::default(),
12✔
740
        }
12✔
741
    }
12✔
742

743
    fn insert_landing_zone_entry(
12✔
744
        &mut self,
12✔
745
        query_id: QueryId,
12✔
746
        landing_zone_entry: LandingZoneEntriesContainer,
12✔
747
    ) -> Result<(), CacheError> {
12✔
748
        let old_entry = self.landing_zone.insert(query_id, landing_zone_entry);
12✔
749

750
        if old_entry.is_some() {
12✔
751
            Err(CacheError::QueryIdAlreadyInLandingZone)
×
752
        } else {
753
            Ok(())
12✔
754
        }
755
    }
12✔
756

757
    fn remove_landing_zone_entry(
12✔
758
        &mut self,
12✔
759
        query_id: &QueryId,
12✔
760
    ) -> Option<LandingZoneEntriesContainer> {
12✔
761
        self.landing_zone.remove(query_id)
12✔
762
    }
12✔
763

764
    fn landing_zone_entry_mut(
12✔
765
        &mut self,
12✔
766
        query_id: &QueryId,
12✔
767
    ) -> Option<&mut LandingZoneEntriesContainer> {
12✔
768
        self.landing_zone.get_mut(query_id)
12✔
769
    }
12✔
770

771
    fn insert_cache_entry(
5✔
772
        &mut self,
5✔
773
        cache_entry_id: CacheEntryId,
5✔
774
        cache_entry: CacheEntriesContainer,
5✔
775
    ) -> Result<(), CacheError> {
5✔
776
        let old_entry = self.entries.insert(cache_entry_id, cache_entry);
5✔
777

778
        if old_entry.is_some() {
5✔
779
            Err(CacheError::CacheEntryIdAlreadyInCache)
×
780
        } else {
781
            Ok(())
5✔
782
        }
783
    }
5✔
784

785
    fn remove_cache_entry(
2✔
786
        &mut self,
2✔
787
        cache_entry_id: &CacheEntryId,
2✔
788
    ) -> Option<CacheEntriesContainer> {
2✔
789
        self.entries.remove(cache_entry_id)
2✔
790
    }
2✔
791

792
    fn is_empty(&self) -> bool {
7✔
793
        self.entries.is_empty() && self.landing_zone.is_empty()
7✔
794
    }
7✔
795

796
    fn iter(&self) -> impl Iterator<Item = (&CacheEntryId, &CacheEntriesContainer)> {
7✔
797
        self.entries.iter()
7✔
798
    }
7✔
799
}
800

801
identifier!(QueryId);
802

803
impl ByteSize for QueryId {}
804

805
identifier!(CacheEntryId);
806

807
impl ByteSize for CacheEntryId {}
808

809
/// Holds all the elements for a given query and is able to answer queries that are fully contained
810
#[derive(Debug, Hash)]
811
pub struct CacheQueryEntry<Query, Elements> {
812
    pub query: Query,
813
    pub elements: Elements,
814
}
815
type RasterOperatorCacheEntry = OperatorCacheEntry<RasterCacheQueryEntry, RasterLandingQueryEntry>;
816
pub type RasterCacheQueryEntry = CacheQueryEntry<RasterQueryRectangle, CachedTiles>;
817
pub type RasterLandingQueryEntry = CacheQueryEntry<RasterQueryRectangle, LandingZoneQueryTiles>;
818

819
type VectorOperatorCacheEntry = OperatorCacheEntry<VectorCacheQueryEntry, VectorLandingQueryEntry>;
820
pub type VectorCacheQueryEntry = CacheQueryEntry<VectorQueryRectangle, CachedFeatures>;
821
pub type VectorLandingQueryEntry = CacheQueryEntry<VectorQueryRectangle, LandingZoneQueryFeatures>;
822

823
impl<Query, Elements> CacheQueryEntry<Query, Elements> {
824
    pub fn create_empty<E>(query: Query) -> Self
14✔
825
    where
14✔
826
        Elements: LandingZoneElementsContainer<E>,
14✔
827
    {
828
        Self {
14✔
829
            query,
14✔
830
            elements: Elements::create_empty(),
14✔
831
        }
14✔
832
    }
14✔
833

834
    pub fn query(&self) -> &Query {
8✔
835
        &self.query
8✔
836
    }
8✔
837

838
    pub fn elements_mut(&mut self) -> &mut Elements {
12✔
839
        &mut self.elements
12✔
840
    }
12✔
841

842
    pub fn insert_element<E>(&mut self, element: E) -> Result<(), CacheError>
5✔
843
    where
5✔
844
        Elements: LandingZoneElementsContainer<E>,
5✔
845
    {
846
        self.elements.insert_element(element)
5✔
847
    }
5✔
848
}
849

850
impl<Query, Elements> ByteSize for CacheQueryEntry<Query, Elements>
851
where
852
    Elements: ByteSize,
853
{
854
    fn heap_byte_size(&self) -> usize {
33✔
855
        self.elements.heap_byte_size()
33✔
856
    }
33✔
857
}
858

859
pub trait CacheQueryMatch<RHS = Self> {
860
    fn is_match(&self, query: &RHS) -> bool;
861
}
862

863
impl CacheQueryMatch for RasterQueryRectangle {
864
    fn is_match(&self, query: &RasterQueryRectangle) -> bool {
8✔
865
        let cache_spatial_query = self.spatial_bounds();
8✔
866
        let query_spatial_query = query.spatial_bounds();
8✔
867

868
        cache_spatial_query.contains(&query_spatial_query)
8✔
869
            && self.time_interval().contains(&query.time_interval())
7✔
870
            && self.attributes().contains_all(query.attributes().as_ref())
7✔
871
    }
8✔
872
}
873

874
impl CacheQueryMatch for VectorQueryRectangle {
875
    fn is_match(&self, query: &VectorQueryRectangle) -> bool {
3✔
876
        let cache_spatial_query = self.spatial_bounds();
3✔
877
        let query_spatial_query = query.spatial_bounds();
3✔
878

879
        cache_spatial_query.contains_bbox(&query_spatial_query)
3✔
880
            && self.time_interval().contains(&query.time_interval())
2✔
881
            && self.attributes() == query.attributes()
2✔
882
    }
3✔
883
}
884

885
pub trait LandingZoneElementsContainer<E> {
886
    fn insert_element(&mut self, element: E) -> Result<(), CacheError>;
887
    fn create_empty() -> Self;
888
}
889

890
pub trait CacheElementsContainerInfos<Query> {
891
    fn is_expired(&self) -> bool;
892
}
893

894
pub trait CacheElementsContainer<Query, E>: CacheElementsContainerInfos<Query> {
895
    fn results_arc(&self) -> Option<Arc<Vec<E>>>;
896
}
897

898
impl From<VectorLandingQueryEntry> for VectorCacheQueryEntry {
899
    fn from(value: VectorLandingQueryEntry) -> Self {
1✔
900
        Self {
1✔
901
            query: value.query,
1✔
902
            elements: value.elements.into(),
1✔
903
        }
1✔
904
    }
1✔
905
}
906

907
pub trait CacheElement: Sized + Send + Sync {
908
    type StoredCacheElement: CacheBackendElementExt<Query = Self::Query>;
909
    type Query: CacheQueryMatch;
910
    type ResultStream: Stream<Item = Result<Self, CacheError>>;
911

912
    fn into_stored_element(self) -> Self::StoredCacheElement;
913
    fn from_stored_element_ref(stored: &Self::StoredCacheElement) -> Result<Self, CacheError>;
914

915
    fn result_stream(
916
        stored_data: Arc<Vec<Self::StoredCacheElement>>,
917
        query: Self::Query,
918
    ) -> Self::ResultStream;
919
}
920

921
#[async_trait]
922
pub trait AsyncCache<C: CacheElement> {
923
    async fn query_cache(
924
        &self,
925
        key: &CanonicOperatorName,
926
        query: &C::Query,
927
    ) -> Result<Option<C::ResultStream>, CacheError>;
928

929
    async fn insert_query(
930
        &self,
931
        key: &CanonicOperatorName,
932
        query: &C::Query,
933
    ) -> Result<QueryId, CacheError>;
934

935
    async fn insert_query_element(
936
        &self,
937
        key: &CanonicOperatorName,
938
        query_id: &QueryId,
939
        landing_zone_element: C,
940
    ) -> Result<(), CacheError>;
941

942
    async fn abort_query(&self, key: &CanonicOperatorName, query_id: &QueryId);
943

944
    async fn finish_query(
945
        &self,
946
        key: &CanonicOperatorName,
947
        query_id: &QueryId,
948
    ) -> Result<CacheEntryId, CacheError>;
949
}
950

951
#[async_trait]
952
impl<C> AsyncCache<C> for SharedCache
953
where
954
    C: CacheElement + Send + Sync + 'static + ByteSize,
955
    CacheBackend: Cache<C::StoredCacheElement>,
956
    C::Query: Clone + CacheQueryMatch + Send + Sync,
957
{
958
    /// Query the cache and on hit create a stream of cache elements
959
    async fn query_cache(
960
        &self,
961
        key: &CanonicOperatorName,
962
        query: &C::Query,
963
    ) -> Result<Option<C::ResultStream>, CacheError> {
9✔
964
        let mut backend = self.backend.write().await;
965
        let res_data = backend.query_and_promote(key, query)?;
966
        Ok(res_data.map(|res_data| C::result_stream(res_data, query.clone())))
1✔
967
    }
9✔
968

969
    /// When inserting a new query, we first register the query and then insert the elements as they are produced
970
    /// This is to avoid confusing different queries on the same operator and query rectangle
971
    async fn insert_query(
972
        &self,
973
        key: &CanonicOperatorName,
974
        query: &C::Query,
975
    ) -> Result<QueryId, CacheError> {
8✔
976
        let mut backend = self.backend.write().await;
977
        backend.insert_query_into_landing_zone(key, query)
978
    }
8✔
979

980
    /// Insert a cachable element for a given query. The query has to be inserted first.
981
    /// The element is inserted into the landing zone and only moved to the cache when the query is finished.
982
    /// If the landing zone is full or the element size would cause the landing zone size to overflow, the caching of the query is aborted.
983
    async fn insert_query_element(
984
        &self,
985
        key: &CanonicOperatorName,
986
        query_id: &QueryId,
987
        landing_zone_element: C,
988
    ) -> Result<(), CacheError> {
213✔
989
        const LOG_LEVEL_THRESHOLD: tracing::Level = tracing::Level::TRACE;
990
        let element_size = if event_enabled!(LOG_LEVEL_THRESHOLD) {
991
            landing_zone_element.byte_size()
992
        } else {
993
            0
994
        };
995

996
        let storeable_element =
997
            crate::util::spawn_blocking(|| landing_zone_element.into_stored_element())
213✔
998
                .await
999
                .map_err(|_| CacheError::BlockingElementConversion)?;
1000

1001
        if event_enabled!(LOG_LEVEL_THRESHOLD) {
1002
            let storeable_element_size = storeable_element.byte_size();
1003
            tracing::trace!(
1004
                "Inserting element into landing zone for query {:?} on operator {}. Element size: {} bytes, storable element size: {} bytes, ratio: {}",
1005
                query_id,
1006
                key,
1007
                element_size,
1008
                storeable_element_size,
1009
                storeable_element_size as f64 / element_size as f64
1010
            );
1011
        }
1012

1013
        let mut backend = self.backend.write().await;
1014
        backend.insert_query_element_into_landing_zone(key, query_id, storeable_element)
1015
    }
213✔
1016

1017
    /// Abort the query and remove already inserted elements from the caches landing zone
1018
    async fn abort_query(&self, key: &CanonicOperatorName, query_id: &QueryId) {
×
1019
        let mut backend = self.backend.write().await;
1020
        backend.discard_query_from_landing_zone(key, query_id);
1021
    }
×
1022

1023
    /// Finish the query and make the inserted elements available in the cache
1024
    async fn finish_query(
1025
        &self,
1026
        key: &CanonicOperatorName,
1027
        query_id: &QueryId,
1028
    ) -> Result<CacheEntryId, CacheError> {
4✔
1029
        let mut backend = self.backend.write().await;
1030
        backend.move_query_from_landing_zone_to_cache(key, query_id)
1031
    }
4✔
1032
}
1033

1034
#[cfg(test)]
1035
mod tests {
1036
    use geoengine_datatypes::{
1037
        primitives::{BandSelection, CacheHint, DateTime, TimeInterval},
1038
        raster::{Grid, GridBoundingBox2D, RasterProperties, RasterTile2D},
1039
    };
1040
    use serde_json::json;
1041
    use std::sync::Arc;
1042

1043
    use crate::cache::cache_tiles::{CompressedGridOrEmpty, CompressedMaskedGrid};
1044

1045
    use super::*;
1046

1047
    async fn process_query_async(tile_cache: &mut SharedCache, op_name: CanonicOperatorName) {
1✔
1048
        let query_id = <SharedCache as AsyncCache<RasterTile2D<u8>>>::insert_query(
1✔
1049
            tile_cache,
1✔
1050
            &op_name,
1✔
1051
            &query_rect(),
1✔
1052
        )
1✔
1053
        .await
1✔
1054
        .unwrap();
1✔
1055

1056
        tile_cache
1✔
1057
            .insert_query_element(&op_name, &query_id, create_tile())
1✔
1058
            .await
1✔
1059
            .unwrap();
1✔
1060

1061
        <SharedCache as AsyncCache<RasterTile2D<u8>>>::finish_query(
1✔
1062
            tile_cache, &op_name, &query_id,
1✔
1063
        )
1✔
1064
        .await
1✔
1065
        .unwrap();
1✔
1066
    }
1✔
1067

1068
    fn process_query(tile_cache: &mut CacheBackend, op_name: &CanonicOperatorName) {
4✔
1069
        let query_id =
4✔
1070
            <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::insert_query_into_landing_zone(
4✔
1071
                tile_cache,
4✔
1072
                op_name,
4✔
1073
                &query_rect(),
4✔
1074
            )
1075
            .unwrap();
4✔
1076

1077
        tile_cache
4✔
1078
            .insert_query_element_into_landing_zone(op_name, &query_id, create_compressed_tile())
4✔
1079
            .unwrap();
4✔
1080

1081
        <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::move_query_from_landing_zone_to_cache(
4✔
1082
            tile_cache, op_name, &query_id,
4✔
1083
        )
1084
        .unwrap();
4✔
1085
    }
4✔
1086

1087
    fn create_tile() -> RasterTile2D<u8> {
1✔
1088
        RasterTile2D::<u8> {
1✔
1089
            time: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0)).unwrap(),
1✔
1090
            tile_position: [-1, 0].into(),
1✔
1091
            band: 0,
1✔
1092
            global_geo_transform: TestDefault::test_default(),
1✔
1093
            grid_array: Grid::new([3, 2].into(), vec![1, 2, 3, 4, 5, 6])
1✔
1094
                .unwrap()
1✔
1095
                .into(),
1✔
1096
            properties: RasterProperties::default(),
1✔
1097
            cache_hint: CacheHint::max_duration(),
1✔
1098
        }
1✔
1099
    }
1✔
1100

1101
    fn create_compressed_tile() -> CompressedRasterTile2D<u8> {
9✔
1102
        CompressedRasterTile2D::<u8> {
9✔
1103
            time: TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0)).unwrap(),
9✔
1104
            tile_position: [-1, 0].into(),
9✔
1105
            band: 0,
9✔
1106
            global_geo_transform: TestDefault::test_default(),
9✔
1107
            grid_array: CompressedGridOrEmpty::Compressed(CompressedMaskedGrid::new(
9✔
1108
                [3, 2].into(),
9✔
1109
                vec![1, 2, 3, 4, 5, 6],
9✔
1110
                vec![1; 6],
9✔
1111
            )),
9✔
1112
            properties: RasterProperties::default(),
9✔
1113
            cache_hint: CacheHint::max_duration(),
9✔
1114
        }
9✔
1115
    }
9✔
1116

1117
    fn query_rect() -> RasterQueryRectangle {
13✔
1118
        RasterQueryRectangle::new(
13✔
1119
            GridBoundingBox2D::new([-90, -180], [89, 179]).unwrap(),
13✔
1120
            TimeInterval::new_instant(DateTime::new_utc(2014, 3, 1, 0, 0, 0)).unwrap(),
13✔
1121
            BandSelection::first(),
13✔
1122
        )
1123
    }
13✔
1124

1125
    fn op(idx: usize) -> CanonicOperatorName {
12✔
1126
        CanonicOperatorName::new_unchecked(&json!({
12✔
1127
            "type": "GdalSource",
12✔
1128
            "params": {
12✔
1129
                "data": idx
12✔
1130
            }
12✔
1131
        }))
12✔
1132
    }
12✔
1133

1134
    #[tokio::test]
1135
    async fn it_evicts_lru() {
1✔
1136
        // Create cache entry and landing zone entry to geht the size of both
1137
        let landing_zone_entry = RasterLandingQueryEntry {
1✔
1138
            query: query_rect(),
1✔
1139
            elements: LandingZoneQueryTiles::U8(vec![create_compressed_tile()]),
1✔
1140
        };
1✔
1141
        let query_id = QueryId::new();
1✔
1142
        let size_of_landing_zone_entry = landing_zone_entry.byte_size() + query_id.byte_size();
1✔
1143
        let cache_entry: RasterCacheQueryEntry = landing_zone_entry.into();
1✔
1144
        let cache_entry_id = CacheEntryId::new();
1✔
1145
        let size_of_cache_entry = cache_entry.byte_size() + cache_entry_id.byte_size();
1✔
1146

1147
        // Select the max of both sizes
1148
        // This is done because the landing zone should not be smaller then the cache
1149
        let m_size = size_of_cache_entry.max(size_of_landing_zone_entry);
1✔
1150

1151
        // set limits s.t. three tiles fit
1152

1153
        let mut cache_backend = CacheBackend {
1✔
1154
            raster_caches: Default::default(),
1✔
1155
            vector_caches: Default::default(),
1✔
1156
            lru: LruCache::unbounded(),
1✔
1157
            cache_size: CacheSize::new(m_size * 3),
1✔
1158
            landing_zone_size: CacheSize::new(m_size * 3),
1✔
1159
        };
1✔
1160

1161
        // process three different queries
1162
        process_query(&mut cache_backend, &op(1));
1✔
1163
        process_query(&mut cache_backend, &op(2));
1✔
1164
        process_query(&mut cache_backend, &op(3));
1✔
1165

1166
        // query the first one s.t. it is the most recently used
1167
        <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::query_and_promote(
1✔
1168
            &mut cache_backend,
1✔
1169
            &op(1),
1✔
1170
            &query_rect(),
1✔
1171
        )
1172
        .unwrap();
1✔
1173

1174
        // process a fourth query
1175
        process_query(&mut cache_backend, &op(4));
1✔
1176

1177
        // assure the seconds query is evicted because it is the least recently used
1178
        assert!(
1✔
1179
            <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::query_and_promote(
1✔
1180
                &mut cache_backend,
1✔
1181
                &op(2),
1✔
1182
                &query_rect()
1✔
1183
            )
1✔
1184
            .unwrap()
1✔
1185
            .is_none()
1✔
1186
        );
1187

1188
        // assure that the other queries are still in the cache
1189
        for i in [1, 3, 4] {
3✔
1190
            assert!(
3✔
1191
                <CacheBackend as Cache<CompressedRasterTile2D<u8>>>::query_and_promote(
3✔
1192
                    &mut cache_backend,
3✔
1193
                    &op(i),
3✔
1194
                    &query_rect()
3✔
1195
                )
3✔
1196
                .unwrap()
3✔
1197
                .is_some()
3✔
1198
            );
1✔
1199
        }
1✔
1200

1✔
1201
        assert_eq!(
1✔
1202
            cache_backend.cache_size.byte_size_used(),
1✔
1203
            3 * size_of_cache_entry
1✔
1204
        );
1✔
1205
    }
1✔
1206

1207
    #[test]
1208
    fn cache_byte_size() {
1✔
1209
        assert_eq!(create_compressed_tile().byte_size(), 276);
1✔
1210
        assert_eq!(
1✔
1211
            CachedTiles::U8(Arc::new(vec![create_compressed_tile()])).byte_size(),
1✔
1212
            /* enum + arc */ 16 + /* vec */ 24  + /* tile */ 276
1✔
1213
        );
1214
        assert_eq!(
1✔
1215
            CachedTiles::U8(Arc::new(vec![
1✔
1216
                create_compressed_tile(),
1✔
1217
                create_compressed_tile()
1✔
1218
            ]))
1✔
1219
            .byte_size(),
1✔
1220
            /* enum + arc */ 16 + /* vec */ 24  + /* tile */ 2 * 276
1✔
1221
        );
1222
    }
1✔
1223

1224
    #[tokio::test]
1225
    async fn it_checks_ttl() {
1✔
1226
        let mut tile_cache = SharedCache {
1✔
1227
            backend: RwLock::new(CacheBackend {
1✔
1228
                raster_caches: Default::default(),
1✔
1229
                vector_caches: Default::default(),
1✔
1230
                lru: LruCache::unbounded(),
1✔
1231
                cache_size: CacheSize::new(usize::MAX),
1✔
1232
                landing_zone_size: CacheSize::new(usize::MAX),
1✔
1233
            }),
1✔
1234
        };
1✔
1235

1236
        process_query_async(&mut tile_cache, op(1)).await;
1✔
1237

1238
        // access works because no ttl is set
1239
        <SharedCache as AsyncCache<RasterTile2D<u8>>>::query_cache(
1✔
1240
            &tile_cache,
1✔
1241
            &op(1),
1✔
1242
            &query_rect(),
1✔
1243
        )
1✔
1244
        .await
1✔
1245
        .unwrap()
1✔
1246
        .unwrap();
1✔
1247

1248
        // manually expire entry
1249
        {
1✔
1250
            let mut backend = tile_cache.backend.write().await;
1✔
1251
            let cache = backend.raster_caches.iter_mut().next().unwrap();
1✔
1252

1✔
1253
            let tiles = &mut cache.1.entries.iter_mut().next().unwrap().1.elements;
1✔
1254
            match tiles {
1✔
1255
                CachedTiles::U8(tiles) => {
1✔
1256
                    let mut expired_tiles = (**tiles).clone();
1✔
1257
                    expired_tiles[0].cache_hint = CacheHint::with_created_and_expires(
1✔
1258
                        DateTime::new_utc(0, 1, 1, 0, 0, 0),
1✔
1259
                        DateTime::new_utc(0, 1, 1, 0, 0, 1).into(),
1✔
1260
                    );
1✔
1261
                    *tiles = Arc::new(expired_tiles);
1✔
1262
                }
1✔
1263
                _ => panic!("wrong tile type"),
1✔
1264
            }
1✔
1265
        }
1✔
1266

1✔
1267
        // access fails because ttl is expired
1✔
1268
        assert!(
1✔
1269
            <SharedCache as AsyncCache<RasterTile2D<u8>>>::query_cache(
1✔
1270
                &tile_cache,
1✔
1271
                &op(1),
1✔
1272
                &query_rect()
1✔
1273
            )
1✔
1274
            .await
1✔
1275
            .unwrap()
1✔
1276
            .is_none()
1✔
1277
        );
1✔
1278
    }
1✔
1279

1280
    #[tokio::test]
1281
    async fn tile_cache_init_size() {
1✔
1282
        let tile_cache = SharedCache::new(100, 0.1).unwrap();
1✔
1283

1284
        let backend = tile_cache.backend.read().await;
1✔
1285

1286
        let cache_size = 90 * 1024 * 1024;
1✔
1287
        let landing_zone_size = 10 * 1024 * 1024;
1✔
1288

1289
        assert_eq!(backend.cache_size.total_byte_size(), cache_size);
1✔
1290
        assert_eq!(
1✔
1291
            backend.landing_zone_size.total_byte_size(),
1✔
1292
            landing_zone_size
1✔
1293
        );
1✔
1294
    }
1✔
1295
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc