• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16942495535

13 Aug 2025 03:53PM UTC coverage: 87.506% (-0.2%) from 87.665%
16942495535

Pull #4197

github

web-flow
Merge 387988f08 into 1c3c7421a
Pull Request #4197: [wip] Remove async from layout readers

860 of 1034 new or added lines in 32 files covered. (83.17%)

20 existing lines in 4 files now uncovered.

56953 of 65085 relevant lines covered (87.51%)

766203.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

77.5
/vortex-layout/src/reader.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::collections::BTreeSet;
5
use std::ops::Range;
6
use std::sync::Arc;
7

8
use futures::FutureExt;
9
use futures::future::{BoxFuture, Shared};
10
use once_cell::sync::OnceCell;
11
use parking_lot::RwLock;
12
use vortex_array::ArrayRef;
13
use vortex_array::stats::Precision;
14
use vortex_dtype::{DType, FieldMask};
15
use vortex_error::{SharedVortexResult, VortexError, VortexExpect, VortexResult, vortex_bail};
16
use vortex_expr::ExprRef;
17
use vortex_mask::Mask;
18
use vortex_utils::aliases::hash_set::HashSet;
19

20
use crate::children::LayoutChildren;
21
use crate::segments::{SegmentId, Segments};
22

23
pub type LayoutReaderRef = Arc<dyn LayoutReader>;
24

25
/// A [`LayoutReader`] is used to read a [`crate::Layout`] in a way that can cache state across multiple
26
/// evaluation operations.
27
pub trait LayoutReader: 'static + Send + Sync {
28
    /// Returns the name of the layout reader for debugging.
29
    fn name(&self) -> &Arc<str>;
30

31
    /// Returns the un-projected dtype of the layout reader.
32
    fn dtype(&self) -> &DType;
33

34
    /// Returns the number of rows in the layout reader.
35
    /// An inexact count may be larger or smaller than the actual row count.
36
    fn row_count(&self) -> Precision<u64>;
37

38
    /// Register the splits of this layout reader.
39
    // TODO(ngates): this is a temporary API until we make layout readers stream based.
40
    fn register_splits(
41
        &self,
42
        field_mask: &[FieldMask],
43
        row_offset: u64,
44
        splits: &mut BTreeSet<u64>,
45
    ) -> VortexResult<()>;
46

47
    /// Performs an approximate evaluation of the expression against the layout reader.
48
    fn pruning_evaluation(
49
        &self,
50
        row_range: &Range<u64>,
51
        expr: &ExprRef,
52
    ) -> VortexResult<Box<dyn PruningEvaluation>>;
53

54
    /// Performs an exact evaluation of the expression against the layout reader.
55
    fn filter_evaluation(
56
        &self,
57
        row_range: &Range<u64>,
58
        expr: &ExprRef,
59
    ) -> VortexResult<Box<dyn MaskEvaluation>>;
60

61
    /// Evaluates the expression against the layout.
62
    fn projection_evaluation(
63
        &self,
64
        row_range: &Range<u64>,
65
        expr: &ExprRef,
66
    ) -> VortexResult<Box<dyn ArrayEvaluation>>;
67
}
68

69
pub type MaskFuture = Shared<BoxFuture<'static, SharedVortexResult<Mask>>>;
70

71
/// Create a resolved [`MaskFuture`] from a [`Mask`].
72
pub fn mask_future_ready(mask: Mask) -> MaskFuture {
×
73
    async move { Ok::<_, Arc<VortexError>>(mask) }
×
74
        .boxed()
×
75
        .shared()
×
76
}
×
77

78
/// Returns a mask where all false values are proven to be false in the given expression.
79
///
80
/// The returned mask **does not** need to have been intersected with the input mask.
81
pub trait PruningEvaluation: 'static + Send + Sync {
82
    fn invoke(&self, mask: Mask, segments: &dyn Segments) -> VortexResult<Mask>;
83

84
    fn required_segments(&self, segments: &mut HashSet<SegmentId>);
85
}
86

87
pub struct NoOpPruningEvaluation;
88

89
impl PruningEvaluation for NoOpPruningEvaluation {
90
    fn invoke(&self, mask: Mask, _segments: &dyn Segments) -> VortexResult<Mask> {
5,011✔
91
        Ok(mask)
5,011✔
92
    }
5,011✔
93

94
    fn required_segments(&self, _segments: &mut HashSet<SegmentId>) {}
5,052✔
95
}
96

97
/// Refines the given mask, returning a mask equal in length to the input mask.
98
///
99
/// ## Post-conditions
100
///
101
/// The returned mask **MUST** have been intersected with the input mask.
102
pub trait MaskEvaluation: 'static + Send + Sync {
103
    fn invoke(&self, mask: Mask, segments: &dyn Segments) -> VortexResult<Mask>;
104

105
    fn required_segments(&self, segments: &mut HashSet<SegmentId>);
106
}
107

108
pub struct NoOpMaskEvaluation;
109

110
impl MaskEvaluation for NoOpMaskEvaluation {
NEW
111
    fn invoke(&self, mask: Mask, _segments: &dyn Segments) -> VortexResult<Mask> {
×
112
        Ok(mask)
×
113
    }
×
114

NEW
115
    fn required_segments(&self, _segments: &mut HashSet<SegmentId>) {}
×
116
}
117

118
/// Evaluates an expression against an array, returning an array equal in length to the true count
119
/// of the input mask.
120
pub trait ArrayEvaluation: 'static + Send + Sync {
121
    fn invoke(&self, mask: Mask, segments: &dyn Segments) -> VortexResult<ArrayRef>;
122

123
    fn required_segments(&self, segments: &mut HashSet<SegmentId>);
124
}
125

126
/// Provides semantics equivalent to `LazyLock`, except where segments are bound late.
127
#[derive(Clone)]
128
pub struct LazyWithSegments<T>(Arc<RwLock<LazyWithSegmentsInner<T>>>);
129
pub type LazyWithSegmentsCtor<T> = Box<dyn FnOnce(&dyn Segments) -> VortexResult<T> + Send + Sync>;
130

131
struct LazyWithSegmentsInner<T> {
132
    result: Option<SharedVortexResult<T>>,
133
    ctor: Option<LazyWithSegmentsCtor<T>>,
134
    required_segments: HashSet<SegmentId>,
135
}
136

137
impl<T: Send + Clone> LazyWithSegments<T> {
138
    pub fn new(
14,504✔
139
        ctor: impl FnOnce(&dyn Segments) -> VortexResult<T> + Send + Sync + 'static,
14,504✔
140
    ) -> Self {
14,504✔
141
        Self(Arc::new(RwLock::new(LazyWithSegmentsInner {
14,504✔
142
            result: None,
14,504✔
143
            ctor: Some(Box::new(ctor)),
14,504✔
144
            required_segments: Default::default(),
14,504✔
145
        })))
14,504✔
146
    }
14,504✔
147

148
    pub fn with_required_segments(self, segment_ids: impl IntoIterator<Item = SegmentId>) -> Self {
13,805✔
149
        self.0.write().required_segments.extend(segment_ids);
13,805✔
150
        self
13,805✔
151
    }
13,805✔
152

153
    pub fn with_lazy_required_segments<R>(self, lazy: &LazyWithSegments<R>) -> Self {
699✔
154
        self.0
699✔
155
            .write()
699✔
156
            .required_segments
699✔
157
            .extend(lazy.0.read().required_segments.iter().cloned());
699✔
158
        self
699✔
159
    }
699✔
160

NEW
161
    pub fn with_array_evaluation_segments(self, eval: &dyn ArrayEvaluation) -> Self {
×
NEW
162
        eval.required_segments(&mut self.0.write().required_segments);
×
NEW
163
        self
×
NEW
164
    }
×
165

NEW
166
    pub fn with_mask_evaluation_segments(self, eval: &dyn MaskEvaluation) -> Self {
×
NEW
167
        eval.required_segments(&mut self.0.write().required_segments);
×
NEW
168
        self
×
NEW
169
    }
×
170

171
    pub fn get(&self, segments: &dyn Segments) -> SharedVortexResult<T> {
31,098✔
172
        {
173
            let read = self.0.read();
31,098✔
174
            if let Some(result) = &read.result {
31,098✔
175
                return result.clone();
21,826✔
176
            }
9,272✔
177
        }
178

179
        let mut write = self.0.write();
9,272✔
180
        if let Some(result) = &write.result {
9,272✔
181
            return result.clone();
33✔
182
        }
9,239✔
183

184
        let ctor = write
9,239✔
185
            .ctor
9,239✔
186
            .take()
9,239✔
187
            .vortex_expect("Constructor already consumed");
9,239✔
188
        let result = ctor(segments).map_err(Arc::new);
9,239✔
189
        write.result = Some(result.clone());
9,239✔
190
        result
9,239✔
191
    }
31,098✔
192

193
    pub fn required_segments(&self, segments: &mut HashSet<SegmentId>) {
33,020✔
194
        segments.extend(self.0.read().required_segments.iter().cloned());
33,020✔
195
    }
33,020✔
196
}
197

198
pub struct LazyReaderChildren {
199
    children: Arc<dyn LayoutChildren>,
200
    // TODO(ngates): we may want a hash map of some sort here?
201
    cache: Vec<OnceCell<LayoutReaderRef>>,
202
}
203

204
impl LazyReaderChildren {
205
    pub fn new(children: Arc<dyn LayoutChildren>) -> Self {
2,216✔
206
        let nchildren = children.nchildren();
2,216✔
207
        let cache = (0..nchildren).map(|_| OnceCell::new()).collect();
11,907✔
208
        Self { children, cache }
2,216✔
209
    }
2,216✔
210

211
    pub fn get(
77,293✔
212
        &self,
77,293✔
213
        idx: usize,
77,293✔
214
        dtype: &DType,
77,293✔
215
        name: &Arc<str>,
77,293✔
216
    ) -> VortexResult<&LayoutReaderRef> {
77,293✔
217
        if idx >= self.cache.len() {
77,293✔
218
            vortex_bail!("Child index out of bounds: {} of {}", idx, self.cache.len());
×
219
        }
77,293✔
220

221
        self.cache[idx].get_or_try_init(|| {
77,293✔
222
            let child = self.children.child(idx, dtype)?;
8,048✔
223
            child.new_reader(name.clone())
8,048✔
224
        })
8,048✔
225
    }
77,293✔
226
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc