• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16950147456

13 Aug 2025 09:49PM UTC coverage: 87.673% (+0.006%) from 87.667%
16950147456

Pull #4214

github

web-flow
Merge 047b17011 into 5b99decba
Pull Request #4214: feat: streaming stores for decoding run-end `PrimitiveArray`

41 of 41 new or added lines in 2 files covered. (100.0%)

21 existing lines in 1 file now uncovered.

56538 of 64487 relevant lines covered (87.67%)

627527.54 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.28
/vortex-buffer/src/buffer_mut.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use core::mem::MaybeUninit;
5
use std::any::type_name;
6
use std::fmt::{Debug, Formatter};
7
use std::io::Write;
8
use std::ops::{Deref, DerefMut};
9

10
use bytes::buf::UninitSlice;
11
use bytes::{Buf, BufMut, BytesMut};
12
use vortex_error::{VortexExpect, vortex_panic};
13

14
use crate::debug::TruncatedDebug;
15
use crate::trusted_len::TrustedLen;
16
use crate::{Alignment, Buffer, ByteBufferMut};
17

18
/// A mutable buffer that maintains a runtime-defined alignment through resizing operations.
19
#[derive(PartialEq, Eq)]
20
pub struct BufferMut<T> {
21
    pub(crate) bytes: BytesMut,
22
    pub(crate) length: usize,
23
    pub(crate) alignment: Alignment,
24
    pub(crate) _marker: std::marker::PhantomData<T>,
25
}
26

27
impl<T> BufferMut<T> {
28
    /// Create a new `BufferMut` with the requested alignment and capacity.
29
    pub fn with_capacity(capacity: usize) -> Self {
2,650,243✔
30
        Self::with_capacity_aligned(capacity, Alignment::of::<T>())
2,650,243✔
31
    }
2,650,243✔
32

33
    /// Create a new `BufferMut` with the requested alignment and capacity.
34
    pub fn with_capacity_aligned(capacity: usize, alignment: Alignment) -> Self {
3,298,077✔
35
        if !alignment.is_aligned_to(Alignment::of::<T>()) {
3,298,077✔
36
            vortex_panic!(
×
37
                "Alignment {} must align to the scalar type's alignment {}",
×
38
                alignment,
39
                align_of::<T>()
40
            );
41
        }
3,298,077✔
42

43
        let mut bytes = BytesMut::with_capacity((capacity * size_of::<T>()) + *alignment);
3,298,077✔
44
        bytes.align_empty(alignment);
3,298,077✔
45

46
        Self {
3,298,077✔
47
            bytes,
3,298,077✔
48
            length: 0,
3,298,077✔
49
            alignment,
3,298,077✔
50
            _marker: Default::default(),
3,298,077✔
51
        }
3,298,077✔
52
    }
3,298,077✔
53

54
    /// Create a new zeroed `BufferMut`.
55
    pub fn zeroed(len: usize) -> Self {
×
56
        Self::zeroed_aligned(len, Alignment::of::<T>())
×
57
    }
×
58

59
    /// Create a new zeroed `BufferMut`.
60
    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
25,594✔
61
        let mut bytes = BytesMut::zeroed((len * size_of::<T>()) + *alignment);
25,594✔
62
        bytes.advance(bytes.as_ptr().align_offset(*alignment));
25,594✔
63
        unsafe { bytes.set_len(len * size_of::<T>()) };
25,594✔
64
        Self {
25,594✔
65
            bytes,
25,594✔
66
            length: len,
25,594✔
67
            alignment,
25,594✔
68
            _marker: Default::default(),
25,594✔
69
        }
25,594✔
70
    }
25,594✔
71

72
    /// Create a new empty `BufferMut` with the provided alignment.
73
    pub fn empty() -> Self {
272,552✔
74
        Self::empty_aligned(Alignment::of::<T>())
272,552✔
75
    }
272,552✔
76

77
    /// Create a new empty `BufferMut` with the provided alignment.
78
    pub fn empty_aligned(alignment: Alignment) -> Self {
277,142✔
79
        BufferMut::with_capacity_aligned(0, alignment)
277,142✔
80
    }
277,142✔
81

82
    /// Create a new full `BufferMut` with the given value.
83
    pub fn full(item: T, len: usize) -> Self
201,753✔
84
    where
201,753✔
85
        T: Copy,
201,753✔
86
    {
87
        let mut buffer = BufferMut::<T>::with_capacity(len);
201,753✔
88
        buffer.push_n(item, len);
201,753✔
89
        buffer
201,753✔
90
    }
201,753✔
91

92
    /// Create a mutable scalar buffer by copying the contents of the slice.
93
    pub fn copy_from(other: impl AsRef<[T]>) -> Self {
73,107✔
94
        Self::copy_from_aligned(other, Alignment::of::<T>())
73,107✔
95
    }
73,107✔
96

97
    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
98
    ///
99
    /// ## Panics
100
    ///
101
    /// Panics when the requested alignment isn't itself aligned to type T.
102
    pub fn copy_from_aligned(other: impl AsRef<[T]>, alignment: Alignment) -> Self {
81,374✔
103
        if !alignment.is_aligned_to(Alignment::of::<T>()) {
81,374✔
104
            vortex_panic!("Given alignment is not aligned to type T")
×
105
        }
81,374✔
106
        let other = other.as_ref();
81,374✔
107
        let mut buffer = Self::with_capacity_aligned(other.len(), alignment);
81,374✔
108
        buffer.extend_from_slice(other);
81,374✔
109
        debug_assert_eq!(buffer.alignment(), alignment);
81,374✔
110
        buffer
81,374✔
111
    }
81,374✔
112

113
    /// Get the alignment of the buffer.
114
    #[inline(always)]
115
    pub fn alignment(&self) -> Alignment {
267,279✔
116
        self.alignment
267,279✔
117
    }
267,279✔
118

119
    /// Returns the length of the buffer.
120
    #[inline(always)]
121
    pub fn len(&self) -> usize {
13,405,286✔
122
        debug_assert_eq!(self.length, self.bytes.len() / size_of::<T>());
13,405,286✔
123
        self.length
13,405,286✔
124
    }
13,405,286✔
125

126
    /// Returns whether the buffer is empty.
127
    #[inline(always)]
128
    pub fn is_empty(&self) -> bool {
28,502✔
129
        self.length == 0
28,502✔
130
    }
28,502✔
131

132
    /// Returns the capacity of the buffer.
133
    #[inline]
134
    pub fn capacity(&self) -> usize {
5,465,860✔
135
        self.bytes.capacity() / size_of::<T>()
5,465,860✔
136
    }
5,465,860✔
137

138
    /// Returns a slice over the buffer of elements of type T.
139
    #[inline]
140
    pub fn as_slice(&self) -> &[T] {
17,007,164✔
141
        let raw_slice = self.bytes.as_ref();
17,007,164✔
142
        // SAFETY: alignment of Buffer is checked on construction
143
        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
17,007,164✔
144
    }
17,007,164✔
145

146
    /// Returns a slice over the buffer of elements of type T.
147
    #[inline]
148
    pub fn as_mut_slice(&mut self) -> &mut [T] {
1,938,227✔
149
        let raw_slice = self.bytes.as_mut();
1,938,227✔
150
        // SAFETY: alignment of Buffer is checked on construction
151
        unsafe { std::slice::from_raw_parts_mut(raw_slice.as_mut_ptr().cast(), self.length) }
1,938,227✔
152
    }
1,938,227✔
153

154
    /// Clear the buffer, retaining any existing capacity.
155
    #[inline]
156
    pub fn clear(&mut self) {
7,644✔
157
        unsafe { self.bytes.set_len(0) }
7,644✔
158
        self.length = 0;
7,644✔
159
    }
7,644✔
160

161
    /// Shortens the buffer, keeping the first `len` bytes and dropping the
162
    /// rest.
163
    ///
164
    /// If `len` is greater than the buffer's current length, this has no
165
    /// effect.
166
    ///
167
    /// Existing underlying capacity is preserved.
168
    #[inline]
169
    pub fn truncate(&mut self, len: usize) {
5✔
170
        if len <= self.len() {
5✔
171
            // SAFETY: Shrinking the buffer cannot expose uninitialized bytes.
5✔
172
            unsafe { self.set_len(len) };
5✔
173
        }
5✔
174
    }
5✔
175

176
    /// Reserves capacity for at least `additional` more elements to be inserted in the buffer.
177
    #[inline]
178
    pub fn reserve(&mut self, additional: usize) {
33,256,168✔
179
        let additional_bytes = additional * size_of::<T>();
33,256,168✔
180
        if additional_bytes <= self.bytes.capacity() - self.bytes.len() {
33,256,168✔
181
            // We can fit the additional bytes in the remaining capacity. Nothing to do.
182
            return;
32,663,545✔
183
        }
592,623✔
184

185
        // Otherwise, reserve additional + alignment bytes in case we need to realign the buffer.
186
        self.reserve_allocate(additional);
592,623✔
187
    }
33,256,168✔
188

189
    /// A separate function so we can inline the reserve call's fast path. According to `BytesMut`
190
    /// this has significant performance implications.
191
    fn reserve_allocate(&mut self, additional: usize) {
592,623✔
192
        let new_capacity: usize = ((self.length + additional) * size_of::<T>()) + *self.alignment;
592,623✔
193
        // Make sure we at least double in size each time we re-allocate to amortize the cost
194
        let new_capacity = new_capacity.max(self.bytes.capacity() * 2);
592,623✔
195

196
        let mut bytes = BytesMut::with_capacity(new_capacity);
592,623✔
197
        bytes.align_empty(self.alignment);
592,623✔
198
        bytes.extend_from_slice(&self.bytes);
592,623✔
199
        self.bytes = bytes;
592,623✔
200
    }
592,623✔
201

202
    /// Returns the spare capacity of the buffer as a slice of `MaybeUninit<T>`.
203
    /// Has identical semantics to [`Vec::spare_capacity_mut`].
204
    ///
205
    /// The returned slice can be used to fill the buffer with data (e.g. by
206
    /// reading from a file) before marking the data as initialized using the
207
    /// [`set_len`] method.
208
    ///
209
    /// [`set_len`]: BufferMut::set_len
210
    /// [`Vec::spare_capacity_mut`]: Vec::spare_capacity_mut
211
    ///
212
    /// # Examples
213
    ///
214
    /// ```
215
    /// use vortex_buffer::BufferMut;
216
    ///
217
    /// // Allocate vector big enough for 10 elements.
218
    /// let mut b = BufferMut::<u64>::with_capacity(10);
219
    ///
220
    /// // Fill in the first 3 elements.
221
    /// let uninit = b.spare_capacity_mut();
222
    /// uninit[0].write(0);
223
    /// uninit[1].write(1);
224
    /// uninit[2].write(2);
225
    ///
226
    /// // Mark the first 3 elements of the vector as being initialized.
227
    /// unsafe {
228
    ///     b.set_len(3);
229
    /// }
230
    ///
231
    /// assert_eq!(b.as_slice(), &[0u64, 1, 2]);
232
    /// ```
233
    #[inline]
234
    pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<T>] {
1,314,092✔
235
        let dst = self.bytes.spare_capacity_mut().as_mut_ptr();
1,314,092✔
236
        unsafe {
1,314,092✔
237
            std::slice::from_raw_parts_mut(
1,314,092✔
238
                dst as *mut MaybeUninit<T>,
1,314,092✔
239
                self.capacity() - self.length,
1,314,092✔
240
            )
1,314,092✔
241
        }
1,314,092✔
242
    }
1,314,092✔
243

244
    /// # Safety
245
    /// The caller must ensure that the buffer was properly initialized up to `len`.
246
    #[inline]
247
    pub unsafe fn set_len(&mut self, len: usize) {
2,383,604✔
248
        unsafe { self.bytes.set_len(len * size_of::<T>()) };
2,383,604✔
249
        self.length = len;
2,383,604✔
250
    }
2,383,604✔
251

252
    /// Appends a scalar to the buffer.
253
    #[inline]
254
    pub fn push(&mut self, value: T) {
23,308,887✔
255
        self.reserve(1);
23,308,887✔
256
        unsafe { self.push_unchecked(value) }
23,308,887✔
257
    }
23,308,887✔
258

259
    /// Appends a scalar to the buffer without checking for sufficient capacity.
260
    ///
261
    /// ## Safety
262
    ///
263
    /// The caller must ensure there is sufficient capacity in the array.
264
    #[inline]
265
    pub unsafe fn push_unchecked(&mut self, item: T) {
42,096,044✔
266
        // SAFETY: the caller ensures we have sufficient capacity
267
        unsafe {
268
            let dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
42,096,044✔
269
            dst.write(item);
42,096,044✔
270
            self.bytes.set_len(self.bytes.len() + size_of::<T>())
42,096,044✔
271
        }
272
        self.length += 1;
42,096,044✔
273
    }
42,096,044✔
274

275
    /// Appends n scalars to the buffer.
276
    ///
277
    /// This function is slightly more optimized than `extend(iter::repeat_n(item, b))`.
278
    #[inline]
279
    pub fn push_n(&mut self, item: T, n: usize)
210,636✔
280
    where
210,636✔
281
        T: Copy,
210,636✔
282
    {
283
        self.reserve(n);
210,636✔
284
        unsafe { self.push_n_unchecked(item, n) }
210,636✔
285
    }
210,636✔
286

287
    /// Appends n scalars to the buffer.
288
    ///
289
    /// ## Safety
290
    ///
291
    /// The caller must ensure there is sufficient capacity in the array.
292
    #[inline]
293
    pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
242,928✔
294
    where
242,928✔
295
        T: Copy,
242,928✔
296
    {
297
        #[cfg(target_feature = "avx2")]
298
        {
299
            // Ensure a minimum of 128 bytes to write for AVX streaming stores.
300
            if size_of::<T>() * n >= 128 {
301
                unsafe { avx2::push_n_unchecked(self, item, n) }
302
            } else {
303
                unsafe { scalar::push_n_unchecked(self, item, n) }
304
            }
305
        }
306

307
        #[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
308
        unsafe {
309
            scalar::push_n_unchecked(self, item, n)
242,928✔
310
        }
311
    }
242,928✔
312

313
    /// Appends a slice of type `T`, growing the internal buffer as needed.
314
    ///
315
    /// # Example:
316
    ///
317
    /// ```
318
    /// # use vortex_buffer::BufferMut;
319
    ///
320
    /// let mut builder = BufferMut::<u16>::with_capacity(10);
321
    /// builder.extend_from_slice(&[42, 44, 46]);
322
    ///
323
    /// assert_eq!(builder.len(), 3);
324
    /// ```
325
    #[inline]
326
    pub fn extend_from_slice(&mut self, slice: &[T]) {
8,604,066✔
327
        self.reserve(slice.len());
8,604,066✔
328
        let raw_slice =
8,604,066✔
329
            unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), size_of_val(slice)) };
8,604,066✔
330
        self.bytes.extend_from_slice(raw_slice);
8,604,066✔
331
        self.length += slice.len();
8,604,066✔
332
    }
8,604,066✔
333

334
    /// Freeze the `BufferMut` into a `Buffer`.
335
    pub fn freeze(self) -> Buffer<T> {
3,129,632✔
336
        Buffer {
3,129,632✔
337
            bytes: self.bytes.freeze(),
3,129,632✔
338
            length: self.length,
3,129,632✔
339
            alignment: self.alignment,
3,129,632✔
340
            _marker: Default::default(),
3,129,632✔
341
        }
3,129,632✔
342
    }
3,129,632✔
343

344
    /// Map each element of the buffer with a closure.
345
    pub fn map_each<R, F>(self, mut f: F) -> BufferMut<R>
9,620✔
346
    where
9,620✔
347
        T: Copy,
9,620✔
348
        F: FnMut(T) -> R,
9,620✔
349
    {
350
        assert_eq!(
9,620✔
351
            size_of::<T>(),
352
            size_of::<R>(),
353
            "Size of T and R do not match"
×
354
        );
355
        // SAFETY: we have checked that `size_of::<T>` == `size_of::<R>`.
356
        let mut buf: BufferMut<R> = unsafe { std::mem::transmute(self) };
9,620✔
357
        buf.iter_mut()
9,620✔
358
            .for_each(|item| *item = f(unsafe { std::mem::transmute_copy(item) }));
261,841,292✔
359
        buf
9,620✔
360
    }
9,620✔
361

362
    /// Return a `BufferMut<T>` with the given alignment. Where possible, this will be zero-copy.
363
    pub fn aligned(self, alignment: Alignment) -> Self {
×
364
        if self.as_ptr().align_offset(*alignment) == 0 {
×
365
            self
×
366
        } else {
367
            Self::copy_from_aligned(self, alignment)
×
368
        }
369
    }
×
370
}
371

372
impl<T> Clone for BufferMut<T> {
373
    fn clone(&self) -> Self {
12,639✔
374
        // NOTE(ngates): we cannot derive Clone since BytesMut copies on clone and the alignment
375
        //  might be messed up.
376
        let mut buffer = BufferMut::<T>::with_capacity_aligned(self.capacity(), self.alignment);
12,639✔
377
        buffer.extend_from_slice(self.as_slice());
12,639✔
378
        buffer
12,639✔
379
    }
12,639✔
380
}
381

382
impl<T: Debug> Debug for BufferMut<T> {
383
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
384
        f.debug_struct(&format!("BufferMut<{}>", type_name::<T>()))
×
385
            .field("length", &self.length)
×
386
            .field("alignment", &self.alignment)
×
387
            .field("as_slice", &TruncatedDebug(self.as_slice()))
×
388
            .finish()
×
389
    }
×
390
}
391

392
impl<T> Default for BufferMut<T> {
393
    fn default() -> Self {
191,385✔
394
        Self::empty()
191,385✔
395
    }
191,385✔
396
}
397

398
impl<T> Deref for BufferMut<T> {
399
    type Target = [T];
400

401
    fn deref(&self) -> &Self::Target {
16,992,108✔
402
        self.as_slice()
16,992,108✔
403
    }
16,992,108✔
404
}
405

406
impl<T> DerefMut for BufferMut<T> {
407
    fn deref_mut(&mut self) -> &mut Self::Target {
1,938,226✔
408
        self.as_mut_slice()
1,938,226✔
409
    }
1,938,226✔
410
}
411

412
impl<T> AsRef<[T]> for BufferMut<T> {
413
    fn as_ref(&self) -> &[T] {
5✔
414
        self.as_slice()
5✔
415
    }
5✔
416
}
417

418
impl<T> AsMut<[T]> for BufferMut<T> {
419
    fn as_mut(&mut self) -> &mut [T] {
1✔
420
        self.as_mut_slice()
1✔
421
    }
1✔
422
}
423

424
impl<T> BufferMut<T> {
425
    fn extend_iter(&mut self, mut iter: impl Iterator<Item = T>) {
261,383✔
426
        // Attempt to reserve enough memory up-front, although this is only a lower bound.
427
        let (lower, _) = iter.size_hint();
261,383✔
428
        self.reserve(lower);
261,383✔
429

430
        let remaining = self.capacity() - self.len();
261,383✔
431

432
        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
261,383✔
433
        let mut dst: *mut T = begin.cast_mut();
261,383✔
434
        for _ in 0..remaining {
261,383✔
435
            if let Some(item) = iter.next() {
86,482,485✔
436
                unsafe {
86,257,634✔
437
                    // SAFETY: We know we have enough capacity to write the item.
86,257,634✔
438
                    dst.write(item);
86,257,634✔
439
                    // Note. we used to have dst.add(iteration).write(item), here.
86,257,634✔
440
                    // however this was much slower than just incrementing dst.
86,257,634✔
441
                    dst = dst.add(1);
86,257,634✔
442
                }
86,257,634✔
443
            } else {
444
                break;
224,851✔
445
            }
446
        }
447

448
        // TODO(joe): replace with ptr_sub when stable
449
        let length = self.len() + unsafe { dst.byte_offset_from(begin) as usize / size_of::<T>() };
261,383✔
450
        unsafe { self.set_len(length) };
261,383✔
451

452
        // Append remaining elements
453
        iter.for_each(|item| self.push(item));
261,383✔
454
    }
261,383✔
455

456
    /// An unsafe variant of the `Extend` trait and its `extend` method that receives what the
457
    /// caller guarantees to be an iterator with a trusted upper bound.
458
    pub fn extend_trusted<I: TrustedLen<Item = T>>(&mut self, iter: I) {
97,332✔
459
        // Reserve all memory upfront since it's an exact upper bound
460
        let (_, high) = iter.size_hint();
97,332✔
461
        self.reserve(high.vortex_expect("TrustedLen iterator didn't have valid upper bound"));
97,332✔
462

463
        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
97,332✔
464
        let mut dst: *mut T = begin.cast_mut();
97,332✔
465
        iter.for_each(|item| {
39,301,666✔
466
            unsafe {
39,301,666✔
467
                // SAFETY: We know we have enough capacity to write the item.
39,301,666✔
468
                dst.write(item);
39,301,666✔
469
                // Note. we used to have dst.add(iteration).write(item), here.
39,301,666✔
470
                // however this was much slower than just incrementing dst.
39,301,666✔
471
                dst = dst.add(1);
39,301,666✔
472
            }
39,301,666✔
473
        });
39,301,666✔
474
        // TODO(joe): replace with ptr_sub when stable
475
        let length = self.len() + unsafe { dst.byte_offset_from(begin) as usize / size_of::<T>() };
97,332✔
476
        unsafe { self.set_len(length) };
97,332✔
477
    }
97,332✔
478
}
479

480
mod scalar {
481
    use std::mem::size_of;
482

483
    use super::*;
484

485
    /// Appends n scalars to the buffer using scalar (non-SIMD) implementation.
486
    ///
487
    /// ## Safety
488
    ///
489
    /// The caller must ensure there is sufficient capacity in the buffer.
490
    #[inline]
491
    pub unsafe fn push_n_unchecked<T>(buffer: &mut BufferMut<T>, item: T, n: usize)
242,928✔
492
    where
242,928✔
493
        T: Copy,
242,928✔
494
    {
495
        let mut dst: *mut T = buffer.bytes.spare_capacity_mut().as_mut_ptr().cast();
242,928✔
496

497
        // Safety: Sufficient capacity is a precondition.
498
        unsafe {
499
            let end = dst.add(n);
242,928✔
500
            while dst < end {
45,788,056✔
501
                dst.write(item);
45,545,128✔
502
                dst = dst.add(1);
45,545,128✔
503
            }
45,545,128✔
504
            buffer
242,928✔
505
                .bytes
242,928✔
506
                .set_len(buffer.bytes.len() + (n * size_of::<T>()));
242,928✔
507
        }
508
        buffer.length += n;
242,928✔
509
    }
242,928✔
510
}
511

512
#[cfg(target_feature = "avx2")]
513
mod avx2 {
514
    use std::arch::x86_64::{
515
        __m256i, _mm_sfence, _mm256_set1_epi8, _mm256_set1_epi16, _mm256_set1_epi32,
516
        _mm256_set1_epi64x, _mm256_storeu_si256, _mm256_stream_si256,
517
    };
518
    use std::mem::size_of;
519

520
    use super::*;
521

522
    /// Appends n copies of item using `_mm256_stream_si256` streaming stores.
523
    ///
524
    /// ## Safety
525
    ///
526
    /// Caller must ensure sufficient capacity.
527
    #[inline]
528
    pub unsafe fn push_n_unchecked<T>(buffer: &mut BufferMut<T>, item: T, n: usize)
529
    where
530
        T: Copy,
531
    {
532
        let size = size_of::<T>();
533

534
        // Splat `item` into a vector register.
535
        let pattern = unsafe {
536
            match size {
537
                1 => {
538
                    let bytes = std::mem::transmute::<T, [u8; 1]>(item);
539
                    _mm256_set1_epi8(i8::from_ne_bytes(bytes))
540
                }
541
                2 => {
542
                    let bytes = std::mem::transmute::<T, [u8; 2]>(item);
543
                    _mm256_set1_epi16(i16::from_ne_bytes(bytes))
544
                }
545
                4 => {
546
                    let bytes = std::mem::transmute::<T, [u8; 4]>(item);
547
                    _mm256_set1_epi32(i32::from_ne_bytes(bytes))
548
                }
549
                8 => {
550
                    let bytes = std::mem::transmute::<T, [u8; 8]>(item);
551
                    _mm256_set1_epi64x(i64::from_ne_bytes(bytes))
552
                }
553
                _ => {
554
                    scalar::push_n_unchecked(buffer, item, n);
555
                    return;
556
                }
557
            }
558
        };
559

560
        let total_bytes = n * size;
561
        let mut ptr = buffer.bytes.spare_capacity_mut().as_mut_ptr() as *mut u8;
562
        let end = unsafe { ptr.add(total_bytes) };
563

564
        if buffer
565
            .bytes
566
            .spare_capacity_mut()
567
            .as_mut_ptr()
568
            .align_offset(align_of::<__m256i>())
569
            == 0
570
        {
571
            // Safety: Sufficient capacity is a precondition.
572
            for _ in 0..(total_bytes / 32) {
573
                unsafe {
574
                    // Use AVX streaming stores if the buffer is aligned to 32 bytes.
575
                    _mm256_stream_si256(ptr as *mut __m256i, pattern);
576
                }
577
                ptr = unsafe { ptr.add(32) };
578
            }
579
            // Ensure all writes become visible.
580
            unsafe { _mm_sfence() };
581
        } else {
582
            // Safety: Sufficient capacity is a precondition.
583
            for _ in 0..(total_bytes / 32) {
584
                unsafe {
585
                    // Use unaligned stores if the buffer is not aligned to 32 bytes.
586
                    _mm256_storeu_si256(ptr as *mut __m256i, pattern);
587
                }
588
                ptr = unsafe { ptr.add(32) };
589
            }
590
        }
591

592
        // Safety: Sufficient capacity is a precondition.
593
        unsafe {
594
            while ptr < end {
595
                (ptr as *mut T).write(item);
596
                ptr = ptr.add(size);
597
            }
598
        }
599

600
        unsafe { buffer.bytes.set_len(buffer.bytes.len() + total_bytes) };
601
        buffer.length += n;
602
    }
603
}
604

605
impl<T> Extend<T> for BufferMut<T> {
606
    #[inline]
607
    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
261,383✔
608
        self.extend_iter(iter.into_iter())
261,383✔
609
    }
261,383✔
610
}
611

612
impl<'a, T> Extend<&'a T> for BufferMut<T>
613
where
614
    T: Copy + 'a,
615
{
616
    #[inline]
UNCOV
617
    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
×
UNCOV
618
        self.extend_iter(iter.into_iter().copied())
×
UNCOV
619
    }
×
620
}
621

622
impl<T> FromIterator<T> for BufferMut<T> {
623
    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
185,912✔
624
        // We don't infer the capacity here and just let the first call to `extend` do it for us.
625
        let mut buffer = Self::with_capacity(0);
185,912✔
626
        buffer.extend(iter);
185,912✔
627
        debug_assert_eq!(buffer.alignment(), Alignment::of::<T>());
185,912✔
628
        buffer
185,904✔
629
    }
185,904✔
630
}
631

632
impl Buf for ByteBufferMut {
633
    fn remaining(&self) -> usize {
2✔
634
        self.len()
2✔
635
    }
2✔
636

637
    fn chunk(&self) -> &[u8] {
2✔
638
        self.as_slice()
2✔
639
    }
2✔
640

641
    fn advance(&mut self, cnt: usize) {
1✔
642
        if !cnt.is_multiple_of(*self.alignment) {
1✔
UNCOV
643
            vortex_panic!(
×
UNCOV
644
                "Cannot advance buffer by {} items, resulting alignment is not {}",
×
645
                cnt,
646
                self.alignment
647
            );
648
        }
1✔
649
        self.bytes.advance(cnt);
1✔
650
        self.length -= cnt;
1✔
651
    }
1✔
652
}
653

654
/// As per the BufMut implementation, we must support internal resizing when
655
/// asked to extend the buffer.
656
/// See: <https://github.com/tokio-rs/bytes/issues/131>
657
unsafe impl BufMut for ByteBufferMut {
658
    #[inline]
659
    fn remaining_mut(&self) -> usize {
7,065✔
660
        usize::MAX - self.len()
7,065✔
661
    }
7,065✔
662

663
    #[inline]
UNCOV
664
    unsafe fn advance_mut(&mut self, cnt: usize) {
×
665
        if !cnt.is_multiple_of(*self.alignment) {
×
666
            vortex_panic!(
×
667
                "Cannot advance buffer by {} items, resulting alignment is not {}",
×
668
                cnt,
669
                self.alignment
670
            );
671
        }
×
UNCOV
672
        unsafe { self.bytes.advance_mut(cnt) };
×
673
        self.length -= cnt;
×
674
    }
×
675

676
    #[inline]
677
    fn chunk_mut(&mut self) -> &mut UninitSlice {
×
678
        self.bytes.chunk_mut()
×
UNCOV
679
    }
×
680

UNCOV
681
    fn put<T: Buf>(&mut self, mut src: T)
×
UNCOV
682
    where
×
UNCOV
683
        Self: Sized,
×
684
    {
UNCOV
685
        while src.has_remaining() {
×
686
            let chunk = src.chunk();
×
687
            self.extend_from_slice(chunk);
×
688
            src.advance(chunk.len());
×
UNCOV
689
        }
×
UNCOV
690
    }
×
691

692
    #[inline]
693
    fn put_slice(&mut self, src: &[u8]) {
14,246✔
694
        self.extend_from_slice(src);
14,246✔
695
    }
14,246✔
696

697
    #[inline]
UNCOV
698
    fn put_bytes(&mut self, val: u8, cnt: usize) {
×
UNCOV
699
        self.push_n(val, cnt)
×
UNCOV
700
    }
×
701
}
702

703
/// Extension trait for [`BytesMut`] that provides functions for aligning the buffer.
704
trait AlignedBytesMut {
705
    /// Align an empty `BytesMut` to the specified alignment.
706
    ///
707
    /// ## Panics
708
    ///
709
    /// Panics if the buffer is not empty, or if there is not enough capacity to align the buffer.
710
    fn align_empty(&mut self, alignment: Alignment);
711
}
712

713
impl AlignedBytesMut for BytesMut {
714
    fn align_empty(&mut self, alignment: Alignment) {
4,201,450✔
715
        if !self.is_empty() {
4,201,450✔
UNCOV
716
            vortex_panic!("ByteBufferMut must be empty");
×
717
        }
4,201,450✔
718

719
        let padding = self.as_ptr().align_offset(*alignment);
4,201,450✔
720
        self.capacity()
4,201,450✔
721
            .checked_sub(padding)
4,201,450✔
722
            .vortex_expect("Not enough capacity to align buffer");
4,201,450✔
723

724
        // SAFETY: We know the buffer is empty, and we know we have enough capacity, so we can
725
        // safely set the length to the padding and advance the buffer to the aligned offset.
726
        unsafe { self.set_len(padding) };
4,201,450✔
727
        self.advance(padding);
4,201,450✔
728
    }
4,201,450✔
729
}
730

731
impl Write for ByteBufferMut {
732
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
6,174✔
733
        self.extend_from_slice(buf);
6,174✔
734
        Ok(buf.len())
6,174✔
735
    }
6,174✔
736

UNCOV
737
    fn flush(&mut self) -> std::io::Result<()> {
×
UNCOV
738
        Ok(())
×
UNCOV
739
    }
×
740
}
741

742
#[cfg(test)]
743
mod test {
744
    use bytes::{Buf, BufMut};
745

746
    use crate::{Alignment, BufferMut, ByteBufferMut, buffer_mut};
747

748
    #[test]
749
    fn capacity() {
1✔
750
        let mut n = 57;
1✔
751
        let mut buf = BufferMut::<i32>::with_capacity_aligned(n, Alignment::new(1024));
1✔
752
        assert!(buf.capacity() >= 57);
1✔
753

754
        while n > 0 {
58✔
755
            buf.push(0);
57✔
756
            assert!(buf.capacity() >= n);
57✔
757
            n -= 1
57✔
758
        }
759

760
        assert_eq!(buf.alignment(), Alignment::new(1024));
1✔
761
    }
1✔
762

763
    #[test]
764
    fn from_iter() {
1✔
765
        let buf = BufferMut::from_iter([0, 10, 20, 30]);
1✔
766
        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
1✔
767
    }
1✔
768

769
    #[test]
770
    fn extend() {
1✔
771
        let mut buf = BufferMut::empty();
1✔
772
        buf.extend([0i32, 10, 20, 30]);
1✔
773
        buf.extend([40, 50, 60]);
1✔
774
        assert_eq!(buf.as_slice(), &[0, 10, 20, 30, 40, 50, 60]);
1✔
775
    }
1✔
776

777
    #[test]
778
    fn push() {
1✔
779
        let mut buf = BufferMut::empty();
1✔
780
        buf.push(1);
1✔
781
        buf.push(2);
1✔
782
        buf.push(3);
1✔
783
        assert_eq!(buf.as_slice(), &[1, 2, 3]);
1✔
784
    }
1✔
785

786
    #[test]
787
    fn push_n() {
1✔
788
        let mut buf = BufferMut::empty();
1✔
789
        buf.push_n(0, 100);
1✔
790
        assert_eq!(buf.as_slice(), &[0; 100]);
1✔
791
    }
1✔
792

793
    #[test]
794
    fn as_mut() {
1✔
795
        let mut buf = buffer_mut![0, 1, 2];
1✔
796
        // Uses DerefMut
797
        buf[1] = 0;
1✔
798
        // Uses as_mut
799
        buf.as_mut()[2] = 0;
1✔
800
        assert_eq!(buf.as_slice(), &[0, 0, 0]);
1✔
801
    }
1✔
802

803
    #[test]
804
    fn map_each() {
1✔
805
        let buf = buffer_mut![0i32, 1, 2];
1✔
806
        // Add one, and cast to an unsigned u32 in the same closure
807
        let buf = buf.map_each(|i| (i + 1) as u32);
3✔
808
        assert_eq!(buf.as_slice(), &[1u32, 2, 3]);
1✔
809
    }
1✔
810

811
    #[test]
812
    fn bytes_buf() {
1✔
813
        let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes());
1✔
814
        assert_eq!(buf.remaining(), 10);
1✔
815
        assert_eq!(buf.chunk(), b"helloworld");
1✔
816

817
        Buf::advance(&mut buf, 5);
1✔
818
        assert_eq!(buf.remaining(), 5);
1✔
819
        assert_eq!(buf.as_slice(), b"world");
1✔
820
        assert_eq!(buf.chunk(), b"world");
1✔
821
    }
1✔
822

823
    #[test]
824
    fn bytes_buf_mut() {
1✔
825
        let mut buf = ByteBufferMut::copy_from("hello".as_bytes());
1✔
826
        assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5);
1✔
827

828
        BufMut::put_slice(&mut buf, b"world");
1✔
829
        assert_eq!(buf.as_slice(), b"helloworld");
1✔
830
    }
1✔
831

832
    #[test]
833
    fn push_n_u8_large() {
1✔
834
        let mut buf = BufferMut::<u8>::with_capacity_aligned(512, Alignment::new(32));
1✔
835
        buf.push_n(42u8, 512);
1✔
836
        assert_eq!(buf.len(), 512);
1✔
837
        assert!(buf.as_slice().iter().all(|&x| x == 42));
512✔
838
    }
1✔
839

840
    #[test]
841
    fn push_n_u16_large() {
1✔
842
        let mut buf = BufferMut::<u16>::with_capacity_aligned(1026, Alignment::new(32));
1✔
843
        buf.push_n(0x1234, 513);
1✔
844
        assert_eq!(buf.len(), 513);
1✔
845
        assert!(buf.as_slice().iter().all(|&x| x == 0x1234));
513✔
846
    }
1✔
847

848
    #[test]
849
    fn push_n_u32_large() {
1✔
850
        let mut buf = BufferMut::<u32>::with_capacity_aligned(2056, Alignment::new(32));
1✔
851
        buf.push_n(0x12345678, 514);
1✔
852
        assert_eq!(buf.len(), 514);
1✔
853
        assert!(buf.as_slice().iter().all(|&x| x == 0x12345678));
514✔
854
    }
1✔
855

856
    #[test]
857
    fn push_n_u64_large() {
1✔
858
        let mut buf = BufferMut::<u64>::with_capacity_aligned(4120, Alignment::new(32));
1✔
859
        buf.push_n(0x123456789ABCDEF, 515);
1✔
860
        assert_eq!(buf.len(), 515);
1✔
861
        assert!(buf.as_slice().iter().all(|&x| x == 0x123456789ABCDEFu64));
515✔
862
    }
1✔
863
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc