• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16387130526

19 Jul 2025 09:05AM UTC coverage: 81.512% (-0.008%) from 81.52%
16387130526

push

github

web-flow
feat: duckdb workstealing (#3927)

Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>

16 of 17 new or added lines in 1 file covered. (94.12%)

185 existing lines in 8 files now uncovered.

42000 of 51526 relevant lines covered (81.51%)

171508.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.18
/vortex-array/src/patches.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::cmp::Ordering;
5
use std::fmt::Debug;
6
use std::hash::Hash;
7

8
use itertools::Itertools as _;
9
use num_traits::{NumCast, ToPrimitive};
10
use serde::{Deserialize, Serialize};
11
use vortex_buffer::BufferMut;
12
use vortex_dtype::Nullability::NonNullable;
13
use vortex_dtype::{
14
    DType, NativePType, PType, match_each_integer_ptype, match_each_unsigned_integer_ptype,
15
};
16
use vortex_error::{
17
    VortexError, VortexExpect, VortexResult, VortexUnwrap, vortex_bail, vortex_err,
18
};
19
use vortex_mask::{AllOr, Mask};
20
use vortex_scalar::{PValue, Scalar};
21
use vortex_utils::aliases::hash_map::HashMap;
22

23
use crate::arrays::PrimitiveArray;
24
use crate::compute::{cast, filter, take};
25
use crate::search_sorted::{SearchResult, SearchSorted, SearchSortedSide};
26
use crate::vtable::ValidityHelper;
27
use crate::{Array, ArrayRef, IntoArray, ToCanonical};
28

29
#[derive(Copy, Clone, Serialize, Deserialize, prost::Message)]
30
pub struct PatchesMetadata {
31
    #[prost(uint64, tag = "1")]
32
    len: u64,
33
    #[prost(uint64, tag = "2")]
34
    offset: u64,
35
    #[prost(enumeration = "PType", tag = "3")]
36
    indices_ptype: i32,
37
}
38

39
impl PatchesMetadata {
40
    pub fn new(len: usize, offset: usize, indices_ptype: PType) -> Self {
74✔
41
        Self {
74✔
42
            len: len as u64,
74✔
43
            offset: offset as u64,
74✔
44
            indices_ptype: indices_ptype as i32,
74✔
45
        }
74✔
46
    }
74✔
47

48
    #[inline]
49
    pub fn len(&self) -> usize {
1,324✔
50
        usize::try_from(self.len).vortex_expect("len is a valid usize")
1,324✔
51
    }
1,324✔
52

53
    #[inline]
54
    pub fn is_empty(&self) -> bool {
×
55
        self.len == 0
×
56
    }
×
57

58
    #[inline]
59
    pub fn offset(&self) -> usize {
662✔
60
        usize::try_from(self.offset).vortex_expect("offset is a valid usize")
662✔
61
    }
662✔
62

63
    #[inline]
64
    pub fn indices_dtype(&self) -> DType {
662✔
65
        assert!(
662✔
66
            self.indices_ptype().is_unsigned_int(),
662✔
67
            "Patch indices must be unsigned integers"
×
68
        );
69
        DType::Primitive(self.indices_ptype(), NonNullable)
662✔
70
    }
662✔
71
}
72

73
/// A helper for working with patched arrays.
74
#[derive(Debug, Clone)]
75
pub struct Patches {
76
    array_len: usize,
77
    offset: usize,
78
    indices: ArrayRef,
79
    values: ArrayRef,
80
}
81

82
impl Patches {
83
    pub fn new(array_len: usize, offset: usize, indices: ArrayRef, values: ArrayRef) -> Self {
8,435✔
84
        assert_eq!(
8,435✔
85
            indices.len(),
8,435✔
86
            values.len(),
8,435✔
87
            "Patch indices and values must have the same length"
×
88
        );
89
        assert!(
8,435✔
90
            indices.dtype().is_unsigned_int(),
8,435✔
91
            "Patch indices must be unsigned integers"
×
92
        );
93
        assert!(
8,435✔
94
            indices.len() <= array_len,
8,435✔
95
            "Patch indices must be shorter than the array length"
×
96
        );
97
        assert!(!indices.is_empty(), "Patch indices must not be empty");
8,435✔
98
        let max = usize::try_from(
8,435✔
99
            &indices
8,435✔
100
                .scalar_at(indices.len() - 1)
8,435✔
101
                .vortex_expect("indices are not empty"),
8,435✔
102
        )
103
        .vortex_expect("indices must be a number");
8,435✔
104
        assert!(
8,435✔
105
            max - offset < array_len,
8,435✔
106
            "Patch indices {max:?}, offset {offset} are longer than the array length {array_len}"
×
107
        );
108
        Self::new_unchecked(array_len, offset, indices, values)
8,435✔
109
    }
8,435✔
110

111
    /// Construct new patches without validating any of the arguments
112
    ///
113
    /// # Safety
114
    ///
115
    /// Users have to assert that
116
    /// * Indices and values have the same length
117
    /// * Indices is an unsigned integer type
118
    /// * Indices must be sorted
119
    /// * Last value in indices is smaller than array_len
120
    pub fn new_unchecked(
10,063✔
121
        array_len: usize,
10,063✔
122
        offset: usize,
10,063✔
123
        indices: ArrayRef,
10,063✔
124
        values: ArrayRef,
10,063✔
125
    ) -> Self {
10,063✔
126
        Self {
10,063✔
127
            array_len,
10,063✔
128
            offset,
10,063✔
129
            indices,
10,063✔
130
            values,
10,063✔
131
        }
10,063✔
132
    }
10,063✔
133

134
    pub fn array_len(&self) -> usize {
33,775✔
135
        self.array_len
33,775✔
136
    }
33,775✔
137

138
    pub fn num_patches(&self) -> usize {
1,481✔
139
        self.indices.len()
1,481✔
140
    }
1,481✔
141

142
    pub fn dtype(&self) -> &DType {
3,031✔
143
        self.values.dtype()
3,031✔
144
    }
3,031✔
145

146
    pub fn indices(&self) -> &ArrayRef {
11,109✔
147
        &self.indices
11,109✔
148
    }
11,109✔
149

150
    pub fn into_indices(self) -> ArrayRef {
×
151
        self.indices
×
152
    }
×
153

154
    pub fn indices_mut(&mut self) -> &mut ArrayRef {
×
155
        &mut self.indices
×
156
    }
×
157

158
    pub fn values(&self) -> &ArrayRef {
21,229✔
159
        &self.values
21,229✔
160
    }
21,229✔
161

162
    pub fn into_values(self) -> ArrayRef {
37✔
163
        self.values
37✔
164
    }
37✔
165

166
    pub fn values_mut(&mut self) -> &mut ArrayRef {
×
167
        &mut self.values
×
168
    }
×
169

170
    pub fn offset(&self) -> usize {
84,923✔
171
        self.offset
84,923✔
172
    }
84,923✔
173

174
    pub fn indices_ptype(&self) -> PType {
×
175
        PType::try_from(self.indices.dtype()).vortex_expect("primitive indices")
×
176
    }
×
177

178
    pub fn to_metadata(&self, len: usize, dtype: &DType) -> VortexResult<PatchesMetadata> {
202✔
179
        if self.indices.len() > len {
202✔
180
            vortex_bail!(
×
181
                "Patch indices {} are longer than the array length {}",
×
182
                self.indices.len(),
×
183
                len
184
            );
185
        }
202✔
186
        if self.values.dtype() != dtype {
202✔
187
            vortex_bail!(
×
188
                "Patch values dtype {} does not match array dtype {}",
×
189
                self.values.dtype(),
×
190
                dtype
191
            );
192
        }
202✔
193
        Ok(PatchesMetadata {
202✔
194
            len: self.indices.len() as u64,
202✔
195
            offset: self.offset as u64,
202✔
196
            indices_ptype: PType::try_from(self.indices.dtype()).vortex_expect("primitive indices")
202✔
197
                as i32,
202✔
198
        })
202✔
199
    }
202✔
200

201
    pub fn cast_values(self, values_dtype: &DType) -> VortexResult<Self> {
1,628✔
202
        Ok(Self::new_unchecked(
1,628✔
203
            self.array_len,
1,628✔
204
            self.offset,
1,628✔
205
            self.indices,
1,628✔
206
            cast(&self.values, values_dtype)?,
1,628✔
207
        ))
208
    }
1,628✔
209

210
    /// Get the patched value at a given index if it exists.
211
    pub fn get_patched(&self, index: usize) -> VortexResult<Option<Scalar>> {
394,222✔
212
        if let Some(patch_idx) = self.search_index(index)?.to_found() {
394,222✔
213
            self.values().scalar_at(patch_idx).map(Some)
6,265✔
214
        } else {
215
            Ok(None)
387,957✔
216
        }
217
    }
394,222✔
218

219
    /// Return the insertion point of `index` in the [Self::indices].
220
    pub fn search_index(&self, index: usize) -> VortexResult<SearchResult> {
396,478✔
221
        Ok(self.indices.as_primitive_typed().search_sorted(
396,478✔
222
            &PValue::U64((index + self.offset) as u64),
396,478✔
223
            SearchSortedSide::Left,
396,478✔
224
        ))
396,478✔
225
    }
396,478✔
226

227
    /// Return the search_sorted result for the given target re-mapped into the original indices.
228
    pub fn search_sorted<T: Into<Scalar>>(
11✔
229
        &self,
11✔
230
        target: T,
11✔
231
        side: SearchSortedSide,
11✔
232
    ) -> VortexResult<SearchResult> {
11✔
233
        let target = target.into();
11✔
234

235
        let sr = if self.values().dtype().is_primitive() {
11✔
236
            self.values()
11✔
237
                .as_primitive_typed()
11✔
238
                .search_sorted(&target.as_primitive().pvalue(), side)
11✔
239
        } else {
240
            self.values().search_sorted(&target, side)
×
241
        };
242

243
        let index_idx = sr.to_offsets_index(self.indices().len(), side);
11✔
244
        let index = usize::try_from(&self.indices().scalar_at(index_idx)?)? - self.offset;
11✔
245
        Ok(match sr {
11✔
246
            // If we reached the end of patched values when searching then the result is one after the last patch index
247
            SearchResult::Found(i) => SearchResult::Found(
3✔
248
                if i == self.indices().len() || side == SearchSortedSide::Right {
3✔
249
                    index + 1
2✔
250
                } else {
251
                    index
1✔
252
                },
253
            ),
254
            // If the result is NotFound we should return index that's one after the nearest not found index for the corresponding value
255
            SearchResult::NotFound(i) => {
8✔
256
                SearchResult::NotFound(if i == 0 { index } else { index + 1 })
8✔
257
            }
258
        })
259
    }
11✔
260

261
    /// Returns the minimum patch index
262
    pub fn min_index(&self) -> VortexResult<usize> {
408✔
263
        Ok(usize::try_from(&self.indices().scalar_at(0)?)? - self.offset)
408✔
264
    }
408✔
265

266
    /// Returns the maximum patch index
267
    pub fn max_index(&self) -> VortexResult<usize> {
408✔
268
        Ok(usize::try_from(&self.indices().scalar_at(self.indices().len() - 1)?)? - self.offset)
408✔
269
    }
408✔
270

271
    /// Filter the patches by a mask, resulting in new patches for the filtered array.
272
    pub fn filter(&self, mask: &Mask) -> VortexResult<Option<Self>> {
381✔
273
        match mask.indices() {
381✔
274
            AllOr::All => Ok(Some(self.clone())),
×
275
            AllOr::None => Ok(None),
×
276
            AllOr::Some(mask_indices) => {
381✔
277
                let flat_indices = self.indices().to_primitive()?;
381✔
278
                match_each_unsigned_integer_ptype!(flat_indices.ptype(), |I| {
381✔
279
                    filter_patches_with_mask(
×
280
                        flat_indices.as_slice::<I>(),
×
281
                        self.offset(),
×
282
                        self.values(),
×
283
                        mask_indices,
×
284
                    )
285
                })
286
            }
287
        }
288
    }
381✔
289

290
    /// Slice the patches by a range of the patched array.
291
    pub fn slice(&self, start: usize, stop: usize) -> VortexResult<Option<Self>> {
1,128✔
292
        let patch_start = self.search_index(start)?.to_index();
1,128✔
293
        let patch_stop = self.search_index(stop)?.to_index();
1,128✔
294

295
        if patch_start == patch_stop {
1,128✔
296
            return Ok(None);
237✔
297
        }
891✔
298

299
        // Slice out the values and indices
300
        let values = self.values().slice(patch_start, patch_stop)?;
891✔
301
        let indices = self.indices().slice(patch_start, patch_stop)?;
891✔
302

303
        Ok(Some(Self::new(
891✔
304
            stop - start,
891✔
305
            start + self.offset(),
891✔
306
            indices,
891✔
307
            values,
891✔
308
        )))
891✔
309
    }
1,128✔
310

311
    // https://docs.google.com/spreadsheets/d/1D9vBZ1QJ6mwcIvV5wIL0hjGgVchcEnAyhvitqWu2ugU
312
    const PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN: f64 = 5.0;
313

314
    fn is_map_faster_than_search(&self, take_indices: &PrimitiveArray) -> bool {
482✔
315
        (self.num_patches() as f64 / take_indices.len() as f64)
482✔
316
            < Self::PREFER_MAP_WHEN_PATCHES_OVER_INDICES_LESS_THAN
482✔
317
    }
482✔
318

319
    /// Take the indicies from the patches
320
    ///
321
    /// Any nulls in take_indices are added to the resulting patches.
322
    pub fn take_with_nulls(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
74✔
323
        if take_indices.is_empty() {
74✔
324
            return Ok(None);
×
325
        }
74✔
326

327
        let take_indices = take_indices.to_primitive()?;
74✔
328
        if self.is_map_faster_than_search(&take_indices) {
74✔
329
            self.take_map(take_indices, true)
74✔
330
        } else {
UNCOV
331
            self.take_search(take_indices, true)
×
332
        }
333
    }
74✔
334

335
    /// Take the indices from the patches.
336
    ///
337
    /// Any nulls in take_indices are ignored.
338
    pub fn take(&self, take_indices: &dyn Array) -> VortexResult<Option<Self>> {
408✔
339
        if take_indices.is_empty() {
408✔
UNCOV
340
            return Ok(None);
×
341
        }
408✔
342

343
        let take_indices = take_indices.to_primitive()?;
408✔
344
        if self.is_map_faster_than_search(&take_indices) {
408✔
345
            self.take_map(take_indices, false)
334✔
346
        } else {
347
            self.take_search(take_indices, false)
74✔
348
        }
349
    }
408✔
350

351
    pub fn take_search(
74✔
352
        &self,
74✔
353
        take_indices: PrimitiveArray,
74✔
354
        include_nulls: bool,
74✔
355
    ) -> VortexResult<Option<Self>> {
74✔
356
        let indices = self.indices.to_primitive()?;
74✔
357
        let new_length = take_indices.len();
74✔
358

359
        let Some((new_indices, values_indices)) =
74✔
360
            match_each_unsigned_integer_ptype!(indices.ptype(), |Indices| {
74✔
UNCOV
361
                match_each_integer_ptype!(take_indices.ptype(), |TakeIndices| {
×
UNCOV
362
                    take_search::<_, TakeIndices>(
×
UNCOV
363
                        indices.as_slice::<Indices>(),
×
UNCOV
364
                        take_indices,
×
365
                        self.offset(),
×
UNCOV
366
                        include_nulls,
×
UNCOV
367
                    )?
×
368
                })
369
            })
370
        else {
UNCOV
371
            return Ok(None);
×
372
        };
373

374
        Ok(Some(Self::new(
74✔
375
            new_length,
74✔
376
            0,
377
            new_indices,
74✔
378
            take(self.values(), &values_indices)?,
74✔
379
        )))
380
    }
74✔
381

382
    pub fn take_map(
408✔
383
        &self,
408✔
384
        take_indices: PrimitiveArray,
408✔
385
        include_nulls: bool,
408✔
386
    ) -> VortexResult<Option<Self>> {
408✔
387
        let indices = self.indices.to_primitive()?;
408✔
388
        let new_length = take_indices.len();
408✔
389

390
        let Some((new_sparse_indices, value_indices)) =
371✔
391
            match_each_unsigned_integer_ptype!(indices.ptype(), |Indices| {
408✔
392
                match_each_integer_ptype!(take_indices.ptype(), |TakeIndices| {
37✔
UNCOV
393
                    take_map::<_, TakeIndices>(
×
UNCOV
394
                        indices.as_slice::<Indices>(),
×
UNCOV
395
                        take_indices,
×
396
                        self.offset(),
×
UNCOV
397
                        self.min_index()?,
×
UNCOV
398
                        self.max_index()?,
×
UNCOV
399
                        include_nulls,
×
UNCOV
400
                    )?
×
401
                })
402
            })
403
        else {
404
            return Ok(None);
37✔
405
        };
406

407
        Ok(Some(Patches::new(
371✔
408
            new_length,
371✔
409
            0,
410
            new_sparse_indices,
371✔
411
            take(self.values(), &value_indices)?,
371✔
412
        )))
413
    }
408✔
414

415
    pub fn map_values<F>(self, f: F) -> VortexResult<Self>
16✔
416
    where
16✔
417
        F: FnOnce(ArrayRef) -> VortexResult<ArrayRef>,
16✔
418
    {
419
        let values = f(self.values)?;
16✔
420
        if self.indices.len() != values.len() {
16✔
421
            vortex_bail!(
×
422
                "map_values must preserve length: expected {} received {}",
×
423
                self.indices.len(),
×
424
                values.len()
×
425
            )
426
        }
16✔
427
        Ok(Self::new(self.array_len, self.offset, self.indices, values))
16✔
428
    }
16✔
429
}
430

431
fn take_search<I: NativePType + NumCast + PartialOrd, T: NativePType + NumCast>(
74✔
432
    indices: &[I],
74✔
433
    take_indices: PrimitiveArray,
74✔
434
    indices_offset: usize,
74✔
435
    include_nulls: bool,
74✔
436
) -> VortexResult<Option<(ArrayRef, ArrayRef)>>
74✔
437
where
74✔
438
    usize: TryFrom<T>,
74✔
439
    VortexError: From<<usize as TryFrom<T>>::Error>,
74✔
440
{
441
    let take_indices_validity = take_indices.validity();
74✔
442
    let indices_offset = I::from(indices_offset).vortex_expect("indices_offset out of range");
74✔
443

444
    let (values_indices, new_indices): (BufferMut<u64>, BufferMut<u64>) = take_indices
74✔
445
        .as_slice::<T>()
74✔
446
        .iter()
74✔
447
        .enumerate()
74✔
448
        .filter_map(|(i, &v)| {
259✔
449
            I::from(v)
259✔
450
                .and_then(|v| {
259✔
451
                    // If we have to take nulls the take index doesn't matter, make it 0 for consistency
452
                    if include_nulls && take_indices_validity.is_null(i).vortex_unwrap() {
259✔
UNCOV
453
                        Some(0)
×
454
                    } else {
455
                        indices
259✔
456
                            .search_sorted(&(v + indices_offset), SearchSortedSide::Left)
259✔
457
                            .to_found()
259✔
458
                            .map(|patch_idx| patch_idx as u64)
259✔
459
                    }
460
                })
259✔
461
                .map(|patch_idx| (patch_idx, i as u64))
259✔
462
        })
259✔
463
        .unzip();
74✔
464

465
    if new_indices.is_empty() {
74✔
UNCOV
466
        return Ok(None);
×
467
    }
74✔
468

469
    let new_indices = new_indices.into_array();
74✔
470
    let values_validity = take_indices_validity.take(&new_indices)?;
74✔
471
    Ok(Some((
74✔
472
        new_indices,
74✔
473
        PrimitiveArray::new(values_indices, values_validity).into_array(),
74✔
474
    )))
74✔
475
}
74✔
476

477
fn take_map<I: NativePType + Hash + Eq + TryFrom<usize>, T: NativePType>(
408✔
478
    indices: &[I],
408✔
479
    take_indices: PrimitiveArray,
408✔
480
    indices_offset: usize,
408✔
481
    min_index: usize,
408✔
482
    max_index: usize,
408✔
483
    include_nulls: bool,
408✔
484
) -> VortexResult<Option<(ArrayRef, ArrayRef)>>
408✔
485
where
408✔
486
    usize: TryFrom<T>,
408✔
487
    VortexError: From<<I as TryFrom<usize>>::Error>,
408✔
488
{
489
    let take_indices_validity = take_indices.validity();
408✔
490
    let take_indices = take_indices.as_slice::<T>();
408✔
491
    let offset_i = I::try_from(indices_offset)?;
408✔
492

493
    let sparse_index_to_value_index: HashMap<I, usize> = indices
408✔
494
        .iter()
408✔
495
        .copied()
408✔
496
        .map(|idx| idx - offset_i)
965✔
497
        .enumerate()
408✔
498
        .map(|(value_index, sparse_index)| (sparse_index, value_index))
965✔
499
        .collect();
408✔
500

501
    let (new_sparse_indices, value_indices): (BufferMut<u64>, BufferMut<u64>) = take_indices
408✔
502
        .iter()
408✔
503
        .copied()
408✔
504
        .map(usize::try_from)
408✔
505
        .process_results(|iter| {
408✔
506
            iter.enumerate()
408✔
507
                .filter_map(|(idx_in_take, ti)| {
1,149✔
508
                    // If we have to take nulls the take index doesn't matter, make it 0 for consistency
509
                    if include_nulls && take_indices_validity.is_null(idx_in_take).vortex_unwrap() {
1,149✔
510
                        Some((idx_in_take as u64, 0))
74✔
511
                    } else if ti < min_index || ti > max_index {
1,075✔
512
                        None
371✔
513
                    } else {
514
                        sparse_index_to_value_index
704✔
515
                            .get(
704✔
516
                                &I::try_from(ti)
704✔
517
                                    .vortex_expect("take index is between min and max index"),
704✔
518
                            )
519
                            .map(|value_index| (idx_in_take as u64, *value_index as u64))
704✔
520
                    }
521
                })
1,149✔
522
                .unzip()
408✔
523
        })
408✔
524
        .map_err(|_| vortex_err!("Failed to convert index to usize"))?;
408✔
525

526
    if new_sparse_indices.is_empty() {
408✔
527
        return Ok(None);
37✔
528
    }
371✔
529

530
    let new_sparse_indices = new_sparse_indices.into_array();
371✔
531
    let values_validity = take_indices_validity.take(&new_sparse_indices)?;
371✔
532
    Ok(Some((
371✔
533
        new_sparse_indices,
371✔
534
        PrimitiveArray::new(value_indices, values_validity).into_array(),
371✔
535
    )))
371✔
536
}
408✔
537

538
/// Filter patches with the provided mask (in flattened space).
539
///
540
/// The filter mask may contain indices that are non-patched. The return value of this function
541
/// is a new set of `Patches` with the indices relative to the provided `mask` rank, and the
542
/// patch values.
543
fn filter_patches_with_mask<T: ToPrimitive + Copy + Ord>(
381✔
544
    patch_indices: &[T],
381✔
545
    offset: usize,
381✔
546
    patch_values: &dyn Array,
381✔
547
    mask_indices: &[usize],
381✔
548
) -> VortexResult<Option<Patches>> {
381✔
549
    let true_count = mask_indices.len();
381✔
550
    let mut new_patch_indices = BufferMut::<u64>::with_capacity(true_count);
381✔
551
    let mut new_mask_indices = Vec::with_capacity(true_count);
381✔
552

553
    // Attempt to move the window by `STRIDE` elements on each iteration. This assumes that
554
    // the patches are relatively sparse compared to the overall mask, and so many indices in the
555
    // mask will end up being skipped.
556
    const STRIDE: usize = 4;
557

558
    let mut mask_idx = 0usize;
381✔
559
    let mut true_idx = 0usize;
381✔
560

561
    while mask_idx < patch_indices.len() && true_idx < true_count {
317,512✔
562
        // NOTE: we are searching for overlaps between sorted, unaligned indices in `patch_indices`
563
        //  and `mask_indices`. We assume that Patches are sparse relative to the global space of
564
        //  the mask (which covers both patch and non-patch values of the parent array), and so to
565
        //  quickly jump through regions with no overlap, we attempt to move our pointers by STRIDE
566
        //  elements on each iteration. If we cannot rule out overlap due to min/max values, we
567
        //  fallback to performing a two-way iterator merge.
568
        if (mask_idx + STRIDE) < patch_indices.len() && (true_idx + STRIDE) < mask_indices.len() {
317,131✔
569
            // Load a vector of each into our registers.
570
            let left_min = patch_indices[mask_idx].to_usize().vortex_expect("left_min") - offset;
312,180✔
571
            let left_max = patch_indices[mask_idx + STRIDE]
312,180✔
572
                .to_usize()
312,180✔
573
                .vortex_expect("left_max")
312,180✔
574
                - offset;
312,180✔
575
            let right_min = mask_indices[true_idx];
312,180✔
576
            let right_max = mask_indices[true_idx + STRIDE];
312,180✔
577

578
            if left_min > right_max {
312,180✔
579
                // Advance right side
580
                true_idx += STRIDE;
30,708✔
581
                continue;
30,708✔
582
            } else if right_min > left_max {
281,472✔
583
                mask_idx += STRIDE;
4,362✔
584
                continue;
4,362✔
585
            } else {
277,110✔
586
                // Fallthrough to direct comparison path.
277,110✔
587
            }
277,110✔
588
        }
4,951✔
589

590
        // Two-way sorted iterator merge:
591

592
        let left = patch_indices[mask_idx].to_usize().vortex_expect("left") - offset;
282,061✔
593
        let right = mask_indices[true_idx];
282,061✔
594

595
        match left.cmp(&right) {
282,061✔
596
            Ordering::Less => {
120,810✔
597
                mask_idx += 1;
120,810✔
598
            }
120,810✔
599
            Ordering::Greater => {
157,388✔
600
                true_idx += 1;
157,388✔
601
            }
157,388✔
602
            Ordering::Equal => {
3,863✔
603
                // Save the mask index as well as the positional index.
3,863✔
604
                new_mask_indices.push(mask_idx);
3,863✔
605
                new_patch_indices.push(true_idx as u64);
3,863✔
606

3,863✔
607
                mask_idx += 1;
3,863✔
608
                true_idx += 1;
3,863✔
609
            }
3,863✔
610
        }
611
    }
612

613
    if new_mask_indices.is_empty() {
381✔
614
        return Ok(None);
24✔
615
    }
357✔
616

617
    let new_patch_indices = new_patch_indices.into_array();
357✔
618
    let new_patch_values = filter(
357✔
619
        patch_values,
357✔
620
        &Mask::from_indices(patch_values.len(), new_mask_indices),
357✔
UNCOV
621
    )?;
×
622

623
    Ok(Some(Patches::new(
357✔
624
        true_count,
357✔
625
        0,
357✔
626
        new_patch_indices,
357✔
627
        new_patch_values,
357✔
628
    )))
357✔
629
}
381✔
630

631
#[cfg(test)]
632
mod test {
633
    use rstest::{fixture, rstest};
634
    use vortex_buffer::buffer;
635
    use vortex_mask::Mask;
636

637
    use crate::arrays::PrimitiveArray;
638
    use crate::patches::Patches;
639
    use crate::search_sorted::{SearchResult, SearchSortedSide};
640
    use crate::validity::Validity;
641
    use crate::{IntoArray, ToCanonical};
642

643
    #[test]
644
    fn test_filter() {
1✔
645
        let patches = Patches::new(
1✔
646
            100,
647
            0,
648
            buffer![10u32, 11, 20].into_array(),
1✔
649
            buffer![100, 110, 200].into_array(),
1✔
650
        );
651

652
        let filtered = patches
1✔
653
            .filter(&Mask::from_indices(100, vec![10, 20, 30]))
1✔
654
            .unwrap()
1✔
655
            .unwrap();
1✔
656

657
        let indices = filtered.indices().to_primitive().unwrap();
1✔
658
        let values = filtered.values().to_primitive().unwrap();
1✔
659
        assert_eq!(indices.as_slice::<u64>(), &[0, 1]);
1✔
660
        assert_eq!(values.as_slice::<i32>(), &[100, 200]);
1✔
661
    }
1✔
662

663
    #[fixture]
664
    fn patches() -> Patches {
665
        Patches::new(
666
            20,
667
            0,
668
            buffer![2u64, 9, 15].into_array(),
669
            PrimitiveArray::new(buffer![33_i32, 44, 55], Validity::AllValid).into_array(),
670
        )
671
    }
672

673
    #[rstest]
674
    fn search_larger_than(patches: Patches) {
675
        let res = patches.search_sorted(66, SearchSortedSide::Left).unwrap();
676
        assert_eq!(res, SearchResult::NotFound(16));
677
    }
678

679
    #[rstest]
680
    fn search_less_than(patches: Patches) {
681
        let res = patches.search_sorted(22, SearchSortedSide::Left).unwrap();
682
        assert_eq!(res, SearchResult::NotFound(2));
683
    }
684

685
    #[rstest]
686
    fn search_found(patches: Patches) {
687
        let res = patches.search_sorted(44, SearchSortedSide::Left).unwrap();
688
        assert_eq!(res, SearchResult::Found(9));
689
    }
690

691
    #[rstest]
692
    fn search_not_found_right(patches: Patches) {
693
        let res = patches.search_sorted(56, SearchSortedSide::Right).unwrap();
694
        assert_eq!(res, SearchResult::NotFound(16));
695
    }
696

697
    #[rstest]
698
    fn search_sliced(patches: Patches) {
699
        let sliced = patches.slice(7, 20).unwrap().unwrap();
700
        assert_eq!(
701
            sliced.search_sorted(22, SearchSortedSide::Left).unwrap(),
702
            SearchResult::NotFound(2)
703
        );
704
    }
705

706
    #[test]
707
    fn search_right() {
1✔
708
        let patches = Patches::new(
1✔
709
            6,
710
            0,
711
            buffer![0u8, 1, 4, 5].into_array(),
1✔
712
            buffer![-128i8, -98, 8, 50].into_array(),
1✔
713
        );
714

715
        assert_eq!(
1✔
716
            patches.search_sorted(-98, SearchSortedSide::Right).unwrap(),
1✔
717
            SearchResult::Found(2)
718
        );
719
        assert_eq!(
1✔
720
            patches.search_sorted(50, SearchSortedSide::Right).unwrap(),
1✔
721
            SearchResult::Found(6),
722
        );
723
        assert_eq!(
1✔
724
            patches.search_sorted(7, SearchSortedSide::Right).unwrap(),
1✔
725
            SearchResult::NotFound(2),
726
        );
727
        assert_eq!(
1✔
728
            patches.search_sorted(51, SearchSortedSide::Right).unwrap(),
1✔
729
            SearchResult::NotFound(6)
730
        );
731
    }
1✔
732

733
    #[test]
734
    fn search_left() {
1✔
735
        let patches = Patches::new(
1✔
736
            20,
737
            0,
738
            buffer![0u64, 1, 17, 18, 19].into_array(),
1✔
739
            buffer![11i32, 22, 33, 44, 55].into_array(),
1✔
740
        );
741
        assert_eq!(
1✔
742
            patches.search_sorted(30, SearchSortedSide::Left).unwrap(),
1✔
743
            SearchResult::NotFound(2)
744
        );
745
        assert_eq!(
1✔
746
            patches.search_sorted(54, SearchSortedSide::Left).unwrap(),
1✔
747
            SearchResult::NotFound(19)
748
        );
749
    }
1✔
750

751
    #[rstest]
752
    fn take_wit_nulls(patches: Patches) {
753
        let taken = patches
754
            .take(
755
                &PrimitiveArray::new(buffer![9, 0], Validity::from_iter(vec![true, false]))
756
                    .into_array(),
757
            )
758
            .unwrap()
759
            .unwrap();
760
        let primitive_values = taken.values().to_primitive().unwrap();
761
        assert_eq!(taken.array_len(), 2);
762
        assert_eq!(primitive_values.as_slice::<i32>(), [44]);
763
        assert_eq!(
764
            primitive_values.validity_mask().unwrap(),
765
            Mask::from_iter(vec![true])
766
        );
767
    }
768

769
    #[test]
770
    fn test_slice() {
1✔
771
        let values = buffer![15_u32, 135, 13531, 42].into_array();
1✔
772
        let indices = buffer![10_u64, 11, 50, 100].into_array();
1✔
773

774
        let patches = Patches::new(101, 0, indices, values);
1✔
775

776
        let sliced = patches.slice(15, 100).unwrap().unwrap();
1✔
777
        assert_eq!(sliced.array_len(), 100 - 15);
1✔
778
        let primitive = sliced.values().to_primitive().unwrap();
1✔
779

780
        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1✔
781
    }
1✔
782

783
    #[test]
784
    fn doubly_sliced() {
1✔
785
        let values = buffer![15_u32, 135, 13531, 42].into_array();
1✔
786
        let indices = buffer![10_u64, 11, 50, 100].into_array();
1✔
787

788
        let patches = Patches::new(101, 0, indices, values);
1✔
789

790
        let sliced = patches.slice(15, 100).unwrap().unwrap();
1✔
791
        assert_eq!(sliced.array_len(), 100 - 15);
1✔
792
        let primitive = sliced.values().to_primitive().unwrap();
1✔
793

794
        assert_eq!(primitive.as_slice::<u32>(), &[13531]);
1✔
795

796
        let doubly_sliced = sliced.slice(35, 36).unwrap().unwrap();
1✔
797
        let primitive_doubly_sliced = doubly_sliced.values().to_primitive().unwrap();
1✔
798

799
        assert_eq!(primitive_doubly_sliced.as_slice::<u32>(), &[13531]);
1✔
800
    }
1✔
801
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc