• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16593958537

29 Jul 2025 10:48AM UTC coverage: 82.285% (+0.5%) from 81.796%
16593958537

Pull #4036

github

web-flow
Merge 04147cb0f into 348079fc3
Pull Request #4036: varbinview builder buffer deduplication

146 of 154 new or added lines in 2 files covered. (94.81%)

348 existing lines in 26 files now uncovered.

44470 of 54044 relevant lines covered (82.28%)

169522.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.72
/vortex-array/src/builders/varbinview.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::cmp::max;
6
use std::sync::Arc;
7

8
use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
9
use vortex_dtype::{DType, Nullability};
10
use vortex_error::{VortexExpect, VortexResult};
11
use vortex_mask::Mask;
12
use vortex_utils::aliases::hash_map::{Entry, HashMap};
13

14
use crate::arrays::{BinaryView, VarBinViewArray};
15
use crate::builders::ArrayBuilder;
16
use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
17
use crate::{Array, ArrayRef, IntoArray, ToCanonical};
18

19
pub struct VarBinViewBuilder {
20
    views_builder: BufferMut<BinaryView>,
21
    pub null_buffer_builder: LazyNullBufferBuilder,
22
    completed: CompletedBuffers,
23
    in_progress: ByteBufferMut,
24
    nullability: Nullability,
25
    dtype: DType,
26
}
27

28
impl VarBinViewBuilder {
29
    // TODO(joe): add a block growth strategy, from arrow
30
    const BLOCK_SIZE: u32 = 8 * 8 * 1024;
31

32
    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
7,416✔
33
        Self::new(dtype, capacity, Default::default())
7,416✔
34
    }
7,416✔
35

36
    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
1✔
37
        Self::new(
1✔
38
            dtype,
1✔
39
            capacity,
1✔
40
            CompletedBuffers::Deduplicated(Default::default()),
1✔
41
        )
42
    }
1✔
43

44
    fn new(dtype: DType, capacity: usize, completed: CompletedBuffers) -> Self {
7,417✔
45
        assert!(
7,417✔
46
            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
7,417✔
47
            "VarBinViewBuilder DType must be Utf8 or Binary."
×
48
        );
49
        Self {
7,417✔
50
            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
7,417✔
51
            null_buffer_builder: LazyNullBufferBuilder::new(capacity),
7,417✔
52
            completed,
7,417✔
53
            in_progress: ByteBufferMut::empty(),
7,417✔
54
            nullability: dtype.nullability(),
7,417✔
55
            dtype,
7,417✔
56
        }
7,417✔
57
    }
7,417✔
58

59
    fn append_value_view(&mut self, value: &[u8]) {
2,384,610✔
60
        let length =
2,384,610✔
61
            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
2,384,610✔
62
        if length <= 12 {
2,384,610✔
63
            self.views_builder.push(BinaryView::make_view(value, 0, 0));
172,920✔
64
            return;
172,920✔
65
        }
2,211,690✔
66

67
        let required_cap = self.in_progress.len() + value.len();
2,211,690✔
68
        if self.in_progress.capacity() < required_cap {
2,211,690✔
69
            self.flush_in_progress();
1,830✔
70
            let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
1,830✔
71
            self.in_progress.reserve(to_reserve);
1,830✔
72
        };
2,209,860✔
73

74
        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
2,211,690✔
75
        self.in_progress.extend_from_slice(value);
2,211,690✔
76
        let view = BinaryView::make_view(
2,211,690✔
77
            value,
2,211,690✔
78
            // buffer offset
79
            self.completed.len(),
2,211,690✔
80
            offset,
2,211,690✔
81
        );
82
        self.views_builder.push(view);
2,211,690✔
83
    }
2,384,610✔
84

85
    #[inline]
86
    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
2,384,610✔
87
        self.append_value_view(value.as_ref());
2,384,610✔
88
        self.null_buffer_builder.append_non_null();
2,384,610✔
89
    }
2,384,610✔
90

91
    #[inline]
92
    pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
5,762✔
93
        match value {
5,762✔
94
            Some(value) => self.append_value(value),
5,761✔
95
            None => self.append_null(),
1✔
96
        }
97
    }
5,762✔
98

99
    #[inline]
100
    fn flush_in_progress(&mut self) {
16,389✔
101
        if self.in_progress.is_empty() {
16,389✔
102
            return;
14,559✔
103
        }
1,830✔
104
        let block = std::mem::take(&mut self.in_progress).freeze();
1,830✔
105

106
        assert!(block.len() < u32::MAX as usize, "Block too large");
1,830✔
107

108
        let initial_len = self.completed.len();
1,830✔
109
        self.completed.push(block);
1,830✔
110
        assert_eq!(
1,830✔
111
            self.completed.len(),
1,830✔
112
            initial_len + 1,
1,830✔
NEW
113
            "Invalid state, just completed block already exists"
×
114
        );
115
    }
16,389✔
116

117
    pub fn completed_block_count(&self) -> usize {
764✔
118
        self.completed.len() as usize
764✔
119
    }
764✔
120

121
    // Pushes an array of values into the buffer, where the buffers are sections of a
122
    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
123
    // buffers adjusted.
124
    // The views must all point to sections of the buffers and the validity length must match
125
    // the view length.
126
    /// ## Panics
127
    /// Panics if this builder deduplicates buffers and if any of the given buffers already
128
    /// exists on this builder
129
    pub fn push_buffer_and_adjusted_views(
760✔
130
        &mut self,
760✔
131
        buffer: &[ByteBuffer],
760✔
132
        views: &Buffer<BinaryView>,
760✔
133
        validity_mask: Mask,
760✔
134
    ) {
760✔
135
        self.flush_in_progress();
760✔
136

137
        let expected_completed_len = self.completed.len() as usize + buffer.len();
760✔
138
        self.completed.extend_from_slice(buffer);
760✔
139
        assert_eq!(
760✔
140
            self.completed.len() as usize,
760✔
141
            expected_completed_len,
NEW
142
            "Some buffers already exist",
×
143
        );
144
        self.views_builder.extend_trusted(views.iter().copied());
760✔
145
        self.push_only_validity_mask(validity_mask);
760✔
146

147
        debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
760✔
148
    }
760✔
149

150
    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
7,416✔
151
        self.flush_in_progress();
7,416✔
152
        let buffers = std::mem::take(&mut self.completed);
7,416✔
153

154
        assert_eq!(
7,416✔
155
            self.views_builder.len(),
7,416✔
156
            self.null_buffer_builder.len(),
7,416✔
157
            "View and validity length must match"
×
158
        );
159

160
        let validity = self
7,416✔
161
            .null_buffer_builder
7,416✔
162
            .finish_with_nullability(self.nullability);
7,416✔
163

164
        VarBinViewArray::try_new(
7,416✔
165
            std::mem::take(&mut self.views_builder).freeze(),
7,416✔
166
            buffers.finish(),
7,416✔
167
            std::mem::replace(&mut self.dtype, DType::Null),
7,416✔
168
            validity,
7,416✔
169
        )
170
        .vortex_expect("VarBinViewArray components should be valid.")
7,416✔
171
    }
7,416✔
172
}
173

174
impl VarBinViewBuilder {
175
    // Pushes a validity mask into the builder not affecting the views or buffers
176
    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
7,143✔
177
        self.null_buffer_builder.append_validity_mask(validity_mask);
7,143✔
178
    }
7,143✔
179
}
180

181
impl ArrayBuilder for VarBinViewBuilder {
182
    fn as_any(&self) -> &dyn Any {
×
183
        self
×
184
    }
×
185

186
    fn as_any_mut(&mut self) -> &mut dyn Any {
6,519✔
187
        self
6,519✔
188
    }
6,519✔
189

190
    #[inline]
191
    fn dtype(&self) -> &DType {
25,108✔
192
        &self.dtype
25,108✔
193
    }
25,108✔
194

195
    #[inline]
196
    fn len(&self) -> usize {
15,698✔
197
        self.null_buffer_builder.len()
15,698✔
198
    }
15,698✔
199

200
    #[inline]
201
    fn append_zeros(&mut self, n: usize) {
1✔
202
        self.views_builder.push_n(BinaryView::empty_view(), n);
1✔
203
        self.null_buffer_builder.append_n_non_nulls(n);
1✔
204
    }
1✔
205

206
    #[inline]
207
    fn append_nulls(&mut self, n: usize) {
2,142✔
208
        self.views_builder.push_n(BinaryView::empty_view(), n);
2,142✔
209
        self.null_buffer_builder.append_n_nulls(n);
2,142✔
210
    }
2,142✔
211

212
    #[inline]
213
    fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
6,383✔
214
        let array = array.to_varbinview()?;
6,383✔
215
        self.flush_in_progress();
6,383✔
216

217
        let new_indices = self.completed.extend_from_slice(array.buffers());
6,383✔
218

219
        self.views_builder
6,383✔
220
            .extend_trusted(array.views().iter().map(|view| new_indices.map_view(view)));
394,403✔
221

222
        self.push_only_validity_mask(array.validity_mask()?);
6,383✔
223

224
        Ok(())
6,383✔
225
    }
6,383✔
226

227
    fn ensure_capacity(&mut self, capacity: usize) {
×
228
        if capacity > self.views_builder.capacity() {
×
229
            self.views_builder
×
230
                .reserve(capacity - self.views_builder.len());
×
231
            self.null_buffer_builder.ensure_capacity(capacity);
×
232
        }
×
233
    }
×
234

235
    fn set_validity(&mut self, validity: Mask) {
×
236
        self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
×
237
        self.null_buffer_builder.append_validity_mask(validity);
×
238
    }
×
239

240
    fn finish(&mut self) -> ArrayRef {
5,433✔
241
        self.finish_into_varbinview().into_array()
5,433✔
242
    }
5,433✔
243
}
244

245
enum CompletedBuffers {
246
    Default(Vec<ByteBuffer>),
247
    Deduplicated(DeduplicatedBuffers),
248
}
249

250
impl Default for CompletedBuffers {
251
    fn default() -> Self {
14,832✔
252
        Self::Default(Vec::new())
14,832✔
253
    }
14,832✔
254
}
255

256
// Self::push enforces len < u32::max
257
#[allow(clippy::cast_possible_truncation)]
258
impl CompletedBuffers {
259
    fn len(&self) -> u32 {
2,219,464✔
260
        match self {
2,219,464✔
261
            Self::Default(buffers) => buffers.len() as u32,
2,219,460✔
262
            Self::Deduplicated(buffers) => buffers.len(),
4✔
263
        }
264
    }
2,219,464✔
265

266
    fn push(&mut self, block: ByteBuffer) -> u32 {
1,830✔
267
        match self {
1,830✔
268
            Self::Default(buffers) => {
1,830✔
269
                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
1,830✔
270
                buffers.push(block);
1,830✔
271
                self.len()
1,830✔
272
            }
NEW
273
            Self::Deduplicated(buffers) => buffers.push(block),
×
274
        }
275
    }
1,830✔
276

277
    fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
7,143✔
278
        match self {
7,143✔
279
            Self::Default(buffers) => {
7,137✔
280
                let offset = buffers.len() as u32;
7,137✔
281
                buffers.extend_from_slice(new_buffers);
7,137✔
282
                NewIndices::ConstantOffset(offset)
7,137✔
283
            }
284
            Self::Deduplicated(buffers) => {
6✔
285
                NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
6✔
286
            }
287
        }
288
    }
7,143✔
289

290
    fn finish(self) -> Arc<[ByteBuffer]> {
7,416✔
291
        match self {
7,416✔
292
            Self::Default(buffers) => Arc::from(buffers),
7,416✔
NEW
293
            Self::Deduplicated(buffers) => buffers.finish(),
×
294
        }
295
    }
7,416✔
296
}
297

298
enum NewIndices {
299
    // add a constant offset to get the new idx
300
    ConstantOffset(u32),
301
    // lookup from the given array to get the new idx
302
    LookupArray(Vec<u32>),
303
}
304

305
impl NewIndices {
306
    fn map_view(&self, view: &BinaryView) -> BinaryView {
394,403✔
307
        match self {
394,403✔
308
            Self::ConstantOffset(offset) => view.offset_view(*offset),
394,396✔
309
            Self::LookupArray(lookup) => {
7✔
310
                if view.is_inlined() {
7✔
311
                    *view
2✔
312
                } else {
313
                    let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
5✔
314
                    view.with_buffer_idx(new_buffer_idx)
5✔
315
                }
316
            }
317
        }
318
    }
394,403✔
319
}
320

321
#[derive(Default)]
322
struct DeduplicatedBuffers {
323
    buffers: Vec<ByteBuffer>,
324
    buffer_to_idx: HashMap<BufferId, u32>,
325
}
326

327
impl DeduplicatedBuffers {
328
    // Self::push enforces len < u32::max
329
    #[allow(clippy::cast_possible_truncation)]
330
    fn len(&self) -> u32 {
10✔
331
        self.buffers.len() as u32
10✔
332
    }
10✔
333

334
    /// Push a new block if not seen before. Returns the idx of the block.
335
    fn push(&mut self, block: ByteBuffer) -> u32 {
6✔
336
        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
6✔
337

338
        let initial_len = self.len();
6✔
339
        let id = BufferId::from(&block);
6✔
340
        match self.buffer_to_idx.entry(id) {
6✔
341
            Entry::Occupied(idx) => *idx.get(),
4✔
342
            Entry::Vacant(entry) => {
2✔
343
                let idx = initial_len;
2✔
344
                entry.insert(idx);
2✔
345
                self.buffers.push(block);
2✔
346
                idx
2✔
347
            }
348
        }
349
    }
6✔
350

351
    fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
6✔
352
        buffers
6✔
353
            .iter()
6✔
354
            .map(|buffer| self.push(buffer.clone()))
6✔
355
            .collect()
6✔
356
    }
6✔
357

NEW
358
    fn finish(self) -> Arc<[ByteBuffer]> {
×
NEW
359
        Arc::from(self.buffers)
×
NEW
360
    }
×
361
}
362

363
#[derive(PartialEq, Eq, Hash)]
364
struct BufferId {
365
    // *const u8 stored as usize for `Send`
366
    ptr: usize,
367
    len: usize,
368
}
369

370
impl BufferId {
371
    fn from(buffer: &ByteBuffer) -> Self {
6✔
372
        let slice = buffer.as_slice();
6✔
373
        Self {
6✔
374
            ptr: slice.as_ptr() as usize,
6✔
375
            len: slice.len(),
6✔
376
        }
6✔
377
    }
6✔
378
}
379

380
#[cfg(test)]
381
mod tests {
382
    use std::str::from_utf8;
383

384
    use itertools::Itertools;
385
    use vortex_dtype::{DType, Nullability};
386

387
    use crate::ToCanonical;
388
    use crate::accessor::ArrayAccessor;
389
    use crate::arrays::VarBinViewVTable;
390
    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
391

392
    #[test]
393
    fn test_utf8_builder() {
1✔
394
        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
395

396
        builder.append_option(Some("Hello"));
1✔
397
        builder.append_option::<&str>(None);
1✔
398
        builder.append_value("World");
1✔
399

400
        builder.append_nulls(2);
1✔
401

402
        builder.append_zeros(2);
1✔
403
        builder.append_value("test");
1✔
404

405
        let arr = builder.finish();
1✔
406

407
        let arr = arr
1✔
408
            .as_::<VarBinViewVTable>()
1✔
409
            .with_iterator(|iter| {
1✔
410
                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
8✔
411
                    .collect_vec()
1✔
412
            })
1✔
413
            .unwrap();
1✔
414
        assert_eq!(arr.len(), 8);
1✔
415
        assert_eq!(
1✔
416
            arr,
417
            vec![
1✔
418
                Some("Hello".to_string()),
1✔
419
                None,
1✔
420
                Some("World".to_string()),
1✔
421
                None,
1✔
422
                None,
1✔
423
                Some("".to_string()),
1✔
424
                Some("".to_string()),
1✔
425
                Some("test".to_string()),
1✔
426
            ]
427
        );
428
    }
1✔
429

430
    #[test]
431
    fn test_utf8_builder_with_extend() {
1✔
432
        let array = {
1✔
433
            let mut builder =
1✔
434
                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
435
            builder.append_null();
1✔
436
            builder.append_value("Hello2");
1✔
437
            builder.finish()
1✔
438
        };
439
        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
440

441
        builder.append_option(Some("Hello1"));
1✔
442
        builder.extend_from_array(&array).unwrap();
1✔
443
        builder.append_nulls(2);
1✔
444
        builder.append_value("Hello3");
1✔
445

446
        let arr = builder.finish().to_varbinview().unwrap();
1✔
447

448
        let arr = arr
1✔
449
            .with_iterator(|iter| {
1✔
450
                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
6✔
451
                    .collect_vec()
1✔
452
            })
1✔
453
            .unwrap();
1✔
454
        assert_eq!(arr.len(), 6);
1✔
455
        assert_eq!(
1✔
456
            arr,
457
            vec![
1✔
458
                Some("Hello1".to_string()),
1✔
459
                None,
1✔
460
                Some("Hello2".to_string()),
1✔
461
                None,
1✔
462
                None,
1✔
463
                Some("Hello3".to_string()),
1✔
464
            ]
465
        );
466
    }
1✔
467

468
    #[test]
469
    fn test_buffer_deduplication() {
1✔
470
        let array = {
1✔
471
            let mut builder =
1✔
472
                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
473
            builder.append_value("This is a long string that should not be inlined");
1✔
474
            builder.append_value("short string");
1✔
475
            builder.finish_into_varbinview()
1✔
476
        };
477

478
        assert_eq!(array.buffers().len(), 1);
1✔
479
        let mut builder =
1✔
480
            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
1✔
481

482
        array.append_to_builder(&mut builder).unwrap();
1✔
483
        assert_eq!(builder.completed_block_count(), 1);
1✔
484

485
        array
1✔
486
            .slice(1, 2)
1✔
487
            .unwrap()
1✔
488
            .append_to_builder(&mut builder)
1✔
489
            .unwrap();
1✔
490
        array
1✔
491
            .slice(0, 1)
1✔
492
            .unwrap()
1✔
493
            .append_to_builder(&mut builder)
1✔
494
            .unwrap();
1✔
495
        assert_eq!(builder.completed_block_count(), 1);
1✔
496

497
        let array2 = {
1✔
498
            let mut builder =
1✔
499
                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
500
            builder.append_value("This is a long string that should not be inlined");
1✔
501
            builder.finish_into_varbinview()
1✔
502
        };
503

504
        array2.append_to_builder(&mut builder).unwrap();
1✔
505
        assert_eq!(builder.completed_block_count(), 2);
1✔
506

507
        array
1✔
508
            .slice(0, 1)
1✔
509
            .unwrap()
1✔
510
            .append_to_builder(&mut builder)
1✔
511
            .unwrap();
1✔
512
        array2
1✔
513
            .slice(0, 1)
1✔
514
            .unwrap()
1✔
515
            .append_to_builder(&mut builder)
1✔
516
            .unwrap();
1✔
517
        assert_eq!(builder.completed_block_count(), 2);
1✔
518
    }
1✔
519
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc