• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16935267080

13 Aug 2025 11:00AM UTC coverage: 24.312% (-63.3%) from 87.658%
16935267080

Pull #4226

github

web-flow
Merge 81b48c7fb into baa6ea202
Pull Request #4226: Support converting TimestampTZ to and from duckdb

0 of 2 new or added lines in 1 file covered. (0.0%)

20666 existing lines in 469 files now uncovered.

8726 of 35892 relevant lines covered (24.31%)

147.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

29.05
/vortex-array/src/arrays/chunked/array.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
//! First-class chunked arrays.
5
//!
6
//! Vortex is a chunked array library that's able to
7

8
use std::fmt::Debug;
9

10
use futures_util::stream;
11
use itertools::Itertools;
12
use vortex_buffer::{Buffer, BufferMut};
13
use vortex_dtype::DType;
14
use vortex_error::{VortexExpect as _, VortexResult, VortexUnwrap, vortex_bail};
15
use vortex_mask::Mask;
16

17
use crate::arrays::ChunkedVTable;
18
use crate::iter::{ArrayIterator, ArrayIteratorAdapter};
19
use crate::search_sorted::{SearchSorted, SearchSortedSide};
20
use crate::stats::{ArrayStats, StatsSetRef};
21
use crate::stream::{ArrayStream, ArrayStreamAdapter};
22
use crate::vtable::{ArrayVTable, ValidityVTable};
23
use crate::{Array, ArrayRef, IntoArray};
24

25
#[derive(Clone, Debug)]
26
pub struct ChunkedArray {
27
    dtype: DType,
28
    len: usize,
29
    chunk_offsets: Buffer<u64>,
30
    chunks: Vec<ArrayRef>,
31
    stats_set: ArrayStats,
32
}
33

34
impl ChunkedArray {
35
    pub fn try_new(chunks: Vec<ArrayRef>, dtype: DType) -> VortexResult<Self> {
18✔
36
        for chunk in &chunks {
184✔
37
            if chunk.dtype() != &dtype {
166✔
38
                vortex_bail!(MismatchedTypes: dtype, chunk.dtype());
×
39
            }
166✔
40
        }
41

42
        Ok(Self::new_unchecked(chunks, dtype))
18✔
43
    }
18✔
44

45
    pub fn new_unchecked(chunks: Vec<ArrayRef>, dtype: DType) -> Self {
26✔
46
        let nchunks = chunks.len();
26✔
47

48
        let mut chunk_offsets = BufferMut::<u64>::with_capacity(nchunks + 1);
26✔
49
        unsafe { chunk_offsets.push_unchecked(0) }
26✔
50
        let mut curr_offset = 0;
26✔
51
        for c in &chunks {
208✔
52
            curr_offset += c.len() as u64;
182✔
53
            unsafe { chunk_offsets.push_unchecked(curr_offset) }
182✔
54
        }
55
        assert_eq!(chunk_offsets.len(), nchunks + 1);
26✔
56

57
        Self {
26✔
58
            dtype,
26✔
59
            len: curr_offset.try_into().vortex_unwrap(),
26✔
60
            chunk_offsets: chunk_offsets.freeze(),
26✔
61
            chunks,
26✔
62
            stats_set: Default::default(),
26✔
63
        }
26✔
64
    }
26✔
65

66
    // TODO(ngates): remove result
67
    #[inline]
68
    pub fn chunk(&self, idx: usize) -> VortexResult<&ArrayRef> {
6✔
69
        if idx >= self.nchunks() {
6✔
70
            vortex_bail!("chunk index {} > num chunks ({})", idx, self.nchunks());
×
71
        }
6✔
72
        Ok(&self.chunks[idx])
6✔
73
    }
6✔
74

75
    pub fn nchunks(&self) -> usize {
62✔
76
        self.chunks.len()
62✔
77
    }
62✔
78

79
    #[inline]
UNCOV
80
    pub fn chunk_offsets(&self) -> &Buffer<u64> {
×
UNCOV
81
        &self.chunk_offsets
×
UNCOV
82
    }
×
83

UNCOV
84
    pub(crate) fn find_chunk_idx(&self, index: usize) -> (usize, usize) {
×
UNCOV
85
        assert!(index <= self.len(), "Index out of bounds of the array");
×
UNCOV
86
        let index = index as u64;
×
87

88
        // Since there might be duplicate values in offsets because of empty chunks we want to search from right
89
        // and take the last chunk (we subtract 1 since there's a leading 0)
UNCOV
90
        let index_chunk = self
×
UNCOV
91
            .chunk_offsets()
×
UNCOV
92
            .search_sorted(&index, SearchSortedSide::Right)
×
UNCOV
93
            .to_ends_index(self.nchunks() + 1)
×
UNCOV
94
            .saturating_sub(1);
×
UNCOV
95
        let chunk_start = self.chunk_offsets()[index_chunk];
×
96

UNCOV
97
        let index_in_chunk =
×
UNCOV
98
            usize::try_from(index - chunk_start).vortex_expect("Index is too large for usize");
×
UNCOV
99
        (index_chunk, index_in_chunk)
×
UNCOV
100
    }
×
101

102
    pub fn chunks(&self) -> &[ArrayRef] {
24✔
103
        &self.chunks
24✔
104
    }
24✔
105

UNCOV
106
    pub fn non_empty_chunks(&self) -> impl Iterator<Item = &ArrayRef> + '_ {
×
UNCOV
107
        self.chunks().iter().filter(|c| !c.is_empty())
×
UNCOV
108
    }
×
109

UNCOV
110
    pub fn array_iterator(&self) -> impl ArrayIterator + '_ {
×
UNCOV
111
        ArrayIteratorAdapter::new(self.dtype().clone(), self.chunks().iter().cloned().map(Ok))
×
UNCOV
112
    }
×
113

114
    pub fn array_stream(&self) -> impl ArrayStream + '_ {
×
115
        ArrayStreamAdapter::new(
×
116
            self.dtype().clone(),
×
117
            stream::iter(self.chunks().iter().cloned().map(Ok)),
×
118
        )
119
    }
×
120

UNCOV
121
    pub fn rechunk(&self, target_bytesize: u64, target_rowsize: usize) -> VortexResult<Self> {
×
UNCOV
122
        let mut new_chunks = Vec::new();
×
UNCOV
123
        let mut chunks_to_combine = Vec::new();
×
UNCOV
124
        let mut new_chunk_n_bytes = 0;
×
UNCOV
125
        let mut new_chunk_n_elements = 0;
×
UNCOV
126
        for chunk in self.chunks() {
×
UNCOV
127
            let n_bytes = chunk.nbytes();
×
UNCOV
128
            let n_elements = chunk.len();
×
129

UNCOV
130
            if (new_chunk_n_bytes + n_bytes > target_bytesize
×
UNCOV
131
                || new_chunk_n_elements + n_elements > target_rowsize)
×
UNCOV
132
                && !chunks_to_combine.is_empty()
×
133
            {
UNCOV
134
                new_chunks.push(
×
UNCOV
135
                    ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
×
UNCOV
136
                        .to_canonical()?
×
UNCOV
137
                        .into_array(),
×
138
                );
139

UNCOV
140
                new_chunk_n_bytes = 0;
×
UNCOV
141
                new_chunk_n_elements = 0;
×
UNCOV
142
                chunks_to_combine = Vec::new();
×
UNCOV
143
            }
×
144

UNCOV
145
            if n_bytes > target_bytesize || n_elements > target_rowsize {
×
UNCOV
146
                new_chunks.push(chunk.clone());
×
UNCOV
147
            } else {
×
UNCOV
148
                new_chunk_n_bytes += n_bytes;
×
UNCOV
149
                new_chunk_n_elements += n_elements;
×
UNCOV
150
                chunks_to_combine.push(chunk.clone());
×
UNCOV
151
            }
×
152
        }
153

UNCOV
154
        if !chunks_to_combine.is_empty() {
×
UNCOV
155
            new_chunks.push(
×
UNCOV
156
                ChunkedArray::new_unchecked(chunks_to_combine, self.dtype().clone())
×
UNCOV
157
                    .to_canonical()?
×
UNCOV
158
                    .into_array(),
×
159
            );
160
        }
×
161

UNCOV
162
        Ok(Self::new_unchecked(new_chunks, self.dtype().clone()))
×
UNCOV
163
    }
×
164
}
165

166
impl FromIterator<ArrayRef> for ChunkedArray {
UNCOV
167
    fn from_iter<T: IntoIterator<Item = ArrayRef>>(iter: T) -> Self {
×
UNCOV
168
        let chunks: Vec<ArrayRef> = iter.into_iter().collect();
×
UNCOV
169
        let dtype = chunks
×
UNCOV
170
            .first()
×
UNCOV
171
            .map(|c| c.dtype().clone())
×
UNCOV
172
            .vortex_expect("Cannot infer DType from an empty iterator");
×
UNCOV
173
        Self::try_new(chunks, dtype).vortex_expect("Failed to create chunked array from iterator")
×
UNCOV
174
    }
×
175
}
176

177
impl ArrayVTable<ChunkedVTable> for ChunkedVTable {
178
    fn len(array: &ChunkedArray) -> usize {
72✔
179
        array.len
72✔
180
    }
72✔
181

182
    fn dtype(array: &ChunkedArray) -> &DType {
88✔
183
        &array.dtype
88✔
184
    }
88✔
185

186
    fn stats(array: &ChunkedArray) -> StatsSetRef<'_> {
24✔
187
        array.stats_set.to_ref(array.as_ref())
24✔
188
    }
24✔
189
}
190

191
impl ValidityVTable<ChunkedVTable> for ChunkedVTable {
UNCOV
192
    fn is_valid(array: &ChunkedArray, index: usize) -> VortexResult<bool> {
×
UNCOV
193
        if !array.dtype.is_nullable() {
×
UNCOV
194
            return Ok(true);
×
UNCOV
195
        }
×
UNCOV
196
        let (chunk, offset_in_chunk) = array.find_chunk_idx(index);
×
UNCOV
197
        array.chunk(chunk)?.is_valid(offset_in_chunk)
×
UNCOV
198
    }
×
199

UNCOV
200
    fn all_valid(array: &ChunkedArray) -> VortexResult<bool> {
×
UNCOV
201
        if !array.dtype().is_nullable() {
×
UNCOV
202
            return Ok(true);
×
UNCOV
203
        }
×
UNCOV
204
        for chunk in array.non_empty_chunks() {
×
UNCOV
205
            if !chunk.all_valid()? {
×
UNCOV
206
                return Ok(false);
×
UNCOV
207
            }
×
208
        }
UNCOV
209
        Ok(true)
×
UNCOV
210
    }
×
211

UNCOV
212
    fn all_invalid(array: &ChunkedArray) -> VortexResult<bool> {
×
UNCOV
213
        if !array.dtype().is_nullable() {
×
UNCOV
214
            return Ok(false);
×
UNCOV
215
        }
×
UNCOV
216
        for chunk in array.non_empty_chunks() {
×
UNCOV
217
            if !chunk.all_invalid()? {
×
UNCOV
218
                return Ok(false);
×
UNCOV
219
            }
×
220
        }
UNCOV
221
        Ok(true)
×
UNCOV
222
    }
×
223

UNCOV
224
    fn validity_mask(array: &ChunkedArray) -> VortexResult<Mask> {
×
UNCOV
225
        array
×
UNCOV
226
            .chunks()
×
UNCOV
227
            .iter()
×
UNCOV
228
            .map(|a| a.validity_mask())
×
UNCOV
229
            .try_collect()
×
UNCOV
230
    }
×
231
}
232

233
#[cfg(test)]
234
mod test {
235
    use vortex_buffer::buffer;
236
    use vortex_dtype::{DType, Nullability, PType};
237
    use vortex_error::VortexResult;
238

239
    use crate::array::Array;
240
    use crate::arrays::chunked::ChunkedArray;
241
    use crate::arrays::{ChunkedVTable, PrimitiveArray};
242
    use crate::compute::sub_scalar;
243
    use crate::validity::Validity;
244
    use crate::{IntoArray, ToCanonical, assert_arrays_eq};
245

246
    fn chunked_array() -> ChunkedArray {
247
        ChunkedArray::try_new(
248
            vec![
249
                buffer![1u64, 2, 3].into_array(),
250
                buffer![4u64, 5, 6].into_array(),
251
                buffer![7u64, 8, 9].into_array(),
252
            ],
253
            DType::Primitive(PType::U64, Nullability::NonNullable),
254
        )
255
        .unwrap()
256
    }
257

258
    #[test]
259
    fn test_scalar_subtract() {
260
        let chunked = chunked_array().into_array();
261
        let to_subtract = 1u64;
262
        let array = sub_scalar(&chunked, to_subtract.into()).unwrap();
263

264
        let chunked = array.as_::<ChunkedVTable>();
265
        let chunks_out = chunked.chunks();
266

267
        let results = chunks_out[0]
268
            .to_primitive()
269
            .unwrap()
270
            .as_slice::<u64>()
271
            .to_vec();
272
        assert_eq!(results, &[0u64, 1, 2]);
273
        let results = chunks_out[1]
274
            .to_primitive()
275
            .unwrap()
276
            .as_slice::<u64>()
277
            .to_vec();
278
        assert_eq!(results, &[3u64, 4, 5]);
279
        let results = chunks_out[2]
280
            .to_primitive()
281
            .unwrap()
282
            .as_slice::<u64>()
283
            .to_vec();
284
        assert_eq!(results, &[6u64, 7, 8]);
285
    }
286

287
    #[test]
288
    fn test_rechunk_one_chunk() {
289
        let chunked = ChunkedArray::try_new(
290
            vec![buffer![0u64].into_array()],
291
            DType::Primitive(PType::U64, Nullability::NonNullable),
292
        )
293
        .unwrap();
294

295
        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
296

297
        assert_arrays_eq!(chunked, rechunked);
298
    }
299

300
    #[test]
301
    fn test_rechunk_two_chunks() {
302
        let chunked = ChunkedArray::try_new(
303
            vec![buffer![0u64].into_array(), buffer![5u64].into_array()],
304
            DType::Primitive(PType::U64, Nullability::NonNullable),
305
        )
306
        .unwrap();
307

308
        let rechunked = chunked.rechunk(1 << 16, 1 << 16).unwrap();
309

310
        assert_eq!(rechunked.nchunks(), 1);
311
        assert_arrays_eq!(chunked, rechunked);
312
    }
313

314
    #[test]
315
    fn test_rechunk_tiny_target_chunks() {
316
        let chunked = ChunkedArray::try_new(
317
            vec![
318
                buffer![0u64, 1, 2, 3].into_array(),
319
                buffer![4u64, 5].into_array(),
320
            ],
321
            DType::Primitive(PType::U64, Nullability::NonNullable),
322
        )
323
        .unwrap();
324

325
        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
326

327
        assert_eq!(rechunked.nchunks(), 2);
328
        assert!(rechunked.chunks().iter().all(|c| c.len() < 5));
329
        assert_arrays_eq!(chunked, rechunked);
330
    }
331

332
    #[test]
333
    fn test_rechunk_with_too_big_chunk() {
334
        let chunked = ChunkedArray::try_new(
335
            vec![
336
                buffer![0u64, 1, 2].into_array(),
337
                buffer![42_u64; 6].into_array(),
338
                buffer![4u64, 5].into_array(),
339
                buffer![6u64, 7].into_array(),
340
                buffer![8u64, 9].into_array(),
341
            ],
342
            DType::Primitive(PType::U64, Nullability::NonNullable),
343
        )
344
        .unwrap();
345

346
        let rechunked = chunked.rechunk(1 << 16, 5).unwrap();
347
        // greedy so should be: [0, 1, 2] [42, 42, 42, 42, 42, 42] [4, 5, 6, 7] [8, 9]
348

349
        assert_eq!(rechunked.nchunks(), 4);
350
        assert_arrays_eq!(chunked, rechunked);
351
    }
352

353
    #[test]
354
    fn test_empty_chunks_all_valid() -> VortexResult<()> {
355
        // Create chunks where some are empty but all non-empty chunks have all valid values
356
        let chunks = vec![
357
            PrimitiveArray::new(buffer![1u64, 2, 3], Validity::AllValid).into_array(),
358
            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
359
            PrimitiveArray::new(buffer![4u64, 5], Validity::AllValid).into_array(),
360
            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
361
        ];
362

363
        let chunked =
364
            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
365

366
        // Should be all_valid since all non-empty chunks are all_valid
367
        assert!(chunked.all_valid()?);
368
        assert!(!chunked.all_invalid()?);
369

370
        Ok(())
371
    }
372

373
    #[test]
374
    fn test_empty_chunks_all_invalid() -> VortexResult<()> {
375
        // Create chunks where some are empty but all non-empty chunks have all invalid values
376
        let chunks = vec![
377
            PrimitiveArray::new(buffer![1u64, 2], Validity::AllInvalid).into_array(),
378
            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
379
            PrimitiveArray::new(buffer![3u64, 4, 5], Validity::AllInvalid).into_array(),
380
            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
381
        ];
382

383
        let chunked =
384
            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
385

386
        // Should be all_invalid since all non-empty chunks are all_invalid
387
        assert!(!chunked.all_valid()?);
388
        assert!(chunked.all_invalid()?);
389

390
        Ok(())
391
    }
392

393
    #[test]
394
    fn test_empty_chunks_mixed_validity() -> VortexResult<()> {
395
        // Create chunks with mixed validity including empty chunks
396
        let chunks = vec![
397
            PrimitiveArray::new(buffer![1u64, 2], Validity::AllValid).into_array(),
398
            PrimitiveArray::new(buffer![0u64; 0], Validity::AllValid).into_array(), // empty chunk
399
            PrimitiveArray::new(buffer![3u64, 4], Validity::AllInvalid).into_array(),
400
            PrimitiveArray::new(buffer![0u64; 0], Validity::AllInvalid).into_array(), // empty chunk
401
        ];
402

403
        let chunked =
404
            ChunkedArray::try_new(chunks, DType::Primitive(PType::U64, Nullability::Nullable))?;
405

406
        // Should be neither all_valid nor all_invalid
407
        assert!(!chunked.all_valid()?);
408
        assert!(!chunked.all_invalid()?);
409

410
        Ok(())
411
    }
412
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc