• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16331938722

16 Jul 2025 10:49PM UTC coverage: 80.702% (-0.9%) from 81.557%
16331938722

push

github

web-flow
feat: build with stable rust (#3881)

120 of 173 new or added lines in 28 files covered. (69.36%)

174 existing lines in 102 files now uncovered.

41861 of 51871 relevant lines covered (80.7%)

157487.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.79
/vortex-array/src/serde.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::fmt::{Debug, Formatter};
5
use std::iter;
6
use std::sync::Arc;
7

8
use flatbuffers::{FlatBufferBuilder, Follow, WIPOffset, root};
9
use itertools::Itertools;
10
use vortex_buffer::{Alignment, ByteBuffer};
11
use vortex_dtype::{DType, TryFromBytes};
12
use vortex_error::{
13
    VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err, vortex_panic,
14
};
15
use vortex_flatbuffers::array::Compression;
16
use vortex_flatbuffers::{
17
    FlatBuffer, FlatBufferRoot, ReadFlatBuffer, WriteFlatBuffer, array as fba,
18
};
19

20
use crate::stats::StatsSet;
21
use crate::{Array, ArrayContext, ArrayRef, ArrayVisitor, ArrayVisitorExt};
22

23
/// Options for serializing an array.
24
#[derive(Default, Debug)]
25
pub struct SerializeOptions {
26
    /// The starting position within an external stream or file. This offset is used to compute
27
    /// appropriate padding to enable zero-copy reads.
28
    pub offset: usize,
29
    /// Whether to include sufficient zero-copy padding.
30
    pub include_padding: bool,
31
}
32

33
impl dyn Array + '_ {
34
    /// Serialize the array into a sequence of byte buffers that should be written contiguously.
35
    /// This function returns a vec to avoid copying data buffers.
36
    ///
37
    /// Optionally, padding can be included to guarantee buffer alignment and ensure zero-copy
38
    /// reads within the context of an external file or stream. In this case, the alignment of
39
    /// the first byte buffer should be respected when writing the buffers to the stream or file.
40
    ///
41
    /// The format of this blob is a sequence of data buffers, possible with prefixed padding,
42
    /// followed by a flatbuffer containing an [`fba::Array`] message, and ending with a
43
    /// little-endian u32 describing the length of the flatbuffer message.
44
    pub fn serialize(
6,983✔
45
        &self,
6,983✔
46
        ctx: &ArrayContext,
6,983✔
47
        options: &SerializeOptions,
6,983✔
48
    ) -> VortexResult<Vec<ByteBuffer>> {
6,983✔
49
        // Collect all array buffers
50
        let array_buffers = self
6,983✔
51
            .depth_first_traversal()
6,983✔
52
            .flat_map(|f| f.buffers())
25,503✔
53
            .collect::<Vec<_>>();
6,983✔
54

55
        // Allocate result buffers, including a possible padding buffer for each.
56
        let mut buffers = vec![];
6,983✔
57
        let mut fb_buffers = Vec::with_capacity(buffers.capacity());
6,983✔
58

59
        // If we're including padding, we need to find the maximum required buffer alignment.
60
        let max_alignment = array_buffers
6,983✔
61
            .iter()
6,983✔
62
            .map(|buf| buf.alignment())
22,809✔
63
            .chain(iter::once(FlatBuffer::alignment()))
6,983✔
64
            .max()
6,983✔
65
            .unwrap_or_else(FlatBuffer::alignment);
6,983✔
66

67
        // Create a shared buffer of zeros we can use for padding
68
        let zeros = ByteBuffer::zeroed(*max_alignment);
6,983✔
69

70
        // We push an empty buffer with the maximum alignment, so then subsequent buffers
71
        // will be aligned. For subsequent buffers, we always push a 1-byte alignment.
72
        buffers.push(ByteBuffer::zeroed_aligned(0, max_alignment));
6,983✔
73

74
        // Keep track of where we are in the "file" to calculate padding.
75
        let mut pos = options.offset;
6,983✔
76

77
        // Push all the array buffers with padding as necessary.
78
        for buffer in array_buffers {
29,792✔
79
            let padding = if options.include_padding {
22,809✔
80
                let padding = pos.next_multiple_of(*buffer.alignment()) - pos;
22,664✔
81
                if padding > 0 {
22,664✔
82
                    pos += padding;
5,464✔
83
                    buffers.push(zeros.slice(0..padding));
5,464✔
84
                }
17,200✔
85
                padding
22,664✔
86
            } else {
87
                0
145✔
88
            };
89

90
            fb_buffers.push(fba::Buffer::new(
22,809✔
91
                u16::try_from(padding).vortex_expect("padding fits into u16"),
22,809✔
92
                buffer.alignment().exponent(),
22,809✔
93
                Compression::None,
94
                u32::try_from(buffer.len())
22,809✔
95
                    .map_err(|_| vortex_err!("All buffers must fit into u32 for serialization"))?,
22,809✔
96
            ));
97

98
            pos += buffer.len();
22,809✔
99
            buffers.push(buffer.aligned(Alignment::none()));
22,809✔
100
        }
101

102
        // Set up the flatbuffer builder
103
        let mut fbb = FlatBufferBuilder::new();
6,983✔
104
        let root = ArrayNodeFlatBuffer::try_new(ctx, self)?;
6,983✔
105
        let fb_root = root.write_flatbuffer(&mut fbb);
6,983✔
106
        let fb_buffers = fbb.create_vector(&fb_buffers);
6,983✔
107
        let fb_array = fba::Array::create(
6,983✔
108
            &mut fbb,
6,983✔
109
            &fba::ArrayArgs {
6,983✔
110
                root: Some(fb_root),
6,983✔
111
                buffers: Some(fb_buffers),
6,983✔
112
            },
6,983✔
113
        );
114
        fbb.finish_minimal(fb_array);
6,983✔
115
        let (fb_vec, fb_start) = fbb.collapse();
6,983✔
116
        let fb_end = fb_vec.len();
6,983✔
117
        let fb_buffer = ByteBuffer::from(fb_vec).slice(fb_start..fb_end);
6,983✔
118
        let fb_length = fb_buffer.len();
6,983✔
119

120
        if options.include_padding {
6,983✔
121
            let padding = pos.next_multiple_of(*FlatBuffer::alignment()) - pos;
6,838✔
122
            if padding > 0 {
6,838✔
123
                buffers.push(zeros.slice(0..padding));
2,682✔
124
            }
4,156✔
125
        }
145✔
126
        buffers.push(fb_buffer);
6,983✔
127

128
        // Finally, we write down the u32 length for the flatbuffer.
129
        buffers.push(ByteBuffer::from(
6,983✔
130
            u32::try_from(fb_length)
6,983✔
131
                .map_err(|_| vortex_err!("Array metadata flatbuffer must fit into u32 for serialization. Array encoding tree is too large."))?
6,983✔
132
                .to_le_bytes()
6,983✔
133
                .to_vec(),
6,983✔
134
        ));
135

136
        Ok(buffers)
6,983✔
137
    }
6,983✔
138
}
139

140
/// A utility struct for creating an [`fba::ArrayNode`] flatbuffer.
141
pub struct ArrayNodeFlatBuffer<'a> {
142
    ctx: &'a ArrayContext,
143
    array: &'a dyn Array,
144
    buffer_idx: u16,
145
}
146

147
impl<'a> ArrayNodeFlatBuffer<'a> {
148
    pub fn try_new(ctx: &'a ArrayContext, array: &'a dyn Array) -> VortexResult<Self> {
6,983✔
149
        // Depth-first traversal of the array to ensure it supports serialization.
150
        for child in array.depth_first_traversal() {
25,503✔
151
            if child.metadata()?.is_none() {
25,503✔
152
                vortex_bail!(
×
153
                    "Array {} does not support serialization",
×
154
                    child.encoding_id()
×
155
                );
156
            }
25,503✔
157
        }
158
        Ok(Self {
6,983✔
159
            ctx,
6,983✔
160
            array,
6,983✔
161
            buffer_idx: 0,
6,983✔
162
        })
6,983✔
163
    }
6,983✔
164
}
165

166
impl FlatBufferRoot for ArrayNodeFlatBuffer<'_> {}
167

168
impl WriteFlatBuffer for ArrayNodeFlatBuffer<'_> {
169
    type Target<'t> = fba::ArrayNode<'t>;
170

171
    fn write_flatbuffer<'fb>(
25,503✔
172
        &self,
25,503✔
173
        fbb: &mut FlatBufferBuilder<'fb>,
25,503✔
174
    ) -> WIPOffset<Self::Target<'fb>> {
25,503✔
175
        let encoding = self.ctx.encoding_idx(&self.array.encoding());
25,503✔
176
        let metadata = self
25,503✔
177
            .array
25,503✔
178
            .metadata()
25,503✔
179
            // TODO(ngates): add try_write_flatbuffer
180
            .vortex_expect("Failed to serialize metadata")
25,503✔
181
            .vortex_expect("Validated that all arrays support serialization");
25,503✔
182
        let metadata = Some(fbb.create_vector(metadata.as_slice()));
25,503✔
183

184
        // Assign buffer indices for all child arrays.
185
        let nbuffers = u16::try_from(self.array.nbuffers())
25,503✔
186
            .vortex_expect("Array can have at most u16::MAX buffers");
25,503✔
187
        let mut child_buffer_idx = self.buffer_idx + nbuffers;
25,503✔
188

189
        let children = &self
25,503✔
190
            .array
25,503✔
191
            .children()
25,503✔
192
            .iter()
25,503✔
193
            .map(|child| {
25,503✔
194
                // Update the number of buffers required.
195
                let msg = ArrayNodeFlatBuffer {
18,520✔
196
                    ctx: self.ctx,
18,520✔
197
                    array: child,
18,520✔
198
                    buffer_idx: child_buffer_idx,
18,520✔
199
                }
18,520✔
200
                .write_flatbuffer(fbb);
18,520✔
201
                child_buffer_idx = u16::try_from(child.nbuffers_recursive())
18,520✔
202
                    .ok()
18,520✔
203
                    .and_then(|nbuffers| nbuffers.checked_add(child_buffer_idx))
18,520✔
204
                    .vortex_expect("Too many buffers (u16) for Array");
18,520✔
205
                msg
18,520✔
206
            })
18,520✔
207
            .collect::<Vec<_>>();
25,503✔
208
        let children = Some(fbb.create_vector(children));
25,503✔
209

210
        let buffers = Some(fbb.create_vector_from_iter((0..nbuffers).map(|i| i + self.buffer_idx)));
25,503✔
211
        let stats = Some(self.array.statistics().to_owned().write_flatbuffer(fbb));
25,503✔
212

213
        fba::ArrayNode::create(
25,503✔
214
            fbb,
25,503✔
215
            &fba::ArrayNodeArgs {
25,503✔
216
                encoding,
25,503✔
217
                metadata,
25,503✔
218
                children,
25,503✔
219
                buffers,
25,503✔
220
                stats,
25,503✔
221
            },
25,503✔
222
        )
223
    }
25,503✔
224
}
225

226
/// To minimize the serialized form, arrays do not persist their own dtype and length. Instead,
227
/// parent arrays pass this information down during deserialization. This trait abstracts
228
/// over either a serialized [`crate::serde::ArrayParts`] or the
229
/// in-memory [`crate::data::ArrayData`].
230
pub trait ArrayChildren {
231
    /// Returns the nth child of the array with the given dtype and length.
232
    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef>;
233

234
    /// The number of children.
235
    fn len(&self) -> usize;
236

237
    /// Returns true if there are no children.
238
    fn is_empty(&self) -> bool {
9,427✔
239
        self.len() == 0
9,427✔
240
    }
9,427✔
241
}
242

243
/// [`ArrayParts`] represents a parsed but not-yet-decoded deserialized [`Array`].
244
/// It contains all the information from the serialized form, without anything extra. i.e.
245
/// it is missing a [`DType`] and `len`, and the `encoding_id` is not yet resolved to a concrete
246
/// vtable.
247
///
248
/// An [`ArrayParts`] can be fully decoded into an [`ArrayRef`] using the `decode` function.
249
#[derive(Clone)]
250
pub struct ArrayParts {
251
    // Typed as fb::ArrayNode
252
    flatbuffer: FlatBuffer,
253
    // The location of the current fb::ArrayNode
254
    flatbuffer_loc: usize,
255
    buffers: Arc<[ByteBuffer]>,
256
}
257

258
impl Debug for ArrayParts {
259
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
260
        f.debug_struct("ArrayParts")
×
261
            .field("encoding_id", &self.encoding_id())
×
262
            .field("children", &(0..self.nchildren()).map(|i| self.child(i)))
×
263
            .field(
×
264
                "buffers",
×
265
                &(0..self.nbuffers()).map(|i| self.buffer(i).ok()),
×
266
            )
267
            .field("metadata", &self.metadata())
×
268
            .finish()
×
269
    }
×
270
}
271

272
impl ArrayParts {
273
    /// Decode an [`ArrayParts`] into an [`ArrayRef`].
274
    pub fn decode(&self, ctx: &ArrayContext, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
19,439✔
275
        let encoding_id = self.flatbuffer().encoding();
19,439✔
276
        let vtable = ctx
19,439✔
277
            .lookup_encoding(encoding_id)
19,439✔
278
            .ok_or_else(|| vortex_err!("Unknown encoding: {}", encoding_id))?;
19,439✔
279

280
        let buffers: Vec<_> = (0..self.nbuffers())
19,439✔
281
            .map(|idx| self.buffer(idx))
19,439✔
282
            .try_collect()?;
19,439✔
283

284
        let children = ArrayPartsChildren { parts: self, ctx };
19,439✔
285

286
        let decoded = vtable.build(dtype, len, self.metadata(), &buffers, &children)?;
19,439✔
287

288
        assert_eq!(
19,439✔
289
            decoded.len(),
19,439✔
290
            len,
291
            "Array decoded from {} has incorrect length {}, expected {}",
×
292
            vtable.id(),
×
293
            decoded.len(),
×
294
            len
295
        );
296
        assert_eq!(
19,439✔
297
            decoded.dtype(),
19,439✔
298
            dtype,
299
            "Array decoded from {} has incorrect dtype {}, expected {}",
×
300
            vtable.id(),
×
301
            decoded.dtype(),
×
302
            dtype,
303
        );
304
        assert_eq!(
19,439✔
305
            decoded.encoding_id(),
19,439✔
306
            vtable.id(),
19,439✔
307
            "Array decoded from {} has incorrect encoding {}",
×
308
            vtable.id(),
×
309
            decoded.encoding_id(),
×
310
        );
311

312
        // Populate statistics from the serialized array.
313
        if let Some(stats) = self.flatbuffer().stats() {
19,439✔
314
            let decoded_statistics = decoded.statistics();
19,439✔
315
            StatsSet::read_flatbuffer(&stats)?
19,439✔
316
                .into_iter()
19,439✔
317
                .for_each(|(stat, val)| decoded_statistics.set(stat, val));
33,551✔
UNCOV
318
        }
×
319

320
        Ok(decoded)
19,439✔
321
    }
19,439✔
322

323
    /// Returns the array encoding.
324
    pub fn encoding_id(&self) -> u16 {
×
325
        self.flatbuffer().encoding()
×
326
    }
×
327

328
    /// Returns the array metadata bytes.
329
    pub fn metadata(&self) -> &[u8] {
19,439✔
330
        self.flatbuffer()
19,439✔
331
            .metadata()
19,439✔
332
            .map(|metadata| metadata.bytes())
19,439✔
333
            .unwrap_or(&[])
19,439✔
334
    }
19,439✔
335

336
    /// Returns the number of children.
337
    pub fn nchildren(&self) -> usize {
17,401✔
338
        self.flatbuffer()
17,401✔
339
            .children()
17,401✔
340
            .map_or(0, |children| children.len())
17,401✔
341
    }
17,401✔
342

343
    /// Returns the nth child of the array.
344
    pub fn child(&self, idx: usize) -> ArrayParts {
12,230✔
345
        let children = self
12,230✔
346
            .flatbuffer()
12,230✔
347
            .children()
12,230✔
348
            .vortex_expect("Expected array to have children");
12,230✔
349
        if idx >= children.len() {
12,230✔
350
            vortex_panic!(
×
351
                "Invalid child index {} for array with {} children",
×
352
                idx,
353
                children.len()
×
354
            );
355
        }
12,230✔
356
        self.with_root(children.get(idx))
12,230✔
357
    }
12,230✔
358

359
    /// Returns the number of buffers.
360
    pub fn nbuffers(&self) -> usize {
19,439✔
361
        self.flatbuffer()
19,439✔
362
            .buffers()
19,439✔
363
            .map_or(0, |buffers| buffers.len())
19,439✔
364
    }
19,439✔
365

366
    /// Returns the nth buffer of the current array.
367
    pub fn buffer(&self, idx: usize) -> VortexResult<ByteBuffer> {
15,089✔
368
        let buffer_idx = self
15,089✔
369
            .flatbuffer()
15,089✔
370
            .buffers()
15,089✔
371
            .ok_or_else(|| vortex_err!("Array has no buffers"))?
15,089✔
372
            .get(idx);
15,089✔
373
        self.buffers
15,089✔
374
            .get(buffer_idx as usize)
15,089✔
375
            .cloned()
15,089✔
376
            .ok_or_else(|| {
15,089✔
377
                vortex_err!(
×
378
                    "Invalid buffer index {} for array with {} buffers",
×
379
                    buffer_idx,
380
                    self.nbuffers()
×
381
                )
UNCOV
382
            })
×
383
    }
15,089✔
384

385
    /// Returns the root ArrayNode flatbuffer.
386
    fn flatbuffer(&self) -> fba::ArrayNode<'_> {
122,476✔
387
        unsafe { fba::ArrayNode::follow(self.flatbuffer.as_ref(), self.flatbuffer_loc) }
122,476✔
388
    }
122,476✔
389

390
    /// Returns a new [`ArrayParts`] with the given node as the root
391
    // TODO(ngates): we may want a wrapper that avoids this clone.
392
    fn with_root(&self, root: fba::ArrayNode) -> Self {
12,230✔
393
        let mut this = self.clone();
12,230✔
394
        this.flatbuffer_loc = root._tab.loc();
12,230✔
395
        this
12,230✔
396
    }
12,230✔
397
}
398

399
struct ArrayPartsChildren<'a> {
400
    parts: &'a ArrayParts,
401
    ctx: &'a ArrayContext,
402
}
403

404
impl ArrayChildren for ArrayPartsChildren<'_> {
405
    fn get(&self, index: usize, dtype: &DType, len: usize) -> VortexResult<ArrayRef> {
12,230✔
406
        self.parts.child(index).decode(self.ctx, dtype, len)
12,230✔
407
    }
12,230✔
408

409
    fn len(&self) -> usize {
17,401✔
410
        self.parts.nchildren()
17,401✔
411
    }
17,401✔
412
}
413

414
impl TryFrom<ByteBuffer> for ArrayParts {
415
    type Error = VortexError;
416

417
    fn try_from(value: ByteBuffer) -> Result<Self, Self::Error> {
7,209✔
418
        // The final 4 bytes contain the length of the flatbuffer.
419
        if value.len() < 4 {
7,209✔
420
            vortex_bail!("ArrayParts buffer is too short");
×
421
        }
7,209✔
422

423
        // We align each buffer individually, so we remove alignment requirements on the buffer.
424
        let value = value.aligned(Alignment::none());
7,209✔
425

426
        let fb_length = u32::try_from_le_bytes(&value.as_slice()[value.len() - 4..])? as usize;
7,209✔
427
        if value.len() < 4 + fb_length {
7,209✔
428
            vortex_bail!("ArrayParts buffer is too short for flatbuffer");
×
429
        }
7,209✔
430

431
        let fb_offset = value.len() - 4 - fb_length;
7,209✔
432
        let fb_buffer = value.slice(fb_offset..fb_offset + fb_length);
7,209✔
433
        let fb_buffer = FlatBuffer::align_from(fb_buffer);
7,209✔
434

435
        let fb_array = root::<fba::Array>(fb_buffer.as_ref())?;
7,209✔
436
        let fb_root = fb_array.root().vortex_expect("Array must have a root node");
7,209✔
437

438
        let mut offset = 0;
7,209✔
439
        let buffers: Arc<[ByteBuffer]> = fb_array
7,209✔
440
            .buffers()
7,209✔
441
            .unwrap_or_default()
7,209✔
442
            .iter()
7,209✔
443
            .map(|fb_buffer| {
15,089✔
444
                // Skip padding
445
                offset += fb_buffer.padding() as usize;
15,089✔
446

447
                let buffer_len = fb_buffer.length() as usize;
15,089✔
448

449
                // Extract a buffer and ensure it's aligned, copying if necessary
450
                let buffer = value
15,089✔
451
                    .slice(offset..(offset + buffer_len))
15,089✔
452
                    .aligned(Alignment::from_exponent(fb_buffer.alignment_exponent()));
15,089✔
453

454
                offset += buffer_len;
15,089✔
455
                buffer
15,089✔
456
            })
15,089✔
457
            .collect();
7,209✔
458

459
        Ok(ArrayParts {
7,209✔
460
            flatbuffer: fb_buffer.clone(),
7,209✔
461
            flatbuffer_loc: fb_root._tab.loc(),
7,209✔
462
            buffers,
7,209✔
463
        })
7,209✔
464
    }
7,209✔
465
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc