• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16946454057

13 Aug 2025 06:49PM UTC coverage: 87.673% (+0.006%) from 87.667%
16946454057

Pull #4214

github

web-flow
Merge 45e9cb840 into 8a5fd601e
Pull Request #4214: feat: streaming stores for decoding run-end `PrimitiveArray`

47 of 47 new or added lines in 2 files covered. (100.0%)

26 existing lines in 1 file now uncovered.

56538 of 64487 relevant lines covered (87.67%)

627586.57 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.28
/vortex-buffer/src/buffer_mut.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use core::mem::MaybeUninit;
5
use std::any::type_name;
6
#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
7
use std::arch::x86_64::{
8
    __m256i, _mm_sfence, _mm256_set1_epi8, _mm256_set1_epi16, _mm256_set1_epi32,
9
    _mm256_set1_epi64x, _mm256_stream_si256,
10
};
11
use std::fmt::{Debug, Formatter};
12
use std::io::Write;
13
use std::ops::{Deref, DerefMut};
14

15
use bytes::buf::UninitSlice;
16
use bytes::{Buf, BufMut, BytesMut};
17
use vortex_error::{VortexExpect, vortex_panic};
18

19
use crate::debug::TruncatedDebug;
20
use crate::trusted_len::TrustedLen;
21
use crate::{Alignment, Buffer, ByteBufferMut};
22

23
/// A mutable buffer that maintains a runtime-defined alignment through resizing operations.
24
#[derive(PartialEq, Eq)]
25
pub struct BufferMut<T> {
26
    pub(crate) bytes: BytesMut,
27
    pub(crate) length: usize,
28
    pub(crate) alignment: Alignment,
29
    pub(crate) _marker: std::marker::PhantomData<T>,
30
}
31

32
impl<T> BufferMut<T> {
33
    /// Create a new `BufferMut` with the requested alignment and capacity.
34
    pub fn with_capacity(capacity: usize) -> Self {
2,650,264✔
35
        Self::with_capacity_aligned(capacity, Alignment::of::<T>())
2,650,264✔
36
    }
2,650,264✔
37

38
    /// Create a new `BufferMut` with the requested alignment and capacity.
39
    pub fn with_capacity_aligned(capacity: usize, alignment: Alignment) -> Self {
3,298,131✔
40
        if !alignment.is_aligned_to(Alignment::of::<T>()) {
3,298,131✔
UNCOV
41
            vortex_panic!(
×
UNCOV
42
                "Alignment {} must align to the scalar type's alignment {}",
×
43
                alignment,
44
                align_of::<T>()
45
            );
46
        }
3,298,131✔
47

48
        let mut bytes = BytesMut::with_capacity((capacity * size_of::<T>()) + *alignment);
3,298,131✔
49
        bytes.align_empty(alignment);
3,298,131✔
50

51
        Self {
3,298,131✔
52
            bytes,
3,298,131✔
53
            length: 0,
3,298,131✔
54
            alignment,
3,298,131✔
55
            _marker: Default::default(),
3,298,131✔
56
        }
3,298,131✔
57
    }
3,298,131✔
58

59
    /// Create a new zeroed `BufferMut`.
UNCOV
60
    pub fn zeroed(len: usize) -> Self {
×
UNCOV
61
        Self::zeroed_aligned(len, Alignment::of::<T>())
×
UNCOV
62
    }
×
63

64
    /// Create a new zeroed `BufferMut`.
65
    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
25,594✔
66
        let mut bytes = BytesMut::zeroed((len * size_of::<T>()) + *alignment);
25,594✔
67
        bytes.advance(bytes.as_ptr().align_offset(*alignment));
25,594✔
68
        unsafe { bytes.set_len(len * size_of::<T>()) };
25,594✔
69
        Self {
25,594✔
70
            bytes,
25,594✔
71
            length: len,
25,594✔
72
            alignment,
25,594✔
73
            _marker: Default::default(),
25,594✔
74
        }
25,594✔
75
    }
25,594✔
76

77
    /// Create a new empty `BufferMut` with the provided alignment.
78
    pub fn empty() -> Self {
272,582✔
79
        Self::empty_aligned(Alignment::of::<T>())
272,582✔
80
    }
272,582✔
81

82
    /// Create a new empty `BufferMut` with the provided alignment.
83
    pub fn empty_aligned(alignment: Alignment) -> Self {
277,172✔
84
        BufferMut::with_capacity_aligned(0, alignment)
277,172✔
85
    }
277,172✔
86

87
    /// Create a new full `BufferMut` with the given value.
88
    pub fn full(item: T, len: usize) -> Self
201,753✔
89
    where
201,753✔
90
        T: Copy,
201,753✔
91
    {
92
        let mut buffer = BufferMut::<T>::with_capacity(len);
201,753✔
93
        buffer.push_n(item, len);
201,753✔
94
        buffer
201,753✔
95
    }
201,753✔
96

97
    /// Create a mutable scalar buffer by copying the contents of the slice.
98
    pub fn copy_from(other: impl AsRef<[T]>) -> Self {
73,095✔
99
        Self::copy_from_aligned(other, Alignment::of::<T>())
73,095✔
100
    }
73,095✔
101

102
    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
103
    ///
104
    /// ## Panics
105
    ///
106
    /// Panics when the requested alignment isn't itself aligned to type T.
107
    pub fn copy_from_aligned(other: impl AsRef<[T]>, alignment: Alignment) -> Self {
81,362✔
108
        if !alignment.is_aligned_to(Alignment::of::<T>()) {
81,362✔
UNCOV
109
            vortex_panic!("Given alignment is not aligned to type T")
×
110
        }
81,362✔
111
        let other = other.as_ref();
81,362✔
112
        let mut buffer = Self::with_capacity_aligned(other.len(), alignment);
81,362✔
113
        buffer.extend_from_slice(other);
81,362✔
114
        debug_assert_eq!(buffer.alignment(), alignment);
81,362✔
115
        buffer
81,362✔
116
    }
81,362✔
117

118
    /// Get the alignment of the buffer.
119
    #[inline(always)]
120
    pub fn alignment(&self) -> Alignment {
267,288✔
121
        self.alignment
267,288✔
122
    }
267,288✔
123

124
    /// Returns the length of the buffer.
125
    #[inline(always)]
126
    pub fn len(&self) -> usize {
13,403,336✔
127
        debug_assert_eq!(self.length, self.bytes.len() / size_of::<T>());
13,403,336✔
128
        self.length
13,403,336✔
129
    }
13,403,336✔
130

131
    /// Returns whether the buffer is empty.
132
    #[inline(always)]
133
    pub fn is_empty(&self) -> bool {
28,502✔
134
        self.length == 0
28,502✔
135
    }
28,502✔
136

137
    /// Returns the capacity of the buffer.
138
    #[inline]
139
    pub fn capacity(&self) -> usize {
5,468,854✔
140
        self.bytes.capacity() / size_of::<T>()
5,468,854✔
141
    }
5,468,854✔
142

143
    /// Returns a slice over the buffer of elements of type T.
144
    #[inline]
145
    pub fn as_slice(&self) -> &[T] {
17,009,265✔
146
        let raw_slice = self.bytes.as_ref();
17,009,265✔
147
        // SAFETY: alignment of Buffer is checked on construction
148
        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
17,009,265✔
149
    }
17,009,265✔
150

151
    /// Returns a slice over the buffer of elements of type T.
152
    #[inline]
153
    pub fn as_mut_slice(&mut self) -> &mut [T] {
1,938,227✔
154
        let raw_slice = self.bytes.as_mut();
1,938,227✔
155
        // SAFETY: alignment of Buffer is checked on construction
156
        unsafe { std::slice::from_raw_parts_mut(raw_slice.as_mut_ptr().cast(), self.length) }
1,938,227✔
157
    }
1,938,227✔
158

159
    /// Clear the buffer, retaining any existing capacity.
160
    #[inline]
161
    pub fn clear(&mut self) {
7,644✔
162
        unsafe { self.bytes.set_len(0) }
7,644✔
163
        self.length = 0;
7,644✔
164
    }
7,644✔
165

166
    /// Shortens the buffer, keeping the first `len` bytes and dropping the
167
    /// rest.
168
    ///
169
    /// If `len` is greater than the buffer's current length, this has no
170
    /// effect.
171
    ///
172
    /// Existing underlying capacity is preserved.
173
    #[inline]
174
    pub fn truncate(&mut self, len: usize) {
5✔
175
        if len <= self.len() {
5✔
176
            // SAFETY: Shrinking the buffer cannot expose uninitialized bytes.
5✔
177
            unsafe { self.set_len(len) };
5✔
178
        }
5✔
179
    }
5✔
180

181
    /// Reserves capacity for at least `additional` more elements to be inserted in the buffer.
182
    #[inline]
183
    pub fn reserve(&mut self, additional: usize) {
33,254,110✔
184
        let additional_bytes = additional * size_of::<T>();
33,254,110✔
185
        if additional_bytes <= self.bytes.capacity() - self.bytes.len() {
33,254,110✔
186
            // We can fit the additional bytes in the remaining capacity. Nothing to do.
187
            return;
32,661,466✔
188
        }
592,644✔
189

190
        // Otherwise, reserve additional + alignment bytes in case we need to realign the buffer.
191
        self.reserve_allocate(additional);
592,644✔
192
    }
33,254,110✔
193

194
    /// A separate function so we can inline the reserve call's fast path. According to `BytesMut`
195
    /// this has significant performance implications.
196
    fn reserve_allocate(&mut self, additional: usize) {
592,644✔
197
        let new_capacity: usize = ((self.length + additional) * size_of::<T>()) + *self.alignment;
592,644✔
198
        // Make sure we at least double in size each time we re-allocate to amortize the cost
199
        let new_capacity = new_capacity.max(self.bytes.capacity() * 2);
592,644✔
200

201
        let mut bytes = BytesMut::with_capacity(new_capacity);
592,644✔
202
        bytes.align_empty(self.alignment);
592,644✔
203
        bytes.extend_from_slice(&self.bytes);
592,644✔
204
        self.bytes = bytes;
592,644✔
205
    }
592,644✔
206

207
    /// Returns the spare capacity of the buffer as a slice of `MaybeUninit<T>`.
208
    /// Has identical semantics to [`Vec::spare_capacity_mut`].
209
    ///
210
    /// The returned slice can be used to fill the buffer with data (e.g. by
211
    /// reading from a file) before marking the data as initialized using the
212
    /// [`set_len`] method.
213
    ///
214
    /// [`set_len`]: BufferMut::set_len
215
    /// [`Vec::spare_capacity_mut`]: Vec::spare_capacity_mut
216
    ///
217
    /// # Examples
218
    ///
219
    /// ```
220
    /// use vortex_buffer::BufferMut;
221
    ///
222
    /// // Allocate vector big enough for 10 elements.
223
    /// let mut b = BufferMut::<u64>::with_capacity(10);
224
    ///
225
    /// // Fill in the first 3 elements.
226
    /// let uninit = b.spare_capacity_mut();
227
    /// uninit[0].write(0);
228
    /// uninit[1].write(1);
229
    /// uninit[2].write(2);
230
    ///
231
    /// // Mark the first 3 elements of the vector as being initialized.
232
    /// unsafe {
233
    ///     b.set_len(3);
234
    /// }
235
    ///
236
    /// assert_eq!(b.as_slice(), &[0u64, 1, 2]);
237
    /// ```
238
    #[inline]
239
    pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<T>] {
1,317,035✔
240
        let dst = self.bytes.spare_capacity_mut().as_mut_ptr();
1,317,035✔
241
        unsafe {
1,317,035✔
242
            std::slice::from_raw_parts_mut(
1,317,035✔
243
                dst as *mut MaybeUninit<T>,
1,317,035✔
244
                self.capacity() - self.length,
1,317,035✔
245
            )
1,317,035✔
246
        }
1,317,035✔
247
    }
1,317,035✔
248

249
    /// # Safety
250
    /// The caller must ensure that the buffer was properly initialized up to `len`.
251
    #[inline]
252
    pub unsafe fn set_len(&mut self, len: usize) {
2,381,618✔
253
        unsafe { self.bytes.set_len(len * size_of::<T>()) };
2,381,618✔
254
        self.length = len;
2,381,618✔
255
    }
2,381,618✔
256

257
    /// Appends a scalar to the buffer.
258
    #[inline]
259
    pub fn push(&mut self, value: T) {
23,308,872✔
260
        self.reserve(1);
23,308,872✔
261
        unsafe { self.push_unchecked(value) }
23,308,872✔
262
    }
23,308,872✔
263

264
    /// Appends a scalar to the buffer without checking for sufficient capacity.
265
    ///
266
    /// ## Safety
267
    ///
268
    /// The caller must ensure there is sufficient capacity in the array.
269
    #[inline]
270
    pub unsafe fn push_unchecked(&mut self, item: T) {
42,096,029✔
271
        // SAFETY: the caller ensures we have sufficient capacity
272
        unsafe {
273
            let dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
42,096,029✔
274
            dst.write(item);
42,096,029✔
275
            self.bytes.set_len(self.bytes.len() + size_of::<T>())
42,096,029✔
276
        }
277
        self.length += 1;
42,096,029✔
278
    }
42,096,029✔
279

280
    /// Appends n scalars to the buffer.
281
    ///
282
    /// This function is slightly more optimized than `extend(iter::repeat_n(item, b))`.
283
    #[inline]
284
    pub fn push_n(&mut self, item: T, n: usize)
210,636✔
285
    where
210,636✔
286
        T: Copy,
210,636✔
287
    {
288
        self.reserve(n);
210,636✔
289
        unsafe { self.push_n_unchecked(item, n) }
210,636✔
290
    }
210,636✔
291

292
    /// Appends n scalars to the buffer.
293
    ///
294
    /// ## Safety
295
    ///
296
    /// The caller must ensure there is sufficient capacity in the array.
297
    #[inline]
298
    pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
242,928✔
299
    where
242,928✔
300
        T: Copy,
242,928✔
301
    {
302
        #[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
303
        {
304
            // Ensure a minimum of 128 bytes to write for AVX streaming stores.
305
            if size_of::<T>() * n >= 128 {
306
                unsafe { avx::push_n_unchecked(self, item, n) }
307
            } else {
308
                unsafe { scalar::push_n_unchecked(self, item, n) }
309
            }
310
        }
311

312
        #[cfg(not(all(target_arch = "x86_64", target_feature = "avx2")))]
313
        unsafe {
314
            scalar::push_n_unchecked(self, item, n)
242,928✔
315
        }
316
    }
242,928✔
317

318
    /// Appends a slice of type `T`, growing the internal buffer as needed.
319
    ///
320
    /// # Example:
321
    ///
322
    /// ```
323
    /// # use vortex_buffer::BufferMut;
324
    ///
325
    /// let mut builder = BufferMut::<u16>::with_capacity(10);
326
    /// builder.extend_from_slice(&[42, 44, 46]);
327
    ///
328
    /// assert_eq!(builder.len(), 3);
329
    /// ```
330
    #[inline]
331
    pub fn extend_from_slice(&mut self, slice: &[T]) {
8,604,054✔
332
        self.reserve(slice.len());
8,604,054✔
333
        let raw_slice =
8,604,054✔
334
            unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), size_of_val(slice)) };
8,604,054✔
335
        self.bytes.extend_from_slice(raw_slice);
8,604,054✔
336
        self.length += slice.len();
8,604,054✔
337
    }
8,604,054✔
338

339
    /// Freeze the `BufferMut` into a `Buffer`.
340
    pub fn freeze(self) -> Buffer<T> {
3,129,665✔
341
        Buffer {
3,129,665✔
342
            bytes: self.bytes.freeze(),
3,129,665✔
343
            length: self.length,
3,129,665✔
344
            alignment: self.alignment,
3,129,665✔
345
            _marker: Default::default(),
3,129,665✔
346
        }
3,129,665✔
347
    }
3,129,665✔
348

349
    /// Map each element of the buffer with a closure.
350
    pub fn map_each<R, F>(self, mut f: F) -> BufferMut<R>
9,620✔
351
    where
9,620✔
352
        T: Copy,
9,620✔
353
        F: FnMut(T) -> R,
9,620✔
354
    {
355
        assert_eq!(
9,620✔
356
            size_of::<T>(),
357
            size_of::<R>(),
UNCOV
358
            "Size of T and R do not match"
×
359
        );
360
        // SAFETY: we have checked that `size_of::<T>` == `size_of::<R>`.
361
        let mut buf: BufferMut<R> = unsafe { std::mem::transmute(self) };
9,620✔
362
        buf.iter_mut()
9,620✔
363
            .for_each(|item| *item = f(unsafe { std::mem::transmute_copy(item) }));
263,063,234✔
364
        buf
9,620✔
365
    }
9,620✔
366

367
    /// Return a `BufferMut<T>` with the given alignment. Where possible, this will be zero-copy.
UNCOV
368
    pub fn aligned(self, alignment: Alignment) -> Self {
×
369
        if self.as_ptr().align_offset(*alignment) == 0 {
×
UNCOV
370
            self
×
371
        } else {
UNCOV
372
            Self::copy_from_aligned(self, alignment)
×
373
        }
UNCOV
374
    }
×
375
}
376

377
impl<T> Clone for BufferMut<T> {
378
    fn clone(&self) -> Self {
12,639✔
379
        // NOTE(ngates): we cannot derive Clone since BytesMut copies on clone and the alignment
380
        //  might be messed up.
381
        let mut buffer = BufferMut::<T>::with_capacity_aligned(self.capacity(), self.alignment);
12,639✔
382
        buffer.extend_from_slice(self.as_slice());
12,639✔
383
        buffer
12,639✔
384
    }
12,639✔
385
}
386

387
impl<T: Debug> Debug for BufferMut<T> {
388
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
389
        f.debug_struct(&format!("BufferMut<{}>", type_name::<T>()))
×
UNCOV
390
            .field("length", &self.length)
×
UNCOV
391
            .field("alignment", &self.alignment)
×
UNCOV
392
            .field("as_slice", &TruncatedDebug(self.as_slice()))
×
UNCOV
393
            .finish()
×
UNCOV
394
    }
×
395
}
396

397
impl<T> Default for BufferMut<T> {
398
    fn default() -> Self {
191,415✔
399
        Self::empty()
191,415✔
400
    }
191,415✔
401
}
402

403
impl<T> Deref for BufferMut<T> {
404
    type Target = [T];
405

406
    fn deref(&self) -> &Self::Target {
16,994,209✔
407
        self.as_slice()
16,994,209✔
408
    }
16,994,209✔
409
}
410

411
impl<T> DerefMut for BufferMut<T> {
412
    fn deref_mut(&mut self) -> &mut Self::Target {
1,938,226✔
413
        self.as_mut_slice()
1,938,226✔
414
    }
1,938,226✔
415
}
416

417
impl<T> AsRef<[T]> for BufferMut<T> {
418
    fn as_ref(&self) -> &[T] {
5✔
419
        self.as_slice()
5✔
420
    }
5✔
421
}
422

423
impl<T> AsMut<[T]> for BufferMut<T> {
424
    fn as_mut(&mut self) -> &mut [T] {
1✔
425
        self.as_mut_slice()
1✔
426
    }
1✔
427
}
428

429
impl<T> BufferMut<T> {
430
    fn extend_iter(&mut self, mut iter: impl Iterator<Item = T>) {
261,404✔
431
        // Attempt to reserve enough memory up-front, although this is only a lower bound.
432
        let (lower, _) = iter.size_hint();
261,404✔
433
        self.reserve(lower);
261,404✔
434

435
        let remaining = self.capacity() - self.len();
261,404✔
436

437
        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
261,404✔
438
        let mut dst: *mut T = begin.cast_mut();
261,404✔
439
        for _ in 0..remaining {
261,404✔
440
            if let Some(item) = iter.next() {
86,722,017✔
441
                unsafe {
86,497,145✔
442
                    // SAFETY: We know we have enough capacity to write the item.
86,497,145✔
443
                    dst.write(item);
86,497,145✔
444
                    // Note. we used to have dst.add(iteration).write(item), here.
86,497,145✔
445
                    // however this was much slower than just incrementing dst.
86,497,145✔
446
                    dst = dst.add(1);
86,497,145✔
447
                }
86,497,145✔
448
            } else {
449
                break;
224,872✔
450
            }
451
        }
452

453
        // TODO(joe): replace with ptr_sub when stable
454
        let length = self.len() + unsafe { dst.byte_offset_from(begin) as usize / size_of::<T>() };
261,404✔
455
        unsafe { self.set_len(length) };
261,404✔
456

457
        // Append remaining elements
458
        iter.for_each(|item| self.push(item));
261,404✔
459
    }
261,404✔
460

461
    /// An unsafe variant of the `Extend` trait and its `extend` method that receives what the
462
    /// caller guarantees to be an iterator with a trusted upper bound.
463
    pub fn extend_trusted<I: TrustedLen<Item = T>>(&mut self, iter: I) {
95,280✔
464
        // Reserve all memory upfront since it's an exact upper bound
465
        let (_, high) = iter.size_hint();
95,280✔
466
        self.reserve(high.vortex_expect("TrustedLen iterator didn't have valid upper bound"));
95,280✔
467

468
        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
95,280✔
469
        let mut dst: *mut T = begin.cast_mut();
95,280✔
470
        iter.for_each(|item| {
39,238,312✔
471
            unsafe {
39,238,312✔
472
                // SAFETY: We know we have enough capacity to write the item.
39,238,312✔
473
                dst.write(item);
39,238,312✔
474
                // Note. we used to have dst.add(iteration).write(item), here.
39,238,312✔
475
                // however this was much slower than just incrementing dst.
39,238,312✔
476
                dst = dst.add(1);
39,238,312✔
477
            }
39,238,312✔
478
        });
39,238,312✔
479
        // TODO(joe): replace with ptr_sub when stable
480
        let length = self.len() + unsafe { dst.byte_offset_from(begin) as usize / size_of::<T>() };
95,280✔
481
        unsafe { self.set_len(length) };
95,280✔
482
    }
95,280✔
483
}
484

485
mod scalar {
486
    use std::mem::size_of;
487

488
    use super::*;
489

490
    /// Appends n scalars to the buffer using scalar (non-SIMD) implementation.
491
    ///
492
    /// ## Safety
493
    ///
494
    /// The caller must ensure there is sufficient capacity in the buffer.
495
    #[inline]
496
    pub unsafe fn push_n_unchecked<T>(buffer: &mut BufferMut<T>, item: T, n: usize)
242,928✔
497
    where
242,928✔
498
        T: Copy,
242,928✔
499
    {
500
        let mut dst: *mut T = buffer.bytes.spare_capacity_mut().as_mut_ptr().cast();
242,928✔
501

502
        // Safety: Sufficient capacity is a precondition.
503
        unsafe {
504
            let end = dst.add(n);
242,928✔
505
            while dst < end {
45,788,056✔
506
                dst.write(item);
45,545,128✔
507
                dst = dst.add(1);
45,545,128✔
508
            }
45,545,128✔
509
            buffer
242,928✔
510
                .bytes
242,928✔
511
                .set_len(buffer.bytes.len() + (n * size_of::<T>()));
242,928✔
512
        }
513
        buffer.length += n;
242,928✔
514
    }
242,928✔
515
}
516

517
#[cfg(all(target_arch = "x86_64", target_feature = "avx2"))]
518
mod avx {
519
    use std::mem::size_of;
520

521
    use super::*;
522

523
    /// Appends n copies of item using `_mm256_stream_si256` streaming stores.
524
    ///
525
    /// ## Safety
526
    ///
527
    /// Caller must ensure sufficient capacity.
528
    #[inline]
529
    pub unsafe fn push_n_unchecked<T>(buffer: &mut BufferMut<T>, item: T, n: usize)
530
    where
531
        T: Copy,
532
    {
533
        // Only use AVX streaming stores if the buffer is aligned to 32 bytes.
534
        if buffer
535
            .bytes
536
            .spare_capacity_mut()
537
            .as_mut_ptr()
538
            .align_offset(align_of::<__m256i>())
539
            != 0
540
        {
541
            unsafe {
542
                scalar::push_n_unchecked(buffer, item, n);
543
            }
544
            panic!("Buffer is not aligned to 32 bytes");
545
            return;
546
        }
547

548
        let size = size_of::<T>();
549

550
        // Splat `item` into a vector register.
551
        let pattern = unsafe {
552
            match size {
553
                1 => _mm256_set1_epi8(std::ptr::read(&raw const item as *const i8)),
554
                2 => _mm256_set1_epi16(std::ptr::read(&raw const item as *const i16)),
555
                4 => _mm256_set1_epi32(std::ptr::read(&raw const item as *const i32)),
556
                8 => _mm256_set1_epi64x(std::ptr::read(&raw const item as *const i64)),
557
                _ => {
558
                    scalar::push_n_unchecked(buffer, item, n);
559
                    return;
560
                }
561
            }
562
        };
563

564
        let total_bytes = n * size;
565
        let mut ptr = buffer.bytes.spare_capacity_mut().as_mut_ptr() as *mut u8;
566
        let end = unsafe { ptr.add(total_bytes) };
567

568
        // Safety: Sufficient capacity is a precondition.
569
        for _ in 0..(total_bytes / 32) {
570
            unsafe {
571
                _mm256_stream_si256(ptr as *mut __m256i, pattern);
572
            }
573
            ptr = unsafe { ptr.add(32) };
574
        }
575

576
        // Ensure all writes are visible before updating buffer state.
577
        unsafe { _mm_sfence() };
578

579
        // Safety: Sufficient capacity is a precondition.
580
        unsafe {
581
            while ptr < end {
582
                (ptr as *mut T).write(item);
583
                ptr = ptr.add(size);
584
            }
585
        }
586

587
        unsafe { buffer.bytes.set_len(buffer.bytes.len() + total_bytes) };
588
        buffer.length += n;
589
    }
590
}
591

592
impl<T> Extend<T> for BufferMut<T> {
593
    #[inline]
594
    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
261,404✔
595
        self.extend_iter(iter.into_iter())
261,404✔
596
    }
261,404✔
597
}
598

599
impl<'a, T> Extend<&'a T> for BufferMut<T>
600
where
601
    T: Copy + 'a,
602
{
603
    #[inline]
UNCOV
604
    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
×
605
        self.extend_iter(iter.into_iter().copied())
×
606
    }
×
607
}
608

609
impl<T> FromIterator<T> for BufferMut<T> {
610
    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
185,933✔
611
        // We don't infer the capacity here and just let the first call to `extend` do it for us.
612
        let mut buffer = Self::with_capacity(0);
185,933✔
613
        buffer.extend(iter);
185,933✔
614
        debug_assert_eq!(buffer.alignment(), Alignment::of::<T>());
185,933✔
615
        buffer
185,925✔
616
    }
185,925✔
617
}
618

619
impl Buf for ByteBufferMut {
620
    fn remaining(&self) -> usize {
2✔
621
        self.len()
2✔
622
    }
2✔
623

624
    fn chunk(&self) -> &[u8] {
2✔
625
        self.as_slice()
2✔
626
    }
2✔
627

628
    fn advance(&mut self, cnt: usize) {
1✔
629
        if !cnt.is_multiple_of(*self.alignment) {
1✔
UNCOV
630
            vortex_panic!(
×
631
                "Cannot advance buffer by {} items, resulting alignment is not {}",
×
632
                cnt,
633
                self.alignment
634
            );
635
        }
1✔
636
        self.bytes.advance(cnt);
1✔
637
        self.length -= cnt;
1✔
638
    }
1✔
639
}
640

641
/// As per the BufMut implementation, we must support internal resizing when
642
/// asked to extend the buffer.
643
/// See: <https://github.com/tokio-rs/bytes/issues/131>
644
unsafe impl BufMut for ByteBufferMut {
645
    #[inline]
646
    fn remaining_mut(&self) -> usize {
7,065✔
647
        usize::MAX - self.len()
7,065✔
648
    }
7,065✔
649

650
    #[inline]
UNCOV
651
    unsafe fn advance_mut(&mut self, cnt: usize) {
×
652
        if !cnt.is_multiple_of(*self.alignment) {
×
653
            vortex_panic!(
×
654
                "Cannot advance buffer by {} items, resulting alignment is not {}",
×
655
                cnt,
656
                self.alignment
657
            );
UNCOV
658
        }
×
659
        unsafe { self.bytes.advance_mut(cnt) };
×
660
        self.length -= cnt;
×
661
    }
×
662

663
    #[inline]
UNCOV
664
    fn chunk_mut(&mut self) -> &mut UninitSlice {
×
665
        self.bytes.chunk_mut()
×
666
    }
×
667

UNCOV
668
    fn put<T: Buf>(&mut self, mut src: T)
×
669
    where
×
670
        Self: Sized,
×
671
    {
UNCOV
672
        while src.has_remaining() {
×
673
            let chunk = src.chunk();
×
674
            self.extend_from_slice(chunk);
×
675
            src.advance(chunk.len());
×
676
        }
×
677
    }
×
678

679
    #[inline]
680
    fn put_slice(&mut self, src: &[u8]) {
14,246✔
681
        self.extend_from_slice(src);
14,246✔
682
    }
14,246✔
683

684
    #[inline]
UNCOV
685
    fn put_bytes(&mut self, val: u8, cnt: usize) {
×
686
        self.push_n(val, cnt)
×
687
    }
×
688
}
689

690
/// Extension trait for [`BytesMut`] that provides functions for aligning the buffer.
691
trait AlignedBytesMut {
692
    /// Align an empty `BytesMut` to the specified alignment.
693
    ///
694
    /// ## Panics
695
    ///
696
    /// Panics if the buffer is not empty, or if there is not enough capacity to align the buffer.
697
    fn align_empty(&mut self, alignment: Alignment);
698
}
699

700
impl AlignedBytesMut for BytesMut {
701
    fn align_empty(&mut self, alignment: Alignment) {
4,201,525✔
702
        if !self.is_empty() {
4,201,525✔
UNCOV
703
            vortex_panic!("ByteBufferMut must be empty");
×
704
        }
4,201,525✔
705

706
        let padding = self.as_ptr().align_offset(*alignment);
4,201,525✔
707
        self.capacity()
4,201,525✔
708
            .checked_sub(padding)
4,201,525✔
709
            .vortex_expect("Not enough capacity to align buffer");
4,201,525✔
710

711
        // SAFETY: We know the buffer is empty, and we know we have enough capacity, so we can
712
        // safely set the length to the padding and advance the buffer to the aligned offset.
713
        unsafe { self.set_len(padding) };
4,201,525✔
714
        self.advance(padding);
4,201,525✔
715
    }
4,201,525✔
716
}
717

718
impl Write for ByteBufferMut {
719
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
6,174✔
720
        self.extend_from_slice(buf);
6,174✔
721
        Ok(buf.len())
6,174✔
722
    }
6,174✔
723

UNCOV
724
    fn flush(&mut self) -> std::io::Result<()> {
×
725
        Ok(())
×
726
    }
×
727
}
728

729
#[cfg(test)]
730
mod test {
731
    use bytes::{Buf, BufMut};
732

733
    use crate::{Alignment, BufferMut, ByteBufferMut, buffer_mut};
734

735
    #[test]
736
    fn capacity() {
1✔
737
        let mut n = 57;
1✔
738
        let mut buf = BufferMut::<i32>::with_capacity_aligned(n, Alignment::new(1024));
1✔
739
        assert!(buf.capacity() >= 57);
1✔
740

741
        while n > 0 {
58✔
742
            buf.push(0);
57✔
743
            assert!(buf.capacity() >= n);
57✔
744
            n -= 1
57✔
745
        }
746

747
        assert_eq!(buf.alignment(), Alignment::new(1024));
1✔
748
    }
1✔
749

750
    #[test]
751
    fn from_iter() {
1✔
752
        let buf = BufferMut::from_iter([0, 10, 20, 30]);
1✔
753
        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
1✔
754
    }
1✔
755

756
    #[test]
757
    fn extend() {
1✔
758
        let mut buf = BufferMut::empty();
1✔
759
        buf.extend([0i32, 10, 20, 30]);
1✔
760
        buf.extend([40, 50, 60]);
1✔
761
        assert_eq!(buf.as_slice(), &[0, 10, 20, 30, 40, 50, 60]);
1✔
762
    }
1✔
763

764
    #[test]
765
    fn push() {
1✔
766
        let mut buf = BufferMut::empty();
1✔
767
        buf.push(1);
1✔
768
        buf.push(2);
1✔
769
        buf.push(3);
1✔
770
        assert_eq!(buf.as_slice(), &[1, 2, 3]);
1✔
771
    }
1✔
772

773
    #[test]
774
    fn push_n() {
1✔
775
        let mut buf = BufferMut::empty();
1✔
776
        buf.push_n(0, 100);
1✔
777
        assert_eq!(buf.as_slice(), &[0; 100]);
1✔
778
    }
1✔
779

780
    #[test]
781
    fn as_mut() {
1✔
782
        let mut buf = buffer_mut![0, 1, 2];
1✔
783
        // Uses DerefMut
784
        buf[1] = 0;
1✔
785
        // Uses as_mut
786
        buf.as_mut()[2] = 0;
1✔
787
        assert_eq!(buf.as_slice(), &[0, 0, 0]);
1✔
788
    }
1✔
789

790
    #[test]
791
    fn map_each() {
1✔
792
        let buf = buffer_mut![0i32, 1, 2];
1✔
793
        // Add one, and cast to an unsigned u32 in the same closure
794
        let buf = buf.map_each(|i| (i + 1) as u32);
3✔
795
        assert_eq!(buf.as_slice(), &[1u32, 2, 3]);
1✔
796
    }
1✔
797

798
    #[test]
799
    fn bytes_buf() {
1✔
800
        let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes());
1✔
801
        assert_eq!(buf.remaining(), 10);
1✔
802
        assert_eq!(buf.chunk(), b"helloworld");
1✔
803

804
        Buf::advance(&mut buf, 5);
1✔
805
        assert_eq!(buf.remaining(), 5);
1✔
806
        assert_eq!(buf.as_slice(), b"world");
1✔
807
        assert_eq!(buf.chunk(), b"world");
1✔
808
    }
1✔
809

810
    #[test]
811
    fn bytes_buf_mut() {
1✔
812
        let mut buf = ByteBufferMut::copy_from("hello".as_bytes());
1✔
813
        assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5);
1✔
814

815
        BufMut::put_slice(&mut buf, b"world");
1✔
816
        assert_eq!(buf.as_slice(), b"helloworld");
1✔
817
    }
1✔
818

819
    #[test]
820
    fn push_n_u8_large() {
1✔
821
        let mut buf = BufferMut::<u8>::with_capacity_aligned(512, Alignment::new(32));
1✔
822
        buf.push_n(42u8, 512);
1✔
823
        assert_eq!(buf.len(), 512);
1✔
824
        assert!(buf.as_slice().iter().all(|&x| x == 42));
512✔
825
    }
1✔
826

827
    #[test]
828
    fn push_n_u16_large() {
1✔
829
        let mut buf = BufferMut::<u16>::with_capacity_aligned(1026, Alignment::new(32));
1✔
830
        buf.push_n(0x1234, 513);
1✔
831
        assert_eq!(buf.len(), 513);
1✔
832
        assert!(buf.as_slice().iter().all(|&x| x == 0x1234));
513✔
833
    }
1✔
834

835
    #[test]
836
    fn push_n_u32_large() {
1✔
837
        let mut buf = BufferMut::<u32>::with_capacity_aligned(2056, Alignment::new(32));
1✔
838
        buf.push_n(0x12345678, 514);
1✔
839
        assert_eq!(buf.len(), 514);
1✔
840
        assert!(buf.as_slice().iter().all(|&x| x == 0x12345678));
514✔
841
    }
1✔
842

843
    #[test]
844
    fn push_n_u64_large() {
1✔
845
        let mut buf = BufferMut::<u64>::with_capacity_aligned(4120, Alignment::new(32));
1✔
846
        buf.push_n(0x123456789ABCDEF, 515);
1✔
847
        assert_eq!(buf.len(), 515);
1✔
848
        assert!(buf.as_slice().iter().all(|&x| x == 0x123456789ABCDEFu64));
515✔
849
    }
1✔
850
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc