• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16331938722

16 Jul 2025 10:49PM UTC coverage: 80.702% (-0.9%) from 81.557%
16331938722

push

github

web-flow
feat: build with stable rust (#3881)

120 of 173 new or added lines in 28 files covered. (69.36%)

174 existing lines in 102 files now uncovered.

41861 of 51871 relevant lines covered (80.7%)

157487.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.81
/vortex-datafusion/src/persistent/cache.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5

6
use async_trait::async_trait;
7
use chrono::{DateTime, Utc};
8
use datafusion::common::ScalarValue;
9
use moka::future::Cache;
10
use object_store::path::Path;
11
use object_store::{ObjectMeta, ObjectStore};
12
use vortex::buffer::ByteBuffer;
13
use vortex::dtype::DType;
14
use vortex::error::{VortexError, VortexResult, vortex_err};
15
use vortex::file::segments::SegmentCache;
16
use vortex::file::{Footer, SegmentSpec, VortexFile, VortexOpenOptions};
17
use vortex::layout::segments::SegmentId;
18
use vortex::session::VortexSession;
19
use vortex::stats::{Precision, Stat};
20
use vortex::utils::aliases::DefaultHashBuilder;
21

22
#[derive(Clone)]
23
pub(crate) struct VortexFileCache {
24
    file_cache: Cache<FileKey, VortexFile, DefaultHashBuilder>,
25
    segment_cache: Cache<SegmentKey, ByteBuffer, DefaultHashBuilder>,
26
    session: Arc<VortexSession>,
27
}
28

29
/// Cache key for a [`VortexFile`].
30
#[derive(Hash, Eq, PartialEq, Debug, Clone)]
31
struct FileKey {
32
    location: Arc<Path>,
33
    m_time: DateTime<Utc>,
34
}
35

36
impl From<&ObjectMeta> for FileKey {
37
    fn from(value: &ObjectMeta) -> Self {
514✔
38
        Self {
514✔
39
            location: Arc::new(value.location.clone()),
514✔
40
            m_time: value.last_modified,
514✔
41
        }
514✔
42
    }
514✔
43
}
44

45
/// Global cache key for a segment.
46
#[derive(Hash, Eq, PartialEq, Debug)]
47
struct SegmentKey {
48
    file: FileKey,
49
    segment_id: SegmentId,
50
}
51

52
impl VortexFileCache {
53
    pub fn new(size_mb: usize, segment_size_mb: usize, session: Arc<VortexSession>) -> Self {
20✔
54
        let file_cache = Cache::builder()
20✔
55
            .max_capacity(size_mb as u64 * (1 << 20))
20✔
56
            .eviction_listener(|k: Arc<FileKey>, _v: VortexFile, cause| {
20✔
57
                log::trace!("Removed {k:?} due to {cause:?}");
×
UNCOV
58
            })
×
59
            .weigher(|_k, vxf| {
20✔
60
                u32::try_from(estimate_layout_size(vxf.footer())).unwrap_or(u32::MAX)
20✔
61
            })
20✔
62
            .build_with_hasher(DefaultHashBuilder::default());
20✔
63

64
        let segment_cache = Cache::builder()
20✔
65
            .max_capacity(segment_size_mb as u64 * (1 << 20))
20✔
66
            .eviction_listener(|k: Arc<SegmentKey>, _v: ByteBuffer, cause| {
20✔
67
                log::trace!("Removed {k:?} due to {cause:?}");
×
UNCOV
68
            })
×
69
            .weigher(|_k, v| u32::try_from(v.len()).unwrap_or(u32::MAX))
20✔
70
            .build_with_hasher(DefaultHashBuilder::default());
20✔
71

72
        Self {
20✔
73
            file_cache,
20✔
74
            segment_cache,
20✔
75
            session,
20✔
76
        }
20✔
77
    }
20✔
78

79
    pub async fn try_get(
514✔
80
        &self,
514✔
81
        object: &ObjectMeta,
514✔
82
        object_store: Arc<dyn ObjectStore>,
514✔
83
    ) -> VortexResult<VortexFile> {
514✔
84
        let file_key = FileKey::from(object);
514✔
85
        self.file_cache
514✔
86
            .try_get_with(
514✔
87
                file_key.clone(),
514✔
88
                VortexOpenOptions::file()
514✔
89
                    // FIXME(ngates): we don't really want to clone on every open...
514✔
90
                    .with_array_registry(Arc::new(self.session.arrays().clone()))
514✔
91
                    .with_layout_registry(Arc::new(self.session.layouts().clone()))
514✔
92
                    .with_metrics(
514✔
93
                        self.session
514✔
94
                            .metrics()
514✔
95
                            .child_with_tags([("filename", object.location.to_string())]),
514✔
96
                    )
514✔
97
                    .with_file_size(object.size)
514✔
98
                    .with_segment_cache(Arc::new(VortexFileSegmentCache {
514✔
99
                        file_key,
514✔
100
                        segment_cache: self.segment_cache.clone(),
514✔
101
                    }))
514✔
102
                    .open_object_store(&object_store, object.location.as_ref()),
514✔
103
            )
514✔
104
            .await
514✔
105
            .map_err(|e: Arc<VortexError>| {
514✔
106
                Arc::try_unwrap(e).unwrap_or_else(|e| vortex_err!("{}", e.to_string()))
×
UNCOV
107
            })
×
108
    }
514✔
109
}
110

111
/// A [`SegmentCache`] implementation that uses the shared global segment cache.
112
struct VortexFileSegmentCache {
113
    file_key: FileKey,
114
    segment_cache: Cache<SegmentKey, ByteBuffer, DefaultHashBuilder>,
115
}
116

117
#[async_trait]
118
impl SegmentCache for VortexFileSegmentCache {
119
    async fn get(&self, segment_id: SegmentId) -> VortexResult<Option<ByteBuffer>> {
2,916✔
120
        Ok(self
1,458✔
121
            .segment_cache
1,458✔
122
            .get(&SegmentKey {
1,458✔
123
                file: self.file_key.clone(),
1,458✔
124
                segment_id,
1,458✔
125
            })
1,458✔
126
            .await)
1,458✔
127
    }
2,916✔
128

129
    async fn put(&self, segment_id: SegmentId, buffer: ByteBuffer) -> VortexResult<()> {
2,916✔
130
        self.segment_cache
1,458✔
131
            .insert(
1,458✔
132
                SegmentKey {
1,458✔
133
                    file: self.file_key.clone(),
1,458✔
134
                    segment_id,
1,458✔
135
                },
1,458✔
136
                buffer,
1,458✔
137
            )
1,458✔
138
            .await;
1,458✔
139
        Ok(())
1,458✔
140
    }
2,916✔
141
}
142

143
/// Approximate the in-memory size of a layout
144
fn estimate_layout_size(footer: &Footer) -> usize {
20✔
145
    let segments_size = footer.segment_map().len() * size_of::<SegmentSpec>();
20✔
146
    let stats_size = footer
20✔
147
        .statistics()
20✔
148
        .iter()
20✔
149
        .map(|v| {
20✔
150
            v.iter()
20✔
151
                .map(|_| size_of::<Stat>() + size_of::<Precision<ScalarValue>>())
130✔
152
                .sum::<usize>()
20✔
153
        })
20✔
154
        .sum::<usize>();
20✔
155

156
    let root_layout = footer.layout();
20✔
157
    let layout_size = size_of::<DType>()
20✔
158
        + root_layout.metadata().len()
20✔
159
        + root_layout.segment_ids().len() * size_of::<SegmentId>();
20✔
160

161
    segments_size + stats_size + layout_size
20✔
162
}
20✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc