• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16574234784

28 Jul 2025 04:10PM UTC coverage: 81.821% (+0.03%) from 81.796%
16574234784

Pull #4036

github

web-flow
Merge 0a7735449 into 904980150
Pull Request #4036: varbinview builder buffer deduplication

97 of 100 new or added lines in 2 files covered. (97.0%)

34 existing lines in 2 files now uncovered.

43370 of 53006 relevant lines covered (81.82%)

170008.6 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.38
/vortex-array/src/builders/varbinview.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::cmp::max;
6
use std::sync::Arc;
7

8
use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
9
use vortex_dtype::{DType, Nullability};
10
use vortex_error::{VortexExpect, VortexResult};
11
use vortex_mask::Mask;
12
use vortex_utils::aliases::hash_map::{Entry, HashMap};
13

14
use crate::arrays::{BinaryView, VarBinViewArray};
15
use crate::builders::ArrayBuilder;
16
use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
17
use crate::{Array, ArrayRef, IntoArray, ToCanonical};
18

19
pub struct VarBinViewBuilder {
20
    views_builder: BufferMut<BinaryView>,
21
    pub null_buffer_builder: LazyNullBufferBuilder,
22
    completed: CompletedBuffers,
23
    in_progress: ByteBufferMut,
24
    nullability: Nullability,
25
    dtype: DType,
26
}
27

28
impl VarBinViewBuilder {
29
    // TODO(joe): add a block growth strategy, from arrow
30
    const BLOCK_SIZE: u32 = 8 * 8 * 1024;
31

32
    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
6,632✔
33
        assert!(
6,632✔
34
            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
6,632✔
35
            "VarBinViewBuilder DType must be Utf8 or Binary."
×
36
        );
37
        Self {
6,632✔
38
            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
6,632✔
39
            null_buffer_builder: LazyNullBufferBuilder::new(capacity),
6,632✔
40
            completed: Default::default(),
6,632✔
41
            in_progress: ByteBufferMut::empty(),
6,632✔
42
            nullability: dtype.nullability(),
6,632✔
43
            dtype,
6,632✔
44
        }
6,632✔
45
    }
6,632✔
46

47
    fn append_value_view(&mut self, value: &[u8]) {
2,312,192✔
48
        let length =
2,312,192✔
49
            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
2,312,192✔
50
        if length <= 12 {
2,312,192✔
51
            self.views_builder.push(BinaryView::make_view(value, 0, 0));
151,875✔
52
            return;
151,875✔
53
        }
2,160,317✔
54

55
        let required_cap = self.in_progress.len() + value.len();
2,160,317✔
56
        if self.in_progress.capacity() < required_cap {
2,160,317✔
57
            self.flush_in_progress();
1,805✔
58
            let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
1,805✔
59
            self.in_progress.reserve(to_reserve);
1,805✔
60
        };
2,158,514✔
61

62
        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
2,160,317✔
63
        self.in_progress.extend_from_slice(value);
2,160,317✔
64
        let view = BinaryView::make_view(
2,160,317✔
65
            value,
2,160,317✔
66
            // buffer offset
67
            self.completed.len(),
2,160,317✔
68
            offset,
2,160,317✔
69
        );
70
        self.views_builder.push(view);
2,160,317✔
71
    }
2,312,192✔
72

73
    #[inline]
74
    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
2,312,192✔
75
        self.append_value_view(value.as_ref());
2,312,192✔
76
        self.null_buffer_builder.append_non_null();
2,312,192✔
77
    }
2,312,192✔
78

79
    #[inline]
80
    pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
5,666✔
81
        match value {
5,666✔
82
            Some(value) => self.append_value(value),
5,665✔
83
            None => self.append_null(),
1✔
84
        }
85
    }
5,666✔
86

87
    #[inline]
88
    fn flush_in_progress(&mut self) {
15,502✔
89
        if self.in_progress.is_empty() {
15,502✔
90
            return;
13,697✔
91
        }
1,805✔
92
        let block = std::mem::take(&mut self.in_progress).freeze();
1,805✔
93

94
        assert!(block.len() < u32::MAX as usize, "Block too large");
1,805✔
95

96
        let initial_len = self.completed.len();
1,805✔
97
        self.completed.push(block);
1,805✔
98
        assert_eq!(
1,805✔
99
            self.completed.len(),
1,805✔
100
            initial_len + 1,
1,805✔
NEW
101
            "Invalid state, just completed block already exists"
×
102
        );
103
    }
15,502✔
104

105
    pub fn completed_block_count(&self) -> usize {
744✔
106
        self.completed.len() as usize
744✔
107
    }
744✔
108

109
    // Pushes an array of values into the buffer, where the buffers are sections of a
110
    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
111
    // buffers adjusted.
112
    // The views must all point to sections of the buffers and the validity length must match
113
    // the view length.
114
    /// ## Panics
115
    /// Panics if any of the given buffers already exists on this builder
116
    pub fn push_buffer_and_adjusted_views(
740✔
117
        &mut self,
740✔
118
        buffer: &[ByteBuffer],
740✔
119
        views: &Buffer<BinaryView>,
740✔
120
        validity_mask: Mask,
740✔
121
    ) {
740✔
122
        self.flush_in_progress();
740✔
123

124
        let expected_completed_len = self.completed.len() as usize + buffer.len();
740✔
125
        self.completed.extend_from_slice(buffer);
740✔
126
        assert_eq!(
740✔
127
            self.completed.len() as usize,
740✔
128
            expected_completed_len,
NEW
UNCOV
129
            "Some buffers already exist",
×
130
        );
131
        self.views_builder.extend_trusted(views.iter().copied());
740✔
132
        self.push_only_validity_mask(validity_mask);
740✔
133

134
        debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
740✔
135
    }
740✔
136

137
    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
6,631✔
138
        self.flush_in_progress();
6,631✔
139
        let buffers = std::mem::take(&mut self.completed);
6,631✔
140

141
        assert_eq!(
6,631✔
142
            self.views_builder.len(),
6,631✔
143
            self.null_buffer_builder.len(),
6,631✔
UNCOV
144
            "View and validity length must match"
×
145
        );
146

147
        let validity = self
6,631✔
148
            .null_buffer_builder
6,631✔
149
            .finish_with_nullability(self.nullability);
6,631✔
150

151
        VarBinViewArray::try_new(
6,631✔
152
            std::mem::take(&mut self.views_builder).freeze(),
6,631✔
153
            buffers.finish(),
6,631✔
154
            std::mem::replace(&mut self.dtype, DType::Null),
6,631✔
155
            validity,
6,631✔
156
        )
157
        .vortex_expect("VarBinViewArray components should be valid.")
6,631✔
158
    }
6,631✔
159
}
160

161
impl VarBinViewBuilder {
162
    // Pushes a validity mask into the builder not affecting the views or buffers
163
    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
7,066✔
164
        self.null_buffer_builder.append_validity_mask(validity_mask);
7,066✔
165
    }
7,066✔
166
}
167

168
impl ArrayBuilder for VarBinViewBuilder {
UNCOV
169
    fn as_any(&self) -> &dyn Any {
×
UNCOV
170
        self
×
UNCOV
171
    }
×
172

173
    fn as_any_mut(&mut self) -> &mut dyn Any {
6,403✔
174
        self
6,403✔
175
    }
6,403✔
176

177
    #[inline]
178
    fn dtype(&self) -> &DType {
24,736✔
179
        &self.dtype
24,736✔
180
    }
24,736✔
181

182
    #[inline]
183
    fn len(&self) -> usize {
15,530✔
184
        self.null_buffer_builder.len()
15,530✔
185
    }
15,530✔
186

187
    #[inline]
188
    fn append_zeros(&mut self, n: usize) {
1✔
189
        self.views_builder.push_n(BinaryView::empty_view(), n);
1✔
190
        self.null_buffer_builder.append_n_non_nulls(n);
1✔
191
    }
1✔
192

193
    #[inline]
194
    fn append_nulls(&mut self, n: usize) {
2,086✔
195
        self.views_builder.push_n(BinaryView::empty_view(), n);
2,086✔
196
        self.null_buffer_builder.append_n_nulls(n);
2,086✔
197
    }
2,086✔
198

199
    #[inline]
200
    fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
6,326✔
201
        let array = array.to_varbinview()?;
6,326✔
202
        self.flush_in_progress();
6,326✔
203

204
        let index_lookup = self.completed.extend_from_slice(array.buffers());
6,326✔
205

206
        self.views_builder
6,326✔
207
            .extend_trusted(array.views().iter().map(|view| {
392,940✔
208
                if view.is_inlined() {
392,940✔
209
                    *view
58,931✔
210
                } else {
211
                    let new_buffer_idx = index_lookup[view.as_view().buffer_index() as usize];
334,009✔
212
                    view.with_buffer_idx(new_buffer_idx)
334,009✔
213
                }
214
            }));
392,940✔
215

216
        self.push_only_validity_mask(array.validity_mask()?);
6,326✔
217

218
        Ok(())
6,326✔
219
    }
6,326✔
220

221
    fn ensure_capacity(&mut self, capacity: usize) {
×
222
        if capacity > self.views_builder.capacity() {
×
223
            self.views_builder
×
224
                .reserve(capacity - self.views_builder.len());
×
225
            self.null_buffer_builder.ensure_capacity(capacity);
×
226
        }
×
227
    }
×
228

229
    fn set_validity(&mut self, validity: Mask) {
×
230
        self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
×
231
        self.null_buffer_builder.append_validity_mask(validity);
×
232
    }
×
233

234
    fn finish(&mut self) -> ArrayRef {
4,696✔
235
        self.finish_into_varbinview().into_array()
4,696✔
236
    }
4,696✔
237
}
238

239
#[derive(Default)]
240
struct CompletedBuffers {
241
    buffers: Vec<ByteBuffer>,
242
    buffer_to_idx: HashMap<BufferId, u32>,
243
}
244

245
impl CompletedBuffers {
246
    // Self::push enforces len < u32::max
247
    #[allow(clippy::cast_possible_truncation)]
248
    fn len(&self) -> u32 {
2,186,653✔
249
        self.buffers.len() as u32
2,186,653✔
250
    }
2,186,653✔
251

252
    /// Push a new block if not seen before. Returns the idx of the block.
253
    fn push(&mut self, block: ByteBuffer) -> u32 {
20,502✔
254
        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
20,502✔
255

256
        let initial_len = self.len();
20,502✔
257
        let id = BufferId::from(&block);
20,502✔
258
        match self.buffer_to_idx.entry(id) {
20,502✔
259
            Entry::Occupied(idx) => *idx.get(),
15,521✔
260
            Entry::Vacant(entry) => {
4,981✔
261
                let idx = initial_len;
4,981✔
262
                entry.insert(idx);
4,981✔
263
                self.buffers.push(block);
4,981✔
264
                idx
4,981✔
265
            }
266
        }
267
    }
20,502✔
268

269
    fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
7,066✔
270
        buffers
7,066✔
271
            .iter()
7,066✔
272
            .map(|buffer| self.push(buffer.clone()))
18,920✔
273
            .collect()
7,066✔
274
    }
7,066✔
275

276
    fn finish(self) -> Arc<[ByteBuffer]> {
6,631✔
277
        Arc::from(self.buffers)
6,631✔
278
    }
6,631✔
279
}
280

281
#[derive(PartialEq, Eq, Hash)]
282
struct BufferId {
283
    // *const u8 stored as usize for `Send`
284
    ptr: usize,
285
    len: usize,
286
}
287

288
impl BufferId {
289
    fn from(buffer: &ByteBuffer) -> Self {
20,502✔
290
        let slice = buffer.as_slice();
20,502✔
291
        Self {
20,502✔
292
            ptr: slice.as_ptr() as usize,
20,502✔
293
            len: slice.len(),
20,502✔
294
        }
20,502✔
295
    }
20,502✔
296
}
297

298
#[cfg(test)]
299
mod tests {
300
    use std::str::from_utf8;
301

302
    use itertools::Itertools;
303
    use vortex_dtype::{DType, Nullability};
304

305
    use crate::ToCanonical;
306
    use crate::accessor::ArrayAccessor;
307
    use crate::arrays::VarBinViewVTable;
308
    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
309

310
    #[test]
311
    fn test_utf8_builder() {
1✔
312
        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
313

314
        builder.append_option(Some("Hello"));
1✔
315
        builder.append_option::<&str>(None);
1✔
316
        builder.append_value("World");
1✔
317

318
        builder.append_nulls(2);
1✔
319

320
        builder.append_zeros(2);
1✔
321
        builder.append_value("test");
1✔
322

323
        let arr = builder.finish();
1✔
324

325
        let arr = arr
1✔
326
            .as_::<VarBinViewVTable>()
1✔
327
            .with_iterator(|iter| {
1✔
328
                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
8✔
329
                    .collect_vec()
1✔
330
            })
1✔
331
            .unwrap();
1✔
332
        assert_eq!(arr.len(), 8);
1✔
333
        assert_eq!(
1✔
334
            arr,
335
            vec![
1✔
336
                Some("Hello".to_string()),
1✔
337
                None,
1✔
338
                Some("World".to_string()),
1✔
339
                None,
1✔
340
                None,
1✔
341
                Some("".to_string()),
1✔
342
                Some("".to_string()),
1✔
343
                Some("test".to_string()),
1✔
344
            ]
345
        );
346
    }
1✔
347

348
    #[test]
349
    fn test_utf8_builder_with_extend() {
1✔
350
        let array = {
1✔
351
            let mut builder =
1✔
352
                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
353
            builder.append_null();
1✔
354
            builder.append_value("Hello2");
1✔
355
            builder.finish()
1✔
356
        };
357
        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
358

359
        builder.append_option(Some("Hello1"));
1✔
360
        builder.extend_from_array(&array).unwrap();
1✔
361
        builder.append_nulls(2);
1✔
362
        builder.append_value("Hello3");
1✔
363

364
        let arr = builder.finish().to_varbinview().unwrap();
1✔
365

366
        let arr = arr
1✔
367
            .with_iterator(|iter| {
1✔
368
                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
6✔
369
                    .collect_vec()
1✔
370
            })
1✔
371
            .unwrap();
1✔
372
        assert_eq!(arr.len(), 6);
1✔
373
        assert_eq!(
1✔
374
            arr,
375
            vec![
1✔
376
                Some("Hello1".to_string()),
1✔
377
                None,
1✔
378
                Some("Hello2".to_string()),
1✔
379
                None,
1✔
380
                None,
1✔
381
                Some("Hello3".to_string()),
1✔
382
            ]
383
        );
384
    }
1✔
385

386
    #[test]
387
    fn test_buffer_deduplication() {
1✔
388
        let array = {
1✔
389
            let mut builder =
1✔
390
                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
391
            builder.append_value("This is a long string that should not be inlined");
1✔
392
            builder.append_value("short string");
1✔
393
            builder.finish_into_varbinview()
1✔
394
        };
395

396
        assert_eq!(array.buffers().len(), 1);
1✔
397
        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
398

399
        array.append_to_builder(&mut builder).unwrap();
1✔
400
        assert_eq!(builder.completed_block_count(), 1);
1✔
401

402
        array
1✔
403
            .slice(1, 2)
1✔
404
            .unwrap()
1✔
405
            .append_to_builder(&mut builder)
1✔
406
            .unwrap();
1✔
407
        array
1✔
408
            .slice(0, 1)
1✔
409
            .unwrap()
1✔
410
            .append_to_builder(&mut builder)
1✔
411
            .unwrap();
1✔
412
        assert_eq!(builder.completed_block_count(), 1);
1✔
413

414
        let array2 = {
1✔
415
            let mut builder =
1✔
416
                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
1✔
417
            builder.append_value("This is a long string that should not be inlined");
1✔
418
            builder.finish_into_varbinview()
1✔
419
        };
420

421
        array2.append_to_builder(&mut builder).unwrap();
1✔
422
        assert_eq!(builder.completed_block_count(), 2);
1✔
423

424
        array
1✔
425
            .slice(0, 1)
1✔
426
            .unwrap()
1✔
427
            .append_to_builder(&mut builder)
1✔
428
            .unwrap();
1✔
429
        array2
1✔
430
            .slice(0, 1)
1✔
431
            .unwrap()
1✔
432
            .append_to_builder(&mut builder)
1✔
433
            .unwrap();
1✔
434
        assert_eq!(builder.completed_block_count(), 2);
1✔
435
    }
1✔
436
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc