• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16268681402

14 Jul 2025 01:49PM UTC coverage: 81.464% (+0.2%) from 81.235%
16268681402

Pull #3856

github

web-flow
Merge 3f50333da into 52555adbb
Pull Request #3856: fix: Drain prefetch buffer

15 of 27 new or added lines in 2 files covered. (55.56%)

115 existing lines in 13 files now uncovered.

46110 of 56602 relevant lines covered (81.46%)

146503.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.61
/vortex-btrblocks/src/integer.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
pub mod dictionary;
5
mod stats;
6

7
use std::fmt::Debug;
8
use std::hash::Hash;
9

10
pub use stats::IntegerStats;
11
use vortex_array::arrays::{ConstantArray, PrimitiveArray, PrimitiveVTable};
12
use vortex_array::compress::downscale_integer_array;
13
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
14
use vortex_dict::DictArray;
15
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_bail, vortex_err};
16
use vortex_fastlanes::{FoRArray, bit_width_histogram, bitpack_encode, find_best_bit_width};
17
use vortex_runend::RunEndArray;
18
use vortex_runend::compress::runend_encode;
19
use vortex_scalar::Scalar;
20
use vortex_sequence::sequence_encode;
21
use vortex_sparse::{SparseArray, SparseVTable};
22
use vortex_zigzag::{ZigZagArray, zigzag_encode};
23

24
use crate::integer::dictionary::dictionary_encode;
25
use crate::patches::compress_patches;
26
use crate::{
27
    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
28
    estimate_compression_ratio_with_sampling,
29
};
30

31
pub struct IntCompressor;
32

33
impl Compressor for IntCompressor {
34
    type ArrayVTable = PrimitiveVTable;
35
    type SchemeType = dyn IntegerScheme;
36
    type StatsType = IntegerStats;
37

38
    fn schemes() -> &'static [&'static dyn IntegerScheme] {
15,657✔
39
        &[
15,657✔
40
            &ConstantScheme,
15,657✔
41
            &FORScheme,
15,657✔
42
            &ZigZagScheme,
15,657✔
43
            &BitPackingScheme,
15,657✔
44
            &SparseScheme,
15,657✔
45
            &DictScheme,
15,657✔
46
            &RunEndScheme,
15,657✔
47
            &SequenceScheme,
15,657✔
48
        ]
15,657✔
49
    }
15,657✔
50

51
    fn default_scheme() -> &'static Self::SchemeType {
10,382✔
52
        &UncompressedScheme
10,382✔
53
    }
10,382✔
54

55
    fn dict_scheme_code() -> IntCode {
13,887✔
56
        DICT_SCHEME
13,887✔
57
    }
13,887✔
58
}
59

60
impl IntCompressor {
61
    pub fn compress_no_dict(
902✔
62
        array: &PrimitiveArray,
902✔
63
        is_sample: bool,
902✔
64
        allowed_cascading: usize,
902✔
65
        excludes: &[IntCode],
902✔
66
    ) -> VortexResult<ArrayRef> {
902✔
67
        let stats = IntegerStats::generate_opts(
902✔
68
            array,
902✔
69
            GenerateStatsOptions {
902✔
70
                count_distinct_values: false,
902✔
71
            },
902✔
72
        );
902✔
73

74
        let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
902✔
75
        let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
902✔
76

77
        if output.nbytes() < array.nbytes() {
902✔
78
            Ok(output)
29✔
79
        } else {
80
            log::debug!("resulting tree too large: {}", output.tree_display());
873✔
81
            Ok(array.to_array())
873✔
82
        }
83
    }
902✔
84
}
85

86
pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
87

88
// Auto-impl
89
impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
90

91
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
92
pub struct IntCode(u8);
93

94
const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
95
const CONSTANT_SCHEME: IntCode = IntCode(1);
96
const FOR_SCHEME: IntCode = IntCode(2);
97
const ZIGZAG_SCHEME: IntCode = IntCode(3);
98
const BITPACKING_SCHEME: IntCode = IntCode(4);
99
const SPARSE_SCHEME: IntCode = IntCode(5);
100
const DICT_SCHEME: IntCode = IntCode(6);
101
const RUNEND_SCHEME: IntCode = IntCode(7);
102
const SEQUENCE_SCHEME: IntCode = IntCode(8);
103

104
#[derive(Debug, Copy, Clone)]
105
pub struct UncompressedScheme;
106

107
#[derive(Debug, Copy, Clone)]
108
pub struct ConstantScheme;
109

110
#[derive(Debug, Copy, Clone)]
111
pub struct FORScheme;
112

113
#[derive(Debug, Copy, Clone)]
114
pub struct ZigZagScheme;
115

116
#[derive(Debug, Copy, Clone)]
117
pub struct BitPackingScheme;
118

119
#[derive(Debug, Copy, Clone)]
120
pub struct SparseScheme;
121

122
#[derive(Debug, Copy, Clone)]
123
pub struct DictScheme;
124

125
#[derive(Debug, Copy, Clone)]
126
pub struct RunEndScheme;
127

128
#[derive(Debug, Copy, Clone)]
129
pub struct SequenceScheme;
130

131
/// Threshold for the average run length in an array before we consider run-end encoding.
132
const RUN_END_THRESHOLD: u32 = 4;
133

134
impl Scheme for UncompressedScheme {
135
    type StatsType = IntegerStats;
136
    type CodeType = IntCode;
137

138
    fn code(&self) -> IntCode {
×
139
        UNCOMPRESSED_SCHEME
×
140
    }
×
141

142
    fn expected_compression_ratio(
×
143
        &self,
×
144
        _stats: &IntegerStats,
×
145
        _is_sample: bool,
×
UNCOV
146
        _allowed_cascading: usize,
×
UNCOV
147
        _excludes: &[IntCode],
×
UNCOV
148
    ) -> VortexResult<f64> {
×
UNCOV
149
        // no compression
×
UNCOV
150
        Ok(1.0)
×
UNCOV
151
    }
×
152

153
    fn compress(
10,382✔
154
        &self,
10,382✔
155
        stats: &IntegerStats,
10,382✔
156
        _is_sample: bool,
10,382✔
157
        _allowed_cascading: usize,
10,382✔
158
        _excludes: &[IntCode],
10,382✔
159
    ) -> VortexResult<ArrayRef> {
10,382✔
160
        Ok(stats.source().to_array())
10,382✔
161
    }
10,382✔
162
}
163

164
impl Scheme for ConstantScheme {
165
    type StatsType = IntegerStats;
166
    type CodeType = IntCode;
167

168
    fn code(&self) -> IntCode {
15,657✔
169
        CONSTANT_SCHEME
15,657✔
170
    }
15,657✔
171

172
    fn is_constant(&self) -> bool {
8,092✔
173
        true
8,092✔
174
    }
8,092✔
175

176
    fn expected_compression_ratio(
7,565✔
177
        &self,
7,565✔
178
        stats: &IntegerStats,
7,565✔
179
        is_sample: bool,
7,565✔
180
        _allowed_cascading: usize,
7,565✔
181
        _excludes: &[IntCode],
7,565✔
182
    ) -> VortexResult<f64> {
7,565✔
183
        // Never yield ConstantScheme for a sample, it could be a false-positive.
7,565✔
184
        if is_sample {
7,565✔
UNCOV
185
            return Ok(0.0);
×
186
        }
7,565✔
187

7,565✔
188
        // Only arrays with one distinct values can be constant compressed.
7,565✔
189
        if stats.distinct_values_count != 1 {
7,565✔
190
            return Ok(0.0);
3,857✔
191
        }
3,708✔
192

3,708✔
193
        // Cannot have mix of nulls and non-nulls
3,708✔
194
        if stats.null_count > 0 && stats.value_count > 0 {
3,708✔
195
            return Ok(0.0);
95✔
196
        }
3,613✔
197

3,613✔
198
        Ok(stats.value_count as f64)
3,613✔
199
    }
7,565✔
200

201
    fn compress(
309✔
202
        &self,
309✔
203
        stats: &IntegerStats,
309✔
204
        _is_sample: bool,
309✔
205
        _allowed_cascading: usize,
309✔
206
        _excludes: &[IntCode],
309✔
207
    ) -> VortexResult<ArrayRef> {
309✔
208
        // We only use Constant encoding if the entire array is constant, never if one of
309✔
209
        // the child arrays yields a constant value.
309✔
210
        let scalar = stats
309✔
211
            .source()
309✔
212
            .as_constant()
309✔
213
            .vortex_expect("constant array expected");
309✔
214

309✔
215
        Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
309✔
216
    }
309✔
217
}
218

219
impl Scheme for FORScheme {
220
    type StatsType = IntegerStats;
221
    type CodeType = IntCode;
222

223
    fn code(&self) -> IntCode {
15,657✔
224
        FOR_SCHEME
15,657✔
225
    }
15,657✔
226

227
    fn expected_compression_ratio(
15,657✔
228
        &self,
15,657✔
229
        stats: &IntegerStats,
15,657✔
230
        _is_sample: bool,
15,657✔
231
        allowed_cascading: usize,
15,657✔
232
        _excludes: &[IntCode],
15,657✔
233
    ) -> VortexResult<f64> {
15,657✔
234
        // Only apply if we are not at the leaf
15,657✔
235
        if allowed_cascading == 0 {
15,657✔
UNCOV
236
            return Ok(0.0);
×
237
        }
15,657✔
238

15,657✔
239
        // All-null cannot be FOR compressed.
15,657✔
240
        if stats.value_count == 0 {
15,657✔
UNCOV
241
            return Ok(0.0);
×
242
        }
15,657✔
243

15,657✔
244
        // Only apply when the min is not already zero.
15,657✔
245
        if stats.typed.min_is_zero() {
15,657✔
246
            return Ok(0.0);
6,994✔
247
        }
8,663✔
248

8,663✔
249
        // Difference between max and min
8,663✔
250
        let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
8,663✔
251
        let bw = match stats.typed.max_minus_min().checked_ilog2() {
8,663✔
252
            Some(l) => l + 1,
3,113✔
253
            // If max-min == 0, it we should use a different compression scheme
254
            // as we don't want to bitpack down to 0 bits.
255
            None => return Ok(0.0),
5,550✔
256
        };
257

258
        // If we're not saving at least 1 byte, don't bother with FOR
259
        if full_width - bw < 8 {
3,113✔
260
            return Ok(0.0);
56✔
261
        }
3,057✔
262

3,057✔
263
        Ok(full_width as f64 / bw as f64)
3,057✔
264
    }
15,657✔
265

266
    fn compress(
3,034✔
267
        &self,
3,034✔
268
        stats: &IntegerStats,
3,034✔
269
        is_sample: bool,
3,034✔
270
        _allowed_cascading: usize,
3,034✔
271
        excludes: &[IntCode],
3,034✔
272
    ) -> VortexResult<ArrayRef> {
3,034✔
273
        let for_array = FoRArray::encode(stats.src.clone())?;
3,034✔
274
        let biased = for_array.encoded().to_primitive()?;
3,034✔
275
        let biased_stats = IntegerStats::generate_opts(
3,034✔
276
            &biased,
3,034✔
277
            GenerateStatsOptions {
3,034✔
278
                count_distinct_values: false,
3,034✔
279
            },
3,034✔
280
        );
3,034✔
281

282
        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
283
        // of bitpacking.
284
        // NOTE: we could delegate in the future if we had another downstream codec that performs
285
        //  as well.
286
        let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
3,034✔
287

288
        Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
3,034✔
289
    }
3,034✔
290
}
291

292
impl Scheme for ZigZagScheme {
293
    type StatsType = IntegerStats;
294
    type CodeType = IntCode;
295

296
    fn code(&self) -> IntCode {
15,895✔
297
        ZIGZAG_SCHEME
15,895✔
298
    }
15,895✔
299

300
    fn expected_compression_ratio(
15,419✔
301
        &self,
15,419✔
302
        stats: &IntegerStats,
15,419✔
303
        is_sample: bool,
15,419✔
304
        allowed_cascading: usize,
15,419✔
305
        excludes: &[IntCode],
15,419✔
306
    ) -> VortexResult<f64> {
15,419✔
307
        // ZigZag is only useful when we cascade it with another encoding
15,419✔
308
        if allowed_cascading == 0 {
15,419✔
UNCOV
309
            return Ok(0.0);
×
310
        }
15,419✔
311

15,419✔
312
        // Don't try and compress all-null arrays
15,419✔
313
        if stats.value_count == 0 {
15,419✔
UNCOV
314
            return Ok(0.0);
×
315
        }
15,419✔
316

15,419✔
317
        // ZigZag is only useful when there are negative values.
15,419✔
318
        if !stats.typed.min_is_negative() {
15,419✔
319
            return Ok(0.0);
15,181✔
320
        }
238✔
321

238✔
322
        // Run compression on a sample to see how it performs.
238✔
323
        estimate_compression_ratio_with_sampling(
238✔
324
            self,
238✔
325
            stats,
238✔
326
            is_sample,
238✔
327
            allowed_cascading,
238✔
328
            excludes,
238✔
329
        )
238✔
330
    }
15,419✔
331

332
    fn compress(
238✔
333
        &self,
238✔
334
        stats: &IntegerStats,
238✔
335
        is_sample: bool,
238✔
336
        allowed_cascading: usize,
238✔
337
        excludes: &[IntCode],
238✔
338
    ) -> VortexResult<ArrayRef> {
238✔
339
        // Zigzag encode the values, then recursively compress the inner values.
340
        let zag = zigzag_encode(stats.src.clone())?;
238✔
341
        let encoded = zag.encoded().to_primitive()?;
238✔
342

343
        // ZigZag should be after Dict, RunEnd or Sparse.
344
        // We should only do these "container" style compressors once.
345
        let mut new_excludes = vec![
238✔
346
            ZigZagScheme.code(),
238✔
347
            DictScheme.code(),
238✔
348
            RunEndScheme.code(),
238✔
349
            SparseScheme.code(),
238✔
350
        ];
238✔
351
        new_excludes.extend_from_slice(excludes);
238✔
352

353
        let compressed =
238✔
354
            IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
238✔
355

356
        log::debug!("zigzag output: {}", compressed.tree_display());
238✔
357

358
        Ok(ZigZagArray::try_new(compressed)?.into_array())
238✔
359
    }
238✔
360
}
361

362
impl Scheme for BitPackingScheme {
363
    type StatsType = IntegerStats;
364
    type CodeType = IntCode;
365

366
    fn code(&self) -> IntCode {
15,657✔
367
        BITPACKING_SCHEME
15,657✔
368
    }
15,657✔
369

370
    #[allow(clippy::cast_possible_truncation)]
371
    fn expected_compression_ratio(
15,657✔
372
        &self,
15,657✔
373
        stats: &IntegerStats,
15,657✔
374
        is_sample: bool,
15,657✔
375
        allowed_cascading: usize,
15,657✔
376
        excludes: &[IntCode],
15,657✔
377
    ) -> VortexResult<f64> {
15,657✔
378
        // BitPacking only works for non-negative values
15,657✔
379
        if stats.typed.min_is_negative() {
15,657✔
380
            return Ok(0.0);
238✔
381
        }
15,419✔
382

15,419✔
383
        // Don't compress all-null arrays
15,419✔
384
        if stats.value_count == 0 {
15,419✔
UNCOV
385
            return Ok(0.0);
×
386
        }
15,419✔
387

15,419✔
388
        estimate_compression_ratio_with_sampling(
15,419✔
389
            self,
15,419✔
390
            stats,
15,419✔
391
            is_sample,
15,419✔
392
            allowed_cascading,
15,419✔
393
            excludes,
15,419✔
394
        )
15,419✔
395
    }
15,657✔
396

397
    #[allow(clippy::cast_possible_truncation)]
398
    fn compress(
19,428✔
399
        &self,
19,428✔
400
        stats: &IntegerStats,
19,428✔
401
        _is_sample: bool,
19,428✔
402
        _allowed_cascading: usize,
19,428✔
403
        _excludes: &[IntCode],
19,428✔
404
    ) -> VortexResult<ArrayRef> {
19,428✔
405
        let histogram = bit_width_histogram(stats.source())?;
19,428✔
406
        let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
19,428✔
407
        // If best bw is determined to be the current bit-width, return the original array.
408
        if bw as usize == stats.source().ptype().bit_width() {
19,428✔
409
            return Ok(stats.source().clone().into_array());
8✔
410
        }
19,420✔
411
        let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
19,420✔
412

413
        let patches = packed.patches().map(compress_patches).transpose()?;
19,420✔
414
        packed.replace_patches(patches);
19,420✔
415

19,420✔
416
        Ok(packed.into_array())
19,420✔
417
    }
19,428✔
418
}
419

420
impl Scheme for SparseScheme {
421
    type StatsType = IntegerStats;
422
    type CodeType = IntCode;
423

424
    fn code(&self) -> IntCode {
15,902✔
425
        SPARSE_SCHEME
15,902✔
426
    }
15,902✔
427

428
    // We can avoid asserting the encoding tree instead.
429
    fn expected_compression_ratio(
15,405✔
430
        &self,
15,405✔
431
        stats: &IntegerStats,
15,405✔
432
        _is_sample: bool,
15,405✔
433
        _allowed_cascading: usize,
15,405✔
434
        _excludes: &[IntCode],
15,405✔
435
    ) -> VortexResult<f64> {
15,405✔
436
        if stats.value_count == 0 {
15,405✔
437
            // All nulls should use ConstantScheme
UNCOV
438
            return Ok(0.0);
×
439
        }
15,405✔
440

15,405✔
441
        // If the majority is null, will compress well.
15,405✔
442
        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
15,405✔
UNCOV
443
            return Ok(stats.src.len() as f64 / stats.value_count as f64);
×
444
        }
15,405✔
445

15,405✔
446
        // See if the top value accounts for >= 90% of the set values.
15,405✔
447
        let (_, top_count) = stats.typed.top_value_and_count();
15,405✔
448

15,405✔
449
        if top_count == stats.value_count {
15,405✔
450
            // top_value is the only value, should use ConstantScheme instead
451
            return Ok(0.0);
5,131✔
452
        }
10,274✔
453

10,274✔
454
        let freq = top_count as f64 / stats.value_count as f64;
10,274✔
455
        if freq >= 0.9 {
10,274✔
456
            // We only store the positions of the non-top values.
457
            return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
5✔
458
        }
10,269✔
459

10,269✔
460
        Ok(0.0)
10,269✔
461
    }
15,405✔
462

463
    fn compress(
7✔
464
        &self,
7✔
465
        stats: &IntegerStats,
7✔
466
        is_sample: bool,
7✔
467
        allowed_cascading: usize,
7✔
468
        excludes: &[IntCode],
7✔
469
    ) -> VortexResult<ArrayRef> {
7✔
470
        assert!(allowed_cascading > 0);
7✔
471
        let (top_pvalue, top_count) = stats.typed.top_value_and_count();
7✔
472
        if top_count as usize == stats.src.len() {
7✔
473
            // top_value is the only value, use ConstantScheme
474
            return Ok(ConstantArray::new(
×
475
                Scalar::primitive_value(
×
476
                    top_pvalue,
×
UNCOV
477
                    top_pvalue.ptype(),
×
UNCOV
478
                    stats.src.dtype().nullability(),
×
UNCOV
479
                ),
×
UNCOV
480
                stats.src.len(),
×
UNCOV
481
            )
×
UNCOV
482
            .into_array());
×
483
        }
7✔
484

485
        let sparse_encoded = SparseArray::encode(
7✔
486
            stats.src.as_ref(),
7✔
487
            Some(Scalar::primitive_value(
7✔
488
                top_pvalue,
7✔
489
                top_pvalue.ptype(),
7✔
490
                stats.src.dtype().nullability(),
7✔
491
            )),
7✔
492
        )?;
7✔
493

494
        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
7✔
495
            // Compress the values
496
            let mut new_excludes = vec![SparseScheme.code()];
7✔
497
            new_excludes.extend_from_slice(excludes);
7✔
498

499
            let compressed_values = IntCompressor::compress_no_dict(
7✔
500
                &sparse.patches().values().to_primitive()?,
7✔
501
                is_sample,
7✔
502
                allowed_cascading - 1,
7✔
503
                &new_excludes,
7✔
UNCOV
504
            )?;
×
505

506
            let indices =
7✔
507
                downscale_integer_array(sparse.patches().indices().clone())?.to_primitive()?;
7✔
508

509
            let compressed_indices = IntCompressor::compress_no_dict(
7✔
510
                &indices,
7✔
511
                is_sample,
7✔
512
                allowed_cascading - 1,
7✔
513
                &new_excludes,
7✔
514
            )?;
7✔
515

516
            SparseArray::try_new(
7✔
517
                compressed_indices,
7✔
518
                compressed_values,
7✔
519
                sparse.len(),
7✔
520
                sparse.fill_scalar().clone(),
7✔
521
            )
7✔
522
            .map(|a| a.into_array())
7✔
523
        } else {
UNCOV
524
            Ok(sparse_encoded)
×
525
        }
526
    }
7✔
527
}
528

529
impl Scheme for DictScheme {
530
    type StatsType = IntegerStats;
531
    type CodeType = IntCode;
532

533
    fn code(&self) -> IntCode {
17,397✔
534
        DICT_SCHEME
17,397✔
535
    }
17,397✔
536

537
    fn expected_compression_ratio(
13,050✔
538
        &self,
13,050✔
539
        stats: &IntegerStats,
13,050✔
540
        _is_sample: bool,
13,050✔
541
        allowed_cascading: usize,
13,050✔
542
        _excludes: &[IntCode],
13,050✔
543
    ) -> VortexResult<f64> {
13,050✔
544
        // Dict should not be terminal.
13,050✔
545
        if allowed_cascading == 0 {
13,050✔
UNCOV
546
            return Ok(0.0);
×
547
        }
13,050✔
548

13,050✔
549
        if stats.value_count == 0 {
13,050✔
UNCOV
550
            return Ok(0.0);
×
551
        }
13,050✔
552

13,050✔
553
        // If > 50% of the values are distinct, skip dict.
13,050✔
554
        if stats.distinct_values_count > stats.value_count / 2 {
13,050✔
555
            return Ok(0.0);
10,031✔
556
        }
3,019✔
557

3,019✔
558
        // Ignore nulls encoding for the estimate. We only focus on values.
3,019✔
559
        let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
3,019✔
560

3,019✔
561
        // Assume codes are compressed RLE + BitPacking.
3,019✔
562
        let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
3,019✔
563

3,019✔
564
        let n_runs = stats.value_count / stats.average_run_length;
3,019✔
565

3,019✔
566
        // Assume that codes will either be BitPack or RLE-BitPack
3,019✔
567
        let codes_size_bp = (codes_bw * stats.value_count) as usize;
3,019✔
568
        let codes_size_rle_bp = (codes_bw + 32) * n_runs;
3,019✔
569

3,019✔
570
        let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
3,019✔
571

3,019✔
572
        let before = stats.value_count as usize * stats.source().ptype().bit_width();
3,019✔
573

3,019✔
574
        Ok(before as f64 / (values_size + codes_size) as f64)
3,019✔
575
    }
13,050✔
576

577
    fn compress(
3✔
578
        &self,
3✔
579
        stats: &IntegerStats,
3✔
580
        is_sample: bool,
3✔
581
        allowed_cascading: usize,
3✔
582
        excludes: &[IntCode],
3✔
583
    ) -> VortexResult<ArrayRef> {
3✔
584
        assert!(allowed_cascading > 0);
3✔
585

586
        // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be
587
        //    RLE or FOR + BP. Cascading probably wastes some time here.
588

589
        let dict = dictionary_encode(stats)?;
3✔
590

591
        // Cascade the codes child
592
        let mut new_excludes = vec![DICT_SCHEME];
3✔
593
        new_excludes.extend_from_slice(excludes);
3✔
594

595
        let compressed_codes = IntCompressor::compress_no_dict(
3✔
596
            &dict.codes().to_primitive()?,
3✔
597
            is_sample,
3✔
598
            allowed_cascading - 1,
3✔
599
            &new_excludes,
3✔
UNCOV
600
        )?;
×
601

602
        Ok(DictArray::try_new(compressed_codes, dict.values().clone())?.into_array())
3✔
603
    }
3✔
604
}
605

606
impl Scheme for RunEndScheme {
607
    type StatsType = IntegerStats;
608
    type CodeType = IntCode;
609

610
    fn code(&self) -> IntCode {
16,761✔
611
        RUNEND_SCHEME
16,761✔
612
    }
16,761✔
613

614
    fn expected_compression_ratio(
13,687✔
615
        &self,
13,687✔
616
        stats: &IntegerStats,
13,687✔
617
        is_sample: bool,
13,687✔
618
        allowed_cascading: usize,
13,687✔
619
        excludes: &[IntCode],
13,687✔
620
    ) -> VortexResult<f64> {
13,687✔
621
        // If the run length is below the threshold, drop it.
13,687✔
622
        if stats.average_run_length < RUN_END_THRESHOLD {
13,687✔
623
            return Ok(0.0);
13,134✔
624
        }
553✔
625

553✔
626
        if allowed_cascading == 0 {
553✔
UNCOV
627
            return Ok(0.0);
×
628
        }
553✔
629

553✔
630
        // Run compression on a sample, see how it performs.
553✔
631
        estimate_compression_ratio_with_sampling(
553✔
632
            self,
553✔
633
            stats,
553✔
634
            is_sample,
553✔
635
            allowed_cascading,
553✔
636
            excludes,
553✔
637
        )
553✔
638
    }
13,687✔
639

640
    fn compress(
866✔
641
        &self,
866✔
642
        stats: &IntegerStats,
866✔
643
        is_sample: bool,
866✔
644
        allowed_cascading: usize,
866✔
645
        excludes: &[IntCode],
866✔
646
    ) -> VortexResult<ArrayRef> {
866✔
647
        assert!(allowed_cascading > 0);
866✔
648

649
        // run-end encode the ends
650
        let (ends, values) = runend_encode(&stats.src)?;
866✔
651

652
        let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
866✔
653
        new_excludes.extend_from_slice(excludes);
866✔
654

866✔
655
        let ends_stats = IntegerStats::generate_opts(
866✔
656
            &ends,
866✔
657
            GenerateStatsOptions {
866✔
658
                count_distinct_values: false,
866✔
659
            },
866✔
660
        );
866✔
661
        let ends_scheme = IntCompressor::choose_scheme(
866✔
662
            &ends_stats,
866✔
663
            is_sample,
866✔
664
            allowed_cascading - 1,
866✔
665
            &new_excludes,
866✔
666
        )?;
866✔
667
        let compressed_ends =
866✔
668
            ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
866✔
669

670
        let compressed_values = IntCompressor::compress_no_dict(
866✔
671
            &values.to_primitive()?,
866✔
672
            is_sample,
866✔
673
            allowed_cascading - 1,
866✔
674
            &new_excludes,
866✔
UNCOV
675
        )?;
×
676

677
        Ok(RunEndArray::try_new(compressed_ends, compressed_values)?.into_array())
866✔
678
    }
866✔
679
}
680

681
impl Scheme for SequenceScheme {
682
    type StatsType = IntegerStats;
683
    type CodeType = IntCode;
684

685
    fn code(&self) -> Self::CodeType {
15,657✔
686
        SEQUENCE_SCHEME
15,657✔
687
    }
15,657✔
688

689
    fn expected_compression_ratio(
15,657✔
690
        &self,
15,657✔
691
        stats: &Self::StatsType,
15,657✔
692
        _is_sample: bool,
15,657✔
693
        _allowed_cascading: usize,
15,657✔
694
        _excludes: &[Self::CodeType],
15,657✔
695
    ) -> VortexResult<f64> {
15,657✔
696
        if stats.null_count > 0 {
15,657✔
697
            return Ok(0.0);
362✔
698
        }
15,295✔
699
        // Since two values are required to store base and multiplier the
15,295✔
700
        // compression ratio is divided by 2.
15,295✔
701
        Ok(sequence_encode(&stats.src)?
15,295✔
702
            .map(|_| stats.src.len() as f64 / 2.0)
15,295✔
703
            .unwrap_or(0.0))
15,295✔
704
    }
15,657✔
705

706
    fn compress(
637✔
707
        &self,
637✔
708
        stats: &Self::StatsType,
637✔
709
        _is_sample: bool,
637✔
710
        _allowed_cascading: usize,
637✔
711
        _excludes: &[Self::CodeType],
637✔
712
    ) -> VortexResult<ArrayRef> {
637✔
713
        if stats.null_count > 0 {
637✔
UNCOV
714
            vortex_bail!("sequence encoding does not support nulls");
×
715
        }
637✔
716
        sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
637✔
717
    }
637✔
718
}
719

720
#[cfg(test)]
721
mod tests {
722
    use itertools::Itertools;
723
    use log::LevelFilter;
724
    use rand::rngs::StdRng;
725
    use rand::{RngCore, SeedableRng};
726
    use vortex_array::arrays::PrimitiveArray;
727
    use vortex_array::validity::Validity;
728
    use vortex_array::vtable::ValidityHelper;
729
    use vortex_array::{Array, IntoArray, ToCanonical};
730
    use vortex_buffer::{Buffer, BufferMut, buffer, buffer_mut};
731
    use vortex_sequence::SequenceEncoding;
732
    use vortex_sparse::SparseEncoding;
733
    use vortex_utils::aliases::hash_set::HashSet;
734

735
    use crate::integer::{IntCompressor, IntegerStats, SequenceScheme, SparseScheme};
736
    use crate::{Compressor, CompressorStats, Scheme};
737

738
    #[test]
739
    fn test_empty() {
1✔
740
        // Make sure empty array compression does not fail
1✔
741
        let result = IntCompressor::compress(
1✔
742
            &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
1✔
743
            false,
1✔
744
            3,
1✔
745
            &[],
1✔
746
        )
1✔
747
        .unwrap();
1✔
748

1✔
749
        assert!(result.is_empty());
1✔
750
    }
1✔
751

752
    #[test]
753
    fn test_dict_encodable() {
1✔
754
        let mut codes = BufferMut::<i32>::with_capacity(65_535);
1✔
755
        // Write some runs of length 3 of a handful of different values. Interrupted by some
1✔
756
        // one-off values.
1✔
757

1✔
758
        let numbers = [0, 10, 50, 100, 1000, 3000]
1✔
759
            .into_iter()
1✔
760
            .map(|i| 1234 * i)
6✔
761
            .collect_vec();
1✔
762

1✔
763
        let mut rng = StdRng::seed_from_u64(1u64);
1✔
764
        while codes.len() < 64000 {
31,916✔
765
            let run_length = rng.next_u32() % 5;
31,915✔
766
            let value = numbers[rng.next_u32() as usize % numbers.len()];
31,915✔
767
            for _ in 0..run_length {
64,000✔
768
                codes.push(value);
64,000✔
769
            }
64,000✔
770
        }
771

772
        let primitive = codes.freeze().into_array().to_primitive().unwrap();
1✔
773
        let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
1✔
774
        log::info!("compressed values: {}", compressed.tree_display());
1✔
775
    }
1✔
776

777
    #[test]
778
    fn test_window_name() {
1✔
779
        env_logger::builder()
1✔
780
            .filter(None, LevelFilter::Debug)
1✔
781
            .try_init()
1✔
782
            .ok();
1✔
783

1✔
784
        // A test that's meant to mirror the WindowName column from ClickBench.
1✔
785
        let mut values = buffer_mut![-1i32; 1_000_000];
1✔
786
        let mut visited = HashSet::new();
1✔
787
        let mut rng = StdRng::seed_from_u64(1u64);
1✔
788
        while visited.len() < 223 {
224✔
789
            let random = (rng.next_u32() as usize) % 1_000_000;
223✔
790
            if visited.contains(&random) {
223✔
791
                continue;
792
            }
223✔
793
            visited.insert(random);
223✔
794
            // Pick 100 random values to insert.
223✔
795
            values[random] = 5 * (rng.next_u64() % 100) as i32;
223✔
796
        }
797

798
        let array = values.freeze().into_array().to_primitive().unwrap();
1✔
799
        let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
1✔
800
        log::info!("WindowName compressed: {}", compressed.tree_display());
1✔
801
    }
1✔
802

803
    #[test]
804
    fn sparse_with_nulls() {
1✔
805
        let array = PrimitiveArray::new(
1✔
806
            buffer![189u8, 189, 189, 0, 46],
1✔
807
            Validity::from_iter(vec![true, true, true, true, false]),
1✔
808
        );
1✔
809
        let compressed = SparseScheme
1✔
810
            .compress(&IntegerStats::generate(&array), false, 3, &[])
1✔
811
            .unwrap();
1✔
812
        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
1✔
813
        let decoded = compressed.to_primitive().unwrap();
1✔
814
        let expected = [189u8, 189, 189, 0, 0];
1✔
815
        assert_eq!(decoded.as_slice::<u8>(), &expected);
1✔
816
        assert_eq!(decoded.validity(), array.validity());
1✔
817
    }
1✔
818

819
    #[test]
820
    fn sparse_mostly_nulls() {
1✔
821
        let array = PrimitiveArray::new(
1✔
822
            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
1✔
823
            Validity::from_iter(vec![
1✔
824
                false, false, false, false, false, false, false, false, false, false, true,
1✔
825
            ]),
1✔
826
        );
1✔
827
        let compressed = SparseScheme
1✔
828
            .compress(&IntegerStats::generate(&array), false, 3, &[])
1✔
829
            .unwrap();
1✔
830
        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
1✔
831
        let decoded = compressed.to_primitive().unwrap();
1✔
832
        let expected = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46];
1✔
833
        assert_eq!(decoded.as_slice::<u8>(), &expected);
1✔
834
        assert_eq!(decoded.validity(), array.validity());
1✔
835
    }
1✔
836

837
    #[test]
838
    fn nullable_sequence() {
1✔
839
        let values = (0i32..20).step_by(7).collect_vec();
1✔
840
        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1✔
841
        let compressed = SequenceScheme
1✔
842
            .compress(&IntegerStats::generate(&array), false, 3, &[])
1✔
843
            .unwrap();
1✔
844
        assert_eq!(compressed.encoding_id(), SequenceEncoding.id());
1✔
845
        let decoded = compressed.to_primitive().unwrap();
1✔
846
        assert_eq!(decoded.as_slice::<i32>(), values.as_slice());
1✔
847
    }
1✔
848
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc