• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16935267080

13 Aug 2025 11:00AM UTC coverage: 24.312% (-63.3%) from 87.658%
16935267080

Pull #4226

github

web-flow
Merge 81b48c7fb into baa6ea202
Pull Request #4226: Support converting TimestampTZ to and from duckdb

0 of 2 new or added lines in 1 file covered. (0.0%)

20666 existing lines in 469 files now uncovered.

8726 of 35892 relevant lines covered (24.31%)

147.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.72
/vortex-btrblocks/src/integer.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
pub mod dictionary;
5
mod stats;
6

7
use std::fmt::Debug;
8
use std::hash::Hash;
9

10
pub use stats::IntegerStats;
11
use vortex_array::arrays::{ConstantArray, PrimitiveArray, PrimitiveVTable};
12
use vortex_array::compress::downscale_integer_array;
13
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
14
use vortex_dict::DictArray;
15
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_bail, vortex_err};
16
use vortex_fastlanes::{FoRArray, bit_width_histogram, bitpack_encode, find_best_bit_width};
17
use vortex_runend::RunEndArray;
18
use vortex_runend::compress::runend_encode;
19
use vortex_scalar::Scalar;
20
use vortex_sequence::sequence_encode;
21
use vortex_sparse::{SparseArray, SparseVTable};
22
use vortex_zigzag::{ZigZagArray, zigzag_encode};
23

24
use crate::integer::dictionary::dictionary_encode;
25
use crate::patches::compress_patches;
26
use crate::{
27
    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
28
    estimate_compression_ratio_with_sampling,
29
};
30

31
pub struct IntCompressor;
32

33
impl Compressor for IntCompressor {
34
    type ArrayVTable = PrimitiveVTable;
35
    type SchemeType = dyn IntegerScheme;
36
    type StatsType = IntegerStats;
37

38
    fn schemes() -> &'static [&'static dyn IntegerScheme] {
30✔
39
        &[
30✔
40
            &ConstantScheme,
30✔
41
            &FORScheme,
30✔
42
            &ZigZagScheme,
30✔
43
            &BitPackingScheme,
30✔
44
            &SparseScheme,
30✔
45
            &DictScheme,
30✔
46
            &RunEndScheme,
30✔
47
            &SequenceScheme,
30✔
48
        ]
30✔
49
    }
30✔
50

51
    fn default_scheme() -> &'static Self::SchemeType {
18✔
52
        &UncompressedScheme
18✔
53
    }
18✔
54

55
    fn dict_scheme_code() -> IntCode {
30✔
56
        DICT_SCHEME
30✔
57
    }
30✔
58
}
59

60
impl IntCompressor {
UNCOV
61
    pub fn compress_no_dict(
×
UNCOV
62
        array: &PrimitiveArray,
×
UNCOV
63
        is_sample: bool,
×
UNCOV
64
        allowed_cascading: usize,
×
UNCOV
65
        excludes: &[IntCode],
×
UNCOV
66
    ) -> VortexResult<ArrayRef> {
×
UNCOV
67
        let stats = IntegerStats::generate_opts(
×
UNCOV
68
            array,
×
UNCOV
69
            GenerateStatsOptions {
×
UNCOV
70
                count_distinct_values: false,
×
UNCOV
71
            },
×
72
        );
73

UNCOV
74
        let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
×
UNCOV
75
        let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
×
76

UNCOV
77
        if output.nbytes() < array.nbytes() {
×
UNCOV
78
            Ok(output)
×
79
        } else {
UNCOV
80
            log::debug!("resulting tree too large: {}", output.display_tree());
×
UNCOV
81
            Ok(array.to_array())
×
82
        }
UNCOV
83
    }
×
84
}
85

86
pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
87

88
// Auto-impl
89
impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
90

91
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
92
pub struct IntCode(u8);
93

94
const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
95
const CONSTANT_SCHEME: IntCode = IntCode(1);
96
const FOR_SCHEME: IntCode = IntCode(2);
97
const ZIGZAG_SCHEME: IntCode = IntCode(3);
98
const BITPACKING_SCHEME: IntCode = IntCode(4);
99
const SPARSE_SCHEME: IntCode = IntCode(5);
100
const DICT_SCHEME: IntCode = IntCode(6);
101
const RUNEND_SCHEME: IntCode = IntCode(7);
102
const SEQUENCE_SCHEME: IntCode = IntCode(8);
103

104
#[derive(Debug, Copy, Clone)]
105
pub struct UncompressedScheme;
106

107
#[derive(Debug, Copy, Clone)]
108
pub struct ConstantScheme;
109

110
#[derive(Debug, Copy, Clone)]
111
pub struct FORScheme;
112

113
#[derive(Debug, Copy, Clone)]
114
pub struct ZigZagScheme;
115

116
#[derive(Debug, Copy, Clone)]
117
pub struct BitPackingScheme;
118

119
#[derive(Debug, Copy, Clone)]
120
pub struct SparseScheme;
121

122
#[derive(Debug, Copy, Clone)]
123
pub struct DictScheme;
124

125
#[derive(Debug, Copy, Clone)]
126
pub struct RunEndScheme;
127

128
#[derive(Debug, Copy, Clone)]
129
pub struct SequenceScheme;
130

131
/// Threshold for the average run length in an array before we consider run-end encoding.
132
const RUN_END_THRESHOLD: u32 = 4;
133

134
impl Scheme for UncompressedScheme {
135
    type StatsType = IntegerStats;
136
    type CodeType = IntCode;
137

138
    fn code(&self) -> IntCode {
×
139
        UNCOMPRESSED_SCHEME
×
140
    }
×
141

142
    fn expected_compression_ratio(
×
143
        &self,
×
144
        _stats: &IntegerStats,
×
145
        _is_sample: bool,
×
146
        _allowed_cascading: usize,
×
147
        _excludes: &[IntCode],
×
148
    ) -> VortexResult<f64> {
×
149
        // no compression
150
        Ok(1.0)
×
151
    }
×
152

153
    fn compress(
18✔
154
        &self,
18✔
155
        stats: &IntegerStats,
18✔
156
        _is_sample: bool,
18✔
157
        _allowed_cascading: usize,
18✔
158
        _excludes: &[IntCode],
18✔
159
    ) -> VortexResult<ArrayRef> {
18✔
160
        Ok(stats.source().to_array())
18✔
161
    }
18✔
162
}
163

164
impl Scheme for ConstantScheme {
165
    type StatsType = IntegerStats;
166
    type CodeType = IntCode;
167

168
    fn code(&self) -> IntCode {
30✔
169
        CONSTANT_SCHEME
30✔
170
    }
30✔
171

172
    fn is_constant(&self) -> bool {
10✔
173
        true
10✔
174
    }
10✔
175

176
    fn expected_compression_ratio(
20✔
177
        &self,
20✔
178
        stats: &IntegerStats,
20✔
179
        is_sample: bool,
20✔
180
        _allowed_cascading: usize,
20✔
181
        _excludes: &[IntCode],
20✔
182
    ) -> VortexResult<f64> {
20✔
183
        // Never yield ConstantScheme for a sample, it could be a false-positive.
184
        if is_sample {
20✔
185
            return Ok(0.0);
×
186
        }
20✔
187

188
        // Only arrays with one distinct values can be constant compressed.
189
        if stats.distinct_values_count != 1 {
20✔
190
            return Ok(0.0);
8✔
191
        }
12✔
192

193
        // Cannot have mix of nulls and non-nulls
194
        if stats.null_count > 0 && stats.value_count > 0 {
12✔
UNCOV
195
            return Ok(0.0);
×
196
        }
12✔
197

198
        Ok(stats.value_count as f64)
12✔
199
    }
20✔
200

UNCOV
201
    fn compress(
×
UNCOV
202
        &self,
×
UNCOV
203
        stats: &IntegerStats,
×
UNCOV
204
        _is_sample: bool,
×
UNCOV
205
        _allowed_cascading: usize,
×
UNCOV
206
        _excludes: &[IntCode],
×
UNCOV
207
    ) -> VortexResult<ArrayRef> {
×
208
        // We only use Constant encoding if the entire array is constant, never if one of
209
        // the child arrays yields a constant value.
UNCOV
210
        let scalar = stats
×
UNCOV
211
            .source()
×
UNCOV
212
            .as_constant()
×
UNCOV
213
            .vortex_expect("constant array expected");
×
214

UNCOV
215
        Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
×
UNCOV
216
    }
×
217
}
218

219
impl Scheme for FORScheme {
220
    type StatsType = IntegerStats;
221
    type CodeType = IntCode;
222

223
    fn code(&self) -> IntCode {
30✔
224
        FOR_SCHEME
30✔
225
    }
30✔
226

227
    fn expected_compression_ratio(
30✔
228
        &self,
30✔
229
        stats: &IntegerStats,
30✔
230
        _is_sample: bool,
30✔
231
        allowed_cascading: usize,
30✔
232
        _excludes: &[IntCode],
30✔
233
    ) -> VortexResult<f64> {
30✔
234
        // Only apply if we are not at the leaf
235
        if allowed_cascading == 0 {
30✔
236
            return Ok(0.0);
×
237
        }
30✔
238

239
        // All-null cannot be FOR compressed.
240
        if stats.value_count == 0 {
30✔
241
            return Ok(0.0);
×
242
        }
30✔
243

244
        // Only apply when the min is not already zero.
245
        if stats.typed.min_is_zero() {
30✔
246
            return Ok(0.0);
18✔
247
        }
12✔
248

249
        // Difference between max and min
250
        let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
12✔
251
        let bw = match stats.typed.max_minus_min().checked_ilog2() {
12✔
252
            Some(l) => l + 1,
4✔
253
            // If max-min == 0, it we should use a different compression scheme
254
            // as we don't want to bitpack down to 0 bits.
255
            None => return Ok(0.0),
8✔
256
        };
257

258
        // If we're not saving at least 1 byte, don't bother with FOR
259
        if full_width - bw < 8 {
4✔
UNCOV
260
            return Ok(0.0);
×
261
        }
4✔
262

263
        Ok(full_width as f64 / bw as f64)
4✔
264
    }
30✔
265

266
    fn compress(
4✔
267
        &self,
4✔
268
        stats: &IntegerStats,
4✔
269
        is_sample: bool,
4✔
270
        _allowed_cascading: usize,
4✔
271
        excludes: &[IntCode],
4✔
272
    ) -> VortexResult<ArrayRef> {
4✔
273
        let for_array = FoRArray::encode(stats.src.clone())?;
4✔
274
        let biased = for_array.encoded().to_primitive()?;
4✔
275
        let biased_stats = IntegerStats::generate_opts(
4✔
276
            &biased,
4✔
277
            GenerateStatsOptions {
4✔
278
                count_distinct_values: false,
4✔
279
            },
4✔
280
        );
281

282
        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
283
        // of bitpacking.
284
        // NOTE: we could delegate in the future if we had another downstream codec that performs
285
        //  as well.
286
        let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
4✔
287

288
        Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
4✔
289
    }
4✔
290
}
291

292
impl Scheme for ZigZagScheme {
293
    type StatsType = IntegerStats;
294
    type CodeType = IntCode;
295

296
    fn code(&self) -> IntCode {
30✔
297
        ZIGZAG_SCHEME
30✔
298
    }
30✔
299

300
    fn expected_compression_ratio(
30✔
301
        &self,
30✔
302
        stats: &IntegerStats,
30✔
303
        is_sample: bool,
30✔
304
        allowed_cascading: usize,
30✔
305
        excludes: &[IntCode],
30✔
306
    ) -> VortexResult<f64> {
30✔
307
        // ZigZag is only useful when we cascade it with another encoding
308
        if allowed_cascading == 0 {
30✔
309
            return Ok(0.0);
×
310
        }
30✔
311

312
        // Don't try and compress all-null arrays
313
        if stats.value_count == 0 {
30✔
314
            return Ok(0.0);
×
315
        }
30✔
316

317
        // ZigZag is only useful when there are negative values.
318
        if !stats.typed.min_is_negative() {
30✔
319
            return Ok(0.0);
30✔
UNCOV
320
        }
×
321

322
        // Run compression on a sample to see how it performs.
UNCOV
323
        estimate_compression_ratio_with_sampling(
×
UNCOV
324
            self,
×
UNCOV
325
            stats,
×
UNCOV
326
            is_sample,
×
UNCOV
327
            allowed_cascading,
×
UNCOV
328
            excludes,
×
329
        )
330
    }
30✔
331

UNCOV
332
    fn compress(
×
UNCOV
333
        &self,
×
UNCOV
334
        stats: &IntegerStats,
×
UNCOV
335
        is_sample: bool,
×
UNCOV
336
        allowed_cascading: usize,
×
UNCOV
337
        excludes: &[IntCode],
×
UNCOV
338
    ) -> VortexResult<ArrayRef> {
×
339
        // Zigzag encode the values, then recursively compress the inner values.
UNCOV
340
        let zag = zigzag_encode(stats.src.clone())?;
×
UNCOV
341
        let encoded = zag.encoded().to_primitive()?;
×
342

343
        // ZigZag should be after Dict, RunEnd or Sparse.
344
        // We should only do these "container" style compressors once.
UNCOV
345
        let mut new_excludes = vec![
×
UNCOV
346
            ZigZagScheme.code(),
×
UNCOV
347
            DictScheme.code(),
×
UNCOV
348
            RunEndScheme.code(),
×
UNCOV
349
            SparseScheme.code(),
×
350
        ];
UNCOV
351
        new_excludes.extend_from_slice(excludes);
×
352

UNCOV
353
        let compressed =
×
UNCOV
354
            IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
×
355

UNCOV
356
        log::debug!("zigzag output: {}", compressed.display_tree());
×
357

UNCOV
358
        Ok(ZigZagArray::try_new(compressed)?.into_array())
×
UNCOV
359
    }
×
360
}
361

362
impl Scheme for BitPackingScheme {
363
    type StatsType = IntegerStats;
364
    type CodeType = IntCode;
365

366
    fn code(&self) -> IntCode {
30✔
367
        BITPACKING_SCHEME
30✔
368
    }
30✔
369

370
    #[allow(clippy::cast_possible_truncation)]
371
    fn expected_compression_ratio(
30✔
372
        &self,
30✔
373
        stats: &IntegerStats,
30✔
374
        is_sample: bool,
30✔
375
        allowed_cascading: usize,
30✔
376
        excludes: &[IntCode],
30✔
377
    ) -> VortexResult<f64> {
30✔
378
        // BitPacking only works for non-negative values
379
        if stats.typed.min_is_negative() {
30✔
UNCOV
380
            return Ok(0.0);
×
381
        }
30✔
382

383
        // Don't compress all-null arrays
384
        if stats.value_count == 0 {
30✔
385
            return Ok(0.0);
×
386
        }
30✔
387

388
        estimate_compression_ratio_with_sampling(
30✔
389
            self,
30✔
390
            stats,
30✔
391
            is_sample,
30✔
392
            allowed_cascading,
30✔
393
            excludes,
30✔
394
        )
395
    }
30✔
396

397
    #[allow(clippy::cast_possible_truncation)]
398
    fn compress(
38✔
399
        &self,
38✔
400
        stats: &IntegerStats,
38✔
401
        _is_sample: bool,
38✔
402
        _allowed_cascading: usize,
38✔
403
        _excludes: &[IntCode],
38✔
404
    ) -> VortexResult<ArrayRef> {
38✔
405
        let histogram = bit_width_histogram(stats.source())?;
38✔
406
        let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
38✔
407
        // If best bw is determined to be the current bit-width, return the original array.
408
        if bw as usize == stats.source().ptype().bit_width() {
38✔
UNCOV
409
            return Ok(stats.source().clone().into_array());
×
410
        }
38✔
411
        let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
38✔
412

413
        let patches = packed.patches().map(compress_patches).transpose()?;
38✔
414
        packed.replace_patches(patches);
38✔
415

416
        Ok(packed.into_array())
38✔
417
    }
38✔
418
}
419

420
impl Scheme for SparseScheme {
421
    type StatsType = IntegerStats;
422
    type CodeType = IntCode;
423

424
    fn code(&self) -> IntCode {
30✔
425
        SPARSE_SCHEME
30✔
426
    }
30✔
427

428
    // We can avoid asserting the encoding tree instead.
429
    fn expected_compression_ratio(
30✔
430
        &self,
30✔
431
        stats: &IntegerStats,
30✔
432
        _is_sample: bool,
30✔
433
        _allowed_cascading: usize,
30✔
434
        _excludes: &[IntCode],
30✔
435
    ) -> VortexResult<f64> {
30✔
436
        if stats.value_count == 0 {
30✔
437
            // All nulls should use ConstantScheme
438
            return Ok(0.0);
×
439
        }
30✔
440

441
        // If the majority is null, will compress well.
442
        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
30✔
443
            return Ok(stats.src.len() as f64 / stats.value_count as f64);
×
444
        }
30✔
445

446
        // See if the top value accounts for >= 90% of the set values.
447
        let (_, top_count) = stats.typed.top_value_and_count();
30✔
448

449
        if top_count == stats.value_count {
30✔
450
            // top_value is the only value, should use ConstantScheme instead
451
            return Ok(0.0);
18✔
452
        }
12✔
453

454
        let freq = top_count as f64 / stats.value_count as f64;
12✔
455
        if freq >= 0.9 {
12✔
456
            // We only store the positions of the non-top values.
UNCOV
457
            return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
×
458
        }
12✔
459

460
        Ok(0.0)
12✔
461
    }
30✔
462

UNCOV
463
    fn compress(
×
UNCOV
464
        &self,
×
UNCOV
465
        stats: &IntegerStats,
×
UNCOV
466
        is_sample: bool,
×
UNCOV
467
        allowed_cascading: usize,
×
UNCOV
468
        excludes: &[IntCode],
×
UNCOV
469
    ) -> VortexResult<ArrayRef> {
×
UNCOV
470
        assert!(allowed_cascading > 0);
×
UNCOV
471
        let (top_pvalue, top_count) = stats.typed.top_value_and_count();
×
UNCOV
472
        if top_count as usize == stats.src.len() {
×
473
            // top_value is the only value, use ConstantScheme
474
            return Ok(ConstantArray::new(
×
475
                Scalar::primitive_value(
×
476
                    top_pvalue,
×
477
                    top_pvalue.ptype(),
×
478
                    stats.src.dtype().nullability(),
×
479
                ),
×
480
                stats.src.len(),
×
481
            )
×
482
            .into_array());
×
UNCOV
483
        }
×
484

UNCOV
485
        let sparse_encoded = SparseArray::encode(
×
UNCOV
486
            stats.src.as_ref(),
×
UNCOV
487
            Some(Scalar::primitive_value(
×
UNCOV
488
                top_pvalue,
×
UNCOV
489
                top_pvalue.ptype(),
×
UNCOV
490
                stats.src.dtype().nullability(),
×
UNCOV
491
            )),
×
492
        )?;
×
493

UNCOV
494
        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
×
495
            // Compress the values
UNCOV
496
            let mut new_excludes = vec![SparseScheme.code()];
×
UNCOV
497
            new_excludes.extend_from_slice(excludes);
×
498

UNCOV
499
            let compressed_values = IntCompressor::compress_no_dict(
×
UNCOV
500
                &sparse.patches().values().to_primitive()?,
×
UNCOV
501
                is_sample,
×
UNCOV
502
                allowed_cascading - 1,
×
UNCOV
503
                &new_excludes,
×
504
            )?;
×
505

UNCOV
506
            let indices =
×
UNCOV
507
                downscale_integer_array(sparse.patches().indices().clone())?.to_primitive()?;
×
508

UNCOV
509
            let compressed_indices = IntCompressor::compress_no_dict(
×
UNCOV
510
                &indices,
×
UNCOV
511
                is_sample,
×
UNCOV
512
                allowed_cascading - 1,
×
UNCOV
513
                &new_excludes,
×
514
            )?;
×
515

UNCOV
516
            SparseArray::try_new(
×
UNCOV
517
                compressed_indices,
×
UNCOV
518
                compressed_values,
×
UNCOV
519
                sparse.len(),
×
UNCOV
520
                sparse.fill_scalar().clone(),
×
521
            )
UNCOV
522
            .map(|a| a.into_array())
×
523
        } else {
524
            Ok(sparse_encoded)
×
525
        }
UNCOV
526
    }
×
527
}
528

529
impl Scheme for DictScheme {
530
    type StatsType = IntegerStats;
531
    type CodeType = IntCode;
532

533
    fn code(&self) -> IntCode {
30✔
534
        DICT_SCHEME
30✔
535
    }
30✔
536

537
    fn expected_compression_ratio(
30✔
538
        &self,
30✔
539
        stats: &IntegerStats,
30✔
540
        _is_sample: bool,
30✔
541
        allowed_cascading: usize,
30✔
542
        _excludes: &[IntCode],
30✔
543
    ) -> VortexResult<f64> {
30✔
544
        // Dict should not be terminal.
545
        if allowed_cascading == 0 {
30✔
546
            return Ok(0.0);
×
547
        }
30✔
548

549
        if stats.value_count == 0 {
30✔
550
            return Ok(0.0);
×
551
        }
30✔
552

553
        // If > 50% of the values are distinct, skip dict.
554
        if stats.distinct_values_count > stats.value_count / 2 {
30✔
555
            return Ok(0.0);
30✔
UNCOV
556
        }
×
557

558
        // Ignore nulls encoding for the estimate. We only focus on values.
UNCOV
559
        let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
×
560

561
        // Assume codes are compressed RLE + BitPacking.
UNCOV
562
        let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
×
563

UNCOV
564
        let n_runs = stats.value_count / stats.average_run_length;
×
565

566
        // Assume that codes will either be BitPack or RLE-BitPack
UNCOV
567
        let codes_size_bp = (codes_bw * stats.value_count) as usize;
×
UNCOV
568
        let codes_size_rle_bp = (codes_bw + 32) * n_runs;
×
569

UNCOV
570
        let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
×
571

UNCOV
572
        let before = stats.value_count as usize * stats.source().ptype().bit_width();
×
573

UNCOV
574
        Ok(before as f64 / (values_size + codes_size) as f64)
×
575
    }
30✔
576

UNCOV
577
    fn compress(
×
UNCOV
578
        &self,
×
UNCOV
579
        stats: &IntegerStats,
×
UNCOV
580
        is_sample: bool,
×
UNCOV
581
        allowed_cascading: usize,
×
UNCOV
582
        excludes: &[IntCode],
×
UNCOV
583
    ) -> VortexResult<ArrayRef> {
×
UNCOV
584
        assert!(allowed_cascading > 0);
×
585

586
        // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be
587
        //    RLE or FOR + BP. Cascading probably wastes some time here.
588

UNCOV
589
        let dict = dictionary_encode(stats)?;
×
590

591
        // Cascade the codes child
592
        // Don't allow SequenceArray as the codes child as it merely adds extra indirection without actually compressing data.
UNCOV
593
        let mut new_excludes = vec![DICT_SCHEME, SEQUENCE_SCHEME];
×
UNCOV
594
        new_excludes.extend_from_slice(excludes);
×
595

UNCOV
596
        let compressed_codes = IntCompressor::compress_no_dict(
×
UNCOV
597
            &dict.codes().to_primitive()?,
×
UNCOV
598
            is_sample,
×
UNCOV
599
            allowed_cascading - 1,
×
UNCOV
600
            &new_excludes,
×
601
        )?;
×
602

UNCOV
603
        Ok(DictArray::try_new(compressed_codes, dict.values().clone())?.into_array())
×
UNCOV
604
    }
×
605
}
606

607
impl Scheme for RunEndScheme {
608
    type StatsType = IntegerStats;
609
    type CodeType = IntCode;
610

611
    fn code(&self) -> IntCode {
30✔
612
        RUNEND_SCHEME
30✔
613
    }
30✔
614

615
    fn expected_compression_ratio(
30✔
616
        &self,
30✔
617
        stats: &IntegerStats,
30✔
618
        is_sample: bool,
30✔
619
        allowed_cascading: usize,
30✔
620
        excludes: &[IntCode],
30✔
621
    ) -> VortexResult<f64> {
30✔
622
        // If the run length is below the threshold, drop it.
623
        if stats.average_run_length < RUN_END_THRESHOLD {
30✔
624
            return Ok(0.0);
30✔
UNCOV
625
        }
×
626

UNCOV
627
        if allowed_cascading == 0 {
×
628
            return Ok(0.0);
×
UNCOV
629
        }
×
630

631
        // Run compression on a sample, see how it performs.
UNCOV
632
        estimate_compression_ratio_with_sampling(
×
UNCOV
633
            self,
×
UNCOV
634
            stats,
×
UNCOV
635
            is_sample,
×
UNCOV
636
            allowed_cascading,
×
UNCOV
637
            excludes,
×
638
        )
639
    }
30✔
640

UNCOV
641
    fn compress(
×
UNCOV
642
        &self,
×
UNCOV
643
        stats: &IntegerStats,
×
UNCOV
644
        is_sample: bool,
×
UNCOV
645
        allowed_cascading: usize,
×
UNCOV
646
        excludes: &[IntCode],
×
UNCOV
647
    ) -> VortexResult<ArrayRef> {
×
UNCOV
648
        assert!(allowed_cascading > 0);
×
649

650
        // run-end encode the ends
UNCOV
651
        let (ends, values) = runend_encode(&stats.src)?;
×
652

UNCOV
653
        let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
×
UNCOV
654
        new_excludes.extend_from_slice(excludes);
×
655

UNCOV
656
        let ends_stats = IntegerStats::generate_opts(
×
UNCOV
657
            &ends,
×
UNCOV
658
            GenerateStatsOptions {
×
UNCOV
659
                count_distinct_values: false,
×
UNCOV
660
            },
×
661
        );
UNCOV
662
        let ends_scheme = IntCompressor::choose_scheme(
×
UNCOV
663
            &ends_stats,
×
UNCOV
664
            is_sample,
×
UNCOV
665
            allowed_cascading - 1,
×
UNCOV
666
            &new_excludes,
×
667
        )?;
×
UNCOV
668
        let compressed_ends =
×
UNCOV
669
            ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
×
670

UNCOV
671
        let compressed_values = IntCompressor::compress_no_dict(
×
UNCOV
672
            &values.to_primitive()?,
×
UNCOV
673
            is_sample,
×
UNCOV
674
            allowed_cascading - 1,
×
UNCOV
675
            &new_excludes,
×
676
        )?;
×
677

UNCOV
678
        Ok(RunEndArray::try_new(compressed_ends, compressed_values)?.into_array())
×
UNCOV
679
    }
×
680
}
681

682
impl Scheme for SequenceScheme {
683
    type StatsType = IntegerStats;
684
    type CodeType = IntCode;
685

686
    fn code(&self) -> Self::CodeType {
30✔
687
        SEQUENCE_SCHEME
30✔
688
    }
30✔
689

690
    fn expected_compression_ratio(
30✔
691
        &self,
30✔
692
        stats: &Self::StatsType,
30✔
693
        _is_sample: bool,
30✔
694
        _allowed_cascading: usize,
30✔
695
        _excludes: &[Self::CodeType],
30✔
696
    ) -> VortexResult<f64> {
30✔
697
        if stats.null_count > 0 {
30✔
UNCOV
698
            return Ok(0.0);
×
699
        }
30✔
700
        // Since two values are required to store base and multiplier the
701
        // compression ratio is divided by 2.
702
        Ok(sequence_encode(&stats.src)?
30✔
703
            .map(|_| stats.src.len() as f64 / 2.0)
30✔
704
            .unwrap_or(0.0))
30✔
705
    }
30✔
706

707
    fn compress(
4✔
708
        &self,
4✔
709
        stats: &Self::StatsType,
4✔
710
        _is_sample: bool,
4✔
711
        _allowed_cascading: usize,
4✔
712
        _excludes: &[Self::CodeType],
4✔
713
    ) -> VortexResult<ArrayRef> {
4✔
714
        if stats.null_count > 0 {
4✔
715
            vortex_bail!("sequence encoding does not support nulls");
×
716
        }
4✔
717
        sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
4✔
718
    }
4✔
719
}
720

721
#[cfg(test)]
722
mod tests {
723
    use itertools::Itertools;
724
    use log::LevelFilter;
725
    use rand::rngs::StdRng;
726
    use rand::{RngCore, SeedableRng};
727
    use vortex_array::arrays::PrimitiveArray;
728
    use vortex_array::validity::Validity;
729
    use vortex_array::vtable::ValidityHelper;
730
    use vortex_array::{Array, IntoArray, ToCanonical};
731
    use vortex_buffer::{Buffer, BufferMut, buffer, buffer_mut};
732
    use vortex_dict::DictEncoding;
733
    use vortex_sequence::SequenceEncoding;
734
    use vortex_sparse::SparseEncoding;
735
    use vortex_utils::aliases::hash_set::HashSet;
736

737
    use crate::integer::{IntCompressor, IntegerStats, SequenceScheme, SparseScheme};
738
    use crate::{Compressor, CompressorStats, Scheme};
739

740
    #[test]
741
    fn test_empty() {
742
        // Make sure empty array compression does not fail
743
        let result = IntCompressor::compress(
744
            &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
745
            false,
746
            3,
747
            &[],
748
        )
749
        .unwrap();
750

751
        assert!(result.is_empty());
752
    }
753

754
    #[test]
755
    fn test_dict_encodable() {
756
        let mut codes = BufferMut::<i32>::with_capacity(65_535);
757
        // Write some runs of length 3 of a handful of different values. Interrupted by some
758
        // one-off values.
759

760
        let numbers = [0, 10, 50, 100, 1000, 3000]
761
            .into_iter()
762
            .map(|i| 1234 * i)
763
            .collect_vec();
764

765
        let mut rng = StdRng::seed_from_u64(1u64);
766
        while codes.len() < 64000 {
767
            let run_length = rng.next_u32() % 5;
768
            let value = numbers[rng.next_u32() as usize % numbers.len()];
769
            for _ in 0..run_length {
770
                codes.push(value);
771
            }
772
        }
773

774
        let primitive = codes.freeze().into_array().to_primitive().unwrap();
775
        let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
776
        assert_eq!(compressed.encoding_id(), DictEncoding.id());
777
    }
778

779
    #[test]
780
    fn test_window_name() {
781
        env_logger::builder()
782
            .filter(None, LevelFilter::Debug)
783
            .try_init()
784
            .ok();
785

786
        // A test that's meant to mirror the WindowName column from ClickBench.
787
        let mut values = buffer_mut![-1i32; 1_000_000];
788
        let mut visited = HashSet::new();
789
        let mut rng = StdRng::seed_from_u64(1u64);
790
        while visited.len() < 223 {
791
            let random = (rng.next_u32() as usize) % 1_000_000;
792
            if visited.contains(&random) {
793
                continue;
794
            }
795
            visited.insert(random);
796
            // Pick 100 random values to insert.
797
            values[random] = 5 * (rng.next_u64() % 100) as i32;
798
        }
799

800
        let array = values.freeze().into_array().to_primitive().unwrap();
801
        let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
802
        log::info!("WindowName compressed: {}", compressed.display_tree());
803
    }
804

805
    #[test]
806
    fn sparse_with_nulls() {
807
        let array = PrimitiveArray::new(
808
            buffer![189u8, 189, 189, 0, 46],
809
            Validity::from_iter(vec![true, true, true, true, false]),
810
        );
811
        let compressed = SparseScheme
812
            .compress(&IntegerStats::generate(&array), false, 3, &[])
813
            .unwrap();
814
        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
815
        let decoded = compressed.to_primitive().unwrap();
816
        let expected = [189u8, 189, 189, 0, 0];
817
        assert_eq!(decoded.as_slice::<u8>(), &expected);
818
        assert_eq!(decoded.validity(), array.validity());
819
    }
820

821
    #[test]
822
    fn sparse_mostly_nulls() {
823
        let array = PrimitiveArray::new(
824
            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
825
            Validity::from_iter(vec![
826
                false, false, false, false, false, false, false, false, false, false, true,
827
            ]),
828
        );
829
        let compressed = SparseScheme
830
            .compress(&IntegerStats::generate(&array), false, 3, &[])
831
            .unwrap();
832
        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
833
        let decoded = compressed.to_primitive().unwrap();
834
        let expected = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46];
835
        assert_eq!(decoded.as_slice::<u8>(), &expected);
836
        assert_eq!(decoded.validity(), array.validity());
837
    }
838

839
    #[test]
840
    fn nullable_sequence() {
841
        let values = (0i32..20).step_by(7).collect_vec();
842
        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
843
        let compressed = SequenceScheme
844
            .compress(&IntegerStats::generate(&array), false, 3, &[])
845
            .unwrap();
846
        assert_eq!(compressed.encoding_id(), SequenceEncoding.id());
847
        let decoded = compressed.to_primitive().unwrap();
848
        assert_eq!(decoded.as_slice::<i32>(), values.as_slice());
849
    }
850
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc