• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16331938722

16 Jul 2025 10:49PM UTC coverage: 80.702% (-0.9%) from 81.557%
16331938722

push

github

web-flow
feat: build with stable rust (#3881)

120 of 173 new or added lines in 28 files covered. (69.36%)

174 existing lines in 102 files now uncovered.

41861 of 51871 relevant lines covered (80.7%)

157487.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.22
/vortex-buffer/src/buffer_mut.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use core::mem::MaybeUninit;
5
use std::any::type_name;
6
use std::fmt::{Debug, Formatter};
7
use std::io::Write;
8
use std::ops::{Deref, DerefMut};
9

10
use bytes::buf::UninitSlice;
11
use bytes::{Buf, BufMut, BytesMut};
12
use vortex_error::{VortexExpect, vortex_panic};
13

14
use crate::debug::TruncatedDebug;
15
use crate::trusted_len::TrustedLen;
16
use crate::{Alignment, Buffer, ByteBufferMut};
17

18
/// A mutable buffer that maintains a runtime-defined alignment through resizing operations.
19
#[derive(PartialEq, Eq)]
20
pub struct BufferMut<T> {
21
    pub(crate) bytes: BytesMut,
22
    pub(crate) length: usize,
23
    pub(crate) alignment: Alignment,
24
    pub(crate) _marker: std::marker::PhantomData<T>,
25
}
26

27
impl<T> BufferMut<T> {
28
    /// Create a new `BufferMut` with the requested alignment and capacity.
29
    pub fn with_capacity(capacity: usize) -> Self {
424,163✔
30
        Self::with_capacity_aligned(capacity, Alignment::of::<T>())
424,163✔
31
    }
424,163✔
32

33
    /// Create a new `BufferMut` with the requested alignment and capacity.
34
    pub fn with_capacity_aligned(capacity: usize, alignment: Alignment) -> Self {
528,130✔
35
        if !alignment.is_aligned_to(Alignment::of::<T>()) {
528,130✔
36
            vortex_panic!(
×
37
                "Alignment {} must align to the scalar type's alignment {}",
×
38
                alignment,
39
                align_of::<T>()
40
            );
41
        }
528,130✔
42

43
        let mut bytes = BytesMut::with_capacity((capacity * size_of::<T>()) + *alignment);
528,130✔
44
        bytes.align_empty(alignment);
528,130✔
45

46
        Self {
528,130✔
47
            bytes,
528,130✔
48
            length: 0,
528,130✔
49
            alignment,
528,130✔
50
            _marker: Default::default(),
528,130✔
51
        }
528,130✔
52
    }
528,130✔
53

54
    /// Create a new zeroed `BufferMut`.
55
    pub fn zeroed(len: usize) -> Self {
×
56
        Self::zeroed_aligned(len, Alignment::of::<T>())
×
57
    }
×
58

59
    /// Create a new zeroed `BufferMut`.
60
    pub fn zeroed_aligned(len: usize, alignment: Alignment) -> Self {
19,488✔
61
        let mut bytes = BytesMut::zeroed((len * size_of::<T>()) + *alignment);
19,488✔
62
        bytes.advance(bytes.as_ptr().align_offset(*alignment));
19,488✔
63
        unsafe { bytes.set_len(len * size_of::<T>()) };
19,488✔
64
        Self {
19,488✔
65
            bytes,
19,488✔
66
            length: len,
19,488✔
67
            alignment,
19,488✔
68
            _marker: Default::default(),
19,488✔
69
        }
19,488✔
70
    }
19,488✔
71

72
    /// Create a new empty `BufferMut` with the provided alignment.
73
    pub fn empty() -> Self {
65,378✔
74
        Self::empty_aligned(Alignment::of::<T>())
65,378✔
75
    }
65,378✔
76

77
    /// Create a new empty `BufferMut` with the provided alignment.
78
    pub fn empty_aligned(alignment: Alignment) -> Self {
66,549✔
79
        BufferMut::with_capacity_aligned(0, alignment)
66,549✔
80
    }
66,549✔
81

82
    /// Create a new full `BufferMut` with the given value.
83
    pub fn full(item: T, len: usize) -> Self
8,401✔
84
    where
8,401✔
85
        T: Copy,
8,401✔
86
    {
87
        let mut buffer = BufferMut::<T>::with_capacity(len);
8,401✔
88
        buffer.push_n(item, len);
8,401✔
89
        buffer
8,401✔
90
    }
8,401✔
91

92
    /// Create a mutable scalar buffer by copying the contents of the slice.
93
    pub fn copy_from(other: impl AsRef<[T]>) -> Self {
16,614✔
94
        Self::copy_from_aligned(other, Alignment::of::<T>())
16,614✔
95
    }
16,614✔
96

97
    /// Create a mutable scalar buffer with the alignment by copying the contents of the slice.
98
    ///
99
    /// ## Panics
100
    ///
101
    /// Panics when the requested alignment isn't itself aligned to type T.
102
    pub fn copy_from_aligned(other: impl AsRef<[T]>, alignment: Alignment) -> Self {
25,267✔
103
        if !alignment.is_aligned_to(Alignment::of::<T>()) {
25,267✔
104
            vortex_panic!("Given alignment is not aligned to type T")
×
105
        }
25,267✔
106
        let other = other.as_ref();
25,267✔
107
        let mut buffer = Self::with_capacity_aligned(other.len(), alignment);
25,267✔
108
        buffer.extend_from_slice(other);
25,267✔
109
        debug_assert_eq!(buffer.alignment(), alignment);
25,267✔
110
        buffer
25,267✔
111
    }
25,267✔
112

113
    /// Get the alignment of the buffer.
114
    #[inline(always)]
115
    pub fn alignment(&self) -> Alignment {
41,883✔
116
        self.alignment
41,883✔
117
    }
41,883✔
118

119
    /// Returns the length of the buffer.
120
    #[inline(always)]
121
    pub fn len(&self) -> usize {
5,376,375✔
122
        debug_assert_eq!(self.length, self.bytes.len() / size_of::<T>());
5,376,375✔
123
        self.length
5,376,375✔
124
    }
5,376,375✔
125

126
    /// Returns whether the buffer is empty.
127
    #[inline(always)]
128
    pub fn is_empty(&self) -> bool {
12,787✔
129
        self.length == 0
12,787✔
130
    }
12,787✔
131

132
    /// Returns the capacity of the buffer.
133
    #[inline]
134
    pub fn capacity(&self) -> usize {
817,371✔
135
        self.bytes.capacity() / size_of::<T>()
817,371✔
136
    }
817,371✔
137

138
    /// Returns a slice over the buffer of elements of type T.
139
    #[inline]
140
    pub fn as_slice(&self) -> &[T] {
11,294,208✔
141
        let raw_slice = self.bytes.as_ref();
11,294,208✔
142
        // SAFETY: alignment of Buffer is checked on construction
143
        unsafe { std::slice::from_raw_parts(raw_slice.as_ptr().cast(), self.length) }
11,294,208✔
144
    }
11,294,208✔
145

146
    /// Returns a slice over the buffer of elements of type T.
147
    #[inline]
148
    pub fn as_mut_slice(&mut self) -> &mut [T] {
286,234✔
149
        let raw_slice = self.bytes.as_mut();
286,234✔
150
        // SAFETY: alignment of Buffer is checked on construction
151
        unsafe { std::slice::from_raw_parts_mut(raw_slice.as_mut_ptr().cast(), self.length) }
286,234✔
152
    }
286,234✔
153

154
    /// Clear the buffer, retaining any existing capacity.
155
    #[inline]
156
    pub fn clear(&mut self) {
3✔
157
        unsafe { self.bytes.set_len(0) }
3✔
158
        self.length = 0;
3✔
159
    }
3✔
160

161
    /// Shortens the buffer, keeping the first `len` bytes and dropping the
162
    /// rest.
163
    ///
164
    /// If `len` is greater than the buffer's current length, this has no
165
    /// effect.
166
    ///
167
    /// Existing underlying capacity is preserved.
168
    #[inline]
169
    pub fn truncate(&mut self, len: usize) {
2✔
170
        if len <= self.len() {
2✔
171
            // SAFETY: Shrinking the buffer cannot expose uninitialized bytes.
2✔
172
            unsafe { self.set_len(len) };
2✔
173
        }
2✔
174
    }
2✔
175

176
    /// Reserves capacity for at least `additional` more elements to be inserted in the buffer.
177
    #[inline]
178
    pub fn reserve(&mut self, additional: usize) {
14,467,576✔
179
        let additional_bytes = additional * size_of::<T>();
14,467,576✔
180
        if additional_bytes <= self.bytes.capacity() - self.bytes.len() {
14,467,576✔
181
            // We can fit the additional bytes in the remaining capacity. Nothing to do.
182
            return;
14,372,255✔
183
        }
95,321✔
184

185
        // Otherwise, reserve additional + alignment bytes in case we need to realign the buffer.
186
        self.reserve_allocate(additional);
95,321✔
187
    }
14,467,576✔
188

189
    /// A separate function so we can inline the reserve call's fast path. According to `BytesMut`
190
    /// this has significant performance implications.
191
    fn reserve_allocate(&mut self, additional: usize) {
95,321✔
192
        let new_capacity: usize = ((self.length + additional) * size_of::<T>()) + *self.alignment;
95,321✔
193
        // Make sure we at least double in size each time we re-allocate to amortize the cost
194
        let new_capacity = new_capacity.max(self.bytes.capacity() * 2);
95,321✔
195

196
        let mut bytes = BytesMut::with_capacity(new_capacity);
95,321✔
197
        bytes.align_empty(self.alignment);
95,321✔
198
        bytes.extend_from_slice(&self.bytes);
95,321✔
199
        self.bytes = bytes;
95,321✔
200
    }
95,321✔
201

202
    /// Returns the spare capacity of the buffer as a slice of `MaybeUninit<T>`.
203
    /// Has identical semantics to [`Vec::spare_capacity_mut`].
204
    ///
205
    /// The returned slice can be used to fill the buffer with data (e.g. by
206
    /// reading from a file) before marking the data as initialized using the
207
    /// [`set_len`] method.
208
    ///
209
    /// [`set_len`]: BufferMut::set_len
210
    /// [`Vec::spare_capacity_mut`]: Vec::spare_capacity_mut
211
    ///
212
    /// # Examples
213
    ///
214
    /// ```
215
    /// use vortex_buffer::BufferMut;
216
    ///
217
    /// // Allocate vector big enough for 10 elements.
218
    /// let mut b = BufferMut::<u64>::with_capacity(10);
219
    ///
220
    /// // Fill in the first 3 elements.
221
    /// let uninit = b.spare_capacity_mut();
222
    /// uninit[0].write(0);
223
    /// uninit[1].write(1);
224
    /// uninit[2].write(2);
225
    ///
226
    /// // Mark the first 3 elements of the vector as being initialized.
227
    /// unsafe {
228
    ///     b.set_len(3);
229
    /// }
230
    ///
231
    /// assert_eq!(b.as_slice(), &[0u64, 1, 2]);
232
    /// ```
233
    #[inline]
234
    pub fn spare_capacity_mut(&mut self) -> &mut [MaybeUninit<T>] {
670,038✔
235
        let dst = self.bytes.spare_capacity_mut().as_mut_ptr();
670,038✔
236
        unsafe {
670,038✔
237
            std::slice::from_raw_parts_mut(
670,038✔
238
                dst as *mut MaybeUninit<T>,
670,038✔
239
                self.capacity() - self.length,
670,038✔
240
            )
670,038✔
241
        }
670,038✔
242
    }
670,038✔
243

244
    /// # Safety
245
    /// The caller must ensure that the buffer was properly initialized up to `len`.
246
    #[inline]
247
    pub unsafe fn set_len(&mut self, len: usize) {
342,322✔
248
        unsafe { self.bytes.set_len(len * size_of::<T>()) };
342,322✔
249
        self.length = len;
342,322✔
250
    }
342,322✔
251

252
    /// Appends a scalar to the buffer.
253
    #[inline]
254
    pub fn push(&mut self, value: T) {
10,780,858✔
255
        self.reserve(1);
10,780,858✔
256
        unsafe { self.push_unchecked(value) }
10,780,858✔
257
    }
10,780,858✔
258

259
    /// Appends a scalar to the buffer without checking for sufficient capacity.
260
    ///
261
    /// ## Safety
262
    ///
263
    /// The caller must ensure there is sufficient capacity in the array.
264
    #[inline]
265
    pub unsafe fn push_unchecked(&mut self, item: T) {
22,150,640✔
266
        // SAFETY: the caller ensures we have sufficient capacity
267
        unsafe {
268
            let dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
22,150,640✔
269
            dst.write(item);
22,150,640✔
270
            self.bytes.set_len(self.bytes.len() + size_of::<T>())
22,150,640✔
271
        }
272
        self.length += 1;
22,150,640✔
273
    }
22,150,640✔
274

275
    /// Appends n scalars to the buffer.
276
    ///
277
    /// This function is slightly more optimized than `extend(iter::repeat_n(item, b))`.
278
    #[inline]
279
    pub fn push_n(&mut self, item: T, n: usize)
8,805✔
280
    where
8,805✔
281
        T: Copy,
8,805✔
282
    {
283
        self.reserve(n);
8,805✔
284
        unsafe { self.push_n_unchecked(item, n) }
8,805✔
285
    }
8,805✔
286

287
    /// Appends n scalars to the buffer.
288
    ///
289
    /// ## Safety
290
    ///
291
    /// The caller must ensure there is sufficient capacity in the array.
292
    #[inline]
293
    pub unsafe fn push_n_unchecked(&mut self, item: T, n: usize)
9,417✔
294
    where
9,417✔
295
        T: Copy,
9,417✔
296
    {
297
        let mut dst: *mut T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
9,417✔
298
        // SAFETY: we checked the capacity in the reserve call
299
        unsafe {
300
            let end = dst.add(n);
9,417✔
301
            while dst < end {
40,371,098✔
302
                dst.write(item);
40,361,681✔
303
                dst = dst.add(1);
40,361,681✔
304
            }
40,361,681✔
305
            self.bytes.set_len(self.bytes.len() + (n * size_of::<T>()));
9,417✔
306
        }
307
        self.length += n;
9,417✔
308
    }
9,417✔
309

310
    /// Appends a slice of type `T`, growing the internal buffer as needed.
311
    ///
312
    /// # Example:
313
    ///
314
    /// ```
315
    /// # use vortex_buffer::BufferMut;
316
    ///
317
    /// let mut builder = BufferMut::<u16>::with_capacity(10);
318
    /// builder.extend_from_slice(&[42, 44, 46]);
319
    ///
320
    /// assert_eq!(builder.len(), 3);
321
    /// ```
322
    #[inline]
323
    pub fn extend_from_slice(&mut self, slice: &[T]) {
3,402,153✔
324
        self.reserve(slice.len());
3,402,153✔
325
        let raw_slice =
3,402,153✔
326
            unsafe { std::slice::from_raw_parts(slice.as_ptr().cast(), size_of_val(slice)) };
3,402,153✔
327
        self.bytes.extend_from_slice(raw_slice);
3,402,153✔
328
        self.length += slice.len();
3,402,153✔
329
    }
3,402,153✔
330

331
    /// Freeze the `BufferMut` into a `Buffer`.
332
    pub fn freeze(self) -> Buffer<T> {
511,967✔
333
        Buffer {
511,967✔
334
            bytes: self.bytes.freeze(),
511,967✔
335
            length: self.length,
511,967✔
336
            alignment: self.alignment,
511,967✔
337
            _marker: Default::default(),
511,967✔
338
        }
511,967✔
339
    }
511,967✔
340

341
    /// Map each element of the buffer with a closure.
342
    pub fn map_each<R, F>(self, mut f: F) -> BufferMut<R>
5,179✔
343
    where
5,179✔
344
        T: Copy,
5,179✔
345
        F: FnMut(T) -> R,
5,179✔
346
    {
347
        assert_eq!(
5,179✔
348
            size_of::<T>(),
349
            size_of::<R>(),
350
            "Size of T and R do not match"
×
351
        );
352
        // SAFETY: we have checked that `size_of::<T>` == `size_of::<R>`.
353
        let mut buf: BufferMut<R> = unsafe { std::mem::transmute(self) };
5,179✔
354
        buf.iter_mut()
5,179✔
355
            .for_each(|item| *item = f(unsafe { std::mem::transmute_copy(item) }));
177,532,423✔
356
        buf
5,179✔
357
    }
5,179✔
358

359
    /// Return a `BufferMut<T>` with the given alignment. Where possible, this will be zero-copy.
360
    pub fn aligned(self, alignment: Alignment) -> Self {
×
361
        if self.as_ptr().align_offset(*alignment) == 0 {
×
362
            self
×
363
        } else {
364
            Self::copy_from_aligned(self, alignment)
×
365
        }
366
    }
×
367
}
368

369
impl<T> Clone for BufferMut<T> {
370
    fn clone(&self) -> Self {
1,993✔
371
        // NOTE(ngates): we cannot derive Clone since BytesMut copies on clone and the alignment
372
        //  might be messed up.
373
        let mut buffer = BufferMut::<T>::with_capacity_aligned(self.capacity(), self.alignment);
1,993✔
374
        buffer.extend_from_slice(self.as_slice());
1,993✔
375
        buffer
1,993✔
376
    }
1,993✔
377
}
378

379
impl<T: Debug> Debug for BufferMut<T> {
380
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
381
        f.debug_struct(&format!("BufferMut<{}>", type_name::<T>()))
×
382
            .field("length", &self.length)
×
383
            .field("alignment", &self.alignment)
×
384
            .field("as_slice", &TruncatedDebug(self.as_slice()))
×
385
            .finish()
×
386
    }
×
387
}
388

389
impl<T> Default for BufferMut<T> {
390
    fn default() -> Self {
36,601✔
391
        Self::empty()
36,601✔
392
    }
36,601✔
393
}
394

395
impl<T> Deref for BufferMut<T> {
396
    type Target = [T];
397

398
    fn deref(&self) -> &Self::Target {
11,290,990✔
399
        self.as_slice()
11,290,990✔
400
    }
11,290,990✔
401
}
402

403
impl<T> DerefMut for BufferMut<T> {
404
    fn deref_mut(&mut self) -> &mut Self::Target {
286,233✔
405
        self.as_mut_slice()
286,233✔
406
    }
286,233✔
407
}
408

409
impl<T> AsRef<[T]> for BufferMut<T> {
410
    fn as_ref(&self) -> &[T] {
16✔
411
        self.as_slice()
16✔
412
    }
16✔
413
}
414

415
impl<T> AsMut<[T]> for BufferMut<T> {
416
    fn as_mut(&mut self) -> &mut [T] {
1✔
417
        self.as_mut_slice()
1✔
418
    }
1✔
419
}
420

421
impl<T> BufferMut<T> {
422
    fn extend_iter(&mut self, mut iter: impl Iterator<Item = T>) {
19,140✔
423
        // Attempt to reserve enough memory up-front, although this is only a lower bound.
424
        let (lower, _) = iter.size_hint();
19,140✔
425
        self.reserve(lower);
19,140✔
426

427
        let remaining = self.capacity() - self.len();
19,140✔
428

429
        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
19,140✔
430
        let mut dst: *mut T = begin.cast_mut();
19,140✔
431
        for _ in 0..remaining {
19,140✔
432
            if let Some(item) = iter.next() {
76,886,062✔
433
                unsafe {
76,868,572✔
434
                    // SAFETY: We know we have enough capacity to write the item.
76,868,572✔
435
                    dst.write(item);
76,868,572✔
436
                    // Note. we used to have dst.add(iteration).write(item), here.
76,868,572✔
437
                    // however this was much slower than just incrementing dst.
76,868,572✔
438
                    dst = dst.add(1);
76,868,572✔
439
                }
76,868,572✔
440
            } else {
441
                break;
17,490✔
442
            }
443
        }
444

445
        // TODO(joe): replace with ptr_sub when stable
446
        let length = self.len() + unsafe { dst.byte_offset_from(begin) as usize / size_of::<T>() };
19,140✔
447
        unsafe { self.set_len(length) };
19,140✔
448

449
        // Append remaining elements
450
        iter.for_each(|item| self.push(item));
19,140✔
451
    }
19,140✔
452

453
    /// An unsafe variant of the `Extend` trait and its `extend` method that receives what the
454
    /// caller guarantees to be an iterator with a trusted upper bound.
455
    pub fn extend_trusted<I: TrustedLen<Item = T>>(&mut self, iter: I) {
64,141✔
456
        // Reserve all memory upfront since it's an exact upper bound
457
        let (_, high) = iter.size_hint();
64,141✔
458
        self.reserve(high.vortex_expect("TrustedLen iterator didn't have valid upper bound"));
64,141✔
459

460
        let begin: *const T = self.bytes.spare_capacity_mut().as_mut_ptr().cast();
64,141✔
461
        let mut dst: *mut T = begin.cast_mut();
64,141✔
462
        iter.for_each(|item| {
2,094,371✔
463
            unsafe {
2,094,371✔
464
                // SAFETY: We know we have enough capacity to write the item.
2,094,371✔
465
                dst.write(item);
2,094,371✔
466
                // Note. we used to have dst.add(iteration).write(item), here.
2,094,371✔
467
                // however this was much slower than just incrementing dst.
2,094,371✔
468
                dst = dst.add(1);
2,094,371✔
469
            }
2,094,371✔
470
        });
2,094,371✔
471
        // TODO(joe): replace with ptr_sub when stable
472
        let length = self.len() + unsafe { dst.byte_offset_from(begin) as usize / size_of::<T>() };
64,141✔
473
        unsafe { self.set_len(length) };
64,141✔
474
    }
64,141✔
475
}
476

477
impl<T> Extend<T> for BufferMut<T> {
478
    #[inline]
479
    fn extend<I: IntoIterator<Item = T>>(&mut self, iter: I) {
19,140✔
480
        self.extend_iter(iter.into_iter())
19,140✔
481
    }
19,140✔
482
}
483

484
impl<'a, T> Extend<&'a T> for BufferMut<T>
485
where
486
    T: Copy + 'a,
487
{
488
    #[inline]
UNCOV
489
    fn extend<I: IntoIterator<Item = &'a T>>(&mut self, iter: I) {
×
NEW
490
        self.extend_iter(iter.into_iter().copied())
×
UNCOV
491
    }
×
492
}
493

494
impl<T> FromIterator<T> for BufferMut<T> {
495
    fn from_iter<I: IntoIterator<Item = T>>(iter: I) -> Self {
16,623✔
496
        // We don't infer the capacity here and just let the first call to `extend` do it for us.
497
        let mut buffer = Self::with_capacity(0);
16,623✔
498
        buffer.extend(iter);
16,623✔
499
        debug_assert_eq!(buffer.alignment(), Alignment::of::<T>());
16,623✔
500
        buffer
16,615✔
501
    }
16,615✔
502
}
503

504
impl Buf for ByteBufferMut {
505
    fn remaining(&self) -> usize {
2✔
506
        self.len()
2✔
507
    }
2✔
508

509
    fn chunk(&self) -> &[u8] {
2✔
510
        self.as_slice()
2✔
511
    }
2✔
512

513
    fn advance(&mut self, cnt: usize) {
1✔
514
        if !cnt.is_multiple_of(*self.alignment) {
1✔
515
            vortex_panic!(
×
516
                "Cannot advance buffer by {} items, resulting alignment is not {}",
×
517
                cnt,
518
                self.alignment
519
            );
520
        }
1✔
521
        self.bytes.advance(cnt);
1✔
522
        self.length -= cnt;
1✔
523
    }
1✔
524
}
525

526
/// As per the BufMut implementation, we must support internal resizing when
527
/// asked to extend the buffer.
528
/// See: <https://github.com/tokio-rs/bytes/issues/131>
529
unsafe impl BufMut for ByteBufferMut {
530
    #[inline]
531
    fn remaining_mut(&self) -> usize {
5,939✔
532
        usize::MAX - self.len()
5,939✔
533
    }
5,939✔
534

535
    #[inline]
536
    unsafe fn advance_mut(&mut self, cnt: usize) {
×
537
        if !cnt.is_multiple_of(*self.alignment) {
×
538
            vortex_panic!(
×
539
                "Cannot advance buffer by {} items, resulting alignment is not {}",
×
540
                cnt,
541
                self.alignment
542
            );
543
        }
×
544
        unsafe { self.bytes.advance_mut(cnt) };
×
545
        self.length -= cnt;
×
546
    }
×
547

548
    #[inline]
549
    fn chunk_mut(&mut self) -> &mut UninitSlice {
×
550
        self.bytes.chunk_mut()
×
551
    }
×
552

553
    fn put<T: Buf>(&mut self, mut src: T)
×
554
    where
×
555
        Self: Sized,
×
556
    {
557
        while src.has_remaining() {
×
558
            let chunk = src.chunk();
×
559
            self.extend_from_slice(chunk);
×
560
            src.advance(chunk.len());
×
561
        }
×
562
    }
×
563

564
    #[inline]
565
    fn put_slice(&mut self, src: &[u8]) {
11,955✔
566
        self.extend_from_slice(src);
11,955✔
567
    }
11,955✔
568

569
    #[inline]
570
    fn put_bytes(&mut self, val: u8, cnt: usize) {
×
571
        self.push_n(val, cnt)
×
572
    }
×
573
}
574

575
/// Extension trait for [`BytesMut`] that provides functions for aligning the buffer.
576
trait AlignedBytesMut {
577
    /// Align an empty `BytesMut` to the specified alignment.
578
    ///
579
    /// ## Panics
580
    ///
581
    /// Panics if the buffer is not empty, or if there is not enough capacity to align the buffer.
582
    fn align_empty(&mut self, alignment: Alignment);
583
}
584

585
impl AlignedBytesMut for BytesMut {
586
    fn align_empty(&mut self, alignment: Alignment) {
734,550✔
587
        if !self.is_empty() {
734,550✔
588
            vortex_panic!("ByteBufferMut must be empty");
×
589
        }
734,550✔
590

591
        let padding = self.as_ptr().align_offset(*alignment);
734,550✔
592
        self.capacity()
734,550✔
593
            .checked_sub(padding)
734,550✔
594
            .vortex_expect("Not enough capacity to align buffer");
734,550✔
595

596
        // SAFETY: We know the buffer is empty, and we know we have enough capacity, so we can
597
        // safely set the length to the padding and advance the buffer to the aligned offset.
598
        unsafe { self.set_len(padding) };
734,550✔
599
        self.advance(padding);
734,550✔
600
    }
734,550✔
601
}
602

603
impl Write for ByteBufferMut {
604
    fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
3,280✔
605
        self.extend_from_slice(buf);
3,280✔
606
        Ok(buf.len())
3,280✔
607
    }
3,280✔
608

609
    fn flush(&mut self) -> std::io::Result<()> {
×
610
        Ok(())
×
611
    }
×
612
}
613

614
#[cfg(test)]
615
mod test {
616
    use bytes::{Buf, BufMut};
617

618
    use crate::{Alignment, BufferMut, ByteBufferMut, buffer_mut};
619

620
    #[test]
621
    fn capacity() {
1✔
622
        let mut n = 57;
1✔
623
        let mut buf = BufferMut::<i32>::with_capacity_aligned(n, Alignment::new(1024));
1✔
624
        assert!(buf.capacity() >= 57);
1✔
625

626
        while n > 0 {
58✔
627
            buf.push(0);
57✔
628
            assert!(buf.capacity() >= n);
57✔
629
            n -= 1
57✔
630
        }
631

632
        assert_eq!(buf.alignment(), Alignment::new(1024));
1✔
633
    }
1✔
634

635
    #[test]
636
    fn from_iter() {
1✔
637
        let buf = BufferMut::from_iter([0, 10, 20, 30]);
1✔
638
        assert_eq!(buf.as_slice(), &[0, 10, 20, 30]);
1✔
639
    }
1✔
640

641
    #[test]
642
    fn extend() {
1✔
643
        let mut buf = BufferMut::empty();
1✔
644
        buf.extend([0i32, 10, 20, 30]);
1✔
645
        buf.extend([40, 50, 60]);
1✔
646
        assert_eq!(buf.as_slice(), &[0, 10, 20, 30, 40, 50, 60]);
1✔
647
    }
1✔
648

649
    #[test]
650
    fn push() {
1✔
651
        let mut buf = BufferMut::empty();
1✔
652
        buf.push(1);
1✔
653
        buf.push(2);
1✔
654
        buf.push(3);
1✔
655
        assert_eq!(buf.as_slice(), &[1, 2, 3]);
1✔
656
    }
1✔
657

658
    #[test]
659
    fn push_n() {
1✔
660
        let mut buf = BufferMut::empty();
1✔
661
        buf.push_n(0, 100);
1✔
662
        assert_eq!(buf.as_slice(), &[0; 100]);
1✔
663
    }
1✔
664

665
    #[test]
666
    fn as_mut() {
1✔
667
        let mut buf = buffer_mut![0, 1, 2];
1✔
668
        // Uses DerefMut
669
        buf[1] = 0;
1✔
670
        // Uses as_mut
671
        buf.as_mut()[2] = 0;
1✔
672
        assert_eq!(buf.as_slice(), &[0, 0, 0]);
1✔
673
    }
1✔
674

675
    #[test]
676
    fn map_each() {
1✔
677
        let buf = buffer_mut![0i32, 1, 2];
1✔
678
        // Add one, and cast to an unsigned u32 in the same closure
679
        let buf = buf.map_each(|i| (i + 1) as u32);
3✔
680
        assert_eq!(buf.as_slice(), &[1u32, 2, 3]);
1✔
681
    }
1✔
682

683
    #[test]
684
    fn bytes_buf() {
1✔
685
        let mut buf = ByteBufferMut::copy_from("helloworld".as_bytes());
1✔
686
        assert_eq!(buf.remaining(), 10);
1✔
687
        assert_eq!(buf.chunk(), b"helloworld");
1✔
688

689
        Buf::advance(&mut buf, 5);
1✔
690
        assert_eq!(buf.remaining(), 5);
1✔
691
        assert_eq!(buf.as_slice(), b"world");
1✔
692
        assert_eq!(buf.chunk(), b"world");
1✔
693
    }
1✔
694

695
    #[test]
696
    fn bytes_buf_mut() {
1✔
697
        let mut buf = ByteBufferMut::copy_from("hello".as_bytes());
1✔
698
        assert_eq!(BufMut::remaining_mut(&buf), usize::MAX - 5);
1✔
699

700
        BufMut::put_slice(&mut buf, b"world");
1✔
701
        assert_eq!(buf.as_slice(), b"helloworld");
1✔
702
    }
1✔
703
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc