• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16331938722

16 Jul 2025 10:49PM UTC coverage: 80.702% (-0.9%) from 81.557%
16331938722

push

github

web-flow
feat: build with stable rust (#3881)

120 of 173 new or added lines in 28 files covered. (69.36%)

174 existing lines in 102 files now uncovered.

41861 of 51871 relevant lines covered (80.7%)

157487.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.77
/vortex-datafusion/src/persistent/format.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::{Debug, Formatter};
6
use std::sync::Arc;
7

8
use async_trait::async_trait;
9
use datafusion::arrow::datatypes::{Schema, SchemaRef};
10
use datafusion::catalog::Session;
11
use datafusion::common::parsers::CompressionTypeVariant;
12
use datafusion::common::stats::Precision;
13
use datafusion::common::{
14
    ColumnStatistics, DataFusionError, GetExt, Result as DFResult, Statistics,
15
    config_datafusion_err, not_impl_err,
16
};
17
use datafusion::datasource::file_format::file_compression_type::FileCompressionType;
18
use datafusion::datasource::file_format::{FileFormat, FileFormatFactory};
19
use datafusion::datasource::physical_plan::{
20
    FileScanConfig, FileScanConfigBuilder, FileSinkConfig, FileSource,
21
};
22
use datafusion::datasource::sink::DataSinkExec;
23
use datafusion::datasource::source::DataSourceExec;
24
use datafusion::logical_expr::dml::InsertOp;
25
use datafusion::physical_expr::LexRequirement;
26
use datafusion::physical_plan::ExecutionPlan;
27
use futures::{FutureExt, StreamExt as _, TryStreamExt as _, stream};
28
use itertools::Itertools;
29
use object_store::{ObjectMeta, ObjectStore};
30
use vortex::dtype::DType;
31
use vortex::dtype::arrow::FromArrowType;
32
use vortex::error::{VortexExpect, VortexResult, vortex_err};
33
use vortex::file::VORTEX_FILE_EXTENSION;
34
use vortex::metrics::VortexMetrics;
35
use vortex::session::VortexSession;
36
use vortex::stats;
37
use vortex::stats::{Stat, StatsProviderExt, StatsSet};
38

39
use super::cache::VortexFileCache;
40
use super::sink::VortexSink;
41
use super::source::VortexSource;
42
use crate::PrecisionExt as _;
43
use crate::convert::TryToDataFusion;
44

45
/// Vortex implementation of a DataFusion [`FileFormat`].
46
pub struct VortexFormat {
47
    session: Arc<VortexSession>,
48
    file_cache: VortexFileCache,
49
    opts: VortexFormatOptions,
50
}
51

52
impl Debug for VortexFormat {
53
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
54
        f.debug_struct("VortexFormat")
×
55
            .field("opts", &self.opts)
×
56
            .finish()
×
57
    }
×
58
}
59

60
/// Options to configure the [`VortexFormat`].
61
#[derive(Debug)]
62
pub struct VortexFormatOptions {
63
    /// The size of the in-memory [`vortex::file::Footer`] cache.
64
    pub footer_cache_size_mb: usize,
65
    /// The size of the in-memory segment cache.
66
    pub segment_cache_size_mb: usize,
67
}
68

69
impl Default for VortexFormatOptions {
70
    fn default() -> Self {
20✔
71
        Self {
20✔
72
            footer_cache_size_mb: 64,
20✔
73
            segment_cache_size_mb: 0,
20✔
74
        }
20✔
75
    }
20✔
76
}
77

78
/// Minimal factory to create [`VortexFormat`] instances.
79
#[derive(Default, Debug)]
80
pub struct VortexFormatFactory {
81
    session: Arc<VortexSession>,
82
}
83

84
impl GetExt for VortexFormatFactory {
85
    fn get_ext(&self) -> String {
18✔
86
        VORTEX_FILE_EXTENSION.to_string()
18✔
87
    }
18✔
88
}
89

90
impl FileFormatFactory for VortexFormatFactory {
91
    #[allow(clippy::disallowed_types)]
92
    fn create(
3✔
93
        &self,
3✔
94
        _state: &dyn Session,
3✔
95
        format_options: &std::collections::HashMap<String, String>,
3✔
96
    ) -> DFResult<Arc<dyn FileFormat>> {
3✔
97
        if !format_options.is_empty() {
3✔
98
            return Err(config_datafusion_err!(
1✔
99
                "Vortex tables don't support any options"
1✔
100
            ));
1✔
101
        }
2✔
102

103
        Ok(Arc::new(VortexFormat::new(self.session.clone())))
2✔
104
    }
3✔
105

106
    fn default(&self) -> Arc<dyn FileFormat> {
×
107
        Arc::new(VortexFormat::default())
×
108
    }
×
109

110
    fn as_any(&self) -> &dyn Any {
×
111
        self
×
112
    }
×
113
}
114

115
impl Default for VortexFormat {
116
    fn default() -> Self {
18✔
117
        Self::new(Arc::new(VortexSession::default()))
18✔
118
    }
18✔
119
}
120

121
impl VortexFormat {
122
    /// Create a new instance of the [`VortexFormat`].
123
    pub fn new(session: Arc<VortexSession>) -> Self {
20✔
124
        let opts = VortexFormatOptions::default();
20✔
125
        Self {
20✔
126
            session: session.clone(),
20✔
127
            file_cache: VortexFileCache::new(
20✔
128
                opts.footer_cache_size_mb,
20✔
129
                opts.segment_cache_size_mb,
20✔
130
                session,
20✔
131
            ),
20✔
132
            opts,
20✔
133
        }
20✔
134
    }
20✔
135

136
    /// Return the format specific configuration
137
    pub fn options(&self) -> &VortexFormatOptions {
×
138
        &self.opts
×
139
    }
×
140
}
141

142
#[async_trait]
143
impl FileFormat for VortexFormat {
144
    fn as_any(&self) -> &dyn Any {
×
145
        self
×
146
    }
×
147

148
    fn get_ext(&self) -> String {
22✔
149
        VORTEX_FILE_EXTENSION.to_string()
22✔
150
    }
22✔
151

152
    fn get_ext_with_compression(
×
153
        &self,
×
154
        file_compression_type: &FileCompressionType,
×
155
    ) -> DFResult<String> {
×
156
        match file_compression_type.get_variant() {
×
157
            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
×
158
            _ => Err(DataFusionError::Internal(
×
159
                "Vortex does not support file level compression.".into(),
×
160
            )),
×
161
        }
162
    }
×
163

164
    async fn infer_schema(
165
        &self,
166
        state: &dyn Session,
167
        store: &Arc<dyn ObjectStore>,
168
        objects: &[ObjectMeta],
169
    ) -> DFResult<SchemaRef> {
4✔
170
        let mut file_schemas = stream::iter(objects.iter().cloned())
2✔
171
            .map(|o| {
2✔
172
                let store = store.clone();
2✔
173
                let cache = self.file_cache.clone();
2✔
174
                tokio::task::spawn(async move {
2✔
175
                    let vxf = cache.try_get(&o, store).await?;
2✔
176
                    let inferred_schema = vxf.dtype().to_arrow_schema()?;
2✔
177
                    VortexResult::Ok((o.location, inferred_schema))
2✔
178
                })
2✔
179
                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
2✔
180
            })
2✔
181
            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
2✔
182
            .try_collect::<Vec<_>>()
2✔
183
            .await
2✔
184
            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
2✔
185

186
        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
187
        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
2✔
188
        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
2✔
189

190
        Ok(Arc::new(Schema::try_merge(file_schemas)?))
2✔
191
    }
4✔
192

193
    #[cfg_attr(feature = "tracing", tracing::instrument(skip_all, fields(location = object.location.as_ref()
194
    )))]
195
    async fn infer_stats(
196
        &self,
197
        _state: &dyn Session,
198
        store: &Arc<dyn ObjectStore>,
199
        table_schema: SchemaRef,
200
        object: &ObjectMeta,
201
    ) -> DFResult<Statistics> {
52✔
202
        let object = object.clone();
28✔
203
        let store = store.clone();
28✔
204
        let cache = self.file_cache.clone();
28✔
205
        tokio::task::spawn(async move {
28✔
206
            let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
28✔
207
                DataFusionError::Execution(format!(
×
208
                    "Failed to open Vortex file {}: {e}",
×
209
                    object.location
×
210
                ))
×
UNCOV
211
            })?;
×
212

213
            let struct_dtype = vxf
28✔
214
                .dtype()
28✔
215
                .as_struct()
28✔
216
                .vortex_expect("dtype is not a struct");
28✔
217

218
            // Evaluate the statistics for each column that we are able to return to DataFusion.
219
            let Some(file_stats) = vxf.file_stats() else {
28✔
220
                // If the file has no column stats, the best we can do is return a row count.
221
                return Ok(Statistics {
222
                    num_rows: Precision::Exact(
223
                        usize::try_from(vxf.row_count())
×
224
                            .map_err(|_| vortex_err!("Row count overflow"))
×
225
                            .vortex_expect("Row count overflow"),
×
226
                    ),
227
                    total_byte_size: Precision::Absent,
×
228
                    column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
×
229
                });
230
            };
231

232
            let stats = table_schema
28✔
233
                .fields()
28✔
234
                .iter()
28✔
235
                .map(|field| struct_dtype.find(field.name()))
168✔
236
                .map(|idx| match idx {
168✔
237
                    None => StatsSet::default(),
×
238
                    Some(id) => file_stats[id].clone(),
168✔
239
                })
168✔
240
                .collect_vec();
28✔
241

242
            let total_byte_size = stats
28✔
243
                .iter()
28✔
244
                .map(|stats_set| {
168✔
245
                    stats_set
168✔
246
                        .get_as::<usize>(Stat::UncompressedSizeInBytes)
168✔
247
                        .unwrap_or_else(|| stats::Precision::inexact(0_usize))
168✔
248
                })
168✔
249
                .fold(stats::Precision::exact(0_usize), |acc, stats_set| {
168✔
250
                    acc.zip(stats_set).map(|(acc, stats_set)| acc + stats_set)
168✔
251
                });
168✔
252

253
            // Sum up the total byte size across all the columns.
254
            let total_byte_size = total_byte_size.to_df();
28✔
255

256
            let column_statistics = stats
28✔
257
                .into_iter()
28✔
258
                .zip(table_schema.fields().iter())
28✔
259
                .map(|(stats_set, field)| {
168✔
260
                    let null_count = stats_set.get_as::<usize>(Stat::NullCount);
168✔
261
                    let min = stats_set
168✔
262
                        .get_scalar(Stat::Min, &DType::from_arrow(field.as_ref()))
168✔
263
                        .and_then(|n| n.map(|n| n.try_to_df().ok()).transpose());
168✔
264

265
                    let max = stats_set
168✔
266
                        .get_scalar(Stat::Max, &DType::from_arrow(field.as_ref()))
168✔
267
                        .and_then(|n| n.map(|n| n.try_to_df().ok()).transpose());
168✔
268

269
                    ColumnStatistics {
270
                        null_count: null_count.to_df(),
168✔
271
                        max_value: max.to_df(),
168✔
272
                        min_value: min.to_df(),
168✔
273
                        sum_value: Precision::Absent,
168✔
274
                        distinct_count: stats_set
168✔
275
                            .get_as::<bool>(Stat::IsConstant)
168✔
276
                            .and_then(|is_constant| {
168✔
277
                                is_constant.as_exact().map(|_| Precision::Exact(1))
×
UNCOV
278
                            })
×
279
                            .unwrap_or(Precision::Absent),
168✔
280
                    }
281
                })
168✔
282
                .collect::<Vec<_>>();
28✔
283

284
            Ok(Statistics {
285
                num_rows: Precision::Exact(
286
                    usize::try_from(vxf.row_count())
28✔
287
                        .map_err(|_| vortex_err!("Row count overflow"))
28✔
288
                        .vortex_expect("Row count overflow"),
28✔
289
                ),
290
                total_byte_size,
28✔
291
                column_statistics,
28✔
292
            })
293
        })
28✔
294
        .await
28✔
295
        .vortex_expect("Failed to spawn infer_stats")
28✔
296
    }
52✔
297

298
    async fn create_physical_plan(
299
        &self,
300
        _state: &dyn Session,
301
        file_scan_config: FileScanConfig,
302
    ) -> DFResult<Arc<dyn ExecutionPlan>> {
360✔
303
        if file_scan_config
180✔
304
            .file_groups
180✔
305
            .iter()
180✔
306
            .flat_map(|fg| fg.files())
182✔
307
            .any(|f| f.range.is_some())
182✔
308
        {
309
            return not_impl_err!("File level partitioning isn't implemented yet for Vortex");
×
310
        }
180✔
311

312
        if !file_scan_config.table_partition_cols.is_empty() {
180✔
313
            return not_impl_err!("Hive style partitioning isn't implemented yet for Vortex");
×
314
        }
180✔
315

316
        if !file_scan_config.output_ordering.is_empty() {
180✔
317
            return not_impl_err!("Vortex doesn't support output ordering");
×
318
        }
180✔
319

320
        let source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone());
180✔
321
        Ok(DataSourceExec::from_data_source(
180✔
322
            FileScanConfigBuilder::from(file_scan_config)
180✔
323
                .with_source(Arc::new(source))
180✔
324
                .build(),
180✔
325
        ))
180✔
326
    }
360✔
327

328
    async fn create_writer_physical_plan(
329
        &self,
330
        input: Arc<dyn ExecutionPlan>,
331
        _state: &dyn Session,
332
        conf: FileSinkConfig,
333
        order_requirements: Option<LexRequirement>,
334
    ) -> DFResult<Arc<dyn ExecutionPlan>> {
4✔
335
        if conf.insert_op != InsertOp::Append {
2✔
336
            return not_impl_err!("Overwrites are not implemented yet for Vortex");
×
337
        }
2✔
338

339
        if !conf.table_partition_cols.is_empty() {
2✔
340
            return not_impl_err!("Hive style partitioning isn't implemented yet for Vortex");
×
341
        }
2✔
342

343
        let schema = conf.output_schema().clone();
2✔
344
        let sink = Arc::new(VortexSink::new(conf, schema));
2✔
345

346
        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
2✔
347
    }
4✔
348

349
    fn file_source(&self) -> Arc<dyn FileSource> {
180✔
350
        Arc::new(VortexSource::new(
180✔
351
            self.file_cache.clone(),
180✔
352
            VortexMetrics::default(),
180✔
353
        ))
180✔
354
    }
180✔
355
}
356

357
#[cfg(test)]
358
mod tests {
359
    use datafusion::execution::SessionStateBuilder;
360
    use datafusion::prelude::SessionContext;
361
    use tempfile::TempDir;
362

363
    use super::*;
364
    use crate::persistent::register_vortex_format_factory;
365

366
    #[tokio::test]
367
    async fn create_table() {
1✔
368
        let dir = TempDir::new().unwrap();
1✔
369

370
        let factory: VortexFormatFactory = Default::default();
1✔
371
        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
1✔
372
        register_vortex_format_factory(factory, &mut session_state_builder);
1✔
373
        let session = SessionContext::new_with_state(session_state_builder.build());
1✔
374

375
        let df = session
1✔
376
            .sql(&format!(
1✔
377
                "CREATE EXTERNAL TABLE my_tbl \
1✔
378
                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
1✔
379
                STORED AS vortex LOCATION '{}'",
1✔
380
                dir.path().to_str().unwrap()
1✔
381
            ))
1✔
382
            .await
1✔
383
            .unwrap();
1✔
384

385
        assert_eq!(df.count().await.unwrap(), 0);
1✔
386
    }
1✔
387

388
    #[tokio::test]
389
    #[should_panic]
390
    async fn fail_table_config() {
1✔
391
        let dir = TempDir::new().unwrap();
1✔
392

393
        let factory: VortexFormatFactory = Default::default();
1✔
394
        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
1✔
395
        register_vortex_format_factory(factory, &mut session_state_builder);
1✔
396
        let session = SessionContext::new_with_state(session_state_builder.build());
1✔
397

398
        session
1✔
399
            .sql(&format!(
1✔
400
                "CREATE EXTERNAL TABLE my_tbl \
1✔
401
                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
1✔
402
                STORED AS vortex LOCATION '{}' \
1✔
403
                OPTIONS( some_key 'value' );",
1✔
404
                dir.path().to_str().unwrap()
1✔
405
            ))
1✔
406
            .await
1✔
407
            .unwrap()
1✔
408
            .collect()
1✔
409
            .await
1✔
410
            .unwrap();
1✔
411
    }
1✔
412
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc