• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16565510329

28 Jul 2025 09:34AM UTC coverage: 81.812% (+0.02%) from 81.789%
16565510329

Pull #4019

github

web-flow
Merge 0c114cac4 into 3d13d2ec0
Pull Request #4019: Refactor read I/O

389 of 429 new or added lines in 17 files covered. (90.68%)

68 existing lines in 5 files now uncovered.

43096 of 52677 relevant lines covered (81.81%)

171076.89 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.96
/vortex-file/src/segments/cache.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5

6
use async_trait::async_trait;
7
use dashmap::DashMap;
8
use futures::FutureExt;
9
use moka::future::{Cache, CacheBuilder};
10
use moka::policy::EvictionPolicy;
11
use rustc_hash::FxBuildHasher;
12
use vortex_buffer::ByteBuffer;
13
use vortex_error::{VortexExpect, VortexResult};
14
use vortex_layout::segments::{SegmentFuture, SegmentId, SegmentSource};
15
use vortex_metrics::{Counter, VortexMetrics};
16

17
/// A cache for storing and retrieving individual segment data.
18
#[async_trait]
19
pub trait SegmentCache: Send + Sync {
20
    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>>;
21
    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()>;
22
}
23

24
/// A [`SegmentCache`] based around an in-memory Moka cache.
25
pub struct MokaSegmentCache(Cache<SegmentId, ByteBuffer, FxBuildHasher>);
26

27
impl MokaSegmentCache {
28
    pub fn new(max_capacity_bytes: u64) -> Self {
1,085✔
29
        Self(
30
            CacheBuilder::new(max_capacity_bytes)
1,085✔
31
                .name("vortex-segment-cache")
1,085✔
32
                // Weight each segment by the number of bytes in the buffer.
33
                .weigher(|_, buffer: &ByteBuffer| {
1,831✔
34
                    u32::try_from(buffer.len().min(u32::MAX as usize)).vortex_expect("must fit")
1,440✔
35
                })
1,440✔
36
                // We configure LFU (vs LRU) since the cache is mostly used when re-reading the
37
                // same file - it is _not_ used when reading the same segments during a single
38
                // scan.
39
                .eviction_policy(EvictionPolicy::tiny_lfu())
1,085✔
40
                .build_with_hasher(FxBuildHasher),
1,085✔
41
        )
42
    }
1,085✔
43
}
44

45
#[async_trait]
46
impl SegmentCache for MokaSegmentCache {
47
    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
2,880✔
48
        Ok(self.0.get(&id).await)
1,440✔
49
    }
2,880✔
50

51
    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
2,880✔
52
        self.0.insert(id, buffer).await;
1,440✔
53
        Ok(())
1,440✔
54
    }
2,880✔
55
}
56

57
/// Segment cache containing the initial read segments.
58
pub(crate) struct InitialReadSegmentCache {
59
    pub(crate) initial: DashMap<SegmentId, ByteBuffer>,
60
    pub(crate) fallback: Option<Arc<dyn SegmentCache>>,
61
}
62

63
#[async_trait]
64
impl SegmentCache for InitialReadSegmentCache {
65
    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
8,080✔
66
        if let Some(buffer) = self.initial.get(&id) {
4,040✔
67
            return Ok(Some(buffer.clone()));
1,098✔
68
        }
2,942✔
69
        if let Some(fb) = &self.fallback {
2,942✔
70
            fb.get(id).await
2,942✔
71
        } else {
NEW
72
            Ok(None)
×
73
        }
74
    }
8,080✔
75

76
    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
5,884✔
77
        if let Some(fb) = &self.fallback {
2,942✔
78
            fb.put(id, buffer).await
2,942✔
79
        } else {
NEW
80
            Ok(())
×
81
        }
82
    }
5,884✔
83
}
84

85
pub struct SegmentCacheMetrics<C> {
86
    segment_cache: C,
87

88
    hits: Arc<Counter>,
89
    misses: Arc<Counter>,
90
    stores: Arc<Counter>,
91
}
92

93
impl<C: SegmentCache> SegmentCacheMetrics<C> {
94
    pub fn new(segment_cache: C, metrics: VortexMetrics) -> Self {
547✔
95
        Self {
547✔
96
            segment_cache,
547✔
97
            hits: metrics.counter("vortex.file.segments.cache.hits"),
547✔
98
            misses: metrics.counter("vortex.file.segments.cache.misses"),
547✔
99
            stores: metrics.counter("vortex.file.segments.cache.stores"),
547✔
100
        }
547✔
101
    }
547✔
102
}
103

104
#[async_trait]
105
impl<C: SegmentCache> SegmentCache for SegmentCacheMetrics<C> {
106
    async fn get(&self, id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
8,080✔
107
        let result = self.segment_cache.get(id).await?;
4,040✔
108
        if result.is_some() {
4,040✔
109
            self.hits.inc()
1,098✔
110
        } else {
111
            self.misses.inc()
2,942✔
112
        }
113
        Ok(result)
4,040✔
114
    }
8,080✔
115

116
    async fn put(&self, id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
5,884✔
117
        self.segment_cache.put(id, buffer).await?;
2,942✔
118
        self.stores.inc();
2,942✔
119
        Ok(())
2,942✔
120
    }
5,884✔
121
}
122

123
pub struct SegmentCacheSourceAdapter {
124
    cache: Arc<dyn SegmentCache>,
125
    source: Arc<dyn SegmentSource>,
126
}
127

128
impl SegmentCacheSourceAdapter {
129
    pub fn new(cache: Arc<dyn SegmentCache>, source: Arc<dyn SegmentSource>) -> Self {
527✔
130
        Self { cache, source }
527✔
131
    }
527✔
132
}
133

134
impl SegmentSource for SegmentCacheSourceAdapter {
135
    fn request(&self, id: SegmentId, for_whom: &Arc<str>) -> SegmentFuture {
8,003✔
136
        let cache = self.cache.clone();
8,003✔
137
        let delegate = self.source.request(id, for_whom);
8,003✔
138
        let for_whom = for_whom.clone();
8,003✔
139

140
        async move {
4,040✔
141
            if let Ok(Some(segment)) = cache.get(id).await {
4,040✔
142
                log::debug!("Resolved segment {} for {} from cache", id, &for_whom);
1,098✔
143
                return Ok(segment);
1,098✔
144
            }
2,942✔
145
            let result = delegate.await?;
2,942✔
146
            if let Err(e) = cache.put(id, result.clone()).await {
2,942✔
147
                log::warn!(
×
148
                    "Failed to store segment {} for {} in cache: {}",
×
149
                    id,
150
                    &for_whom,
×
151
                    e
152
                );
153
            }
2,942✔
154
            Ok(result)
2,942✔
155
        }
4,040✔
156
        .boxed()
8,003✔
157
    }
8,003✔
158
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc