• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16988198748

15 Aug 2025 10:31AM UTC coverage: 87.744% (+0.03%) from 87.718%
16988198748

Pull #4180

github

web-flow
Merge 3b547aaf1 into 7a89e7fa1
Pull Request #4180: feat: Support more datafusion features

227 of 271 new or added lines in 5 files covered. (83.76%)

2 existing lines in 1 file now uncovered.

56402 of 64280 relevant lines covered (87.74%)

628997.68 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.26
/vortex-datafusion/src/persistent/source.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::Formatter;
6
use std::sync::{Arc, Weak};
7

8
use dashmap::DashMap;
9
use datafusion::arrow::datatypes::SchemaRef;
10
use datafusion::common::{Result as DFResult, Statistics};
11
use datafusion::config::ConfigOptions;
12
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
13
use datafusion::datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
14
use datafusion::physical_expr::schema_rewriter::{
15
    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory,
16
};
17
use datafusion::physical_expr::{PhysicalExprRef, conjunction};
18
use datafusion::physical_plan::filter_pushdown::{
19
    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
20
};
21
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
22
use datafusion::physical_plan::{DisplayFormatType, PhysicalExpr};
23
use object_store::ObjectStore;
24
use object_store::path::Path;
25
use vortex::error::VortexExpect as _;
26
use vortex::file::VORTEX_FILE_EXTENSION;
27
use vortex::layout::LayoutReader;
28
use vortex::metrics::VortexMetrics;
29

30
use super::cache::VortexFileCache;
31
use super::metrics::PARTITION_LABEL;
32
use super::opener::VortexFileOpener;
33
use crate::convert::exprs::can_be_pushed_down;
34

35
/// A config for [`VortexFileOpener`]. Used to create [`DataSourceExec`] based physical plans.
36
///
37
/// [`DataSourceExec`]: datafusion::datasource::source::DataSourceExec
38
#[derive(Clone)]
39
pub struct VortexSource {
40
    pub(crate) file_cache: VortexFileCache,
41
    pub(crate) predicate: Option<PhysicalExprRef>,
42
    pub(crate) batch_size: Option<usize>,
43
    pub(crate) projected_statistics: Option<Statistics>,
44
    /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema.
45
    pub(crate) arrow_file_schema: Option<SchemaRef>,
46
    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
47
    pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
48
    pub(crate) metrics: VortexMetrics,
49
    _unused_df_metrics: ExecutionPlanMetricsSet,
50
    /// Shared layout readers, the source only lives as long as one scan.
51
    ///
52
    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
53
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
54
}
55

56
impl VortexSource {
57
    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
360✔
58
        Self {
360✔
59
            file_cache,
360✔
60
            metrics,
360✔
61
            predicate: None,
360✔
62
            batch_size: None,
360✔
63
            projected_statistics: None,
360✔
64
            arrow_file_schema: None,
360✔
65
            schema_adapter_factory: None,
360✔
66
            expr_adapter_factory: None,
360✔
67
            _unused_df_metrics: Default::default(),
360✔
68
            layout_readers: Arc::new(DashMap::default()),
360✔
69
        }
360✔
70
    }
360✔
71

72
    /// Sets a [`PhysicalExprAdapterFactory`] for the [`VortexSource`].
73
    /// Currently, this must be provided in order to filter columns in files that have a different data type from the unified table schema.
74
    ///
75
    /// This factory will take precedence when opening files over instances provided by the [`FileScanConfig`].
NEW
76
    pub fn with_expr_adapter_factory(
×
NEW
77
        &self,
×
NEW
78
        expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
×
NEW
79
    ) -> Arc<dyn FileSource> {
×
UNCOV
80
        let mut source = self.clone();
×
NEW
81
        source.expr_adapter_factory = Some(expr_adapter_factory);
×
NEW
82
        Arc::new(source)
×
UNCOV
83
    }
×
84
}
85

86
impl FileSource for VortexSource {
87
    fn create_file_opener(
484✔
88
        &self,
484✔
89
        object_store: Arc<dyn ObjectStore>,
484✔
90
        base_config: &FileScanConfig,
484✔
91
        partition: usize,
484✔
92
    ) -> Arc<dyn FileOpener> {
484✔
93
        let partition_metrics = self
484✔
94
            .metrics
484✔
95
            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
484✔
96

97
        let batch_size = self
484✔
98
            .batch_size
484✔
99
            .vortex_expect("batch_size must be supplied to VortexSource");
484✔
100

101
        let expr_adapter = self
484✔
102
            .expr_adapter_factory
484✔
103
            .as_ref()
484✔
104
            .or(base_config.expr_adapter_factory.as_ref());
484✔
105
        let schema_adapter = self.schema_adapter_factory.as_ref();
484✔
106

107
        // This match is here to support the behavior defined by [`ListingTable`], see https://github.com/apache/datafusion/issues/16800 for more details.
108
        let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) {
484✔
NEW
109
            (Some(expr_adapter), Some(schema_adapter)) => {
×
NEW
110
                (Some(expr_adapter.clone()), schema_adapter.clone())
×
111
            }
NEW
112
            (Some(expr_adapter), None) => (
×
NEW
113
                Some(expr_adapter.clone()),
×
NEW
114
                Arc::new(DefaultSchemaAdapterFactory) as _,
×
NEW
115
            ),
×
NEW
116
            (None, Some(schema_adapter)) => {
×
117
                // If no `PhysicalExprAdapterFactory` is specified, we only use the provided `SchemaAdapterFactory`
NEW
118
                (None, schema_adapter.clone())
×
119
            }
120
            (None, None) => (
484✔
121
                Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
484✔
122
                Arc::new(DefaultSchemaAdapterFactory) as _,
484✔
123
            ),
484✔
124
        };
125

126
        let projection = base_config.file_column_projection_indices().map(Arc::from);
484✔
127

128
        let opener = VortexFileOpener {
484✔
129
            object_store,
484✔
130
            projection,
484✔
131
            filter: self.predicate.clone(),
484✔
132
            expr_adapter_factory,
484✔
133
            schema_adapter_factory,
484✔
134
            partition_fields: base_config.table_partition_cols.clone(),
484✔
135
            logical_schema: base_config.file_schema.clone(),
484✔
136
            file_cache: self.file_cache.clone(),
484✔
137
            batch_size,
484✔
138
            limit: base_config.limit,
484✔
139
            metrics: partition_metrics,
484✔
140
            layout_readers: self.layout_readers.clone(),
484✔
141
        };
484✔
142

143
        Arc::new(opener)
484✔
144
    }
484✔
145

146
    fn as_any(&self) -> &dyn Any {
174✔
147
        self
174✔
148
    }
174✔
149

150
    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
484✔
151
        let mut source = self.clone();
484✔
152
        source.batch_size = Some(batch_size);
484✔
153
        Arc::new(source)
484✔
154
    }
484✔
155

156
    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
506✔
157
        let mut source = self.clone();
506✔
158
        source.arrow_file_schema = Some(schema);
506✔
159
        Arc::new(source)
506✔
160
    }
506✔
161

162
    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
484✔
163
        Arc::new(self.clone())
484✔
164
    }
484✔
165

166
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
506✔
167
        let mut source = self.clone();
506✔
168
        source.projected_statistics = Some(statistics);
506✔
169
        Arc::new(source)
506✔
170
    }
506✔
171

172
    fn metrics(&self) -> &ExecutionPlanMetricsSet {
1,142✔
173
        &self._unused_df_metrics
1,142✔
174
    }
1,142✔
175

176
    fn statistics(&self) -> DFResult<Statistics> {
3,624✔
177
        let statistics = self
3,624✔
178
            .projected_statistics
3,624✔
179
            .clone()
3,624✔
180
            .vortex_expect("projected_statistics must be set");
3,624✔
181

182
        if self.predicate.is_some() {
3,624✔
183
            Ok(statistics.to_inexact())
1,318✔
184
        } else {
185
            Ok(statistics)
2,306✔
186
        }
187
    }
3,624✔
188

189
    fn file_type(&self) -> &str {
×
190
        VORTEX_FILE_EXTENSION
×
191
    }
×
192

193
    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
×
194
        match t {
×
195
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
196
                if let Some(ref predicate) = self.predicate {
×
197
                    write!(f, ", predicate: {predicate}")?;
×
198
                }
×
199
            }
200
            // Use TreeRender style key=value formatting to display the predicate
201
            DisplayFormatType::TreeRender => {
202
                if let Some(ref predicate) = self.predicate {
×
NEW
203
                    writeln!(f, "predicate={predicate}")?;
×
204
                };
×
205
            }
206
        }
207
        Ok(())
×
208
    }
×
209

210
    fn try_pushdown_filters(
350✔
211
        &self,
350✔
212
        filters: Vec<Arc<dyn PhysicalExpr>>,
350✔
213
        _config: &ConfigOptions,
350✔
214
    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
350✔
215
        let Some(schema) = self.arrow_file_schema.as_ref() else {
350✔
216
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
×
217
                vec![PushedDown::No; filters.len()],
×
218
            ));
×
219
        };
220

221
        let mut source = self.clone();
350✔
222

223
        let filters = filters
350✔
224
            .into_iter()
350✔
225
            .map(|expr| {
350✔
226
                if can_be_pushed_down(&expr, schema) {
134✔
227
                    PushedDownPredicate::supported(expr)
126✔
228
                } else {
229
                    PushedDownPredicate::unsupported(expr)
8✔
230
                }
231
            })
134✔
232
            .collect::<Vec<_>>();
350✔
233

234
        if filters
350✔
235
            .iter()
350✔
236
            .all(|p| matches!(p.discriminant, PushedDown::No))
350✔
237
        {
238
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
268✔
239
                vec![PushedDown::No; filters.len()],
268✔
240
            ));
268✔
241
        }
82✔
242

243
        let supported = filters
82✔
244
            .iter()
82✔
245
            .filter_map(|p| match p.discriminant {
132✔
246
                PushedDown::Yes => Some(&p.predicate),
126✔
247
                PushedDown::No => None,
6✔
248
            })
132✔
249
            .cloned();
82✔
250

251
        let predicate = match source.predicate {
82✔
NEW
252
            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
×
253
            None => conjunction(supported),
82✔
254
        };
255
        source.predicate = Some(predicate);
82✔
256

257
        let pushdown_propagation = if source.predicate.clone().is_some() {
82✔
258
            FilterPushdownPropagation::with_parent_pushdown_result(
82✔
259
                filters.iter().map(|f| f.discriminant).collect(),
82✔
260
            )
261
            .with_updated_node(Arc::new(source) as _)
82✔
262
        } else {
NEW
263
            FilterPushdownPropagation::with_parent_pushdown_result(vec![
×
NEW
264
                PushedDown::No;
×
NEW
265
                filters.len()
×
266
            ])
267
        };
268

269
        Ok(pushdown_propagation)
82✔
270
    }
350✔
271

272
    fn repartitioned(
138✔
273
        &self,
138✔
274
        target_partitions: usize,
138✔
275
        repartition_file_min_size: usize,
138✔
276
        output_ordering: Option<datafusion::physical_expr::LexOrdering>,
138✔
277
        config: &FileScanConfig,
138✔
278
    ) -> DFResult<Option<FileScanConfig>> {
138✔
279
        if config.file_compression_type.is_compressed() || config.new_lines_in_values {
138✔
NEW
280
            return Ok(None);
×
281
        }
138✔
282

283
        let repartitioned_file_groups_option =
138✔
284
            datafusion::datasource::physical_plan::FileGroupPartitioner::new()
138✔
285
                .with_target_partitions(target_partitions)
138✔
286
                .with_repartition_file_min_size(repartition_file_min_size)
138✔
287
                .with_preserve_order_within_groups(output_ordering.is_some())
138✔
288
                .repartition_file_groups(&config.file_groups);
138✔
289

290
        if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
138✔
291
            let mut source = config.clone();
44✔
292
            source.file_groups = repartitioned_file_groups;
44✔
293
            return Ok(Some(source));
44✔
294
        }
94✔
295
        Ok(None)
94✔
296
    }
138✔
297

NEW
298
    fn with_schema_adapter_factory(
×
NEW
299
        &self,
×
NEW
300
        factory: Arc<dyn SchemaAdapterFactory>,
×
NEW
301
    ) -> DFResult<Arc<dyn FileSource>> {
×
NEW
302
        let mut source = self.clone();
×
NEW
303
        source.schema_adapter_factory = Some(factory);
×
NEW
304
        Ok(Arc::new(source))
×
NEW
305
    }
×
306

NEW
307
    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
×
NEW
308
        None
×
NEW
309
    }
×
310
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc