• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16968296061

14 Aug 2025 02:36PM UTC coverage: 85.384%. First build
16968296061

Pull #4180

github

web-flow
Merge 4f1b98ecb into d1c2d9c66
Pull Request #4180: feat: Support more datafusion features

189 of 259 new or added lines in 6 files covered. (72.97%)

55334 of 64806 relevant lines covered (85.38%)

489107.11 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

52.75
/vortex-datafusion/src/persistent/source.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::Formatter;
6
use std::sync::{Arc, Weak};
7

8
use dashmap::DashMap;
9
use datafusion::arrow::datatypes::SchemaRef;
10
use datafusion::common::{Result as DFResult, Statistics};
11
use datafusion::config::ConfigOptions;
12
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
13
use datafusion::datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
14
use datafusion::physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory;
15
use datafusion::physical_expr::{PhysicalExprRef, conjunction};
16
use datafusion::physical_plan::filter_pushdown::{
17
    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
18
};
19
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
20
use datafusion::physical_plan::{DisplayFormatType, PhysicalExpr};
21
use object_store::ObjectStore;
22
use object_store::path::Path;
23
use vortex::error::VortexExpect as _;
24
use vortex::file::VORTEX_FILE_EXTENSION;
25
use vortex::layout::LayoutReader;
26
use vortex::metrics::VortexMetrics;
27

28
use super::cache::VortexFileCache;
29
use super::metrics::PARTITION_LABEL;
30
use super::opener::VortexFileOpener;
31
use crate::convert::exprs::can_be_pushed_down;
32

33
/// A config for [`VortexFileOpener`]. Used to create [`DataSourceExec`] based physical plans.
34
///
35
/// [`DataSourceExec`]: datafusion_physical_plan::source::DataSourceExec
36
#[derive(Clone)]
37
pub struct VortexSource {
38
    pub(crate) file_cache: VortexFileCache,
39
    pub(crate) predicate: Option<PhysicalExprRef>,
40
    pub(crate) batch_size: Option<usize>,
41
    pub(crate) projected_statistics: Option<Statistics>,
42
    /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema.
43
    pub(crate) arrow_file_schema: Option<SchemaRef>,
44
    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
45
    pub(crate) metrics: VortexMetrics,
46
    _unused_df_metrics: ExecutionPlanMetricsSet,
47
    /// Shared layout readers, the source only lives as long as one scan.
48
    ///
49
    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
50
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
51
}
52

53
impl VortexSource {
54
    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
12✔
55
        Self {
12✔
56
            file_cache,
12✔
57
            metrics,
12✔
58
            predicate: None,
12✔
59
            batch_size: None,
12✔
60
            projected_statistics: None,
12✔
61
            arrow_file_schema: None,
12✔
62
            schema_adapter_factory: None,
12✔
63
            _unused_df_metrics: Default::default(),
12✔
64
            layout_readers: Arc::new(DashMap::default()),
12✔
65
        }
12✔
66
    }
12✔
67
}
68

69
impl FileSource for VortexSource {
70
    fn create_file_opener(
2✔
71
        &self,
2✔
72
        object_store: Arc<dyn ObjectStore>,
2✔
73
        base_config: &FileScanConfig,
2✔
74
        partition: usize,
2✔
75
    ) -> Arc<dyn FileOpener> {
2✔
76
        let partition_metrics = self
2✔
77
            .metrics
2✔
78
            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
2✔
79

80
        let batch_size = self
2✔
81
            .batch_size
2✔
82
            .vortex_expect("batch_size must be supplied to VortexSource");
2✔
83

84
        let expr_adapter = base_config.expr_adapter_factory.as_ref();
2✔
85
        let schema_adapter = self.schema_adapter_factory.as_ref();
2✔
86

87
        // This match is here to support the behavior defined by [`ListingTable`], see https://github.com/apache/datafusion/issues/16800 for more details.
88
        let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) {
2✔
NEW
89
            (Some(expr_adapter), Some(schema_adapter)) => {
×
NEW
90
                (Some(expr_adapter.clone()), schema_adapter.clone())
×
91
            }
NEW
92
            (Some(expr_adapter), None) => (
×
NEW
93
                Some(expr_adapter.clone()),
×
NEW
94
                Arc::new(DefaultSchemaAdapterFactory) as _,
×
NEW
95
            ),
×
NEW
96
            (None, Some(schema_adapter)) => {
×
97
                // If no `PhysicalExprAdapterFactory` is specified, we only use the provided `SchemaAdapterFactory`
NEW
98
                (None, schema_adapter.clone())
×
99
            }
100
            (None, None) => (
2✔
101
                Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
2✔
102
                Arc::new(DefaultSchemaAdapterFactory) as _,
2✔
103
            ),
2✔
104
        };
105

106
        let projection = base_config.file_column_projection_indices().map(Arc::from);
2✔
107

108
        let opener = VortexFileOpener {
2✔
109
            object_store,
2✔
110
            projection,
2✔
111
            filter: self.predicate.clone(),
2✔
112
            expr_adapter_factory,
2✔
113
            schema_adapter_factory,
2✔
114
            partition_fields: base_config.table_partition_cols.clone(),
2✔
115
            logical_schema: base_config.file_schema.clone(),
2✔
116
            file_cache: self.file_cache.clone(),
2✔
117
            batch_size,
2✔
118
            limit: base_config.limit,
2✔
119
            metrics: partition_metrics,
2✔
120
            layout_readers: self.layout_readers.clone(),
2✔
121
        };
2✔
122

123
        Arc::new(opener)
2✔
124
    }
2✔
125

126
    fn as_any(&self) -> &dyn Any {
×
127
        self
×
128
    }
×
129

130
    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
2✔
131
        let mut source = self.clone();
2✔
132
        source.batch_size = Some(batch_size);
2✔
133
        Arc::new(source)
2✔
134
    }
2✔
135

136
    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
12✔
137
        let mut source = self.clone();
12✔
138
        source.arrow_file_schema = Some(schema);
12✔
139
        Arc::new(source)
12✔
140
    }
12✔
141

142
    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
2✔
143
        Arc::new(self.clone())
2✔
144
    }
2✔
145

146
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
12✔
147
        let mut source = self.clone();
12✔
148
        source.projected_statistics = Some(statistics);
12✔
149
        Arc::new(source)
12✔
150
    }
12✔
151

152
    fn metrics(&self) -> &ExecutionPlanMetricsSet {
4✔
153
        &self._unused_df_metrics
4✔
154
    }
4✔
155

156
    fn statistics(&self) -> DFResult<Statistics> {
18✔
157
        let statistics = self
18✔
158
            .projected_statistics
18✔
159
            .clone()
18✔
160
            .vortex_expect("projected_statistics must be set");
18✔
161

162
        if self.predicate.is_some() {
18✔
163
            Ok(statistics.to_inexact())
×
164
        } else {
165
            Ok(statistics)
18✔
166
        }
167
    }
18✔
168

169
    fn file_type(&self) -> &str {
×
170
        VORTEX_FILE_EXTENSION
×
171
    }
×
172

173
    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
×
174
        match t {
×
175
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
176
                if let Some(ref predicate) = self.predicate {
×
177
                    write!(f, ", predicate: {predicate}")?;
×
178
                }
×
179
            }
180
            // Use TreeRender style key=value formatting to display the predicate
181
            DisplayFormatType::TreeRender => {
182
                if let Some(ref predicate) = self.predicate {
×
NEW
183
                    writeln!(f, "predicate={predicate}")?;
×
184
                };
×
185
            }
186
        }
187
        Ok(())
×
188
    }
×
189

190
    fn try_pushdown_filters(
2✔
191
        &self,
2✔
192
        filters: Vec<Arc<dyn PhysicalExpr>>,
2✔
193
        _config: &ConfigOptions,
2✔
194
    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
2✔
195
        let Some(schema) = self.arrow_file_schema.as_ref() else {
2✔
196
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
×
197
                vec![PushedDown::No; filters.len()],
×
198
            ));
×
199
        };
200

201
        let mut source = self.clone();
2✔
202

203
        let filters = filters
2✔
204
            .into_iter()
2✔
205
            .map(|expr| {
2✔
206
                if can_be_pushed_down(&expr, schema) {
×
207
                    PushedDownPredicate::supported(expr)
×
208
                } else {
209
                    PushedDownPredicate::unsupported(expr)
×
210
                }
211
            })
×
212
            .collect::<Vec<_>>();
2✔
213

214
        if filters
2✔
215
            .iter()
2✔
216
            .all(|p| matches!(p.discriminant, PushedDown::No))
2✔
217
        {
218
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
2✔
219
                vec![PushedDown::No; filters.len()],
2✔
220
            ));
2✔
221
        }
×
222

223
        let supported = filters
×
224
            .iter()
×
225
            .filter_map(|p| match p.discriminant {
×
226
                PushedDown::Yes => Some(&p.predicate),
×
227
                PushedDown::No => None,
×
228
            })
×
NEW
229
            .cloned();
×
230

NEW
231
        let predicate = match source.predicate {
×
NEW
232
            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
×
NEW
233
            None => conjunction(supported),
×
234
        };
NEW
235
        source.predicate = Some(predicate);
×
236

NEW
237
        let pushdown_propagation = if source.predicate.clone().is_some() {
×
NEW
238
            FilterPushdownPropagation::with_parent_pushdown_result(
×
239
                filters.iter().map(|f| f.discriminant).collect(),
×
240
            )
NEW
241
            .with_updated_node(Arc::new(source) as _)
×
242
        } else {
NEW
243
            FilterPushdownPropagation::with_parent_pushdown_result(vec![
×
NEW
244
                PushedDown::No;
×
NEW
245
                filters.len()
×
246
            ])
247
        };
248

NEW
249
        Ok(pushdown_propagation)
×
250
    }
2✔
251

NEW
252
    fn repartitioned(
×
NEW
253
        &self,
×
NEW
254
        target_partitions: usize,
×
NEW
255
        repartition_file_min_size: usize,
×
NEW
256
        output_ordering: Option<datafusion::physical_expr::LexOrdering>,
×
NEW
257
        config: &FileScanConfig,
×
NEW
258
    ) -> DFResult<Option<FileScanConfig>> {
×
NEW
259
        if config.file_compression_type.is_compressed() || config.new_lines_in_values {
×
NEW
260
            return Ok(None);
×
261
        }
×
262

NEW
263
        let repartitioned_file_groups_option =
×
NEW
264
            datafusion::datasource::physical_plan::FileGroupPartitioner::new()
×
NEW
265
                .with_target_partitions(target_partitions)
×
NEW
266
                .with_repartition_file_min_size(repartition_file_min_size)
×
NEW
267
                .with_preserve_order_within_groups(output_ordering.is_some())
×
NEW
268
                .repartition_file_groups(&config.file_groups);
×
269

NEW
270
        if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
×
NEW
271
            let mut source = config.clone();
×
NEW
272
            source.file_groups = repartitioned_file_groups;
×
NEW
273
            return Ok(Some(source));
×
NEW
274
        }
×
NEW
275
        Ok(None)
×
276
    }
×
277

NEW
278
    fn with_schema_adapter_factory(
×
NEW
279
        &self,
×
NEW
280
        factory: Arc<dyn SchemaAdapterFactory>,
×
NEW
281
    ) -> DFResult<Arc<dyn FileSource>> {
×
NEW
282
        let mut source = self.clone();
×
NEW
283
        source.schema_adapter_factory = Some(factory);
×
NEW
284
        Ok(Arc::new(source))
×
NEW
285
    }
×
286

NEW
287
    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
×
NEW
288
        None
×
NEW
289
    }
×
290
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc