• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16331938722

16 Jul 2025 10:49PM UTC coverage: 80.702% (-0.9%) from 81.557%
16331938722

push

github

web-flow
feat: build with stable rust (#3881)

120 of 173 new or added lines in 28 files covered. (69.36%)

174 existing lines in 102 files now uncovered.

41861 of 51871 relevant lines covered (80.7%)

157487.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.34
/vortex-btrblocks/src/float.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
mod dictionary;
5
mod stats;
6

7
use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8
use vortex_array::arrays::{ConstantArray, PrimitiveVTable};
9
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10
use vortex_dict::DictArray;
11
use vortex_dtype::PType;
12
use vortex_error::{VortexExpect, VortexResult, vortex_panic};
13

14
use self::stats::FloatStats;
15
use crate::float::dictionary::dictionary_encode;
16
use crate::integer::{IntCompressor, IntegerStats};
17
use crate::patches::compress_patches;
18
use crate::{
19
    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20
    estimate_compression_ratio_with_sampling, integer,
21
};
22

23
pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
24

25
impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
26

27
pub struct FloatCompressor;
28

29
impl Compressor for FloatCompressor {
30
    type ArrayVTable = PrimitiveVTable;
31
    type SchemeType = dyn FloatScheme;
32
    type StatsType = FloatStats;
33

34
    fn schemes() -> &'static [&'static Self::SchemeType] {
117✔
35
        &[
117✔
36
            &UncompressedScheme,
117✔
37
            &ConstantScheme,
117✔
38
            &ALPScheme,
117✔
39
            &ALPRDScheme,
117✔
40
            &DictScheme,
117✔
41
        ]
117✔
42
    }
117✔
43

44
    fn default_scheme() -> &'static Self::SchemeType {
116✔
45
        &UncompressedScheme
116✔
46
    }
116✔
47

48
    fn dict_scheme_code() -> FloatCode {
117✔
49
        DICT_SCHEME
117✔
50
    }
117✔
51
}
52

53
const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
54
const CONSTANT_SCHEME: FloatCode = FloatCode(1);
55
const ALP_SCHEME: FloatCode = FloatCode(2);
56
const ALPRD_SCHEME: FloatCode = FloatCode(3);
57
const DICT_SCHEME: FloatCode = FloatCode(4);
58
const RUNEND_SCHEME: FloatCode = FloatCode(5);
59

60
#[derive(Debug, Copy, Clone)]
61
struct UncompressedScheme;
62

63
#[derive(Debug, Copy, Clone)]
64
struct ConstantScheme;
65

66
#[derive(Debug, Copy, Clone)]
67
struct ALPScheme;
68

69
#[derive(Debug, Copy, Clone)]
70
struct ALPRDScheme;
71

72
#[derive(Debug, Copy, Clone)]
73
struct DictScheme;
74

75
impl Scheme for UncompressedScheme {
76
    type StatsType = FloatStats;
77
    type CodeType = FloatCode;
78

79
    fn code(&self) -> FloatCode {
117✔
80
        UNCOMPRESSED_SCHEME
117✔
81
    }
117✔
82

83
    fn expected_compression_ratio(
117✔
84
        &self,
117✔
85
        _stats: &Self::StatsType,
117✔
86
        _is_sample: bool,
117✔
87
        _allowed_cascading: usize,
117✔
88
        _excludes: &[FloatCode],
117✔
89
    ) -> VortexResult<f64> {
117✔
90
        Ok(1.0)
117✔
91
    }
117✔
92

93
    fn compress(
116✔
94
        &self,
116✔
95
        stats: &Self::StatsType,
116✔
96
        _is_sample: bool,
116✔
97
        _allowed_cascading: usize,
116✔
98
        _excludes: &[FloatCode],
116✔
99
    ) -> VortexResult<ArrayRef> {
116✔
100
        Ok(stats.source().to_array())
116✔
101
    }
116✔
102
}
103

104
impl Scheme for ConstantScheme {
105
    type StatsType = FloatStats;
106
    type CodeType = FloatCode;
107

108
    fn code(&self) -> FloatCode {
117✔
109
        CONSTANT_SCHEME
117✔
110
    }
117✔
111

112
    fn expected_compression_ratio(
117✔
113
        &self,
117✔
114
        stats: &Self::StatsType,
117✔
115
        is_sample: bool,
117✔
116
        _allowed_cascading: usize,
117✔
117
        _excludes: &[FloatCode],
117✔
118
    ) -> VortexResult<f64> {
117✔
119
        // Never select Constant when sampling
120
        if is_sample {
117✔
121
            return Ok(0.0);
1✔
122
        }
116✔
123

124
        // Can only have 1 distinct value
125
        if stats.distinct_values_count > 1 {
116✔
126
            return Ok(0.0);
40✔
127
        }
76✔
128

129
        // Cannot have mix of nulls and non-nulls
130
        if stats.null_count > 0 && stats.value_count > 0 {
76✔
131
            return Ok(0.0);
×
132
        }
76✔
133

134
        Ok(stats.value_count as f64)
76✔
135
    }
117✔
136

137
    fn compress(
×
138
        &self,
×
139
        stats: &Self::StatsType,
×
140
        _is_sample: bool,
×
141
        _allowed_cascading: usize,
×
142
        _excludes: &[FloatCode],
×
143
    ) -> VortexResult<ArrayRef> {
×
144
        let scalar = stats
×
145
            .source()
×
146
            .as_constant()
×
147
            .vortex_expect("must be constant");
×
148

149
        Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
×
150
    }
×
151
}
152

153
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
154
pub struct FloatCode(u8);
155

156
impl Scheme for ALPScheme {
157
    type StatsType = FloatStats;
158
    type CodeType = FloatCode;
159

160
    fn code(&self) -> FloatCode {
117✔
161
        ALP_SCHEME
117✔
162
    }
117✔
163

164
    fn expected_compression_ratio(
117✔
165
        &self,
117✔
166
        stats: &Self::StatsType,
117✔
167
        is_sample: bool,
117✔
168
        allowed_cascading: usize,
117✔
169
        excludes: &[FloatCode],
117✔
170
    ) -> VortexResult<f64> {
117✔
171
        // We don't support ALP for f16
172
        if stats.source().ptype() == PType::F16 {
117✔
173
            return Ok(0.0);
19✔
174
        }
98✔
175

176
        if allowed_cascading == 0 {
98✔
177
            // ALP does not compress on its own, we need to be able to cascade it with
178
            // an integer compressor.
179
            return Ok(0.0);
×
180
        }
98✔
181

182
        estimate_compression_ratio_with_sampling(
98✔
183
            self,
98✔
184
            stats,
98✔
185
            is_sample,
98✔
186
            allowed_cascading,
98✔
187
            excludes,
98✔
188
        )
189
    }
117✔
190

191
    fn compress(
98✔
192
        &self,
98✔
193
        stats: &FloatStats,
98✔
194
        is_sample: bool,
98✔
195
        allowed_cascading: usize,
98✔
196
        excludes: &[FloatCode],
98✔
197
    ) -> VortexResult<ArrayRef> {
98✔
198
        let alp_encoded = ALPEncoding
98✔
199
            .encode(&stats.source().to_canonical()?, None)?
98✔
200
            .vortex_expect("Input is a supported floating point array");
98✔
201
        let alp = alp_encoded.as_::<ALPVTable>();
98✔
202
        let alp_ints = alp.encoded().to_primitive()?;
98✔
203

204
        // Compress the ALP ints.
205
        // Patches are not compressed. They should be infrequent, and if they are not then we want
206
        // to keep them linear for easy indexing.
207
        let mut int_excludes = Vec::new();
98✔
208
        if excludes.contains(&DICT_SCHEME) {
98✔
209
            int_excludes.push(integer::DictScheme.code());
2✔
210
        }
97✔
211
        if excludes.contains(&RUNEND_SCHEME) {
98✔
212
            int_excludes.push(integer::RunEndScheme.code());
×
213
        }
98✔
214

215
        let compressed_alp_ints =
98✔
216
            IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
98✔
217

218
        let patches = alp.patches().map(compress_patches).transpose()?;
98✔
219

220
        Ok(ALPArray::try_new(compressed_alp_ints, alp.exponents(), patches)?.into_array())
98✔
221
    }
98✔
222
}
223

224
impl Scheme for ALPRDScheme {
225
    type StatsType = FloatStats;
226
    type CodeType = FloatCode;
227

228
    fn code(&self) -> FloatCode {
117✔
229
        ALPRD_SCHEME
117✔
230
    }
117✔
231

232
    fn expected_compression_ratio(
117✔
233
        &self,
117✔
234
        stats: &Self::StatsType,
117✔
235
        is_sample: bool,
117✔
236
        allowed_cascading: usize,
117✔
237
        excludes: &[FloatCode],
117✔
238
    ) -> VortexResult<f64> {
117✔
239
        if stats.source().ptype() == PType::F16 {
117✔
240
            return Ok(0.0);
19✔
241
        }
98✔
242

243
        estimate_compression_ratio_with_sampling(
98✔
244
            self,
98✔
245
            stats,
98✔
246
            is_sample,
98✔
247
            allowed_cascading,
98✔
248
            excludes,
98✔
249
        )
250
    }
117✔
251

252
    fn compress(
98✔
253
        &self,
98✔
254
        stats: &Self::StatsType,
98✔
255
        _is_sample: bool,
98✔
256
        _allowed_cascading: usize,
98✔
257
        _excludes: &[FloatCode],
98✔
258
    ) -> VortexResult<ArrayRef> {
98✔
259
        let encoder = match stats.source().ptype() {
98✔
260
            PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
3✔
261
            PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
95✔
262
            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
×
263
        };
264

265
        let mut alp_rd = encoder.encode(stats.source());
98✔
266

267
        let patches = alp_rd
98✔
268
            .left_parts_patches()
98✔
269
            .map(compress_patches)
98✔
270
            .transpose()?;
98✔
271
        alp_rd.replace_left_parts_patches(patches);
98✔
272

273
        Ok(alp_rd.into_array())
98✔
274
    }
98✔
275
}
276

277
impl Scheme for DictScheme {
278
    type StatsType = FloatStats;
279
    type CodeType = FloatCode;
280

281
    fn code(&self) -> FloatCode {
117✔
282
        DICT_SCHEME
117✔
283
    }
117✔
284

285
    fn expected_compression_ratio(
115✔
286
        &self,
115✔
287
        stats: &Self::StatsType,
115✔
288
        is_sample: bool,
115✔
289
        allowed_cascading: usize,
115✔
290
        excludes: &[FloatCode],
115✔
291
    ) -> VortexResult<f64> {
115✔
292
        if stats.value_count == 0 {
115✔
293
            return Ok(0.0);
19✔
294
        }
96✔
295

296
        // If the array is high cardinality (>50% unique values) skip.
297
        if stats.distinct_values_count > stats.value_count / 2 {
96✔
298
            return Ok(0.0);
95✔
299
        }
1✔
300

301
        // Take a sample and run compression on the sample to determine before/after size.
302
        estimate_compression_ratio_with_sampling(
1✔
303
            self,
1✔
304
            stats,
1✔
305
            is_sample,
1✔
306
            allowed_cascading,
1✔
307
            excludes,
1✔
308
        )
309
    }
115✔
310

311
    fn compress(
2✔
312
        &self,
2✔
313
        stats: &Self::StatsType,
2✔
314
        is_sample: bool,
2✔
315
        allowed_cascading: usize,
2✔
316
        _excludes: &[FloatCode],
2✔
317
    ) -> VortexResult<ArrayRef> {
2✔
318
        let dict_array = dictionary_encode(stats)?;
2✔
319

320
        // Only compress the codes.
321
        let codes_stats = IntegerStats::generate_opts(
2✔
322
            &dict_array.codes().to_primitive()?,
2✔
323
            GenerateStatsOptions {
2✔
324
                count_distinct_values: false,
2✔
325
            },
2✔
326
        );
327
        let codes_scheme = IntCompressor::choose_scheme(
2✔
328
            &codes_stats,
2✔
329
            is_sample,
2✔
330
            allowed_cascading - 1,
2✔
331
            &[integer::DictScheme.code()],
2✔
UNCOV
332
        )?;
×
333
        let compressed_codes = codes_scheme.compress(
2✔
334
            &codes_stats,
2✔
335
            is_sample,
2✔
336
            allowed_cascading - 1,
2✔
337
            &[integer::DictScheme.code()],
2✔
UNCOV
338
        )?;
×
339

340
        let compressed_values = FloatCompressor::compress(
2✔
341
            &dict_array.values().to_primitive()?,
2✔
342
            is_sample,
2✔
343
            allowed_cascading - 1,
2✔
344
            &[DICT_SCHEME],
2✔
345
        )?;
×
346

347
        Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
2✔
348
    }
2✔
349
}
350

351
#[cfg(test)]
352
mod tests {
353
    use vortex_array::arrays::PrimitiveArray;
354
    use vortex_array::validity::Validity;
355
    use vortex_array::{Array, IntoArray, ToCanonical};
356
    use vortex_buffer::{Buffer, buffer_mut};
357

358
    use crate::float::FloatCompressor;
359
    use crate::{Compressor, MAX_CASCADE};
360

361
    #[test]
362
    fn test_empty() {
1✔
363
        // Make sure empty array compression does not fail
364
        let result = FloatCompressor::compress(
1✔
365
            &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
1✔
366
            false,
367
            3,
368
            &[],
1✔
369
        )
370
        .unwrap();
1✔
371

372
        assert!(result.is_empty());
1✔
373
    }
1✔
374

375
    #[test]
376
    fn test_compress() {
1✔
377
        let mut values = buffer_mut![1.0f32; 1024];
1✔
378
        // Sprinkle some other values in.
379
        for i in 0..1024 {
1,025✔
380
            // Insert 2.0 at all odd positions.
1,024✔
381
            // This should force dictionary encoding and exclude run-end due to the
1,024✔
382
            // average run length being 1.
1,024✔
383
            values[i] = (i % 50) as f32;
1,024✔
384
        }
1,024✔
385

386
        let floats = values.into_array().to_primitive().unwrap();
1✔
387
        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
1✔
388
        println!("compressed: {}", compressed.display_tree())
1✔
389
    }
1✔
390
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc