• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 17008093231

16 Aug 2025 11:54AM UTC coverage: 87.913% (+0.06%) from 87.855%
17008093231

push

github

web-flow
fix(deps): update slf4j monorepo to v2.0.17 (patch) (#4258)

This PR contains the following updates:

| Package | Change | Age | Confidence |
|---|---|---|---|
| [org.slf4j:slf4j-api](http://www.slf4j.org)
([source](https://redirect.github.com/qos-ch/slf4j),
[changelog](https://www.slf4j.org/news.html)) | `2.0.9` -> `2.0.17` |
[![age](https://developer.mend.io/api/mc/badges/age/maven/org.slf4j:slf4j-api/2.0.17?slim=true)](https://docs.renovatebot.com/merge-confidence/)
|
[![confidence](https://developer.mend.io/api/mc/badges/confidence/maven/org.slf4j:slf4j-api/2.0.9/2.0.17?slim=true)](https://docs.renovatebot.com/merge-confidence/)
|
| [org.slf4j:slf4j-simple](http://www.slf4j.org)
([source](https://redirect.github.com/qos-ch/slf4j),
[changelog](https://www.slf4j.org/news.html)) | `2.0.9` -> `2.0.17` |
[![age](https://developer.mend.io/api/mc/badges/age/maven/org.slf4j:slf4j-simple/2.0.17?slim=true)](https://docs.renovatebot.com/merge-confidence/)
|
[![confidence](https://developer.mend.io/api/mc/badges/confidence/maven/org.slf4j:slf4j-simple/2.0.9/2.0.17?slim=true)](https://docs.renovatebot.com/merge-confidence/)
|

---

### Configuration

📅 **Schedule**: Branch creation - At any time (no schedule defined),
Automerge - At any time (no schedule defined).

🚦 **Automerge**: Enabled.

â™» **Rebasing**: Whenever PR becomes conflicted, or you tick the
rebase/retry checkbox.

🔕 **Ignore**: Close this PR and you won't be reminded about these
updates again.

---

- [ ] <!-- rebase-check -->If you want to rebase/retry this PR, check
this box

---

This PR was generated by [Mend Renovate](https://mend.io/renovate/).
View the [repository job
log](https://developer.mend.io/github/vortex-data/vortex).

<!--renovate-debug:eyJjcmVhdGVkSW5WZXIiOiI0MS43MS4xIiwidXBkYXRlZEluVmVyIjoiNDEuNzEuMSIsInRhcmdldEJyYW5jaCI6ImRldmVsb3AiLCJsYWJlbHMiOlsiY2hvcmUiXX0=-->

Co-authored-by: renovate[bot] <29139614+renovate[bot]@users.noreply.github.com>

56580 of 64359 relevant lines covered (87.91%)

628739.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

79.36
/vortex-datafusion/src/persistent/format.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::{Debug, Formatter};
6
use std::sync::Arc;
7

8
use arrow_schema::{Schema, SchemaRef};
9
use async_trait::async_trait;
10
use datafusion_catalog::Session;
11
use datafusion_common::parsers::CompressionTypeVariant;
12
use datafusion_common::stats::Precision;
13
use datafusion_common::{
14
    ColumnStatistics, DataFusionError, GetExt, Result as DFResult, Statistics,
15
    config_datafusion_err, not_impl_err,
16
};
17
use datafusion_common_runtime::SpawnedTask;
18
use datafusion_datasource::file::FileSource;
19
use datafusion_datasource::file_compression_type::FileCompressionType;
20
use datafusion_datasource::file_format::{FileFormat, FileFormatFactory};
21
use datafusion_datasource::file_scan_config::{FileScanConfig, FileScanConfigBuilder};
22
use datafusion_datasource::file_sink_config::FileSinkConfig;
23
use datafusion_datasource::sink::DataSinkExec;
24
use datafusion_datasource::source::DataSourceExec;
25
use datafusion_expr::dml::InsertOp;
26
use datafusion_physical_expr::LexRequirement;
27
use datafusion_physical_plan::ExecutionPlan;
28
use futures::{FutureExt, StreamExt as _, TryStreamExt as _, stream};
29
use itertools::Itertools;
30
use object_store::{ObjectMeta, ObjectStore};
31
use vortex::dtype::arrow::FromArrowType;
32
use vortex::dtype::{DType, Nullability, PType};
33
use vortex::error::{VortexExpect, VortexResult, vortex_err};
34
use vortex::file::VORTEX_FILE_EXTENSION;
35
use vortex::metrics::VortexMetrics;
36
use vortex::scalar::Scalar;
37
use vortex::session::VortexSession;
38
use vortex::stats;
39
use vortex::stats::{Stat, StatsSet};
40

41
use super::cache::VortexFileCache;
42
use super::sink::VortexSink;
43
use super::source::VortexSource;
44
use crate::PrecisionExt as _;
45
use crate::convert::TryToDataFusion;
46

47
/// Vortex implementation of a DataFusion [`FileFormat`].
48
pub struct VortexFormat {
49
    session: Arc<VortexSession>,
50
    file_cache: VortexFileCache,
51
    opts: VortexFormatOptions,
52
}
53

54
impl Debug for VortexFormat {
55
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
×
56
        f.debug_struct("VortexFormat")
×
57
            .field("opts", &self.opts)
×
58
            .finish()
×
59
    }
×
60
}
61

62
/// Options to configure the [`VortexFormat`].
63
#[derive(Debug)]
64
pub struct VortexFormatOptions {
65
    /// The size of the in-memory [`vortex::file::Footer`] cache.
66
    pub footer_cache_size_mb: usize,
67
    /// The size of the in-memory segment cache.
68
    pub segment_cache_size_mb: usize,
69
}
70

71
impl Default for VortexFormatOptions {
72
    fn default() -> Self {
20✔
73
        Self {
20✔
74
            footer_cache_size_mb: 64,
20✔
75
            segment_cache_size_mb: 0,
20✔
76
        }
20✔
77
    }
20✔
78
}
79

80
/// Minimal factory to create [`VortexFormat`] instances.
81
#[derive(Default, Debug)]
82
pub struct VortexFormatFactory {
83
    session: Arc<VortexSession>,
84
}
85

86
impl GetExt for VortexFormatFactory {
87
    fn get_ext(&self) -> String {
18✔
88
        VORTEX_FILE_EXTENSION.to_string()
18✔
89
    }
18✔
90
}
91

92
impl FileFormatFactory for VortexFormatFactory {
93
    #[allow(clippy::disallowed_types)]
94
    fn create(
3✔
95
        &self,
3✔
96
        _state: &dyn Session,
3✔
97
        format_options: &std::collections::HashMap<String, String>,
3✔
98
    ) -> DFResult<Arc<dyn FileFormat>> {
3✔
99
        if !format_options.is_empty() {
3✔
100
            return Err(config_datafusion_err!(
1✔
101
                "Vortex tables don't support any options"
1✔
102
            ));
1✔
103
        }
2✔
104

105
        Ok(Arc::new(VortexFormat::new(self.session.clone())))
2✔
106
    }
3✔
107

108
    fn default(&self) -> Arc<dyn FileFormat> {
×
109
        Arc::new(VortexFormat::default())
×
110
    }
×
111

112
    fn as_any(&self) -> &dyn Any {
×
113
        self
×
114
    }
×
115
}
116

117
impl Default for VortexFormat {
118
    fn default() -> Self {
18✔
119
        Self::new(Arc::new(VortexSession::default()))
18✔
120
    }
18✔
121
}
122

123
impl VortexFormat {
124
    /// Create a new instance of the [`VortexFormat`].
125
    pub fn new(session: Arc<VortexSession>) -> Self {
20✔
126
        let opts = VortexFormatOptions::default();
20✔
127
        Self {
20✔
128
            session: session.clone(),
20✔
129
            file_cache: VortexFileCache::new(
20✔
130
                opts.footer_cache_size_mb,
20✔
131
                opts.segment_cache_size_mb,
20✔
132
                session,
20✔
133
            ),
20✔
134
            opts,
20✔
135
        }
20✔
136
    }
20✔
137

138
    /// Return the format specific configuration
139
    pub fn options(&self) -> &VortexFormatOptions {
×
140
        &self.opts
×
141
    }
×
142
}
143

144
#[async_trait]
145
impl FileFormat for VortexFormat {
146
    fn as_any(&self) -> &dyn Any {
×
147
        self
×
148
    }
×
149

150
    fn compression_type(&self) -> Option<FileCompressionType> {
×
151
        None
×
152
    }
×
153

154
    fn get_ext(&self) -> String {
22✔
155
        VORTEX_FILE_EXTENSION.to_string()
22✔
156
    }
22✔
157

158
    fn get_ext_with_compression(
×
159
        &self,
×
160
        file_compression_type: &FileCompressionType,
×
161
    ) -> DFResult<String> {
×
162
        match file_compression_type.get_variant() {
×
163
            CompressionTypeVariant::UNCOMPRESSED => Ok(self.get_ext()),
×
164
            _ => Err(DataFusionError::Internal(
×
165
                "Vortex does not support file level compression.".into(),
×
166
            )),
×
167
        }
168
    }
×
169

170
    async fn infer_schema(
171
        &self,
172
        state: &dyn Session,
173
        store: &Arc<dyn ObjectStore>,
174
        objects: &[ObjectMeta],
175
    ) -> DFResult<SchemaRef> {
2✔
176
        let mut file_schemas = stream::iter(objects.iter().cloned())
177
            .map(|o| {
2✔
178
                let store = store.clone();
2✔
179
                let cache = self.file_cache.clone();
2✔
180
                SpawnedTask::spawn(async move {
2✔
181
                    let vxf = cache.try_get(&o, store).await?;
2✔
182
                    let inferred_schema = vxf.dtype().to_arrow_schema()?;
2✔
183
                    VortexResult::Ok((o.location, inferred_schema))
2✔
184
                })
2✔
185
                .map(|f| f.vortex_expect("Failed to spawn infer_schema"))
2✔
186
            })
2✔
187
            .buffer_unordered(state.config_options().execution.meta_fetch_concurrency)
188
            .try_collect::<Vec<_>>()
189
            .await
190
            .map_err(|e| DataFusionError::Execution(format!("Failed to infer schema: {e}")))?;
×
191

192
        // Get consistent order of schemas for `Schema::try_merge`, as some filesystems don't have deterministic listing orders
193
        file_schemas.sort_by(|(l1, _), (l2, _)| l1.cmp(l2));
×
194
        let file_schemas = file_schemas.into_iter().map(|(_, schema)| schema);
195

196
        Ok(Arc::new(Schema::try_merge(file_schemas)?))
197
    }
2✔
198

199
    #[tracing::instrument(skip_all, fields(location = object.location.as_ref()))]
200
    async fn infer_stats(
201
        &self,
202
        _state: &dyn Session,
203
        store: &Arc<dyn ObjectStore>,
204
        table_schema: SchemaRef,
205
        object: &ObjectMeta,
206
    ) -> DFResult<Statistics> {
207
        let object = object.clone();
208
        let store = store.clone();
209
        let cache = self.file_cache.clone();
210

211
        SpawnedTask::spawn(async move {
28✔
212
            let vxf = cache.try_get(&object, store.clone()).await.map_err(|e| {
28✔
213
                DataFusionError::Execution(format!(
×
214
                    "Failed to open Vortex file {}: {e}",
×
215
                    object.location
×
216
                ))
×
217
            })?;
×
218

219
            let struct_dtype = vxf
28✔
220
                .dtype()
28✔
221
                .as_struct()
28✔
222
                .vortex_expect("dtype is not a struct");
28✔
223

224
            // Evaluate the statistics for each column that we are able to return to DataFusion.
225
            let Some(file_stats) = vxf.file_stats() else {
28✔
226
                // If the file has no column stats, the best we can do is return a row count.
227
                return Ok(Statistics {
228
                    num_rows: Precision::Exact(
229
                        usize::try_from(vxf.row_count())
×
230
                            .map_err(|_| vortex_err!("Row count overflow"))
×
231
                            .vortex_expect("Row count overflow"),
×
232
                    ),
233
                    total_byte_size: Precision::Absent,
×
234
                    column_statistics: vec![ColumnStatistics::default(); struct_dtype.nfields()],
×
235
                });
236
            };
237

238
            let stats = table_schema
28✔
239
                .fields()
28✔
240
                .iter()
28✔
241
                .map(|field| struct_dtype.find(field.name()))
168✔
242
                .map(|idx| match idx {
168✔
243
                    None => StatsSet::default(),
×
244
                    Some(id) => file_stats[id].clone(),
168✔
245
                })
168✔
246
                .collect_vec();
28✔
247

248
            let total_byte_size = stats
28✔
249
                .iter()
28✔
250
                .map(|stats_set| {
168✔
251
                    stats_set
168✔
252
                        .get_as::<usize>(Stat::UncompressedSizeInBytes, &PType::U64.into())
168✔
253
                        .unwrap_or_else(|| stats::Precision::inexact(0_usize))
168✔
254
                })
168✔
255
                .fold(stats::Precision::exact(0_usize), |acc, stats_set| {
168✔
256
                    acc.zip(stats_set).map(|(acc, stats_set)| acc + stats_set)
168✔
257
                });
168✔
258

259
            // Sum up the total byte size across all the columns.
260
            let total_byte_size = total_byte_size.to_df();
28✔
261

262
            let column_statistics = stats
28✔
263
                .into_iter()
28✔
264
                .zip(table_schema.fields().iter())
28✔
265
                .map(|(stats_set, field)| {
168✔
266
                    let null_count = stats_set.get_as::<usize>(Stat::NullCount, &PType::U64.into());
168✔
267
                    let min = stats_set.get(Stat::Min).and_then(|n| {
168✔
268
                        n.map(|n| {
168✔
269
                            Scalar::new(
168✔
270
                                Stat::Min
168✔
271
                                    .dtype(&DType::from_arrow(field.as_ref()))
168✔
272
                                    .vortex_expect("must have a valid dtype"),
168✔
273
                                n,
168✔
274
                            )
168✔
275
                            .try_to_df()
168✔
276
                            .ok()
168✔
277
                        })
168✔
278
                        .transpose()
168✔
279
                    });
168✔
280

281
                    let max = stats_set.get(Stat::Max).and_then(|n| {
168✔
282
                        n.map(|n| {
168✔
283
                            Scalar::new(
168✔
284
                                Stat::Max
168✔
285
                                    .dtype(&DType::from_arrow(field.as_ref()))
168✔
286
                                    .vortex_expect("must have a valid dtype"),
168✔
287
                                n,
168✔
288
                            )
168✔
289
                            .try_to_df()
168✔
290
                            .ok()
168✔
291
                        })
168✔
292
                        .transpose()
168✔
293
                    });
168✔
294

295
                    ColumnStatistics {
296
                        null_count: null_count.to_df(),
168✔
297
                        max_value: max.to_df(),
168✔
298
                        min_value: min.to_df(),
168✔
299
                        sum_value: Precision::Absent,
168✔
300
                        distinct_count: stats_set
168✔
301
                            .get_as::<bool>(
168✔
302
                                Stat::IsConstant,
168✔
303
                                &DType::Bool(Nullability::NonNullable),
168✔
304
                            )
305
                            .and_then(|is_constant| {
168✔
306
                                is_constant.as_exact().map(|_| Precision::Exact(1))
×
307
                            })
×
308
                            .unwrap_or(Precision::Absent),
168✔
309
                    }
310
                })
168✔
311
                .collect::<Vec<_>>();
28✔
312

313
            Ok(Statistics {
314
                num_rows: Precision::Exact(
315
                    usize::try_from(vxf.row_count())
28✔
316
                        .map_err(|_| vortex_err!("Row count overflow"))
28✔
317
                        .vortex_expect("Row count overflow"),
28✔
318
                ),
319
                total_byte_size,
28✔
320
                column_statistics,
28✔
321
            })
322
        })
28✔
323
        .await
324
        .vortex_expect("Failed to spawn infer_stats")
325
    }
326

327
    async fn create_physical_plan(
328
        &self,
329
        _state: &dyn Session,
330
        file_scan_config: FileScanConfig,
331
    ) -> DFResult<Arc<dyn ExecutionPlan>> {
180✔
332
        if !file_scan_config.table_partition_cols.is_empty() {
333
            return not_impl_err!("Hive style partitioning isn't implemented yet for Vortex");
334
        }
335

336
        if !file_scan_config.output_ordering.is_empty() {
337
            return not_impl_err!("Vortex doesn't support output ordering");
338
        }
339

340
        let source = VortexSource::new(self.file_cache.clone(), self.session.metrics().clone());
341
        let source = Arc::new(source);
342

343
        Ok(DataSourceExec::from_data_source(
344
            FileScanConfigBuilder::from(file_scan_config)
345
                .with_source(source)
346
                .build(),
347
        ))
348
    }
180✔
349

350
    async fn create_writer_physical_plan(
351
        &self,
352
        input: Arc<dyn ExecutionPlan>,
353
        _state: &dyn Session,
354
        conf: FileSinkConfig,
355
        order_requirements: Option<LexRequirement>,
356
    ) -> DFResult<Arc<dyn ExecutionPlan>> {
2✔
357
        if conf.insert_op != InsertOp::Append {
358
            return not_impl_err!("Overwrites are not implemented yet for Vortex");
359
        }
360

361
        if !conf.table_partition_cols.is_empty() {
362
            return not_impl_err!("Hive style partitioning isn't implemented yet for Vortex");
363
        }
364

365
        let schema = conf.output_schema().clone();
366
        let sink = Arc::new(VortexSink::new(conf, schema));
367

368
        Ok(Arc::new(DataSinkExec::new(input, sink, order_requirements)) as _)
369
    }
2✔
370

371
    fn file_source(&self) -> Arc<dyn FileSource> {
180✔
372
        Arc::new(VortexSource::new(
180✔
373
            self.file_cache.clone(),
180✔
374
            VortexMetrics::default(),
180✔
375
        ))
180✔
376
    }
180✔
377
}
378

379
#[cfg(test)]
380
mod tests {
381
    use datafusion::execution::SessionStateBuilder;
382
    use datafusion::prelude::SessionContext;
383
    use tempfile::TempDir;
384

385
    use super::*;
386
    use crate::persistent::register_vortex_format_factory;
387

388
    #[tokio::test]
389
    async fn create_table() {
1✔
390
        let dir = TempDir::new().unwrap();
1✔
391

392
        let factory: VortexFormatFactory = Default::default();
1✔
393
        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
1✔
394
        register_vortex_format_factory(factory, &mut session_state_builder);
1✔
395
        let session = SessionContext::new_with_state(session_state_builder.build());
1✔
396

397
        let df = session
1✔
398
            .sql(&format!(
1✔
399
                "CREATE EXTERNAL TABLE my_tbl \
1✔
400
                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
1✔
401
                STORED AS vortex LOCATION '{}'",
1✔
402
                dir.path().to_str().unwrap()
1✔
403
            ))
1✔
404
            .await
1✔
405
            .unwrap();
1✔
406

407
        assert_eq!(df.count().await.unwrap(), 0);
1✔
408
    }
1✔
409

410
    #[tokio::test]
411
    #[should_panic]
412
    async fn fail_table_config() {
1✔
413
        let dir = TempDir::new().unwrap();
1✔
414

415
        let factory: VortexFormatFactory = Default::default();
1✔
416
        let mut session_state_builder = SessionStateBuilder::new().with_default_features();
1✔
417
        register_vortex_format_factory(factory, &mut session_state_builder);
1✔
418
        let session = SessionContext::new_with_state(session_state_builder.build());
1✔
419

420
        session
1✔
421
            .sql(&format!(
1✔
422
                "CREATE EXTERNAL TABLE my_tbl \
1✔
423
                (c1 VARCHAR NOT NULL, c2 INT NOT NULL) \
1✔
424
                STORED AS vortex LOCATION '{}' \
1✔
425
                OPTIONS( some_key 'value' );",
1✔
426
                dir.path().to_str().unwrap()
1✔
427
            ))
1✔
428
            .await
1✔
429
            .unwrap()
1✔
430
            .collect()
1✔
431
            .await
1✔
432
            .unwrap();
1✔
433
    }
1✔
434
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc