• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16935267080

13 Aug 2025 11:00AM UTC coverage: 24.312% (-63.3%) from 87.658%
16935267080

Pull #4226

github

web-flow
Merge 81b48c7fb into baa6ea202
Pull Request #4226: Support converting TimestampTZ to and from duckdb

0 of 2 new or added lines in 1 file covered. (0.0%)

20666 existing lines in 469 files now uncovered.

8726 of 35892 relevant lines covered (24.31%)

147.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.75
/vortex-btrblocks/src/float.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
mod dictionary;
5
mod stats;
6

7
use vortex_alp::{ALPArray, ALPEncoding, ALPVTable, RDEncoder};
8
use vortex_array::arrays::{ConstantArray, PrimitiveVTable};
9
use vortex_array::{ArrayRef, IntoArray, ToCanonical};
10
use vortex_dict::DictArray;
11
use vortex_dtype::PType;
12
use vortex_error::{VortexExpect, VortexResult, vortex_panic};
13

14
use self::stats::FloatStats;
15
use crate::float::dictionary::dictionary_encode;
16
use crate::integer::{IntCompressor, IntegerStats};
17
use crate::patches::compress_patches;
18
use crate::{
19
    Compressor, CompressorStats, GenerateStatsOptions, Scheme,
20
    estimate_compression_ratio_with_sampling, integer,
21
};
22

23
pub trait FloatScheme: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
24

25
impl<T> FloatScheme for T where T: Scheme<StatsType = FloatStats, CodeType = FloatCode> {}
26

27
pub struct FloatCompressor;
28

29
impl Compressor for FloatCompressor {
30
    type ArrayVTable = PrimitiveVTable;
31
    type SchemeType = dyn FloatScheme;
32
    type StatsType = FloatStats;
33

34
    fn schemes() -> &'static [&'static Self::SchemeType] {
10✔
35
        &[
10✔
36
            &UncompressedScheme,
10✔
37
            &ConstantScheme,
10✔
38
            &ALPScheme,
10✔
39
            &ALPRDScheme,
10✔
40
            &DictScheme,
10✔
41
        ]
10✔
42
    }
10✔
43

44
    fn default_scheme() -> &'static Self::SchemeType {
6✔
45
        &UncompressedScheme
6✔
46
    }
6✔
47

48
    fn dict_scheme_code() -> FloatCode {
10✔
49
        DICT_SCHEME
10✔
50
    }
10✔
51
}
52

53
const UNCOMPRESSED_SCHEME: FloatCode = FloatCode(0);
54
const CONSTANT_SCHEME: FloatCode = FloatCode(1);
55
const ALP_SCHEME: FloatCode = FloatCode(2);
56
const ALPRD_SCHEME: FloatCode = FloatCode(3);
57
const DICT_SCHEME: FloatCode = FloatCode(4);
58
const RUNEND_SCHEME: FloatCode = FloatCode(5);
59

60
#[derive(Debug, Copy, Clone)]
61
struct UncompressedScheme;
62

63
#[derive(Debug, Copy, Clone)]
64
struct ConstantScheme;
65

66
#[derive(Debug, Copy, Clone)]
67
struct ALPScheme;
68

69
#[derive(Debug, Copy, Clone)]
70
struct ALPRDScheme;
71

72
#[derive(Debug, Copy, Clone)]
73
struct DictScheme;
74

75
impl Scheme for UncompressedScheme {
76
    type StatsType = FloatStats;
77
    type CodeType = FloatCode;
78

79
    fn code(&self) -> FloatCode {
10✔
80
        UNCOMPRESSED_SCHEME
10✔
81
    }
10✔
82

83
    fn expected_compression_ratio(
10✔
84
        &self,
10✔
85
        _stats: &Self::StatsType,
10✔
86
        _is_sample: bool,
10✔
87
        _allowed_cascading: usize,
10✔
88
        _excludes: &[FloatCode],
10✔
89
    ) -> VortexResult<f64> {
10✔
90
        Ok(1.0)
10✔
91
    }
10✔
92

93
    fn compress(
6✔
94
        &self,
6✔
95
        stats: &Self::StatsType,
6✔
96
        _is_sample: bool,
6✔
97
        _allowed_cascading: usize,
6✔
98
        _excludes: &[FloatCode],
6✔
99
    ) -> VortexResult<ArrayRef> {
6✔
100
        Ok(stats.source().to_array())
6✔
101
    }
6✔
102
}
103

104
impl Scheme for ConstantScheme {
105
    type StatsType = FloatStats;
106
    type CodeType = FloatCode;
107

108
    fn code(&self) -> FloatCode {
10✔
109
        CONSTANT_SCHEME
10✔
110
    }
10✔
111

112
    fn expected_compression_ratio(
10✔
113
        &self,
10✔
114
        stats: &Self::StatsType,
10✔
115
        is_sample: bool,
10✔
116
        _allowed_cascading: usize,
10✔
117
        _excludes: &[FloatCode],
10✔
118
    ) -> VortexResult<f64> {
10✔
119
        // Never select Constant when sampling
120
        if is_sample {
10✔
UNCOV
121
            return Ok(0.0);
×
122
        }
10✔
123

124
        // Can only have 1 distinct value
125
        if stats.distinct_values_count > 1 {
10✔
126
            return Ok(0.0);
4✔
127
        }
6✔
128

129
        // Cannot have mix of nulls and non-nulls
130
        if stats.null_count > 0 && stats.value_count > 0 {
6✔
131
            return Ok(0.0);
×
132
        }
6✔
133

134
        Ok(stats.value_count as f64)
6✔
135
    }
10✔
136

137
    fn compress(
×
138
        &self,
×
139
        stats: &Self::StatsType,
×
140
        _is_sample: bool,
×
141
        _allowed_cascading: usize,
×
142
        _excludes: &[FloatCode],
×
143
    ) -> VortexResult<ArrayRef> {
×
144
        let scalar = stats
×
145
            .source()
×
146
            .as_constant()
×
147
            .vortex_expect("must be constant");
×
148

149
        Ok(ConstantArray::new(scalar, stats.source().len()).into_array())
×
150
    }
×
151
}
152

153
#[derive(Debug, Copy, Clone, Eq, PartialEq, Hash)]
154
pub struct FloatCode(u8);
155

156
impl Scheme for ALPScheme {
157
    type StatsType = FloatStats;
158
    type CodeType = FloatCode;
159

160
    fn code(&self) -> FloatCode {
10✔
161
        ALP_SCHEME
10✔
162
    }
10✔
163

164
    fn expected_compression_ratio(
10✔
165
        &self,
10✔
166
        stats: &Self::StatsType,
10✔
167
        is_sample: bool,
10✔
168
        allowed_cascading: usize,
10✔
169
        excludes: &[FloatCode],
10✔
170
    ) -> VortexResult<f64> {
10✔
171
        // We don't support ALP for f16
172
        if stats.source().ptype() == PType::F16 {
10✔
UNCOV
173
            return Ok(0.0);
×
174
        }
10✔
175

176
        if allowed_cascading == 0 {
10✔
177
            // ALP does not compress on its own, we need to be able to cascade it with
178
            // an integer compressor.
179
            return Ok(0.0);
×
180
        }
10✔
181

182
        estimate_compression_ratio_with_sampling(
10✔
183
            self,
10✔
184
            stats,
10✔
185
            is_sample,
10✔
186
            allowed_cascading,
10✔
187
            excludes,
10✔
188
        )
189
    }
10✔
190

191
    fn compress(
14✔
192
        &self,
14✔
193
        stats: &FloatStats,
14✔
194
        is_sample: bool,
14✔
195
        allowed_cascading: usize,
14✔
196
        excludes: &[FloatCode],
14✔
197
    ) -> VortexResult<ArrayRef> {
14✔
198
        let alp_encoded = ALPEncoding
14✔
199
            .encode(&stats.source().to_canonical()?, None)?
14✔
200
            .vortex_expect("Input is a supported floating point array");
14✔
201
        let alp = alp_encoded.as_::<ALPVTable>();
14✔
202
        let alp_ints = alp.encoded().to_primitive()?;
14✔
203

204
        // Compress the ALP ints.
205
        // Patches are not compressed. They should be infrequent, and if they are not then we want
206
        // to keep them linear for easy indexing.
207
        let mut int_excludes = Vec::new();
14✔
208
        if excludes.contains(&DICT_SCHEME) {
14✔
UNCOV
209
            int_excludes.push(integer::DictScheme.code());
×
210
        }
14✔
211
        if excludes.contains(&RUNEND_SCHEME) {
14✔
212
            int_excludes.push(integer::RunEndScheme.code());
×
213
        }
14✔
214

215
        let compressed_alp_ints =
14✔
216
            IntCompressor::compress(&alp_ints, is_sample, allowed_cascading - 1, &int_excludes)?;
14✔
217

218
        let patches = alp.patches().map(compress_patches).transpose()?;
14✔
219

220
        Ok(ALPArray::try_new(compressed_alp_ints, alp.exponents(), patches)?.into_array())
14✔
221
    }
14✔
222
}
223

224
impl Scheme for ALPRDScheme {
225
    type StatsType = FloatStats;
226
    type CodeType = FloatCode;
227

228
    fn code(&self) -> FloatCode {
10✔
229
        ALPRD_SCHEME
10✔
230
    }
10✔
231

232
    fn expected_compression_ratio(
10✔
233
        &self,
10✔
234
        stats: &Self::StatsType,
10✔
235
        is_sample: bool,
10✔
236
        allowed_cascading: usize,
10✔
237
        excludes: &[FloatCode],
10✔
238
    ) -> VortexResult<f64> {
10✔
239
        if stats.source().ptype() == PType::F16 {
10✔
UNCOV
240
            return Ok(0.0);
×
241
        }
10✔
242

243
        estimate_compression_ratio_with_sampling(
10✔
244
            self,
10✔
245
            stats,
10✔
246
            is_sample,
10✔
247
            allowed_cascading,
10✔
248
            excludes,
10✔
249
        )
250
    }
10✔
251

252
    fn compress(
10✔
253
        &self,
10✔
254
        stats: &Self::StatsType,
10✔
255
        _is_sample: bool,
10✔
256
        _allowed_cascading: usize,
10✔
257
        _excludes: &[FloatCode],
10✔
258
    ) -> VortexResult<ArrayRef> {
10✔
259
        let encoder = match stats.source().ptype() {
10✔
260
            PType::F32 => RDEncoder::new(stats.source().as_slice::<f32>()),
8✔
261
            PType::F64 => RDEncoder::new(stats.source().as_slice::<f64>()),
2✔
262
            ptype => vortex_panic!("cannot ALPRD compress ptype {ptype}"),
×
263
        };
264

265
        let mut alp_rd = encoder.encode(stats.source());
10✔
266

267
        let patches = alp_rd
10✔
268
            .left_parts_patches()
10✔
269
            .map(compress_patches)
10✔
270
            .transpose()?;
10✔
271
        alp_rd.replace_left_parts_patches(patches);
10✔
272

273
        Ok(alp_rd.into_array())
10✔
274
    }
10✔
275
}
276

277
impl Scheme for DictScheme {
278
    type StatsType = FloatStats;
279
    type CodeType = FloatCode;
280

281
    fn code(&self) -> FloatCode {
10✔
282
        DICT_SCHEME
10✔
283
    }
10✔
284

285
    fn expected_compression_ratio(
10✔
286
        &self,
10✔
287
        stats: &Self::StatsType,
10✔
288
        is_sample: bool,
10✔
289
        allowed_cascading: usize,
10✔
290
        excludes: &[FloatCode],
10✔
291
    ) -> VortexResult<f64> {
10✔
292
        if stats.value_count == 0 {
10✔
UNCOV
293
            return Ok(0.0);
×
294
        }
10✔
295

296
        // If the array is high cardinality (>50% unique values) skip.
297
        if stats.distinct_values_count > stats.value_count / 2 {
10✔
298
            return Ok(0.0);
10✔
UNCOV
299
        }
×
300

301
        // Take a sample and run compression on the sample to determine before/after size.
UNCOV
302
        estimate_compression_ratio_with_sampling(
×
UNCOV
303
            self,
×
UNCOV
304
            stats,
×
UNCOV
305
            is_sample,
×
UNCOV
306
            allowed_cascading,
×
UNCOV
307
            excludes,
×
308
        )
309
    }
10✔
310

UNCOV
311
    fn compress(
×
UNCOV
312
        &self,
×
UNCOV
313
        stats: &Self::StatsType,
×
UNCOV
314
        is_sample: bool,
×
UNCOV
315
        allowed_cascading: usize,
×
UNCOV
316
        _excludes: &[FloatCode],
×
UNCOV
317
    ) -> VortexResult<ArrayRef> {
×
UNCOV
318
        let dict_array = dictionary_encode(stats)?;
×
319

320
        // Only compress the codes.
UNCOV
321
        let codes_stats = IntegerStats::generate_opts(
×
UNCOV
322
            &dict_array.codes().to_primitive()?,
×
UNCOV
323
            GenerateStatsOptions {
×
UNCOV
324
                count_distinct_values: false,
×
UNCOV
325
            },
×
326
        );
UNCOV
327
        let codes_scheme = IntCompressor::choose_scheme(
×
UNCOV
328
            &codes_stats,
×
UNCOV
329
            is_sample,
×
UNCOV
330
            allowed_cascading - 1,
×
UNCOV
331
            &[integer::DictScheme.code(), integer::SequenceScheme.code()],
×
332
        )?;
×
UNCOV
333
        let compressed_codes = codes_scheme.compress(
×
UNCOV
334
            &codes_stats,
×
UNCOV
335
            is_sample,
×
UNCOV
336
            allowed_cascading - 1,
×
UNCOV
337
            &[integer::DictScheme.code()],
×
338
        )?;
×
339

UNCOV
340
        let compressed_values = FloatCompressor::compress(
×
UNCOV
341
            &dict_array.values().to_primitive()?,
×
UNCOV
342
            is_sample,
×
UNCOV
343
            allowed_cascading - 1,
×
UNCOV
344
            &[DICT_SCHEME],
×
345
        )?;
×
346

UNCOV
347
        Ok(DictArray::try_new(compressed_codes, compressed_values)?.into_array())
×
UNCOV
348
    }
×
349
}
350

351
#[cfg(test)]
352
mod tests {
353
    use vortex_array::arrays::PrimitiveArray;
354
    use vortex_array::validity::Validity;
355
    use vortex_array::{Array, IntoArray, ToCanonical};
356
    use vortex_buffer::{Buffer, buffer_mut};
357

358
    use crate::float::FloatCompressor;
359
    use crate::{Compressor, MAX_CASCADE};
360

361
    #[test]
362
    fn test_empty() {
363
        // Make sure empty array compression does not fail
364
        let result = FloatCompressor::compress(
365
            &PrimitiveArray::new(Buffer::<f32>::empty(), Validity::NonNullable),
366
            false,
367
            3,
368
            &[],
369
        )
370
        .unwrap();
371

372
        assert!(result.is_empty());
373
    }
374

375
    #[test]
376
    fn test_compress() {
377
        let mut values = buffer_mut![1.0f32; 1024];
378
        // Sprinkle some other values in.
379
        for i in 0..1024 {
380
            // Insert 2.0 at all odd positions.
381
            // This should force dictionary encoding and exclude run-end due to the
382
            // average run length being 1.
383
            values[i] = (i % 50) as f32;
384
        }
385

386
        let floats = values.into_array().to_primitive().unwrap();
387
        let compressed = FloatCompressor::compress(&floats, false, MAX_CASCADE, &[]).unwrap();
388
        println!("compressed: {}", compressed.display_tree())
389
    }
390
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc