• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16419023817

21 Jul 2025 01:56PM UTC coverage: 81.673%. First build
16419023817

Pull #3876

github

web-flow
Merge 3389f94e0 into 56156aa03
Pull Request #3876: feat[layout]: replace register_splits with a layout splits stream

726 of 777 new or added lines in 17 files covered. (93.44%)

42577 of 52131 relevant lines covered (81.67%)

169631.2 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.32
/vortex-layout/src/masks/intersection.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::ops::BitAnd;
5

6
use vortex_error::{VortexExpect, VortexResult};
7
use vortex_mask::Mask;
8

9
use super::BoxMaskIterator;
10

11
/// An iterator that merges multiple mask iterators while performing an intersection.
12
pub struct IntersectionMaskIterator {
13
    iterators: Vec<BoxMaskIterator>,
14
    // For each iterator, we keep track of the current mask and the offset within it.
15
    current: Vec<Option<(Mask, usize)>>,
16
    finished: bool,
17
}
18

19
impl IntersectionMaskIterator {
20
    pub fn new(iterators: Vec<BoxMaskIterator>) -> Self {
1,321✔
21
        assert!(!iterators.is_empty(), "must have at least one iterator");
1,321✔
22
        let iter_count = iterators.len();
1,321✔
23
        Self {
1,321✔
24
            iterators,
1,321✔
25
            current: vec![None; iter_count],
1,321✔
26
            finished: false,
1,321✔
27
        }
1,321✔
28
    }
1,321✔
29

30
    /// Finds the minimum remaining length across all current masks
31
    fn min_remaining_length(&self) -> Option<usize> {
3,623✔
32
        self.current
3,623✔
33
            .iter()
3,623✔
34
            .filter_map(|opt| opt.as_ref())
12,528✔
35
            .map(|(mask, offset)| mask.len().saturating_sub(*offset))
12,528✔
36
            .min()
3,623✔
37
    }
3,623✔
38

39
    /// Checks if all iterators have current masks available
40
    fn all_masks_available(&self) -> bool {
3,623✔
41
        self.current.iter().all(|opt| opt.is_some())
12,528✔
42
    }
3,623✔
43

44
    /// Advances all offsets by the given amount
45
    fn advance_offsets(&mut self, advance_by: usize) {
3,623✔
46
        for current_item in &mut self.current {
16,151✔
47
            if let Some((mask, offset)) = current_item {
12,528✔
48
                *offset += advance_by;
12,528✔
49
                // If we've consumed the entire mask, clear it
50
                if *offset >= mask.len() {
12,528✔
51
                    *current_item = None;
9,871✔
52
                }
9,871✔
NEW
53
            }
×
54
        }
55
    }
3,623✔
56

57
    /// Performs intersection of current mask slices
58
    fn intersect_current_slices(&self, slice_length: usize) -> Mask {
3,623✔
59
        let mut result_mask = None;
3,623✔
60

61
        for (mask, offset) in self.current.iter().filter_map(|opt| opt.as_ref()) {
12,528✔
62
            let slice = mask.slice(*offset, slice_length);
12,528✔
63

64
            match result_mask {
12,528✔
65
                None => result_mask = Some(slice),
3,623✔
66
                Some(current) => {
8,905✔
67
                    result_mask = Some(current.bitand(&slice));
8,905✔
68
                }
8,905✔
69
            }
70
        }
71

72
        result_mask.vortex_expect("no masks")
3,623✔
73
    }
3,623✔
74

75
    /// Tries to fill any empty slots in the current array
76
    /// Returns true if any iterator is exhausted (which means intersection is complete)
77
    fn try_fill_empty_slots(&mut self) -> VortexResult<bool> {
4,944✔
78
        let mut any_exhausted = false;
4,944✔
79
        for i in 0..self.iterators.len() {
15,797✔
80
            if self.current[i].is_none() {
15,797✔
81
                match self.iterators[i].next() {
13,140✔
82
                    Some(Ok(mask)) => {
9,871✔
83
                        self.current[i] = Some((mask, 0));
9,871✔
84
                    }
9,871✔
NEW
85
                    Some(Err(e)) => {
×
NEW
86
                        return Err(e);
×
87
                    }
88
                    None => {
3,269✔
89
                        // Iterator is exhausted - for intersection, if ANY iterator is exhausted,
3,269✔
90
                        // the intersection is complete
3,269✔
91
                        any_exhausted = true;
3,269✔
92
                    }
3,269✔
93
                }
94
            }
2,657✔
95
        }
96
        Ok(any_exhausted)
4,944✔
97
    }
4,944✔
98
}
99

100
impl Iterator for IntersectionMaskIterator {
101
    type Item = VortexResult<Mask>;
102

103
    fn next(&mut self) -> Option<Self::Item> {
4,944✔
104
        if self.finished {
4,944✔
NEW
105
            return None;
×
106
        }
4,944✔
107

108
        loop {
109
            // Try to fill any empty slots
110
            let any_exhausted = match self.try_fill_empty_slots() {
4,944✔
111
                Ok(exhausted) => exhausted,
4,944✔
NEW
112
                Err(e) => {
×
NEW
113
                    self.finished = true;
×
NEW
114
                    return Some(Err(e));
×
115
                }
116
            };
117

118
            // If any iterator is exhausted, intersection is complete
119
            if any_exhausted {
4,944✔
120
                self.finished = true;
1,321✔
121
                return None;
1,321✔
122
            }
3,623✔
123

124
            // Check if all iterators are exhausted
125
            if self.current.iter().all(|opt| opt.is_none()) {
3,623✔
NEW
126
                self.finished = true;
×
NEW
127
                return None;
×
128
            }
3,623✔
129

130
            // If we don't have masks from all iterators, we can't proceed
131
            if !self.all_masks_available() {
3,623✔
132
                // This should not happen if try_fill_empty_slots worked correctly
NEW
133
                self.finished = true;
×
NEW
134
                return None;
×
135
            }
3,623✔
136

137
            // Find the minimum remaining length
138
            let min_length = match self.min_remaining_length() {
3,623✔
139
                Some(len) if len > 0 => len,
3,623✔
140
                _ => {
141
                    // All current masks are exhausted, continue to fetch new ones
NEW
142
                    continue;
×
143
                }
144
            };
145

146
            // Perform intersection on the slice
147
            let result_mask = self.intersect_current_slices(min_length);
3,623✔
148
            // Advance all offsets
149
            self.advance_offsets(min_length);
3,623✔
150

151
            return Some(Ok(result_mask));
3,623✔
152
        }
153
    }
4,944✔
154
}
155

156
#[cfg(test)]
157
impl IntersectionMaskIterator {
158
    /// Convenience method to create from a vector of mask vectors (for testing)
159
    pub fn from_mask_vecs(mask_vecs: Vec<Vec<Mask>>) -> Self {
2✔
160
        let iterators: Vec<BoxMaskIterator> = mask_vecs
2✔
161
            .into_iter()
2✔
162
            .map(|masks| Box::new(masks.into_iter().map(Ok)) as BoxMaskIterator)
4✔
163
            .collect();
2✔
164

165
        Self::new(iterators)
2✔
166
    }
2✔
167
}
168

169
#[cfg(test)]
170
mod tests {
171
    use itertools::Itertools;
172

173
    use super::*;
174
    #[test]
175
    fn test_iterator_basic_intersection() {
1✔
176
        // Create some test masks
177
        let mask1 = Mask::from_iter([true, true, false, true].iter().cloned());
1✔
178
        let mask2 = Mask::from_iter([true, false, true, true].iter().cloned());
1✔
179

180
        let iterator = IntersectionMaskIterator::from_mask_vecs(vec![vec![mask1], vec![mask2]]);
1✔
181

182
        let results: Vec<_> = iterator.collect();
1✔
183
        assert_eq!(results.len(), 1);
1✔
184

185
        let result_mask = results[0].as_ref().unwrap();
1✔
186
        let expected = [true, false, false, true]; // Intersection
1✔
187
        assert_eq!(
1✔
188
            result_mask.to_boolean_buffer().iter().collect_vec(),
1✔
189
            expected
190
        );
191
    }
1✔
192

193
    #[test]
194
    fn test_iterator_different_sized_masks() {
1✔
195
        let mask1 = Mask::from_iter([true, true].iter().cloned());
1✔
196
        let mask2 = Mask::from_iter([true, false].iter().cloned());
1✔
197
        let mask3 = Mask::from_iter([true, false, true, true].iter().cloned());
1✔
198

199
        let iterator =
1✔
200
            IntersectionMaskIterator::from_mask_vecs(vec![vec![mask1, mask2], vec![mask3]]);
1✔
201

202
        let results: Vec<_> = iterator.collect();
1✔
203
        assert_eq!(results.len(), 2);
1✔
204

205
        let result = results
1✔
206
            .into_iter()
1✔
207
            .flat_map(|mask| {
2✔
208
                mask.unwrap()
2✔
209
                    .to_boolean_buffer()
2✔
210
                    .iter()
2✔
211
                    .collect_vec()
2✔
212
                    .into_iter()
2✔
213
            })
2✔
214
            .collect_vec();
1✔
215
        let expected = [true, false, true, false];
1✔
216
        assert_eq!(result, expected);
1✔
217
    }
1✔
218
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc