• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16372702366

18 Jul 2025 02:11PM UTC coverage: 81.499% (-0.03%) from 81.529%
16372702366

Pull #3899

github

web-flow
Merge b2016e2e9 into e971e6c7f
Pull Request #3899: Remove async API for scanning

188 of 207 new or added lines in 9 files covered. (90.82%)

10 existing lines in 2 files now uncovered.

42034 of 51576 relevant lines covered (81.5%)

171519.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.73
/vortex-datafusion/src/persistent/opener.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::ops::Range;
5
use std::sync::{Arc, Weak};
6

7
use dashmap::{DashMap, Entry};
8
use datafusion::arrow::datatypes::SchemaRef;
9
use datafusion::common::{DataFusionError, Result as DFResult};
10
use datafusion::datasource::physical_plan::{FileMeta, FileOpenFuture, FileOpener};
11
use futures::{FutureExt as _, StreamExt, TryFutureExt, TryStreamExt, stream};
12
use object_store::ObjectStore;
13
use object_store::path::Path;
14
use tokio::runtime::Handle;
15
use vortex::ArrayRef;
16
use vortex::error::vortex_err;
17
use vortex::expr::{ExprRef, VortexExpr};
18
use vortex::layout::LayoutReader;
19
use vortex::metrics::VortexMetrics;
20
use vortex::scan::ScanBuilder;
21

22
use super::cache::VortexFileCache;
23

24
#[derive(Clone)]
25
pub(crate) struct VortexFileOpener {
26
    pub object_store: Arc<dyn ObjectStore>,
27
    pub projection: ExprRef,
28
    pub filter: Option<ExprRef>,
29
    pub(crate) file_cache: VortexFileCache,
30
    pub projected_arrow_schema: SchemaRef,
31
    pub batch_size: usize,
32
    pub limit: Option<usize>,
33
    metrics: VortexMetrics,
34
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
35
}
36

37
impl VortexFileOpener {
38
    #[allow(clippy::too_many_arguments)]
39
    pub fn new(
484✔
40
        object_store: Arc<dyn ObjectStore>,
484✔
41
        projection: Arc<dyn VortexExpr>,
484✔
42
        filter: Option<Arc<dyn VortexExpr>>,
484✔
43
        file_cache: VortexFileCache,
484✔
44
        projected_arrow_schema: SchemaRef,
484✔
45
        batch_size: usize,
484✔
46
        limit: Option<usize>,
484✔
47
        metrics: VortexMetrics,
484✔
48
        layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
484✔
49
    ) -> Self {
484✔
50
        Self {
484✔
51
            object_store,
484✔
52
            projection,
484✔
53
            filter,
484✔
54
            file_cache,
484✔
55
            projected_arrow_schema,
484✔
56
            batch_size,
484✔
57
            limit,
484✔
58
            metrics,
484✔
59
            layout_readers,
484✔
60
        }
484✔
61
    }
484✔
62
}
63

64
impl FileOpener for VortexFileOpener {
65
    fn open(&self, file_meta: FileMeta) -> DFResult<FileOpenFuture> {
484✔
66
        let filter = self.filter.clone();
484✔
67
        let projection = self.projection.clone();
484✔
68
        let file_cache = self.file_cache.clone();
484✔
69
        let object_store = self.object_store.clone();
484✔
70
        let projected_arrow_schema = self.projected_arrow_schema.clone();
484✔
71
        let metrics = self.metrics.clone();
484✔
72
        let batch_size = self.batch_size;
484✔
73
        let limit = self.limit;
484✔
74
        let layout_reader = self.layout_readers.clone();
484✔
75

76
        Ok(async move {
484✔
77
            let vxf = file_cache
484✔
78
                .try_get(&file_meta.object_meta, object_store)
484✔
79
                .await
484✔
80
                .map_err(|e| {
484✔
81
                    DataFusionError::Execution(format!("Failed to open Vortex file {e}"))
×
82
                })?;
×
83

84
            // We share our layout readers with others partitions in the scan, so we can only need to read each layout in each file once.
85
            let layout_reader = match layout_reader.entry(file_meta.object_meta.location.clone()) {
484✔
86
                Entry::Occupied(mut occupied_entry) => {
308✔
87
                    if let Some(reader) = occupied_entry.get().upgrade() {
308✔
88
                        log::trace!("reusing layout reader for {}", occupied_entry.key());
276✔
89
                        reader
276✔
90
                    } else {
91
                        log::trace!("creating layout reader for {}", occupied_entry.key());
32✔
92
                        let reader = vxf.layout_reader().map_err(|e| {
32✔
93
                            DataFusionError::Execution(format!(
×
94
                                "Failed to create layout reader: {e}"
×
95
                            ))
×
96
                        })?;
×
97
                        occupied_entry.insert(Arc::downgrade(&reader));
32✔
98
                        reader
32✔
99
                    }
100
                }
101
                Entry::Vacant(vacant_entry) => {
176✔
102
                    log::trace!("creating layout reader for {}", vacant_entry.key());
176✔
103
                    let reader = vxf.layout_reader().map_err(|e| {
176✔
104
                        DataFusionError::Execution(format!("Failed to create layout reader: {e}"))
×
105
                    })?;
×
106
                    vacant_entry.insert(Arc::downgrade(&reader));
176✔
107

108
                    reader
176✔
109
                }
110
            };
111

112
            let scan_builder = ScanBuilder::new(layout_reader);
484✔
113
            let mut scan_builder = apply_byte_range(file_meta, vxf.row_count(), scan_builder);
484✔
114

115
            if let Some(limit) = limit {
484✔
116
                if filter.is_none() {
×
117
                    scan_builder = scan_builder.with_limit(limit);
×
118
                }
×
119
            }
484✔
120

121
            let tasks = scan_builder
484✔
122
                .with_metrics(metrics)
484✔
123
                .with_projection(projection)
484✔
124
                .with_some_filter(filter)
484✔
125
                .map_to_record_batch(projected_arrow_schema.clone())
484✔
126
                .build()
484✔
127
                .map_err(|e| {
484✔
NEW
128
                    DataFusionError::Execution(format!("Failed to build Vortex scan: {e}"))
×
NEW
129
                })?;
×
130

131
            let stream = stream::iter(tasks)
484✔
132
                .map(|task| {
3,324✔
133
                    let fut = Handle::current()
3,324✔
134
                        .spawn(task)
3,324✔
135
                        .map_err(|e| vortex_err!("Failed to spawn task: {e}"));
3,324✔
136
                    async move { fut.await? }
3,324✔
137
                })
3,324✔
138
                .buffer_unordered(16)
484✔
139
                .filter_map(|r| async move { r.transpose() })
6,648✔
140
                .map_err(|e| {
484✔
141
                    DataFusionError::Execution(format!("Failed to create Vortex stream: {e}"))
×
NEW
142
                })
×
143
                .map_ok(move |rb| {
902✔
144
                    // We try and slice the stream into respecting datafusion's configured batch size.
145
                    stream::iter(
902✔
146
                        (0..rb.num_rows().div_ceil(batch_size * 2))
902✔
147
                            .flat_map(move |block_idx| {
1,676✔
148
                                let offset = block_idx * batch_size * 2;
1,676✔
149

150
                                // If we have less than two batches worth of rows left, we keep them together as a single batch.
151
                                if rb.num_rows() - offset < 2 * batch_size {
1,676✔
152
                                    let length = rb.num_rows() - offset;
852✔
153
                                    [Some(rb.slice(offset, length)), None].into_iter()
852✔
154
                                } else {
155
                                    let first = rb.slice(offset, batch_size);
824✔
156
                                    let second = rb.slice(offset + batch_size, batch_size);
824✔
157
                                    [Some(first), Some(second)].into_iter()
824✔
158
                                }
159
                            })
1,676✔
160
                            .flatten()
902✔
161
                            .map(Ok),
902✔
162
                    )
163
                })
902✔
164
                .try_flatten()
484✔
165
                .boxed();
484✔
166

167
            Ok(stream)
484✔
168
        }
484✔
169
        .boxed())
484✔
170
    }
484✔
171
}
172

173
/// If the file has a [`FileRange`](datafusion::datasource::listing::FileRange), we translate it into a row range in the file for the scan.
174
fn apply_byte_range(
484✔
175
    file_meta: FileMeta,
484✔
176
    row_count: u64,
484✔
177
    scan_builder: ScanBuilder<ArrayRef>,
484✔
178
) -> ScanBuilder<ArrayRef> {
484✔
179
    if let Some(byte_range) = file_meta.range {
484✔
180
        let row_range = byte_range_to_row_range(
352✔
181
            byte_range.start as u64..byte_range.end as u64,
352✔
182
            row_count,
352✔
183
            file_meta.object_meta.size,
352✔
184
        );
185

186
        scan_builder.with_row_range(row_range)
352✔
187
    } else {
188
        scan_builder
132✔
189
    }
190
}
484✔
191

192
fn byte_range_to_row_range(byte_range: Range<u64>, row_count: u64, total_size: u64) -> Range<u64> {
361✔
193
    let average_row = total_size / row_count;
361✔
194
    assert!(average_row > 0, "A row must always have at least one byte");
361✔
195

196
    let start_row = byte_range.start / average_row;
361✔
197
    let end_row = byte_range.end / average_row;
361✔
198

199
    // We take the min here as `end_row` might overshoot
200
    start_row..u64::min(row_count, end_row)
361✔
201
}
361✔
202

203
#[cfg(test)]
204
mod tests {
205
    use itertools::Itertools;
206
    use rstest::rstest;
207

208
    use super::*;
209

210
    #[rstest]
211
    #[case(0..100, 100, 100, 0..100)]
212
    #[case(0..105, 100, 105, 0..100)]
213
    #[case(0..50, 100, 105, 0..50)]
214
    #[case(50..105, 100, 105, 50..100)]
215
    #[case(0..1, 4, 8, 0..0)]
216
    #[case(1..8, 4, 8, 0..4)]
217
    fn test_range_translation(
218
        #[case] byte_range: Range<u64>,
219
        #[case] row_count: u64,
220
        #[case] total_size: u64,
221
        #[case] expected: Range<u64>,
222
    ) {
223
        assert_eq!(
224
            byte_range_to_row_range(byte_range, row_count, total_size),
225
            expected
226
        );
227
    }
228

229
    #[test]
230
    fn test_consecutive_ranges() {
1✔
231
        let row_count = 100;
1✔
232
        let total_size = 429;
1✔
233
        let bytes_a = 0..143;
1✔
234
        let bytes_b = 143..286;
1✔
235
        let bytes_c = 286..429;
1✔
236

237
        let rows_a = byte_range_to_row_range(bytes_a, row_count, total_size);
1✔
238
        let rows_b = byte_range_to_row_range(bytes_b, row_count, total_size);
1✔
239
        let rows_c = byte_range_to_row_range(bytes_c, row_count, total_size);
1✔
240

241
        assert_eq!(rows_a.end - rows_a.start, 35);
1✔
242
        assert_eq!(rows_b.end - rows_b.start, 36);
1✔
243
        assert_eq!(rows_c.end - rows_c.start, 29);
1✔
244

245
        assert_eq!(rows_a.start, 0);
1✔
246
        assert_eq!(rows_c.end, 100);
1✔
247
        for (left, right) in [rows_a, rows_b, rows_c].iter().tuple_windows() {
2✔
248
            assert_eq!(left.end, right.start);
2✔
249
        }
250
    }
1✔
251
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc