• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16331938722

16 Jul 2025 10:49PM UTC coverage: 80.702% (-0.9%) from 81.557%
16331938722

push

github

web-flow
feat: build with stable rust (#3881)

120 of 173 new or added lines in 28 files covered. (69.36%)

174 existing lines in 102 files now uncovered.

41861 of 51871 relevant lines covered (80.7%)

157487.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.2
/vortex-btrblocks/src/integer.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
pub mod dictionary;
5
mod stats;
6

7
use std::fmt::Debug;
8
use std::hash::Hash;
9

10
pub use stats::IntegerStats;
11
use vortex_array::arrays::{ConstantArray, PrimitiveArray, PrimitiveVTable};
12
use vortex_array::compress::downscale_integer_array;
13
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
14
use vortex_dict::DictArray;
15
use vortex_error::{VortexExpect, VortexResult, VortexUnwrap, vortex_bail, vortex_err};
16
use vortex_fastlanes::{FoRArray, bit_width_histogram, bitpack_encode, find_best_bit_width};
17
use vortex_runend::RunEndArray;
18
use vortex_runend::compress::runend_encode;
19
use vortex_scalar::Scalar;
20
use vortex_sequence::sequence_encode;
21
use vortex_sparse::{SparseArray, SparseVTable};
22
use vortex_zigzag::{ZigZagArray, zigzag_encode};
23

24
use crate::integer::dictionary::dictionary_encode;
25
use crate::patches::compress_patches;
26
use crate::{
27
    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
28
    estimate_compression_ratio_with_sampling,
29
};
30

31
pub struct IntCompressor;
32

33
impl Compressor for IntCompressor {
34
    type ArrayVTable = PrimitiveVTable;
35
    type SchemeType = dyn IntegerScheme;
36
    type StatsType = IntegerStats;
37

38
    fn schemes() -> &'static [&'static dyn IntegerScheme] {
15,771✔
39
        &[
15,771✔
40
            &ConstantScheme,
15,771✔
41
            &FORScheme,
15,771✔
42
            &ZigZagScheme,
15,771✔
43
            &BitPackingScheme,
15,771✔
44
            &SparseScheme,
15,771✔
45
            &DictScheme,
15,771✔
46
            &RunEndScheme,
15,771✔
47
            &SequenceScheme,
15,771✔
48
        ]
15,771✔
49
    }
15,771✔
50

51
    fn default_scheme() -> &'static Self::SchemeType {
10,401✔
52
        &UncompressedScheme
10,401✔
53
    }
10,401✔
54

55
    fn dict_scheme_code() -> IntCode {
14,001✔
56
        DICT_SCHEME
14,001✔
57
    }
14,001✔
58
}
59

60
impl IntCompressor {
61
    pub fn compress_no_dict(
902✔
62
        array: &PrimitiveArray,
902✔
63
        is_sample: bool,
902✔
64
        allowed_cascading: usize,
902✔
65
        excludes: &[IntCode],
902✔
66
    ) -> VortexResult<ArrayRef> {
902✔
67
        let stats = IntegerStats::generate_opts(
902✔
68
            array,
902✔
69
            GenerateStatsOptions {
902✔
70
                count_distinct_values: false,
902✔
71
            },
902✔
72
        );
73

74
        let scheme = Self::choose_scheme(&stats, is_sample, allowed_cascading, excludes)?;
902✔
75
        let output = scheme.compress(&stats, is_sample, allowed_cascading, excludes)?;
902✔
76

77
        if output.nbytes() < array.nbytes() {
902✔
78
            Ok(output)
29✔
79
        } else {
80
            log::debug!("resulting tree too large: {}", output.display_tree());
873✔
81
            Ok(array.to_array())
873✔
82
        }
83
    }
902✔
84
}
85

86
pub trait IntegerScheme: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
87

88
// Auto-impl
89
impl<T> IntegerScheme for T where T: Scheme<StatsType = IntegerStats, CodeType = IntCode> {}
90

91
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
92
pub struct IntCode(u8);
93

94
const UNCOMPRESSED_SCHEME: IntCode = IntCode(0);
95
const CONSTANT_SCHEME: IntCode = IntCode(1);
96
const FOR_SCHEME: IntCode = IntCode(2);
97
const ZIGZAG_SCHEME: IntCode = IntCode(3);
98
const BITPACKING_SCHEME: IntCode = IntCode(4);
99
const SPARSE_SCHEME: IntCode = IntCode(5);
100
const DICT_SCHEME: IntCode = IntCode(6);
101
const RUNEND_SCHEME: IntCode = IntCode(7);
102
const SEQUENCE_SCHEME: IntCode = IntCode(8);
103

104
#[derive(Debug, Copy, Clone)]
105
pub struct UncompressedScheme;
106

107
#[derive(Debug, Copy, Clone)]
108
pub struct ConstantScheme;
109

110
#[derive(Debug, Copy, Clone)]
111
pub struct FORScheme;
112

113
#[derive(Debug, Copy, Clone)]
114
pub struct ZigZagScheme;
115

116
#[derive(Debug, Copy, Clone)]
117
pub struct BitPackingScheme;
118

119
#[derive(Debug, Copy, Clone)]
120
pub struct SparseScheme;
121

122
#[derive(Debug, Copy, Clone)]
123
pub struct DictScheme;
124

125
#[derive(Debug, Copy, Clone)]
126
pub struct RunEndScheme;
127

128
#[derive(Debug, Copy, Clone)]
129
pub struct SequenceScheme;
130

131
/// Threshold for the average run length in an array before we consider run-end encoding.
132
const RUN_END_THRESHOLD: u32 = 4;
133

134
impl Scheme for UncompressedScheme {
135
    type StatsType = IntegerStats;
136
    type CodeType = IntCode;
137

138
    fn code(&self) -> IntCode {
×
139
        UNCOMPRESSED_SCHEME
×
140
    }
×
141

142
    fn expected_compression_ratio(
×
143
        &self,
×
144
        _stats: &IntegerStats,
×
145
        _is_sample: bool,
×
146
        _allowed_cascading: usize,
×
147
        _excludes: &[IntCode],
×
148
    ) -> VortexResult<f64> {
×
149
        // no compression
150
        Ok(1.0)
×
151
    }
×
152

153
    fn compress(
10,401✔
154
        &self,
10,401✔
155
        stats: &IntegerStats,
10,401✔
156
        _is_sample: bool,
10,401✔
157
        _allowed_cascading: usize,
10,401✔
158
        _excludes: &[IntCode],
10,401✔
159
    ) -> VortexResult<ArrayRef> {
10,401✔
160
        Ok(stats.source().to_array())
10,401✔
161
    }
10,401✔
162
}
163

164
impl Scheme for ConstantScheme {
165
    type StatsType = IntegerStats;
166
    type CodeType = IntCode;
167

168
    fn code(&self) -> IntCode {
15,771✔
169
        CONSTANT_SCHEME
15,771✔
170
    }
15,771✔
171

172
    fn is_constant(&self) -> bool {
8,092✔
173
        true
8,092✔
174
    }
8,092✔
175

176
    fn expected_compression_ratio(
7,679✔
177
        &self,
7,679✔
178
        stats: &IntegerStats,
7,679✔
179
        is_sample: bool,
7,679✔
180
        _allowed_cascading: usize,
7,679✔
181
        _excludes: &[IntCode],
7,679✔
182
    ) -> VortexResult<f64> {
7,679✔
183
        // Never yield ConstantScheme for a sample, it could be a false-positive.
184
        if is_sample {
7,679✔
185
            return Ok(0.0);
×
186
        }
7,679✔
187

188
        // Only arrays with one distinct values can be constant compressed.
189
        if stats.distinct_values_count != 1 {
7,679✔
190
            return Ok(0.0);
3,952✔
191
        }
3,727✔
192

193
        // Cannot have mix of nulls and non-nulls
194
        if stats.null_count > 0 && stats.value_count > 0 {
3,727✔
195
            return Ok(0.0);
95✔
196
        }
3,632✔
197

198
        Ok(stats.value_count as f64)
3,632✔
199
    }
7,679✔
200

201
    fn compress(
328✔
202
        &self,
328✔
203
        stats: &IntegerStats,
328✔
204
        _is_sample: bool,
328✔
205
        _allowed_cascading: usize,
328✔
206
        _excludes: &[IntCode],
328✔
207
    ) -> VortexResult<ArrayRef> {
328✔
208
        // We only use Constant encoding if the entire array is constant, never if one of
209
        // the child arrays yields a constant value.
210
        let scalar = stats
328✔
211
            .source()
328✔
212
            .as_constant()
328✔
213
            .vortex_expect("constant array expected");
328✔
214

215
        Ok(ConstantArray::new(scalar, stats.src.len()).into_array())
328✔
216
    }
328✔
217
}
218

219
impl Scheme for FORScheme {
220
    type StatsType = IntegerStats;
221
    type CodeType = IntCode;
222

223
    fn code(&self) -> IntCode {
15,771✔
224
        FOR_SCHEME
15,771✔
225
    }
15,771✔
226

227
    fn expected_compression_ratio(
15,771✔
228
        &self,
15,771✔
229
        stats: &IntegerStats,
15,771✔
230
        _is_sample: bool,
15,771✔
231
        allowed_cascading: usize,
15,771✔
232
        _excludes: &[IntCode],
15,771✔
233
    ) -> VortexResult<f64> {
15,771✔
234
        // Only apply if we are not at the leaf
235
        if allowed_cascading == 0 {
15,771✔
236
            return Ok(0.0);
×
237
        }
15,771✔
238

239
        // All-null cannot be FOR compressed.
240
        if stats.value_count == 0 {
15,771✔
241
            return Ok(0.0);
×
242
        }
15,771✔
243

244
        // Only apply when the min is not already zero.
245
        if stats.typed.min_is_zero() {
15,771✔
246
            return Ok(0.0);
7,070✔
247
        }
8,701✔
248

249
        // Difference between max and min
250
        let full_width: u32 = stats.src.ptype().bit_width().try_into().vortex_unwrap();
8,701✔
251
        let bw = match stats.typed.max_minus_min().checked_ilog2() {
8,701✔
252
            Some(l) => l + 1,
3,151✔
253
            // If max-min == 0, it we should use a different compression scheme
254
            // as we don't want to bitpack down to 0 bits.
255
            None => return Ok(0.0),
5,550✔
256
        };
257

258
        // If we're not saving at least 1 byte, don't bother with FOR
259
        if full_width - bw < 8 {
3,151✔
260
            return Ok(0.0);
56✔
261
        }
3,095✔
262

263
        Ok(full_width as f64 / bw as f64)
3,095✔
264
    }
15,771✔
265

266
    fn compress(
3,072✔
267
        &self,
3,072✔
268
        stats: &IntegerStats,
3,072✔
269
        is_sample: bool,
3,072✔
270
        _allowed_cascading: usize,
3,072✔
271
        excludes: &[IntCode],
3,072✔
272
    ) -> VortexResult<ArrayRef> {
3,072✔
273
        let for_array = FoRArray::encode(stats.src.clone())?;
3,072✔
274
        let biased = for_array.encoded().to_primitive()?;
3,072✔
275
        let biased_stats = IntegerStats::generate_opts(
3,072✔
276
            &biased,
3,072✔
277
            GenerateStatsOptions {
3,072✔
278
                count_distinct_values: false,
3,072✔
279
            },
3,072✔
280
        );
281

282
        // Immediately bitpack. If any other scheme was preferable, it would be chosen instead
283
        // of bitpacking.
284
        // NOTE: we could delegate in the future if we had another downstream codec that performs
285
        //  as well.
286
        let compressed = BitPackingScheme.compress(&biased_stats, is_sample, 0, excludes)?;
3,072✔
287

288
        Ok(FoRArray::try_new(compressed, for_array.reference_scalar().clone())?.into_array())
3,072✔
289
    }
3,072✔
290
}
291

292
impl Scheme for ZigZagScheme {
293
    type StatsType = IntegerStats;
294
    type CodeType = IntCode;
295

296
    fn code(&self) -> IntCode {
16,009✔
297
        ZIGZAG_SCHEME
16,009✔
298
    }
16,009✔
299

300
    fn expected_compression_ratio(
15,533✔
301
        &self,
15,533✔
302
        stats: &IntegerStats,
15,533✔
303
        is_sample: bool,
15,533✔
304
        allowed_cascading: usize,
15,533✔
305
        excludes: &[IntCode],
15,533✔
306
    ) -> VortexResult<f64> {
15,533✔
307
        // ZigZag is only useful when we cascade it with another encoding
308
        if allowed_cascading == 0 {
15,533✔
309
            return Ok(0.0);
×
310
        }
15,533✔
311

312
        // Don't try and compress all-null arrays
313
        if stats.value_count == 0 {
15,533✔
314
            return Ok(0.0);
×
315
        }
15,533✔
316

317
        // ZigZag is only useful when there are negative values.
318
        if !stats.typed.min_is_negative() {
15,533✔
319
            return Ok(0.0);
15,295✔
320
        }
238✔
321

322
        // Run compression on a sample to see how it performs.
323
        estimate_compression_ratio_with_sampling(
238✔
324
            self,
238✔
325
            stats,
238✔
326
            is_sample,
238✔
327
            allowed_cascading,
238✔
328
            excludes,
238✔
329
        )
330
    }
15,533✔
331

332
    fn compress(
238✔
333
        &self,
238✔
334
        stats: &IntegerStats,
238✔
335
        is_sample: bool,
238✔
336
        allowed_cascading: usize,
238✔
337
        excludes: &[IntCode],
238✔
338
    ) -> VortexResult<ArrayRef> {
238✔
339
        // Zigzag encode the values, then recursively compress the inner values.
340
        let zag = zigzag_encode(stats.src.clone())?;
238✔
341
        let encoded = zag.encoded().to_primitive()?;
238✔
342

343
        // ZigZag should be after Dict, RunEnd or Sparse.
344
        // We should only do these "container" style compressors once.
345
        let mut new_excludes = vec![
238✔
346
            ZigZagScheme.code(),
238✔
347
            DictScheme.code(),
238✔
348
            RunEndScheme.code(),
238✔
349
            SparseScheme.code(),
238✔
350
        ];
351
        new_excludes.extend_from_slice(excludes);
238✔
352

353
        let compressed =
238✔
354
            IntCompressor::compress(&encoded, is_sample, allowed_cascading - 1, &new_excludes)?;
238✔
355

356
        log::debug!("zigzag output: {}", compressed.display_tree());
238✔
357

358
        Ok(ZigZagArray::try_new(compressed)?.into_array())
238✔
359
    }
238✔
360
}
361

362
impl Scheme for BitPackingScheme {
363
    type StatsType = IntegerStats;
364
    type CodeType = IntCode;
365

366
    fn code(&self) -> IntCode {
15,771✔
367
        BITPACKING_SCHEME
15,771✔
368
    }
15,771✔
369

370
    #[allow(clippy::cast_possible_truncation)]
371
    fn expected_compression_ratio(
15,771✔
372
        &self,
15,771✔
373
        stats: &IntegerStats,
15,771✔
374
        is_sample: bool,
15,771✔
375
        allowed_cascading: usize,
15,771✔
376
        excludes: &[IntCode],
15,771✔
377
    ) -> VortexResult<f64> {
15,771✔
378
        // BitPacking only works for non-negative values
379
        if stats.typed.min_is_negative() {
15,771✔
380
            return Ok(0.0);
238✔
381
        }
15,533✔
382

383
        // Don't compress all-null arrays
384
        if stats.value_count == 0 {
15,533✔
385
            return Ok(0.0);
×
386
        }
15,533✔
387

388
        estimate_compression_ratio_with_sampling(
15,533✔
389
            self,
15,533✔
390
            stats,
15,533✔
391
            is_sample,
15,533✔
392
            allowed_cascading,
15,533✔
393
            excludes,
15,533✔
394
        )
395
    }
15,771✔
396

397
    #[allow(clippy::cast_possible_truncation)]
398
    fn compress(
19,580✔
399
        &self,
19,580✔
400
        stats: &IntegerStats,
19,580✔
401
        _is_sample: bool,
19,580✔
402
        _allowed_cascading: usize,
19,580✔
403
        _excludes: &[IntCode],
19,580✔
404
    ) -> VortexResult<ArrayRef> {
19,580✔
405
        let histogram = bit_width_histogram(stats.source())?;
19,580✔
406
        let bw = find_best_bit_width(stats.source().ptype(), &histogram)?;
19,580✔
407
        // If best bw is determined to be the current bit-width, return the original array.
408
        if bw as usize == stats.source().ptype().bit_width() {
19,580✔
409
            return Ok(stats.source().clone().into_array());
8✔
410
        }
19,572✔
411
        let mut packed = bitpack_encode(stats.source(), bw, Some(&histogram))?;
19,572✔
412

413
        let patches = packed.patches().map(compress_patches).transpose()?;
19,572✔
414
        packed.replace_patches(patches);
19,572✔
415

416
        Ok(packed.into_array())
19,572✔
417
    }
19,580✔
418
}
419

420
impl Scheme for SparseScheme {
421
    type StatsType = IntegerStats;
422
    type CodeType = IntCode;
423

424
    fn code(&self) -> IntCode {
16,016✔
425
        SPARSE_SCHEME
16,016✔
426
    }
16,016✔
427

428
    // We can avoid asserting the encoding tree instead.
429
    fn expected_compression_ratio(
15,519✔
430
        &self,
15,519✔
431
        stats: &IntegerStats,
15,519✔
432
        _is_sample: bool,
15,519✔
433
        _allowed_cascading: usize,
15,519✔
434
        _excludes: &[IntCode],
15,519✔
435
    ) -> VortexResult<f64> {
15,519✔
436
        if stats.value_count == 0 {
15,519✔
437
            // All nulls should use ConstantScheme
438
            return Ok(0.0);
×
439
        }
15,519✔
440

441
        // If the majority is null, will compress well.
442
        if stats.null_count as f64 / stats.src.len() as f64 > 0.9 {
15,519✔
443
            return Ok(stats.src.len() as f64 / stats.value_count as f64);
×
444
        }
15,519✔
445

446
        // See if the top value accounts for >= 90% of the set values.
447
        let (_, top_count) = stats.typed.top_value_and_count();
15,519✔
448

449
        if top_count == stats.value_count {
15,519✔
450
            // top_value is the only value, should use ConstantScheme instead
451
            return Ok(0.0);
5,150✔
452
        }
10,369✔
453

454
        let freq = top_count as f64 / stats.value_count as f64;
10,369✔
455
        if freq >= 0.9 {
10,369✔
456
            // We only store the positions of the non-top values.
457
            return Ok(stats.value_count as f64 / (stats.value_count - top_count) as f64);
5✔
458
        }
10,364✔
459

460
        Ok(0.0)
10,364✔
461
    }
15,519✔
462

463
    fn compress(
7✔
464
        &self,
7✔
465
        stats: &IntegerStats,
7✔
466
        is_sample: bool,
7✔
467
        allowed_cascading: usize,
7✔
468
        excludes: &[IntCode],
7✔
469
    ) -> VortexResult<ArrayRef> {
7✔
470
        assert!(allowed_cascading > 0);
7✔
471
        let (top_pvalue, top_count) = stats.typed.top_value_and_count();
7✔
472
        if top_count as usize == stats.src.len() {
7✔
473
            // top_value is the only value, use ConstantScheme
474
            return Ok(ConstantArray::new(
×
475
                Scalar::primitive_value(
×
476
                    top_pvalue,
×
477
                    top_pvalue.ptype(),
×
478
                    stats.src.dtype().nullability(),
×
479
                ),
×
480
                stats.src.len(),
×
481
            )
×
482
            .into_array());
×
483
        }
7✔
484

485
        let sparse_encoded = SparseArray::encode(
7✔
486
            stats.src.as_ref(),
7✔
487
            Some(Scalar::primitive_value(
7✔
488
                top_pvalue,
7✔
489
                top_pvalue.ptype(),
7✔
490
                stats.src.dtype().nullability(),
7✔
491
            )),
7✔
UNCOV
492
        )?;
×
493

494
        if let Some(sparse) = sparse_encoded.as_opt::<SparseVTable>() {
7✔
495
            // Compress the values
496
            let mut new_excludes = vec![SparseScheme.code()];
7✔
497
            new_excludes.extend_from_slice(excludes);
7✔
498

499
            let compressed_values = IntCompressor::compress_no_dict(
7✔
500
                &sparse.patches().values().to_primitive()?,
7✔
501
                is_sample,
7✔
502
                allowed_cascading - 1,
7✔
503
                &new_excludes,
7✔
504
            )?;
×
505

506
            let indices =
7✔
507
                downscale_integer_array(sparse.patches().indices().clone())?.to_primitive()?;
7✔
508

509
            let compressed_indices = IntCompressor::compress_no_dict(
7✔
510
                &indices,
7✔
511
                is_sample,
7✔
512
                allowed_cascading - 1,
7✔
513
                &new_excludes,
7✔
UNCOV
514
            )?;
×
515

516
            SparseArray::try_new(
7✔
517
                compressed_indices,
7✔
518
                compressed_values,
7✔
519
                sparse.len(),
7✔
520
                sparse.fill_scalar().clone(),
7✔
521
            )
522
            .map(|a| a.into_array())
7✔
523
        } else {
524
            Ok(sparse_encoded)
×
525
        }
526
    }
7✔
527
}
528

529
impl Scheme for DictScheme {
530
    type StatsType = IntegerStats;
531
    type CodeType = IntCode;
532

533
    fn code(&self) -> IntCode {
17,511✔
534
        DICT_SCHEME
17,511✔
535
    }
17,511✔
536

537
    fn expected_compression_ratio(
13,164✔
538
        &self,
13,164✔
539
        stats: &IntegerStats,
13,164✔
540
        _is_sample: bool,
13,164✔
541
        allowed_cascading: usize,
13,164✔
542
        _excludes: &[IntCode],
13,164✔
543
    ) -> VortexResult<f64> {
13,164✔
544
        // Dict should not be terminal.
545
        if allowed_cascading == 0 {
13,164✔
546
            return Ok(0.0);
×
547
        }
13,164✔
548

549
        if stats.value_count == 0 {
13,164✔
550
            return Ok(0.0);
×
551
        }
13,164✔
552

553
        // If > 50% of the values are distinct, skip dict.
554
        if stats.distinct_values_count > stats.value_count / 2 {
13,164✔
555
            return Ok(0.0);
10,126✔
556
        }
3,038✔
557

558
        // Ignore nulls encoding for the estimate. We only focus on values.
559
        let values_size = stats.source().ptype().bit_width() * stats.distinct_values_count as usize;
3,038✔
560

561
        // Assume codes are compressed RLE + BitPacking.
562
        let codes_bw = usize::BITS - stats.distinct_values_count.leading_zeros();
3,038✔
563

564
        let n_runs = stats.value_count / stats.average_run_length;
3,038✔
565

566
        // Assume that codes will either be BitPack or RLE-BitPack
567
        let codes_size_bp = (codes_bw * stats.value_count) as usize;
3,038✔
568
        let codes_size_rle_bp = (codes_bw + 32) * n_runs;
3,038✔
569

570
        let codes_size = usize::min(codes_size_bp, codes_size_rle_bp as usize);
3,038✔
571

572
        let before = stats.value_count as usize * stats.source().ptype().bit_width();
3,038✔
573

574
        Ok(before as f64 / (values_size + codes_size) as f64)
3,038✔
575
    }
13,164✔
576

577
    fn compress(
3✔
578
        &self,
3✔
579
        stats: &IntegerStats,
3✔
580
        is_sample: bool,
3✔
581
        allowed_cascading: usize,
3✔
582
        excludes: &[IntCode],
3✔
583
    ) -> VortexResult<ArrayRef> {
3✔
584
        assert!(allowed_cascading > 0);
3✔
585

586
        // TODO(aduffy): we can be more prescriptive: we know that codes will EITHER be
587
        //    RLE or FOR + BP. Cascading probably wastes some time here.
588

589
        let dict = dictionary_encode(stats)?;
3✔
590

591
        // Cascade the codes child
592
        let mut new_excludes = vec![DICT_SCHEME];
3✔
593
        new_excludes.extend_from_slice(excludes);
3✔
594

595
        let compressed_codes = IntCompressor::compress_no_dict(
3✔
596
            &dict.codes().to_primitive()?,
3✔
597
            is_sample,
3✔
598
            allowed_cascading - 1,
3✔
599
            &new_excludes,
3✔
600
        )?;
×
601

602
        Ok(DictArray::try_new(compressed_codes, dict.values().clone())?.into_array())
3✔
603
    }
3✔
604
}
605

606
impl Scheme for RunEndScheme {
607
    type StatsType = IntegerStats;
608
    type CodeType = IntCode;
609

610
    fn code(&self) -> IntCode {
16,875✔
611
        RUNEND_SCHEME
16,875✔
612
    }
16,875✔
613

614
    fn expected_compression_ratio(
13,801✔
615
        &self,
13,801✔
616
        stats: &IntegerStats,
13,801✔
617
        is_sample: bool,
13,801✔
618
        allowed_cascading: usize,
13,801✔
619
        excludes: &[IntCode],
13,801✔
620
    ) -> VortexResult<f64> {
13,801✔
621
        // If the run length is below the threshold, drop it.
622
        if stats.average_run_length < RUN_END_THRESHOLD {
13,801✔
623
            return Ok(0.0);
13,248✔
624
        }
553✔
625

626
        if allowed_cascading == 0 {
553✔
627
            return Ok(0.0);
×
628
        }
553✔
629

630
        // Run compression on a sample, see how it performs.
631
        estimate_compression_ratio_with_sampling(
553✔
632
            self,
553✔
633
            stats,
553✔
634
            is_sample,
553✔
635
            allowed_cascading,
553✔
636
            excludes,
553✔
637
        )
638
    }
13,801✔
639

640
    fn compress(
866✔
641
        &self,
866✔
642
        stats: &IntegerStats,
866✔
643
        is_sample: bool,
866✔
644
        allowed_cascading: usize,
866✔
645
        excludes: &[IntCode],
866✔
646
    ) -> VortexResult<ArrayRef> {
866✔
647
        assert!(allowed_cascading > 0);
866✔
648

649
        // run-end encode the ends
650
        let (ends, values) = runend_encode(&stats.src)?;
866✔
651

652
        let mut new_excludes = vec![RunEndScheme.code(), DictScheme.code()];
866✔
653
        new_excludes.extend_from_slice(excludes);
866✔
654

655
        let ends_stats = IntegerStats::generate_opts(
866✔
656
            &ends,
866✔
657
            GenerateStatsOptions {
866✔
658
                count_distinct_values: false,
866✔
659
            },
866✔
660
        );
661
        let ends_scheme = IntCompressor::choose_scheme(
866✔
662
            &ends_stats,
866✔
663
            is_sample,
866✔
664
            allowed_cascading - 1,
866✔
665
            &new_excludes,
866✔
UNCOV
666
        )?;
×
667
        let compressed_ends =
866✔
668
            ends_scheme.compress(&ends_stats, is_sample, allowed_cascading - 1, &new_excludes)?;
866✔
669

670
        let compressed_values = IntCompressor::compress_no_dict(
866✔
671
            &values.to_primitive()?,
866✔
672
            is_sample,
866✔
673
            allowed_cascading - 1,
866✔
674
            &new_excludes,
866✔
675
        )?;
×
676

677
        Ok(RunEndArray::try_new(compressed_ends, compressed_values)?.into_array())
866✔
678
    }
866✔
679
}
680

681
impl Scheme for SequenceScheme {
682
    type StatsType = IntegerStats;
683
    type CodeType = IntCode;
684

685
    fn code(&self) -> Self::CodeType {
15,771✔
686
        SEQUENCE_SCHEME
15,771✔
687
    }
15,771✔
688

689
    fn expected_compression_ratio(
15,771✔
690
        &self,
15,771✔
691
        stats: &Self::StatsType,
15,771✔
692
        _is_sample: bool,
15,771✔
693
        _allowed_cascading: usize,
15,771✔
694
        _excludes: &[Self::CodeType],
15,771✔
695
    ) -> VortexResult<f64> {
15,771✔
696
        if stats.null_count > 0 {
15,771✔
697
            return Ok(0.0);
362✔
698
        }
15,409✔
699
        // Since two values are required to store base and multiplier the
700
        // compression ratio is divided by 2.
701
        Ok(sequence_encode(&stats.src)?
15,409✔
702
            .map(|_| stats.src.len() as f64 / 2.0)
15,409✔
703
            .unwrap_or(0.0))
15,409✔
704
    }
15,771✔
705

706
    fn compress(
675✔
707
        &self,
675✔
708
        stats: &Self::StatsType,
675✔
709
        _is_sample: bool,
675✔
710
        _allowed_cascading: usize,
675✔
711
        _excludes: &[Self::CodeType],
675✔
712
    ) -> VortexResult<ArrayRef> {
675✔
713
        if stats.null_count > 0 {
675✔
714
            vortex_bail!("sequence encoding does not support nulls");
×
715
        }
675✔
716
        sequence_encode(&stats.src)?.ok_or_else(|| vortex_err!("cannot sequence encode array"))
675✔
717
    }
675✔
718
}
719

720
#[cfg(test)]
721
mod tests {
722
    use itertools::Itertools;
723
    use log::LevelFilter;
724
    use rand::rngs::StdRng;
725
    use rand::{RngCore, SeedableRng};
726
    use vortex_array::arrays::PrimitiveArray;
727
    use vortex_array::validity::Validity;
728
    use vortex_array::vtable::ValidityHelper;
729
    use vortex_array::{Array, IntoArray, ToCanonical};
730
    use vortex_buffer::{Buffer, BufferMut, buffer, buffer_mut};
731
    use vortex_sequence::SequenceEncoding;
732
    use vortex_sparse::SparseEncoding;
733
    use vortex_utils::aliases::hash_set::HashSet;
734

735
    use crate::integer::{IntCompressor, IntegerStats, SequenceScheme, SparseScheme};
736
    use crate::{Compressor, CompressorStats, Scheme};
737

738
    #[test]
739
    fn test_empty() {
1✔
740
        // Make sure empty array compression does not fail
741
        let result = IntCompressor::compress(
1✔
742
            &PrimitiveArray::new(Buffer::<i32>::empty(), Validity::NonNullable),
1✔
743
            false,
744
            3,
745
            &[],
1✔
746
        )
747
        .unwrap();
1✔
748

749
        assert!(result.is_empty());
1✔
750
    }
1✔
751

752
    #[test]
753
    fn test_dict_encodable() {
1✔
754
        let mut codes = BufferMut::<i32>::with_capacity(65_535);
1✔
755
        // Write some runs of length 3 of a handful of different values. Interrupted by some
756
        // one-off values.
757

758
        let numbers = [0, 10, 50, 100, 1000, 3000]
1✔
759
            .into_iter()
1✔
760
            .map(|i| 1234 * i)
6✔
761
            .collect_vec();
1✔
762

763
        let mut rng = StdRng::seed_from_u64(1u64);
1✔
764
        while codes.len() < 64000 {
31,916✔
765
            let run_length = rng.next_u32() % 5;
31,915✔
766
            let value = numbers[rng.next_u32() as usize % numbers.len()];
31,915✔
767
            for _ in 0..run_length {
64,000✔
768
                codes.push(value);
64,000✔
769
            }
64,000✔
770
        }
771

772
        let primitive = codes.freeze().into_array().to_primitive().unwrap();
1✔
773
        let compressed = IntCompressor::compress(&primitive, false, 3, &[]).unwrap();
1✔
774
        log::info!("compressed values: {}", compressed.display_tree());
1✔
775
    }
1✔
776

777
    #[test]
778
    fn test_window_name() {
1✔
779
        env_logger::builder()
1✔
780
            .filter(None, LevelFilter::Debug)
1✔
781
            .try_init()
1✔
782
            .ok();
1✔
783

784
        // A test that's meant to mirror the WindowName column from ClickBench.
785
        let mut values = buffer_mut![-1i32; 1_000_000];
1✔
786
        let mut visited = HashSet::new();
1✔
787
        let mut rng = StdRng::seed_from_u64(1u64);
1✔
788
        while visited.len() < 223 {
224✔
789
            let random = (rng.next_u32() as usize) % 1_000_000;
223✔
790
            if visited.contains(&random) {
223✔
791
                continue;
792
            }
223✔
793
            visited.insert(random);
223✔
794
            // Pick 100 random values to insert.
795
            values[random] = 5 * (rng.next_u64() % 100) as i32;
223✔
796
        }
797

798
        let array = values.freeze().into_array().to_primitive().unwrap();
1✔
799
        let compressed = IntCompressor::compress(&array, false, 3, &[]).unwrap();
1✔
800
        log::info!("WindowName compressed: {}", compressed.display_tree());
1✔
801
    }
1✔
802

803
    #[test]
804
    fn sparse_with_nulls() {
1✔
805
        let array = PrimitiveArray::new(
1✔
806
            buffer![189u8, 189, 189, 0, 46],
1✔
807
            Validity::from_iter(vec![true, true, true, true, false]),
1✔
808
        );
809
        let compressed = SparseScheme
1✔
810
            .compress(&IntegerStats::generate(&array), false, 3, &[])
1✔
811
            .unwrap();
1✔
812
        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
1✔
813
        let decoded = compressed.to_primitive().unwrap();
1✔
814
        let expected = [189u8, 189, 189, 0, 0];
1✔
815
        assert_eq!(decoded.as_slice::<u8>(), &expected);
1✔
816
        assert_eq!(decoded.validity(), array.validity());
1✔
817
    }
1✔
818

819
    #[test]
820
    fn sparse_mostly_nulls() {
1✔
821
        let array = PrimitiveArray::new(
1✔
822
            buffer![189u8, 189, 189, 189, 189, 189, 189, 189, 189, 0, 46],
1✔
823
            Validity::from_iter(vec![
1✔
824
                false, false, false, false, false, false, false, false, false, false, true,
825
            ]),
826
        );
827
        let compressed = SparseScheme
1✔
828
            .compress(&IntegerStats::generate(&array), false, 3, &[])
1✔
829
            .unwrap();
1✔
830
        assert_eq!(compressed.encoding_id(), SparseEncoding.id());
1✔
831
        let decoded = compressed.to_primitive().unwrap();
1✔
832
        let expected = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 46];
1✔
833
        assert_eq!(decoded.as_slice::<u8>(), &expected);
1✔
834
        assert_eq!(decoded.validity(), array.validity());
1✔
835
    }
1✔
836

837
    #[test]
838
    fn nullable_sequence() {
1✔
839
        let values = (0i32..20).step_by(7).collect_vec();
1✔
840
        let array = PrimitiveArray::from_option_iter(values.clone().into_iter().map(Some));
1✔
841
        let compressed = SequenceScheme
1✔
842
            .compress(&IntegerStats::generate(&array), false, 3, &[])
1✔
843
            .unwrap();
1✔
844
        assert_eq!(compressed.encoding_id(), SequenceEncoding.id());
1✔
845
        let decoded = compressed.to_primitive().unwrap();
1✔
846
        assert_eq!(decoded.as_slice::<i32>(), values.as_slice());
1✔
847
    }
1✔
848
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc