• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16465419038

23 Jul 2025 08:21AM UTC coverage: 81.135%. Remained the same
16465419038

Pull #3979

github

web-flow
Merge d7f6a6976 into 2b0334c0b
Pull Request #3979: fix: Prevent compressor from choosing SequenceArray for dictionary codes

3 of 3 new or added lines in 1 file covered. (100.0%)

5 existing lines in 1 file now uncovered.

42062 of 51842 relevant lines covered (81.13%)

173532.84 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.2
/vortex-btrblocks/src/integer.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
pub mod dictionary;
5
mod stats;
6

7
use std::fmt::Debug;
8
use std::hash::Hash;
9

10
pub use stats::IntegerStats;
11
use vortex_array::arrays::{ConstantArray, PrimitiveArray, PrimitiveVTable};
12
use vortex_array::compress::downscale_integer_array;
13
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
14
use vortex_dict::DictArray;
15
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_bail, vortex_err};
16
use vortex_fastlanes::{FoRArray, bit_width_histogram, bitpack_encode, find_best_bit_width};
17
use vortex_runend::RunEndArray;
18
use vortex_runend::compress::runend_encode;
19
use vortex_scalar::Scalar;
20
use vortex_sequence::sequence_encode;
21
use vortex_sparse::{SparseArray, SparseVTable};
22
use vortex_zigzag::{ZigZagArray, zigzag_encode};
23

24
use crate::integer::dictionary::dictionary_encode;
25
use crate::patches::compress_patches;
26
use crate::{
27
    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
28
    estimate_compression_ratio_with_sampling,
29
};
30

31
pub struct IntCompressor;
32

33
impl Compressor for IntCompressor {
34
    type ArrayVTable = PrimitiveVTable;
35
    type SchemeType = dyn IntegerScheme;
36
    type StatsType = IntegerStats;
37

38
    fn schemes() -> &'static [&'static dyn IntegerScheme] {
16,350✔
39
        &[
16,350✔
40
            &ConstantScheme,
16,350✔
41
            &FORScheme,
16,350✔
42
            &ZigZagScheme,
16,350✔
43
            &BitPackingScheme,
16,350✔
44
            &SparseScheme,
16,350✔
45
            &DictScheme,
16,350✔
46
            &RunEndScheme,
16,350✔
47
            &SequenceScheme,
16,350✔
48
        ]
16,350✔
49
    }
16,350✔
50

51
    fn default_scheme() -> &'static Self::SchemeType {
10,829✔
52
        &UncompressedScheme
10,829✔
53
    }
10,829✔
54

55
    fn dict_scheme_code() -> IntCode {
14,493✔
56
        DICT_SCHEME
14,493✔
57
    }
14,493✔
58
}
59

60
impl IntCompressor {
61
    pub fn compress_no_dict(
946✔
62
        array: &PrimitiveArray,
946✔
63
        is_sample: bool,
946✔
64
        allowed_cascading: usize,
946✔
65
        excludes: &[IntCode],
946✔
66
    ) -> VortexResult<ArrayRef> {
946✔
67
        let stats = IntegerStats::generate_opts(
946✔
68
            array,
946✔
69
            GenerateStatsOptions {
946✔
70
                count_distinct_values: false,
946✔
71
            },
946✔
72
        );
73

74
        let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
946✔
75
        let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
946✔
76

77
        if output.nbytes() < array.nbytes() {
946✔
78
            Ok(output)
30✔
79
        } else {
80
            log::debug!("resulting tree too large: {}", output.display_tree());
916✔
81
            Ok(array.to_array())
916✔
82
        }
83
    }
946✔
84
}
85

86
pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
87

88
// Auto-impl
89
impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
90

91
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
92
pub struct IntCode(u8);
93

94
const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
95
const CONSTANT_SCHEME: IntCode = IntCode(1);
96
const FOR_SCHEME: IntCode = IntCode(2);
97
const ZIGZAG_SCHEME: IntCode = IntCode(3);
98
const BITPACKING_SCHEME: IntCode = IntCode(4);
99
const SPARSE_SCHEME: IntCode = IntCode(5);
100
const DICT_SCHEME: IntCode = IntCode(6);
101
const RUNEND_SCHEME: IntCode = IntCode(7);
102
const SEQUENCE_SCHEME: IntCode = IntCode(8);
103

104
#[derive(Debug, Copy, Clone)]
105
pub struct UncompressedScheme;
106

107
#[derive(Debug, Copy, Clone)]
108
pub struct ConstantScheme;
109

110
#[derive(Debug, Copy, Clone)]
111
pub struct FORScheme;
112

113
#[derive(Debug, Copy, Clone)]
114
pub struct ZigZagScheme;
115

116
#[derive(Debug, Copy, Clone)]
117
pub struct BitPackingScheme;
118

119
#[derive(Debug, Copy, Clone)]
120
pub struct SparseScheme;
121

122
#[derive(Debug, Copy, Clone)]
123
pub struct DictScheme;
124

125
#[derive(Debug, Copy, Clone)]
126
pub struct RunEndScheme;
127

128
#[derive(Debug, Copy, Clone)]
129
pub struct SequenceScheme;
130

131
/// Threshold for the average run length in an array before we consider run-end encoding.
132
const RUN_END_THRESHOLD: u32 = 4;
133

134
impl Scheme for UncompressedScheme {
135
    type StatsType = IntegerStats;
136
    type CodeType = IntCode;
137

138
    fn code(&self) -> IntCode {
×
139
        UNCOMPRESSED_SCHEME
×
140
    }
×
141

142
    fn expected_compression_ratio(
×
143
        &self,
×
144
        _stats: &IntegerStats,
×
145
        _is_sample: bool,
×
146
        _allowed_cascading: usize,
×
147
        _excludes: &[IntCode],
×
148
    ) -> VortexResult<f64> {
×
149
        // no compression
150
        Ok(1.0)
×
151
    }
×
152

153
    fn compress(
10,829✔
154
        &self,
10,829✔
155
        stats: &IntegerStats,
10,829✔
156
        _is_sample: bool,
10,829✔
157
        _allowed_cascading: usize,
10,829✔
158
        _excludes: &[IntCode],
10,829✔
159
    ) -> VortexResult<ArrayRef> {
10,829✔
160
        Ok(stats.source().to_array())
10,829✔
161
    }
10,829✔
162
}
163

164
impl Scheme for ConstantScheme {
165
    type StatsType = IntegerStats;
166
    type CodeType = IntCode;
167

168
    fn code(&self) -> IntCode {
16,350✔
169
        CONSTANT_SCHEME
16,350✔
170
    }
16,350✔
171

172
    fn is_constant(&self) -> bool {
8,641✔
173
        true
8,641✔
174
    }
8,641✔
175

176
    fn expected_compression_ratio(
7,709✔
177
        &self,
7,709✔
178
        stats: &IntegerStats,
7,709✔
179
        is_sample: bool,
7,709✔
180
        _allowed_cascading: usize,
7,709✔
181
        _excludes: &[IntCode],
7,709✔
182
    ) -> VortexResult<f64> {
7,709✔
183
        // Never yield ConstantScheme for a sample, it could be a false-positive.
184
        if is_sample {
7,709✔
185
            return Ok(0.0);
×
186
        }
7,709✔
187

188
        // Only arrays with one distinct values can be constant compressed.
189
        if stats.distinct_values_count != 1 {
7,709✔
190
            return Ok(0.0);
3,761✔
191
        }
3,948✔
192

193
        // Cannot have mix of nulls and non-nulls
194
        if stats.null_count > 0 && stats.value_count > 0 {
3,948✔
195
            return Ok(0.0);
100✔
196
        }
3,848✔
197

198
        Ok(stats.value_count as f64)
3,848✔
199
    }
7,709✔
200

201
    fn compress(
354✔
202
        &self,
354✔
203
        stats: &IntegerStats,
354✔
204
        _is_sample: bool,
354✔
205
        _allowed_cascading: usize,
354✔
206
        _excludes: &[IntCode],
354✔
207
    ) -> VortexResult<ArrayRef> {
354✔
208
        // We only use Constant encoding if the entire array is constant, never if one of
209
        // the child arrays yields a constant value.
210
        let scalar = stats
354✔
211
            .source()
354✔
212
            .as_constant()
354✔
213
            .vortex_expect("constant array expected");
354✔
214

215
        Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
354✔
216
    }
354✔
217
}
218

219
impl Scheme for FORScheme {
220
    type StatsType = IntegerStats;
221
    type CodeType = IntCode;
222

223
    fn code(&self) -> IntCode {
16,350✔
224
        FOR_SCHEME
16,350✔
225
    }
16,350✔
226

227
    fn expected_compression_ratio(
16,350✔
228
        &self,
16,350✔
229
        stats: &IntegerStats,
16,350✔
230
        _is_sample: bool,
16,350✔
231
        allowed_cascading: usize,
16,350✔
232
        _excludes: &[IntCode],
16,350✔
233
    ) -> VortexResult<f64> {
16,350✔
234
        // Only apply if we are not at the leaf
235
        if allowed_cascading == 0 {
16,350✔
236
            return Ok(0.0);
×
237
        }
16,350✔
238

239
        // All-null cannot be FOR compressed.
240
        if stats.value_count == 0 {
16,350✔
241
            return Ok(0.0);
×
242
        }
16,350✔
243

244
        // Only apply when the min is not already zero.
245
        if stats.typed.min_is_zero() {
16,350✔
246
            return Ok(0.0);
7,384✔
247
        }
8,966✔
248

249
        // Difference between max and min
250
        let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
8,966✔
251
        let bw = match stats.typed.max_minus_min().checked_ilog2() {
8,966✔
252
            Some(l) => l + 1,
3,188✔
253
            // If max-min == 0, it we should use a different compression scheme
254
            // as we don't want to bitpack down to 0 bits.
255
            None => return Ok(0.0),
5,778✔
256
        };
257

258
        // If we're not saving at least 1 byte, don't bother with FOR
259
        if full_width - bw < 8 {
3,188✔
260
            return Ok(0.0);
56✔
261
        }
3,132✔
262

263
        Ok(full_width as f64 / bw as f64)
3,132✔
264
    }
16,350✔
265

266
    fn compress(
3,109✔
267
        &self,
3,109✔
268
        stats: &IntegerStats,
3,109✔
269
        is_sample: bool,
3,109✔
270
        _allowed_cascading: usize,
3,109✔
271
        excludes: &[IntCode],
3,109✔
272
    ) -> VortexResult<ArrayRef> {
3,109✔
273
        let for_array = FoRArray::encode(stats.src.clone())?;
3,109✔
274
        let biased = for_array.encoded().to_primitive()?;
3,109✔
275
        let biased_stats = IntegerStats::generate_opts(
3,109✔
276
            &biased,
3,109✔
277
            GenerateStatsOptions {
3,109✔
278
                count_distinct_values: false,
3,109✔
279
            },
3,109✔
280
        );
281

282
        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
283
        // of bitpacking.
284
        // NOTE: we could delegate in the future if we had another downstream codec that performs
285
        //  as well.
286
        let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
3,109✔
287

288
        Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
3,109✔
289
    }
3,109✔
290
}
291

292
impl Scheme for ZigZagScheme {
293
    type StatsType = IntegerStats;
294
    type CodeType = IntCode;
295

296
    fn code(&self) -> IntCode {
16,600✔
297
        ZIGZAG_SCHEME
16,600✔
298
    }
16,600✔
299

300
    fn expected_compression_ratio(
16,100✔
301
        &self,
16,100✔
302
        stats: &IntegerStats,
16,100✔
303
        is_sample: bool,
16,100✔
304
        allowed_cascading: usize,
16,100✔
305
        excludes: &[IntCode],
16,100✔
306
    ) -> VortexResult<f64> {
16,100✔
307
        // ZigZag is only useful when we cascade it with another encoding
308
        if allowed_cascading == 0 {
16,100✔
309
            return Ok(0.0);
×
310
        }
16,100✔
311

312
        // Don't try and compress all-null arrays
313
        if stats.value_count == 0 {
16,100✔
314
            return Ok(0.0);
×
315
        }
16,100✔
316

317
        // ZigZag is only useful when there are negative values.
318
        if !stats.typed.min_is_negative() {
16,100✔
319
            return Ok(0.0);
15,850✔
320
        }
250✔
321

322
        // Run compression on a sample to see how it performs.
323
        estimate_compression_ratio_with_sampling(
250✔
324
            self,
250✔
325
            stats,
250✔
326
            is_sample,
250✔
327
            allowed_cascading,
250✔
328
            excludes,
250✔
329
        )
330
    }
16,100✔
331

332
    fn compress(
250✔
333
        &self,
250✔
334
        stats: &IntegerStats,
250✔
335
        is_sample: bool,
250✔
336
        allowed_cascading: usize,
250✔
337
        excludes: &[IntCode],
250✔
338
    ) -> VortexResult<ArrayRef> {
250✔
339
        // Zigzag encode the values, then recursively compress the inner values.
340
        let zag = zigzag_encode(stats.src.clone())?;
250✔
341
        let encoded = zag.encoded().to_primitive()?;
250✔
342

343
        // ZigZag should be after Dict, RunEnd or Sparse.
344
        // We should only do these "container" style compressors once.
345
        let mut new_excludes = vec![
250✔
346
            ZigZagScheme.code(),
250✔
347
            DictScheme.code(),
250✔
348
            RunEndScheme.code(),
250✔
349
            SparseScheme.code(),
250✔
350
        ];
351
        new_excludes.extend_from_slice(excludes);
250✔
352

353
        let compressed =
250✔
354
            IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
250✔
355

356
        log::debug!("zigzag output: {}", compressed.display_tree());
250✔
357

358
        Ok(ZigZagArray::try_new(compressed)?.into_array())
250✔
359
    }
250✔
360
}
361

362
impl Scheme for BitPackingScheme {
363
    type StatsType = IntegerStats;
364
    type CodeType = IntCode;
365

366
    fn code(&self) -> IntCode {
16,350✔
367
        BITPACKING_SCHEME
16,350✔
368
    }
16,350✔
369

370
    #[allow(clippy::cast_possible_truncation)]
371
    fn expected_compression_ratio(
16,350✔
372
        &self,
16,350✔
373
        stats: &IntegerStats,
16,350✔
374
        is_sample: bool,
16,350✔
375
        allowed_cascading: usize,
16,350✔
376
        excludes: &[IntCode],
16,350✔
377
    ) -> VortexResult<f64> {
16,350✔
378
        // BitPacking only works for non-negative values
379
        if stats.typed.min_is_negative() {
16,350✔
380
            return Ok(0.0);
250✔
381
        }
16,100✔
382

383
        // Don't compress all-null arrays
384
        if stats.value_count == 0 {
16,100✔
385
            return Ok(0.0);
×
386
        }
16,100✔
387

388
        estimate_compression_ratio_with_sampling(
16,100✔
389
            self,
16,100✔
390
            stats,
16,100✔
391
            is_sample,
16,100✔
392
            allowed_cascading,
16,100✔
393
            excludes,
16,100✔
394
        )
395
    }
16,350✔
396

397
    #[allow(clippy::cast_possible_truncation)]
398
    fn compress(
20,228✔
399
        &self,
20,228✔
400
        stats: &IntegerStats,
20,228✔
401
        _is_sample: bool,
20,228✔
402
        _allowed_cascading: usize,
20,228✔
403
        _excludes: &[IntCode],
20,228✔
404
    ) -> VortexResult<ArrayRef> {
20,228✔
405
        let histogram = bit_width_histogram(stats.source())?;
20,228✔
406
        let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
20,228✔
407
        // If best bw is determined to be the current bit-width, return the original array.
408
        if bw as usize == stats.source().ptype().bit_width() {
20,228✔
409
            return Ok(stats.source().clone().into_array());
8✔
410
        }
20,220✔
411
        let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
20,220✔
412

413
        let patches = packed.patches().map(compress_patches).transpose()?;
20,220✔
414
        packed.replace_patches(patches);
20,220✔
415

416
        Ok(packed.into_array())
20,220✔
417
    }
20,228✔
418
}
419

420
impl Scheme for SparseScheme {
421
    type StatsType = IntegerStats;
422
    type CodeType = IntCode;
423

424
    fn code(&self) -> IntCode {
16,607✔
425
        SPARSE_SCHEME
16,607✔
426
    }
16,607✔
427

428
    // We can avoid asserting the encoding tree instead.
429
    fn expected_compression_ratio(
16,086✔
430
        &self,
16,086✔
431
        stats: &IntegerStats,
16,086✔
432
        _is_sample: bool,
16,086✔
433
        _allowed_cascading: usize,
16,086✔
434
        _excludes: &[IntCode],
16,086✔
435
    ) -> VortexResult<f64> {
16,086✔
436
        if stats.value_count == 0 {
16,086✔
437
            // All nulls should use ConstantScheme
438
            return Ok(0.0);
×
439
        }
16,086✔
440

441
        // If the majority is null, will compress well.
442
        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
16,086✔
443
            return Ok(stats.src.len() as f64 / stats.value_count as f64);
×
444
        }
16,086✔
445

446
        // See if the top value accounts for >= 90% of the set values.
447
        let (_, top_count) = stats.typed.top_value_and_count();
16,086✔
448

449
        if top_count == stats.value_count {
16,086✔
450
            // top_value is the only value, should use ConstantScheme instead
451
            return Ok(0.0);
5,582✔
452
        }
10,504✔
453

454
        let freq = top_count as f64 / stats.value_count as f64;
10,504✔
455
        if freq >= 0.9 {
10,504✔
456
            // We only store the positions of the non-top values.
457
            return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
5✔
458
        }
10,499✔
459

460
        Ok(0.0)
10,499✔
461
    }
16,086✔
462

463
    fn compress(
7✔
464
        &self,
7✔
465
        stats: &IntegerStats,
7✔
466
        is_sample: bool,
7✔
467
        allowed_cascading: usize,
7✔
468
        excludes: &[IntCode],
7✔
469
    ) -> VortexResult<ArrayRef> {
7✔
470
        assert!(allowed_cascading > 0);
7✔
471
        let (top_pvalue, top_count) = stats.typed.top_value_and_count();
7✔
472
        if top_count as usize == stats.src.len() {
7✔
473
            // top_value is the only value, use ConstantScheme
474
            return Ok(ConstantArray::new(
×
475
                Scalar::primitive_value(
×
476
                    top_pvalue,
×
477
                    top_pvalue.ptype(),
×
478
                    stats.src.dtype().nullability(),
×
479
                ),
×
480
                stats.src.len(),
×
481
            )
×
482
            .into_array());
×
483
        }
7✔
484

485
        let sparse_encoded = SparseArray::encode(
7✔
486
            stats.src.as_ref(),
7✔
487
            Some(Scalar::primitive_value(
7✔
488
                top_pvalue,
7✔
489
                top_pvalue.ptype(),
7✔
490
                stats.src.dtype().nullability(),
7✔
491
            )),
7✔
492
        )?;
×
493

494
        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
7✔
495
            // Compress the values
496
            let mut new_excludes = vec![SparseScheme.code()];
7✔
497
            new_excludes.extend_from_slice(excludes);
7✔
498

499
            let compressed_values = IntCompressor::compress_no_dict(
7✔
500
                &sparse.patches().values().to_primitive()?,
7✔
501
                is_sample,
7✔
502
                allowed_cascading - 1,
7✔
503
                &new_excludes,
7✔
504
            )?;
×
505

506
            let indices =
7✔
507
                downscale_integer_array(sparse.patches().indices().clone())?.to_primitive()?;
7✔
508

509
            let compressed_indices = IntCompressor::compress_no_dict(
7✔
510
                &indices,
7✔
511
                is_sample,
7✔
512
                allowed_cascading - 1,
7✔
513
                &new_excludes,
7✔
514
            )?;
×
515

516
            SparseArray::try_new(
7✔
517
                compressed_indices,
7✔
518
                compressed_values,
7✔
519
                sparse.len(),
7✔
520
                sparse.fill_scalar().clone(),
7✔
521
            )
522
            .map(|a| a.into_array())
7✔
523
        } else {
524
            Ok(sparse_encoded)
×
525
        }
526
    }
7✔
527
}
528

529
impl Scheme for DictScheme {
530
    type StatsType = IntegerStats;
531
    type CodeType = IntCode;
532

533
    fn code(&self) -> IntCode {
18,205✔
534
        DICT_SCHEME
18,205✔
535
    }
18,205✔
536

537
    fn expected_compression_ratio(
13,585✔
538
        &self,
13,585✔
539
        stats: &IntegerStats,
13,585✔
540
        _is_sample: bool,
13,585✔
541
        allowed_cascading: usize,
13,585✔
542
        _excludes: &[IntCode],
13,585✔
543
    ) -> VortexResult<f64> {
13,585✔
544
        // Dict should not be terminal.
545
        if allowed_cascading == 0 {
13,585✔
546
            return Ok(0.0);
×
547
        }
13,585✔
548

549
        if stats.value_count == 0 {
13,585✔
550
            return Ok(0.0);
×
551
        }
13,585✔
552

553
        // If > 50% of the values are distinct, skip dict.
554
        if stats.distinct_values_count > stats.value_count / 2 {
13,585✔
555
            return Ok(0.0);
10,335✔
556
        }
3,250✔
557

558
        // Ignore nulls encoding for the estimate. We only focus on values.
559
        let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
3,250✔
560

561
        // Assume codes are compressed RLE + BitPacking.
562
        let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
3,250✔
563

564
        let n_runs = stats.value_count / stats.average_run_length;
3,250✔
565

566
        // Assume that codes will either be BitPack or RLE-BitPack
567
        let codes_size_bp = (codes_bw * stats.value_count) as usize;
3,250✔
568
        let codes_size_rle_bp = (codes_bw + 32) * n_runs;
3,250✔
569

570
        let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
3,250✔
571

572
        let before = stats.value_count as usize * stats.source().ptype().bit_width();
3,250✔
573

574
        Ok(before as f64 / (values_size + codes_size) as f64)
3,250✔
575
    }
13,585✔
576

577
    fn compress(
3✔
578
        &self,
3✔
579
        stats: &IntegerStats,
3✔
580
        is_sample: bool,
3✔
581
        allowed_cascading: usize,
3✔
582
        excludes: &[IntCode],
3✔
583
    ) -> VortexResult<ArrayRef> {
3✔
584
        assert!(allowed_cascading > 0);
3✔
585

586
        // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be
587
        //    RLE or FOR + BP. Cascading probably wastes some time here.
588

589
        let dict = dictionary_encode(stats)?;
3✔
590

591
        // Cascade the codes child
592
        let mut new_excludes = vec![DICT_SCHEME, SEQUENCE_SCHEME];
3✔
593
        new_excludes.extend_from_slice(excludes);
3✔
594

595
        let compressed_codes = IntCompressor::compress_no_dict(
3✔
596
            &dict.codes().to_primitive()?,
3✔
597
            is_sample,
3✔
598
            allowed_cascading - 1,
3✔
599
            &new_excludes,
3✔
UNCOV
600
        )?;
×
601

602
        Ok(DictArray::try_new(compressed_codes, dict.values().clone())?.into_array())
3✔
603
    }
3✔
604
}
605

606
impl Scheme for RunEndScheme {
607
    type StatsType = IntegerStats;
608
    type CodeType = IntCode;
609

610
    fn code(&self) -> IntCode {
17,509✔
611
        RUNEND_SCHEME
17,509✔
612
    }
17,509✔
613

614
    fn expected_compression_ratio(
14,282✔
615
        &self,
14,282✔
616
        stats: &IntegerStats,
14,282✔
617
        is_sample: bool,
14,282✔
618
        allowed_cascading: usize,
14,282✔
619
        excludes: &[IntCode],
14,282✔
620
    ) -> VortexResult<f64> {
14,282✔
621
        // If the run length is below the threshold, drop it.
622
        if stats.average_run_length < RUN_END_THRESHOLD {
14,282✔
623
            return Ok(0.0);
13,706✔
624
        }
576✔
625

626
        if allowed_cascading == 0 {
576✔
UNCOV
627
            return Ok(0.0);
×
628
        }
576✔
629

630
        // Run compression on a sample, see how it performs.
631
        estimate_compression_ratio_with_sampling(
576✔
632
            self,
576✔
633
            stats,
576✔
634
            is_sample,
576✔
635
            allowed_cascading,
576✔
636
            excludes,
576✔
637
        )
638
    }
14,282✔
639

640
    fn compress(
909✔
641
        &self,
909✔
642
        stats: &IntegerStats,
909✔
643
        is_sample: bool,
909✔
644
        allowed_cascading: usize,
909✔
645
        excludes: &[IntCode],
909✔
646
    ) -> VortexResult<ArrayRef> {
909✔
647
        assert!(allowed_cascading > 0);
909✔
648

649
        // run-end encode the ends
650
        let (ends, values) = runend_encode(&stats.src)?;
909✔
651

652
        let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
909✔
653
        new_excludes.extend_from_slice(excludes);
909✔
654

655
        let ends_stats = IntegerStats::generate_opts(
909✔
656
            &ends,
909✔
657
            GenerateStatsOptions {
909✔
658
                count_distinct_values: false,
909✔
659
            },
909✔
660
        );
661
        let ends_scheme = IntCompressor::choose_scheme(
909✔
662
            &ends_stats,
909✔
663
            is_sample,
909✔
664
            allowed_cascading - 1,
909✔
665
            &new_excludes,
909✔
UNCOV
666
        )?;
×
667
        let compressed_ends =
909✔
668
            ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
909✔
669

670
        let compressed_values = IntCompressor::compress_no_dict(
909✔
671
            &values.to_primitive()?,
909✔
672
            is_sample,
909✔
673
            allowed_cascading - 1,
909✔
674
            &new_excludes,
909✔
UNCOV
675
        )?;
×
676

677
        Ok(RunEndArray::try_new(compressed_ends, compressed_values)?.into_array())
909✔
678
    }
909✔
679
}
680

681
impl Scheme for SequenceScheme {
682
    type StatsType = IntegerStats;
683
    type CodeType = IntCode;
684

685
    fn code(&self) -> Self::CodeType {
16,350✔
686
        SEQUENCE_SCHEME
16,350✔
687
    }
16,350✔
688

689
    fn expected_compression_ratio(
16,347✔
690
        &self,
16,347✔
691
        stats: &Self::StatsType,
16,347✔
692
        _is_sample: bool,
16,347✔
693
        _allowed_cascading: usize,
16,347✔
694
        _excludes: &[Self::CodeType],
16,347✔
695
    ) -> VortexResult<f64> {
16,347✔
696
        if stats.null_count > 0 {
16,347✔
697
            return Ok(0.0);
421✔
698
        }
15,926✔
699
        // Since two values are required to store base and multiplier the
700
        // compression ratio is divided by 2.
701
        Ok(sequence_encode(&stats.src)?
15,926✔
702
            .map(|_| stats.src.len() as f64 / 2.0)
15,926✔
703
            .unwrap_or(0.0))
15,926✔
704
    }
16,347✔
705

706
    fn compress(
699✔
707
        &self,
699✔
708
        stats: &Self::StatsType,
699✔
709
        _is_sample: bool,
699✔
710
        _allowed_cascading: usize,
699✔
711
        _excludes: &[Self::CodeType],
699✔
712
    ) -> VortexResult<ArrayRef> {
699✔
713
        if stats.null_count > 0 {
699✔
UNCOV
714
            vortex_bail!("sequence encoding does not support nulls");
×
715
        }
699✔
716
        sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
699✔
717
    }
699✔
718
}
719

720
#[cfg(test)]
721
mod tests {
722
    use itertools::Itertools;
723
    use log::LevelFilter;
724
    use rand::rngs::StdRng;
725
    use rand::{RngCore, SeedableRng};
726
    use vortex_array::arrays::PrimitiveArray;
727
    use vortex_array::validity::Validity;
728
    use vortex_array::vtable::ValidityHelper;
729
    use vortex_array::{Array, IntoArray, ToCanonical};
730
    use vortex_buffer::{Buffer, BufferMut, buffer, buffer_mut};
731
    use vortex_dict::DictEncoding;
732
    use vortex_sequence::SequenceEncoding;
733
    use vortex_sparse::SparseEncoding;
734
    use vortex_utils::aliases::hash_set::HashSet;
735

736
    use crate::integer::{IntCompressor, IntegerStats, SequenceScheme, SparseScheme};
737
    use crate::{Compressor, CompressorStats, Scheme};
738

739
    #[test]
740
    fn test_empty() {
1✔
741
        // Make sure empty array compression does not fail
742
        let result = IntCompressor::compress(
1✔
743
            &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
1✔
744
            false,
745
            3,
746
            &[],
1✔
747
        )
748
        .unwrap();
1✔
749

750
        assert!(result.is_empty());
1✔
751
    }
1✔
752

753
    #[test]
754
    fn test_dict_encodable() {
1✔
755
        let mut codes = BufferMut::<i32>::with_capacity(65_535);
1✔
756
        // Write some runs of length 3 of a handful of different values. Interrupted by some
757
        // one-off values.
758

759
        let numbers = [0, 10, 50, 100, 1000, 3000]
1✔
760
            .into_iter()
1✔
761
            .map(|i| 1234 * i)
6✔
762
            .collect_vec();
1✔
763

764
        let mut rng = StdRng::seed_from_u64(1u64);
1✔
765
        while codes.len() < 64000 {
31,916✔
766
            let run_length = rng.next_u32() % 5;
31,915✔
767
            let value = numbers[rng.next_u32() as usize % numbers.len()];
31,915✔
768
            for _ in 0..run_length {
64,000✔
769
                codes.push(value);
64,000✔
770
            }
64,000✔
771
        }
772

773
        let primitive = codes.freeze().into_array().to_primitive().unwrap();
1✔
774
        let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
1✔
775
        assert_eq!(compressed.encoding_id(), DictEncoding.id());
1✔
776
    }
1✔
777

778
    #[test]
779
    fn test_window_name() {
1✔
780
        env_logger::builder()
1✔
781
            .filter(None, LevelFilter::Debug)
1✔
782
            .try_init()
1✔
783
            .ok();
1✔
784

785
        // A test that's meant to mirror the WindowName column from ClickBench.
786
        let mut values = buffer_mut![-1i32; 1_000_000];
1✔
787
        let mut visited = HashSet::new();
1✔
788
        let mut rng = StdRng::seed_from_u64(1u64);
1✔
789
        while visited.len() < 223 {
224✔
790
            let random = (rng.next_u32() as usize) % 1_000_000;
223✔
791
            if visited.contains(&random) {
223✔
792
                continue;
793
            }
223✔
794
            visited.insert(random);
223✔
795
            // Pick 100 random values to insert.
796
            values[random] = 5 * (rng.next_u64() % 100) as i32;
223✔
797
        }
798

799
        let array = values.freeze().into_array().to_primitive().unwrap();
1✔
800
        let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
1✔
801
        log::info!("WindowName compressed: {}", compressed.display_tree());
1✔
802
    }
1✔
803

804
    #[test]
805
    fn sparse_with_nulls() {
1✔
806
        let array = PrimitiveArray::new(
1✔
807
            buffer![189u8, 189, 189, 0, 46],
1✔
808
            Validity::from_iter(vec![true, true, true, true, false]),
1✔
809
        );
810
        let compressed = SparseScheme
1✔
811
            .compress(&IntegerStats::generate(&array), false, 3, &[])
1✔
812
            .unwrap();
1✔
813
        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
1✔
814
        let decoded = compressed.to_primitive().unwrap();
1✔
815
        let expected = [189u8, 189, 189, 0, 0];
1✔
816
        assert_eq!(decoded.as_slice::<u8>(), &expected);
1✔
817
        assert_eq!(decoded.validity(), array.validity());
1✔
818
    }
1✔
819

820
    #[test]
821
    fn sparse_mostly_nulls() {
1✔
822
        let array = PrimitiveArray::new(
1✔
823
            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
1✔
824
            Validity::from_iter(vec![
1✔
825
                false, false, false, false, false, false, false, false, false, false, true,
826
            ]),
827
        );
828
        let compressed = SparseScheme
1✔
829
            .compress(&IntegerStats::generate(&array), false, 3, &[])
1✔
830
            .unwrap();
1✔
831
        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
1✔
832
        let decoded = compressed.to_primitive().unwrap();
1✔
833
        let expected = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46];
1✔
834
        assert_eq!(decoded.as_slice::<u8>(), &expected);
1✔
835
        assert_eq!(decoded.validity(), array.validity());
1✔
836
    }
1✔
837

838
    #[test]
839
    fn nullable_sequence() {
1✔
840
        let values = (0i32..20).step_by(7).collect_vec();
1✔
841
        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1✔
842
        let compressed = SequenceScheme
1✔
843
            .compress(&IntegerStats::generate(&array), false, 3, &[])
1✔
844
            .unwrap();
1✔
845
        assert_eq!(compressed.encoding_id(), SequenceEncoding.id());
1✔
846
        let decoded = compressed.to_primitive().unwrap();
1✔
847
        assert_eq!(decoded.as_slice::<i32>(), values.as_slice());
1✔
848
    }
1✔
849
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc