• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16935267080

13 Aug 2025 11:00AM UTC coverage: 24.312% (-63.3%) from 87.658%
16935267080

Pull #4226

github

web-flow
Merge 81b48c7fb into baa6ea202
Pull Request #4226: Support converting TimestampTZ to and from duckdb

0 of 2 new or added lines in 1 file covered. (0.0%)

20666 existing lines in 469 files now uncovered.

8726 of 35892 relevant lines covered (24.31%)

147.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/vortex-array/src/builders/varbinview.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::cmp::max;
6
use std::sync::Arc;
7

8
use vortex_buffer::{Buffer, BufferMut, ByteBuffer, ByteBufferMut};
9
use vortex_dtype::{DType, Nullability};
10
use vortex_error::{VortexExpect, VortexResult};
11
use vortex_mask::Mask;
12
use vortex_utils::aliases::hash_map::{Entry, HashMap};
13

14
use crate::arrays::{BinaryView, VarBinViewArray};
15
use crate::builders::ArrayBuilder;
16
use crate::builders::lazy_validity_builder::LazyNullBufferBuilder;
17
use crate::{Array, ArrayRef, IntoArray, ToCanonical};
18

19
pub struct VarBinViewBuilder {
20
    views_builder: BufferMut<BinaryView>,
21
    pub null_buffer_builder: LazyNullBufferBuilder,
22
    completed: CompletedBuffers,
23
    in_progress: ByteBufferMut,
24
    nullability: Nullability,
25
    dtype: DType,
26
}
27

28
impl VarBinViewBuilder {
29
    // TODO(joe): add a block growth strategy, from arrow
30
    const BLOCK_SIZE: u32 = 8 * 8 * 1024;
31

UNCOV
32
    pub fn with_capacity(dtype: DType, capacity: usize) -> Self {
×
UNCOV
33
        Self::new(dtype, capacity, Default::default())
×
UNCOV
34
    }
×
35

UNCOV
36
    pub fn with_buffer_deduplication(dtype: DType, capacity: usize) -> Self {
×
UNCOV
37
        Self::new(
×
UNCOV
38
            dtype,
×
UNCOV
39
            capacity,
×
UNCOV
40
            CompletedBuffers::Deduplicated(Default::default()),
×
41
        )
UNCOV
42
    }
×
43

UNCOV
44
    fn new(dtype: DType, capacity: usize, completed: CompletedBuffers) -> Self {
×
UNCOV
45
        assert!(
×
UNCOV
46
            matches!(dtype, DType::Utf8(_) | DType::Binary(_)),
×
47
            "VarBinViewBuilder DType must be Utf8 or Binary."
×
48
        );
UNCOV
49
        Self {
×
UNCOV
50
            views_builder: BufferMut::<BinaryView>::with_capacity(capacity),
×
UNCOV
51
            null_buffer_builder: LazyNullBufferBuilder::new(capacity),
×
UNCOV
52
            completed,
×
UNCOV
53
            in_progress: ByteBufferMut::empty(),
×
UNCOV
54
            nullability: dtype.nullability(),
×
UNCOV
55
            dtype,
×
UNCOV
56
        }
×
UNCOV
57
    }
×
58

UNCOV
59
    fn append_value_view(&mut self, value: &[u8]) {
×
UNCOV
60
        let length =
×
UNCOV
61
            u32::try_from(value.len()).vortex_expect("cannot have a single string >2^32 in length");
×
UNCOV
62
        if length <= 12 {
×
UNCOV
63
            self.views_builder.push(BinaryView::make_view(value, 0, 0));
×
UNCOV
64
            return;
×
UNCOV
65
        }
×
66

UNCOV
67
        let required_cap = self.in_progress.len() + value.len();
×
UNCOV
68
        if self.in_progress.capacity() < required_cap {
×
UNCOV
69
            self.flush_in_progress();
×
UNCOV
70
            let to_reserve = max(value.len(), VarBinViewBuilder::BLOCK_SIZE as usize);
×
UNCOV
71
            self.in_progress.reserve(to_reserve);
×
UNCOV
72
        };
×
73

UNCOV
74
        let offset = u32::try_from(self.in_progress.len()).vortex_expect("too many buffers");
×
UNCOV
75
        self.in_progress.extend_from_slice(value);
×
UNCOV
76
        let view = BinaryView::make_view(
×
UNCOV
77
            value,
×
78
            // buffer offset
UNCOV
79
            self.completed.len(),
×
UNCOV
80
            offset,
×
81
        );
UNCOV
82
        self.views_builder.push(view);
×
UNCOV
83
    }
×
84

85
    #[inline]
UNCOV
86
    pub fn append_value<S: AsRef<[u8]>>(&mut self, value: S) {
×
UNCOV
87
        self.append_value_view(value.as_ref());
×
UNCOV
88
        self.null_buffer_builder.append_non_null();
×
UNCOV
89
    }
×
90

91
    #[inline]
UNCOV
92
    pub fn append_option<S: AsRef<[u8]>>(&mut self, value: Option<S>) {
×
UNCOV
93
        match value {
×
UNCOV
94
            Some(value) => self.append_value(value),
×
UNCOV
95
            None => self.append_null(),
×
96
        }
UNCOV
97
    }
×
98

99
    #[inline]
UNCOV
100
    fn flush_in_progress(&mut self) {
×
UNCOV
101
        if self.in_progress.is_empty() {
×
UNCOV
102
            return;
×
UNCOV
103
        }
×
UNCOV
104
        let block = std::mem::take(&mut self.in_progress).freeze();
×
105

UNCOV
106
        assert!(block.len() < u32::MAX as usize, "Block too large");
×
107

UNCOV
108
        let initial_len = self.completed.len();
×
UNCOV
109
        self.completed.push(block);
×
UNCOV
110
        assert_eq!(
×
UNCOV
111
            self.completed.len(),
×
UNCOV
112
            initial_len + 1,
×
113
            "Invalid state, just completed block already exists"
×
114
        );
UNCOV
115
    }
×
116

UNCOV
117
    pub fn completed_block_count(&self) -> usize {
×
UNCOV
118
        self.completed.len() as usize
×
UNCOV
119
    }
×
120

121
    // Pushes an array of values into the buffer, where the buffers are sections of a
122
    // VarBinView and the views are the BinaryView's of the VarBinView *already with their*
123
    // buffers adjusted.
124
    // The views must all point to sections of the buffers and the validity length must match
125
    // the view length.
126
    /// ## Panics
127
    /// Panics if this builder deduplicates buffers and if any of the given buffers already
128
    /// exists on this builder
UNCOV
129
    pub fn push_buffer_and_adjusted_views(
×
UNCOV
130
        &mut self,
×
UNCOV
131
        buffer: &[ByteBuffer],
×
UNCOV
132
        views: &Buffer<BinaryView>,
×
UNCOV
133
        validity_mask: Mask,
×
UNCOV
134
    ) {
×
UNCOV
135
        self.flush_in_progress();
×
136

UNCOV
137
        let expected_completed_len = self.completed.len() as usize + buffer.len();
×
UNCOV
138
        self.completed.extend_from_slice(buffer);
×
UNCOV
139
        assert_eq!(
×
UNCOV
140
            self.completed.len() as usize,
×
141
            expected_completed_len,
142
            "Some buffers already exist",
×
143
        );
UNCOV
144
        self.views_builder.extend_trusted(views.iter().copied());
×
UNCOV
145
        self.push_only_validity_mask(validity_mask);
×
146

UNCOV
147
        debug_assert_eq!(self.null_buffer_builder.len(), self.views_builder.len())
×
UNCOV
148
    }
×
149

UNCOV
150
    pub fn finish_into_varbinview(&mut self) -> VarBinViewArray {
×
UNCOV
151
        self.flush_in_progress();
×
UNCOV
152
        let buffers = std::mem::take(&mut self.completed);
×
153

UNCOV
154
        assert_eq!(
×
UNCOV
155
            self.views_builder.len(),
×
UNCOV
156
            self.null_buffer_builder.len(),
×
157
            "View and validity length must match"
×
158
        );
159

UNCOV
160
        let validity = self
×
UNCOV
161
            .null_buffer_builder
×
UNCOV
162
            .finish_with_nullability(self.nullability);
×
163

UNCOV
164
        VarBinViewArray::try_new(
×
UNCOV
165
            std::mem::take(&mut self.views_builder).freeze(),
×
UNCOV
166
            buffers.finish(),
×
UNCOV
167
            std::mem::replace(&mut self.dtype, DType::Null),
×
UNCOV
168
            validity,
×
169
        )
UNCOV
170
        .vortex_expect("VarBinViewArray components should be valid.")
×
UNCOV
171
    }
×
172
}
173

174
impl VarBinViewBuilder {
175
    // Pushes a validity mask into the builder not affecting the views or buffers
UNCOV
176
    fn push_only_validity_mask(&mut self, validity_mask: Mask) {
×
UNCOV
177
        self.null_buffer_builder.append_validity_mask(validity_mask);
×
UNCOV
178
    }
×
179
}
180

181
impl ArrayBuilder for VarBinViewBuilder {
182
    fn as_any(&self) -> &dyn Any {
×
183
        self
×
184
    }
×
185

UNCOV
186
    fn as_any_mut(&mut self) -> &mut dyn Any {
×
UNCOV
187
        self
×
UNCOV
188
    }
×
189

190
    #[inline]
UNCOV
191
    fn dtype(&self) -> &DType {
×
UNCOV
192
        &self.dtype
×
UNCOV
193
    }
×
194

195
    #[inline]
UNCOV
196
    fn len(&self) -> usize {
×
UNCOV
197
        self.null_buffer_builder.len()
×
UNCOV
198
    }
×
199

200
    #[inline]
UNCOV
201
    fn append_zeros(&mut self, n: usize) {
×
UNCOV
202
        self.views_builder.push_n(BinaryView::empty_view(), n);
×
UNCOV
203
        self.null_buffer_builder.append_n_non_nulls(n);
×
UNCOV
204
    }
×
205

206
    #[inline]
UNCOV
207
    fn append_nulls(&mut self, n: usize) {
×
UNCOV
208
        self.views_builder.push_n(BinaryView::empty_view(), n);
×
UNCOV
209
        self.null_buffer_builder.append_n_nulls(n);
×
UNCOV
210
    }
×
211

212
    #[inline]
UNCOV
213
    fn extend_from_array(&mut self, array: &dyn Array) -> VortexResult<()> {
×
UNCOV
214
        let array = array.to_varbinview()?;
×
UNCOV
215
        self.flush_in_progress();
×
216

UNCOV
217
        let new_indices = self.completed.extend_from_slice(array.buffers());
×
218

UNCOV
219
        match new_indices {
×
UNCOV
220
            NewIndices::ConstantOffset(offset) => {
×
UNCOV
221
                self.views_builder
×
UNCOV
222
                    .extend_trusted(array.views().iter().map(|view| view.offset_view(offset)));
×
223
            }
UNCOV
224
            NewIndices::LookupArray(lookup) => {
×
UNCOV
225
                self.views_builder
×
UNCOV
226
                    .extend_trusted(array.views().iter().map(|view| {
×
UNCOV
227
                        if view.is_inlined() {
×
UNCOV
228
                            *view
×
229
                        } else {
UNCOV
230
                            let new_buffer_idx = lookup[view.as_view().buffer_index() as usize];
×
UNCOV
231
                            view.with_buffer_idx(new_buffer_idx)
×
232
                        }
UNCOV
233
                    }));
×
234
            }
235
        }
236

UNCOV
237
        self.push_only_validity_mask(array.validity_mask()?);
×
238

UNCOV
239
        Ok(())
×
UNCOV
240
    }
×
241

242
    fn ensure_capacity(&mut self, capacity: usize) {
×
243
        if capacity > self.views_builder.capacity() {
×
244
            self.views_builder
×
245
                .reserve(capacity - self.views_builder.len());
×
246
            self.null_buffer_builder.ensure_capacity(capacity);
×
247
        }
×
248
    }
×
249

UNCOV
250
    fn set_validity(&mut self, validity: Mask) {
×
UNCOV
251
        self.null_buffer_builder = LazyNullBufferBuilder::new(validity.len());
×
UNCOV
252
        self.null_buffer_builder.append_validity_mask(validity);
×
UNCOV
253
    }
×
254

UNCOV
255
    fn finish(&mut self) -> ArrayRef {
×
UNCOV
256
        self.finish_into_varbinview().into_array()
×
UNCOV
257
    }
×
258
}
259

260
enum CompletedBuffers {
261
    Default(Vec<ByteBuffer>),
262
    Deduplicated(DeduplicatedBuffers),
263
}
264

265
impl Default for CompletedBuffers {
UNCOV
266
    fn default() -> Self {
×
UNCOV
267
        Self::Default(Vec::new())
×
UNCOV
268
    }
×
269
}
270

271
// Self::push enforces len < u32::max
272
#[allow(clippy::cast_possible_truncation)]
273
impl CompletedBuffers {
UNCOV
274
    fn len(&self) -> u32 {
×
UNCOV
275
        match self {
×
UNCOV
276
            Self::Default(buffers) => buffers.len() as u32,
×
UNCOV
277
            Self::Deduplicated(buffers) => buffers.len(),
×
278
        }
UNCOV
279
    }
×
280

UNCOV
281
    fn push(&mut self, block: ByteBuffer) -> u32 {
×
UNCOV
282
        match self {
×
UNCOV
283
            Self::Default(buffers) => {
×
UNCOV
284
                assert!(buffers.len() < u32::MAX as usize, "Too many blocks");
×
UNCOV
285
                buffers.push(block);
×
UNCOV
286
                self.len()
×
287
            }
288
            Self::Deduplicated(buffers) => buffers.push(block),
×
289
        }
UNCOV
290
    }
×
291

UNCOV
292
    fn extend_from_slice(&mut self, new_buffers: &[ByteBuffer]) -> NewIndices {
×
UNCOV
293
        match self {
×
UNCOV
294
            Self::Default(buffers) => {
×
UNCOV
295
                let offset = buffers.len() as u32;
×
UNCOV
296
                buffers.extend_from_slice(new_buffers);
×
UNCOV
297
                NewIndices::ConstantOffset(offset)
×
298
            }
UNCOV
299
            Self::Deduplicated(buffers) => {
×
UNCOV
300
                NewIndices::LookupArray(buffers.extend_from_slice(new_buffers))
×
301
            }
302
        }
UNCOV
303
    }
×
304

UNCOV
305
    fn finish(self) -> Arc<[ByteBuffer]> {
×
UNCOV
306
        match self {
×
UNCOV
307
            Self::Default(buffers) => Arc::from(buffers),
×
UNCOV
308
            Self::Deduplicated(buffers) => buffers.finish(),
×
309
        }
UNCOV
310
    }
×
311
}
312

313
enum NewIndices {
314
    // add a constant offset to get the new idx
315
    ConstantOffset(u32),
316
    // lookup from the given array to get the new idx
317
    LookupArray(Vec<u32>),
318
}
319

320
#[derive(Default)]
321
struct DeduplicatedBuffers {
322
    buffers: Vec<ByteBuffer>,
323
    buffer_to_idx: HashMap<BufferId, u32>,
324
}
325

326
impl DeduplicatedBuffers {
327
    // Self::push enforces len < u32::max
328
    #[allow(clippy::cast_possible_truncation)]
UNCOV
329
    fn len(&self) -> u32 {
×
UNCOV
330
        self.buffers.len() as u32
×
UNCOV
331
    }
×
332

333
    /// Push a new block if not seen before. Returns the idx of the block.
UNCOV
334
    fn push(&mut self, block: ByteBuffer) -> u32 {
×
UNCOV
335
        assert!(self.buffers.len() < u32::MAX as usize, "Too many blocks");
×
336

UNCOV
337
        let initial_len = self.len();
×
UNCOV
338
        let id = BufferId::from(&block);
×
UNCOV
339
        match self.buffer_to_idx.entry(id) {
×
UNCOV
340
            Entry::Occupied(idx) => *idx.get(),
×
UNCOV
341
            Entry::Vacant(entry) => {
×
UNCOV
342
                let idx = initial_len;
×
UNCOV
343
                entry.insert(idx);
×
UNCOV
344
                self.buffers.push(block);
×
UNCOV
345
                idx
×
346
            }
347
        }
UNCOV
348
    }
×
349

UNCOV
350
    fn extend_from_slice(&mut self, buffers: &[ByteBuffer]) -> Vec<u32> {
×
UNCOV
351
        buffers
×
UNCOV
352
            .iter()
×
UNCOV
353
            .map(|buffer| self.push(buffer.clone()))
×
UNCOV
354
            .collect()
×
UNCOV
355
    }
×
356

UNCOV
357
    fn finish(self) -> Arc<[ByteBuffer]> {
×
UNCOV
358
        Arc::from(self.buffers)
×
UNCOV
359
    }
×
360
}
361

362
#[derive(PartialEq, Eq, Hash)]
363
struct BufferId {
364
    // *const u8 stored as usize for `Send`
365
    ptr: usize,
366
    len: usize,
367
}
368

369
impl BufferId {
UNCOV
370
    fn from(buffer: &ByteBuffer) -> Self {
×
UNCOV
371
        let slice = buffer.as_slice();
×
UNCOV
372
        Self {
×
UNCOV
373
            ptr: slice.as_ptr() as usize,
×
UNCOV
374
            len: slice.len(),
×
UNCOV
375
        }
×
UNCOV
376
    }
×
377
}
378

379
#[cfg(test)]
380
mod tests {
381
    use std::str::from_utf8;
382

383
    use itertools::Itertools;
384
    use vortex_dtype::{DType, Nullability};
385

386
    use crate::ToCanonical;
387
    use crate::accessor::ArrayAccessor;
388
    use crate::arrays::VarBinViewVTable;
389
    use crate::builders::{ArrayBuilder, VarBinViewBuilder};
390

391
    #[test]
392
    fn test_utf8_builder() {
393
        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
394

395
        builder.append_option(Some("Hello"));
396
        builder.append_option::<&str>(None);
397
        builder.append_value("World");
398

399
        builder.append_nulls(2);
400

401
        builder.append_zeros(2);
402
        builder.append_value("test");
403

404
        let arr = builder.finish();
405

406
        let arr = arr
407
            .as_::<VarBinViewVTable>()
408
            .with_iterator(|iter| {
409
                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
410
                    .collect_vec()
411
            })
412
            .unwrap();
413
        assert_eq!(arr.len(), 8);
414
        assert_eq!(
415
            arr,
416
            vec![
417
                Some("Hello".to_string()),
418
                None,
419
                Some("World".to_string()),
420
                None,
421
                None,
422
                Some("".to_string()),
423
                Some("".to_string()),
424
                Some("test".to_string()),
425
            ]
426
        );
427
    }
428

429
    #[test]
430
    fn test_utf8_builder_with_extend() {
431
        let array = {
432
            let mut builder =
433
                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
434
            builder.append_null();
435
            builder.append_value("Hello2");
436
            builder.finish()
437
        };
438
        let mut builder = VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
439

440
        builder.append_option(Some("Hello1"));
441
        builder.extend_from_array(&array).unwrap();
442
        builder.append_nulls(2);
443
        builder.append_value("Hello3");
444

445
        let arr = builder.finish().to_varbinview().unwrap();
446

447
        let arr = arr
448
            .with_iterator(|iter| {
449
                iter.map(|x| x.map(|x| from_utf8(x).unwrap().to_string()))
450
                    .collect_vec()
451
            })
452
            .unwrap();
453
        assert_eq!(arr.len(), 6);
454
        assert_eq!(
455
            arr,
456
            vec![
457
                Some("Hello1".to_string()),
458
                None,
459
                Some("Hello2".to_string()),
460
                None,
461
                None,
462
                Some("Hello3".to_string()),
463
            ]
464
        );
465
    }
466

467
    #[test]
468
    fn test_buffer_deduplication() {
469
        let array = {
470
            let mut builder =
471
                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
472
            builder.append_value("This is a long string that should not be inlined");
473
            builder.append_value("short string");
474
            builder.finish_into_varbinview()
475
        };
476

477
        assert_eq!(array.buffers().len(), 1);
478
        let mut builder =
479
            VarBinViewBuilder::with_buffer_deduplication(DType::Utf8(Nullability::Nullable), 10);
480

481
        array.append_to_builder(&mut builder).unwrap();
482
        assert_eq!(builder.completed_block_count(), 1);
483

484
        array
485
            .slice(1, 2)
486
            .unwrap()
487
            .append_to_builder(&mut builder)
488
            .unwrap();
489
        array
490
            .slice(0, 1)
491
            .unwrap()
492
            .append_to_builder(&mut builder)
493
            .unwrap();
494
        assert_eq!(builder.completed_block_count(), 1);
495

496
        let array2 = {
497
            let mut builder =
498
                VarBinViewBuilder::with_capacity(DType::Utf8(Nullability::Nullable), 10);
499
            builder.append_value("This is a long string that should not be inlined");
500
            builder.finish_into_varbinview()
501
        };
502

503
        array2.append_to_builder(&mut builder).unwrap();
504
        assert_eq!(builder.completed_block_count(), 2);
505

506
        array
507
            .slice(0, 1)
508
            .unwrap()
509
            .append_to_builder(&mut builder)
510
            .unwrap();
511
        array2
512
            .slice(0, 1)
513
            .unwrap()
514
            .append_to_builder(&mut builder)
515
            .unwrap();
516
        assert_eq!(builder.completed_block_count(), 2);
517
    }
518
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc