• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16618319975

30 Jul 2025 09:08AM UTC coverage: 82.694% (+0.007%) from 82.687%
16618319975

Pull #4053

github

web-flow
Merge dad0cc5c9 into 5f86536fe
Pull Request #4053: chore: add `into_cpu_stream`

49 of 50 new or added lines in 3 files covered. (98.0%)

1 existing line in 1 file now uncovered.

45241 of 54709 relevant lines covered (82.69%)

184655.04 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.58
/vortex-datafusion/src/persistent/opener.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::ops::Range;
5
use std::sync::{Arc, Weak};
6

7
use dashmap::{DashMap, Entry};
8
use datafusion::arrow::datatypes::SchemaRef;
9
use datafusion::arrow::error::ArrowError;
10
use datafusion::common::{DataFusionError, Result as DFResult};
11
use datafusion::datasource::listing::PartitionedFile;
12
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
13
use futures::{FutureExt as _, StreamExt, TryStreamExt, stream};
14
use object_store::ObjectStore;
15
use object_store::path::Path;
16
use vortex::error::VortexError;
17
use vortex::expr::{ExprRef, VortexExpr};
18
use vortex::layout::LayoutReader;
19
use vortex::metrics::VortexMetrics;
20
use vortex::scan::ScanBuilder;
21
use vortex::{ArrayRef, ToCanonical};
22

23
use super::cache::VortexFileCache;
24

25
#[derive(Clone)]
26
pub(crate) struct VortexFileOpener {
27
    pub object_store: Arc<dyn ObjectStore>,
28
    pub projection: ExprRef,
29
    pub filter: Option<ExprRef>,
30
    pub(crate) file_cache: VortexFileCache,
31
    pub projected_arrow_schema: SchemaRef,
32
    pub batch_size: usize,
33
    pub limit: Option<usize>,
34
    metrics: VortexMetrics,
35
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
36
}
37

38
impl VortexFileOpener {
39
    #[allow(clippy::too_many_arguments)]
40
    pub fn new(
484✔
41
        object_store: Arc<dyn ObjectStore>,
484✔
42
        projection: Arc<dyn VortexExpr>,
484✔
43
        filter: Option<Arc<dyn VortexExpr>>,
484✔
44
        file_cache: VortexFileCache,
484✔
45
        projected_arrow_schema: SchemaRef,
484✔
46
        batch_size: usize,
484✔
47
        limit: Option<usize>,
484✔
48
        metrics: VortexMetrics,
484✔
49
        layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
484✔
50
    ) -> Self {
484✔
51
        Self {
484✔
52
            object_store,
484✔
53
            projection,
484✔
54
            filter,
484✔
55
            file_cache,
484✔
56
            projected_arrow_schema,
484✔
57
            batch_size,
484✔
58
            limit,
484✔
59
            metrics,
484✔
60
            layout_readers,
484✔
61
        }
484✔
62
    }
484✔
63
}
64

65
impl FileOpener for VortexFileOpener {
66
    fn open(&self, file_meta: FileMeta, _file: PartitionedFile) -> DFResult<FileOpenFuture> {
484✔
67
        let filter = self.filter.clone();
484✔
68
        let projection = self.projection.clone();
484✔
69
        let file_cache = self.file_cache.clone();
484✔
70
        let object_store = self.object_store.clone();
484✔
71
        let projected_arrow_schema = self.projected_arrow_schema.clone();
484✔
72
        let metrics = self.metrics.clone();
484✔
73
        let batch_size = self.batch_size;
484✔
74
        let limit = self.limit;
484✔
75
        let layout_reader = self.layout_readers.clone();
484✔
76

77
        Ok(async move {
484✔
78
            let vxf = file_cache
484✔
79
                .try_get(&file_meta.object_meta, object_store)
484✔
80
                .await
484✔
81
                .map_err(|e| {
484✔
82
                    DataFusionError::Execution(format!("Failed to open Vortex file {e}"))
×
83
                })?;
×
84

85
            // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
86
            let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) {
484✔
87
                Entry::Occupied(mut occupied_entry) => {
308✔
88
                    if let Some(reader) = occupied_entry.get().upgrade() {
308✔
89
                        log::trace!("reusing layout reader for {}", occupied_entry.key());
280✔
90
                        reader
280✔
91
                    } else {
92
                        log::trace!("creating layout reader for {}", occupied_entry.key());
28✔
93
                        let reader = vxf.layout_reader().map_err(|e| {
28✔
94
                            DataFusionError::Execution(format!(
×
95
                                "Failed to create layout reader: {e}"
×
96
                            ))
×
97
                        })?;
×
98
                        occupied_entry.insert(Arc::downgrade(&reader));
28✔
99
                        reader
28✔
100
                    }
101
                }
102
                Entry::Vacant(vacant_entry) => {
176✔
103
                    log::trace!("creating layout reader for {}", vacant_entry.key());
176✔
104
                    let reader = vxf.layout_reader().map_err(|e| {
176✔
105
                        DataFusionError::Execution(format!("Failed to create layout reader: {e}"))
×
106
                    })?;
×
107
                    vacant_entry.insert(Arc::downgrade(&reader));
176✔
108

109
                    reader
176✔
110
                }
111
            };
112

113
            let scan_builder = ScanBuilder::new(layout_reader);
484✔
114
            let mut scan_builder = apply_byte_range(file_meta, vxf.row_count(), scan_builder);
484✔
115

116
            if let Some(limit) = limit {
484✔
117
                if filter.is_none() {
×
118
                    scan_builder = scan_builder.with_limit(limit);
×
119
                }
×
120
            }
484✔
121

122
            let stream = scan_builder
484✔
123
                .with_metrics(metrics)
484✔
124
                .with_projection(projection)
484✔
125
                .with_some_filter(filter)
484✔
126
                .map(move |chunk| {
906✔
127
                    let st = chunk.to_struct()?;
906✔
128
                    st.into_record_batch_with_schema(projected_arrow_schema.as_ref())
906✔
129
                })
906✔
130
                .into_tokio_stream()
484✔
131
                .map_err(|e| {
484✔
132
                    DataFusionError::Execution(format!("Failed to create Vortex stream: {e}"))
×
NEW
133
                })?
×
134
                .map_ok(move |rb| {
906✔
135
                    // We try and slice the stream into respecting datafusion's configured batch size.
136
                    stream::iter(
906✔
137
                        (0..rb.num_rows().div_ceil(batch_size * 2))
906✔
138
                            .flat_map(move |block_idx| {
1,680✔
139
                                let offset = block_idx * batch_size * 2;
1,680✔
140

141
                                // If we have less than two batches worth of rows left, we keep them together as a single batch.
142
                                if rb.num_rows() - offset < 2 * batch_size {
1,680✔
143
                                    let length = rb.num_rows() - offset;
856✔
144
                                    [Some(rb.slice(offset, length)), None].into_iter()
856✔
145
                                } else {
146
                                    let first = rb.slice(offset, batch_size);
824✔
147
                                    let second = rb.slice(offset + batch_size, batch_size);
824✔
148
                                    [Some(first), Some(second)].into_iter()
824✔
149
                                }
150
                            })
1,680✔
151
                            .flatten()
906✔
152
                            .map(Ok),
906✔
153
                    )
154
                })
906✔
155
                .map_err(|e: VortexError| ArrowError::ExternalError(Box::new(e)))
484✔
156
                .try_flatten()
484✔
157
                .boxed();
484✔
158

159
            Ok(stream)
484✔
160
        }
484✔
161
        .boxed())
484✔
162
    }
484✔
163
}
164

165
/// If the file has a [`FileRange`](datafusion::datasource::listing::FileRange), we translate it into a row range in the file for the scan.
166
fn apply_byte_range(
484✔
167
    file_meta: FileMeta,
484✔
168
    row_count: u64,
484✔
169
    scan_builder: ScanBuilder<ArrayRef>,
484✔
170
) -> ScanBuilder<ArrayRef> {
484✔
171
    if let Some(byte_range) = file_meta.range {
484✔
172
        let row_range = byte_range_to_row_range(
352✔
173
            byte_range.start as u64..byte_range.end as u64,
352✔
174
            row_count,
352✔
175
            file_meta.object_meta.size,
352✔
176
        );
177

178
        scan_builder.with_row_range(row_range)
352✔
179
    } else {
180
        scan_builder
132✔
181
    }
182
}
484✔
183

184
fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u64) -> Range<u64> {
361✔
185
    let average_row = total_size / row_count;
361✔
186
    assert!(average_row > 0, "A row must always have at least one byte");
361✔
187

188
    let start_row = byte_range.start / average_row;
361✔
189
    let end_row = byte_range.end / average_row;
361✔
190

191
    // We take the min here as `end_row` might overshoot
192
    start_row..u64::min(row_count, end_row)
361✔
193
}
361✔
194

195
#[cfg(test)]
196
mod tests {
197
    use itertools::Itertools;
198
    use rstest::rstest;
199

200
    use super::*;
201

202
    #[rstest]
203
    #[case(0..100, 100, 100, 0..100)]
204
    #[case(0..105, 100, 105, 0..100)]
205
    #[case(0..50, 100, 105, 0..50)]
206
    #[case(50..105, 100, 105, 50..100)]
207
    #[case(0..1, 4, 8, 0..0)]
208
    #[case(1..8, 4, 8, 0..4)]
209
    fn test_range_translation(
210
        #[case] byte_range: Range<u64>,
211
        #[case] row_count: u64,
212
        #[case] total_size: u64,
213
        #[case] expected: Range<u64>,
214
    ) {
215
        assert_eq!(
216
            byte_range_to_row_range(byte_range, row_count, total_size),
217
            expected
218
        );
219
    }
220

221
    #[test]
222
    fn test_consecutive_ranges() {
1✔
223
        let row_count = 100;
1✔
224
        let total_size = 429;
1✔
225
        let bytes_a = 0..143;
1✔
226
        let bytes_b = 143..286;
1✔
227
        let bytes_c = 286..429;
1✔
228

229
        let rows_a = byte_range_to_row_range(bytes_a, row_count, total_size);
1✔
230
        let rows_b = byte_range_to_row_range(bytes_b, row_count, total_size);
1✔
231
        let rows_c = byte_range_to_row_range(bytes_c, row_count, total_size);
1✔
232

233
        assert_eq!(rows_a.end - rows_a.start, 35);
1✔
234
        assert_eq!(rows_b.end - rows_b.start, 36);
1✔
235
        assert_eq!(rows_c.end - rows_c.start, 29);
1✔
236

237
        assert_eq!(rows_a.start, 0);
1✔
238
        assert_eq!(rows_c.end, 100);
1✔
239
        for (left, right) in [rows_a, rows_b, rows_c].iter().tuple_windows() {
2✔
240
            assert_eq!(left.end, right.start);
2✔
241
        }
242
    }
1✔
243
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc