• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16968654568

14 Aug 2025 02:50PM UTC coverage: 87.702%. First build
16968654568

Pull #4180

github

web-flow
Merge fcec0147f into d1c2d9c66
Pull Request #4180: feat: Support more datafusion features

221 of 259 new or added lines in 6 files covered. (85.33%)

56768 of 64728 relevant lines covered (87.7%)

636071.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

78.02
/vortex-datafusion/src/persistent/source.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::Formatter;
6
use std::sync::{Arc, Weak};
7

8
use dashmap::DashMap;
9
use datafusion::arrow::datatypes::SchemaRef;
10
use datafusion::common::{Result as DFResult, Statistics};
11
use datafusion::config::ConfigOptions;
12
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
13
use datafusion::datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
14
use datafusion::physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory;
15
use datafusion::physical_expr::{PhysicalExprRef, conjunction};
16
use datafusion::physical_plan::filter_pushdown::{
17
    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
18
};
19
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
20
use datafusion::physical_plan::{DisplayFormatType, PhysicalExpr};
21
use object_store::ObjectStore;
22
use object_store::path::Path;
23
use vortex::error::VortexExpect as _;
24
use vortex::file::VORTEX_FILE_EXTENSION;
25
use vortex::layout::LayoutReader;
26
use vortex::metrics::VortexMetrics;
27

28
use super::cache::VortexFileCache;
29
use super::metrics::PARTITION_LABEL;
30
use super::opener::VortexFileOpener;
31
use crate::convert::exprs::can_be_pushed_down;
32

33
/// A config for [`VortexFileOpener`]. Used to create [`DataSourceExec`] based physical plans.
34
///
35
/// [`DataSourceExec`]: datafusion_physical_plan::source::DataSourceExec
36
#[derive(Clone)]
37
pub struct VortexSource {
38
    pub(crate) file_cache: VortexFileCache,
39
    pub(crate) predicate: Option<PhysicalExprRef>,
40
    pub(crate) batch_size: Option<usize>,
41
    pub(crate) projected_statistics: Option<Statistics>,
42
    /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema.
43
    pub(crate) arrow_file_schema: Option<SchemaRef>,
44
    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
45
    pub(crate) metrics: VortexMetrics,
46
    _unused_df_metrics: ExecutionPlanMetricsSet,
47
    /// Shared layout readers, the source only lives as long as one scan.
48
    ///
49
    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
50
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
51
}
52

53
impl VortexSource {
54
    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
360✔
55
        Self {
360✔
56
            file_cache,
360✔
57
            metrics,
360✔
58
            predicate: None,
360✔
59
            batch_size: None,
360✔
60
            projected_statistics: None,
360✔
61
            arrow_file_schema: None,
360✔
62
            schema_adapter_factory: None,
360✔
63
            _unused_df_metrics: Default::default(),
360✔
64
            layout_readers: Arc::new(DashMap::default()),
360✔
65
        }
360✔
66
    }
360✔
67
}
68

69
impl FileSource for VortexSource {
70
    fn create_file_opener(
484✔
71
        &self,
484✔
72
        object_store: Arc<dyn ObjectStore>,
484✔
73
        base_config: &FileScanConfig,
484✔
74
        partition: usize,
484✔
75
    ) -> Arc<dyn FileOpener> {
484✔
76
        let partition_metrics = self
484✔
77
            .metrics
484✔
78
            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
484✔
79

80
        let batch_size = self
484✔
81
            .batch_size
484✔
82
            .vortex_expect("batch_size must be supplied to VortexSource");
484✔
83

84
        let expr_adapter = base_config.expr_adapter_factory.as_ref();
484✔
85
        let schema_adapter = self.schema_adapter_factory.as_ref();
484✔
86

87
        // This match is here to support the behavior defined by [`ListingTable`], see https://github.com/apache/datafusion/issues/16800 for more details.
88
        let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) {
484✔
NEW
89
            (Some(expr_adapter), Some(schema_adapter)) => {
×
NEW
90
                (Some(expr_adapter.clone()), schema_adapter.clone())
×
91
            }
NEW
92
            (Some(expr_adapter), None) => (
×
NEW
93
                Some(expr_adapter.clone()),
×
NEW
94
                Arc::new(DefaultSchemaAdapterFactory) as _,
×
NEW
95
            ),
×
NEW
96
            (None, Some(schema_adapter)) => {
×
97
                // If no `PhysicalExprAdapterFactory` is specified, we only use the provided `SchemaAdapterFactory`
NEW
98
                (None, schema_adapter.clone())
×
99
            }
100
            (None, None) => (
484✔
101
                Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
484✔
102
                Arc::new(DefaultSchemaAdapterFactory) as _,
484✔
103
            ),
484✔
104
        };
105

106
        let projection = base_config.file_column_projection_indices().map(Arc::from);
484✔
107

108
        let opener = VortexFileOpener {
484✔
109
            object_store,
484✔
110
            projection,
484✔
111
            filter: self.predicate.clone(),
484✔
112
            expr_adapter_factory,
484✔
113
            schema_adapter_factory,
484✔
114
            partition_fields: base_config.table_partition_cols.clone(),
484✔
115
            logical_schema: base_config.file_schema.clone(),
484✔
116
            file_cache: self.file_cache.clone(),
484✔
117
            batch_size,
484✔
118
            limit: base_config.limit,
484✔
119
            metrics: partition_metrics,
484✔
120
            layout_readers: self.layout_readers.clone(),
484✔
121
        };
484✔
122

123
        Arc::new(opener)
484✔
124
    }
484✔
125

126
    fn as_any(&self) -> &dyn Any {
174✔
127
        self
174✔
128
    }
174✔
129

130
    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
484✔
131
        let mut source = self.clone();
484✔
132
        source.batch_size = Some(batch_size);
484✔
133
        Arc::new(source)
484✔
134
    }
484✔
135

136
    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
434✔
137
        let mut source = self.clone();
434✔
138
        source.arrow_file_schema = Some(schema);
434✔
139
        Arc::new(source)
434✔
140
    }
434✔
141

142
    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
484✔
143
        Arc::new(self.clone())
484✔
144
    }
484✔
145

146
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
434✔
147
        let mut source = self.clone();
434✔
148
        source.projected_statistics = Some(statistics);
434✔
149
        Arc::new(source)
434✔
150
    }
434✔
151

152
    fn metrics(&self) -> &ExecutionPlanMetricsSet {
1,142✔
153
        &self._unused_df_metrics
1,142✔
154
    }
1,142✔
155

156
    fn statistics(&self) -> DFResult<Statistics> {
3,634✔
157
        let statistics = self
3,634✔
158
            .projected_statistics
3,634✔
159
            .clone()
3,634✔
160
            .vortex_expect("projected_statistics must be set");
3,634✔
161

162
        if self.predicate.is_some() {
3,634✔
163
            Ok(statistics.to_inexact())
682✔
164
        } else {
165
            Ok(statistics)
2,952✔
166
        }
167
    }
3,634✔
168

169
    fn file_type(&self) -> &str {
×
170
        VORTEX_FILE_EXTENSION
×
171
    }
×
172

173
    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
×
174
        match t {
×
175
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
176
                if let Some(ref predicate) = self.predicate {
×
177
                    write!(f, ", predicate: {predicate}")?;
×
178
                }
×
179
            }
180
            // Use TreeRender style key=value formatting to display the predicate
181
            DisplayFormatType::TreeRender => {
182
                if let Some(ref predicate) = self.predicate {
×
NEW
183
                    writeln!(f, "predicate={predicate}")?;
×
184
                };
×
185
            }
186
        }
187
        Ok(())
×
188
    }
×
189

190
    fn try_pushdown_filters(
350✔
191
        &self,
350✔
192
        filters: Vec<Arc<dyn PhysicalExpr>>,
350✔
193
        _config: &ConfigOptions,
350✔
194
    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
350✔
195
        let Some(schema) = self.arrow_file_schema.as_ref() else {
350✔
196
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
×
197
                vec![PushedDown::No; filters.len()],
×
198
            ));
×
199
        };
200

201
        let mut source = self.clone();
350✔
202

203
        let filters = filters
350✔
204
            .into_iter()
350✔
205
            .map(|expr| {
350✔
206
                if can_be_pushed_down(&expr, schema) {
134✔
207
                    PushedDownPredicate::supported(expr)
46✔
208
                } else {
209
                    PushedDownPredicate::unsupported(expr)
88✔
210
                }
211
            })
134✔
212
            .collect::<Vec<_>>();
350✔
213

214
        if filters
350✔
215
            .iter()
350✔
216
            .all(|p| matches!(p.discriminant, PushedDown::No))
350✔
217
        {
218
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
308✔
219
                vec![PushedDown::No; filters.len()],
308✔
220
            ));
308✔
221
        }
42✔
222

223
        let supported = filters
42✔
224
            .iter()
42✔
225
            .filter_map(|p| match p.discriminant {
58✔
226
                PushedDown::Yes => Some(&p.predicate),
46✔
227
                PushedDown::No => None,
12✔
228
            })
58✔
229
            .cloned();
42✔
230

231
        let predicate = match source.predicate {
42✔
NEW
232
            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
×
233
            None => conjunction(supported),
42✔
234
        };
235
        source.predicate = Some(predicate);
42✔
236

237
        let pushdown_propagation = if source.predicate.clone().is_some() {
42✔
238
            FilterPushdownPropagation::with_parent_pushdown_result(
42✔
239
                filters.iter().map(|f| f.discriminant).collect(),
42✔
240
            )
241
            .with_updated_node(Arc::new(source) as _)
42✔
242
        } else {
NEW
243
            FilterPushdownPropagation::with_parent_pushdown_result(vec![
×
NEW
244
                PushedDown::No;
×
NEW
245
                filters.len()
×
246
            ])
247
        };
248

249
        Ok(pushdown_propagation)
42✔
250
    }
350✔
251

252
    fn repartitioned(
138✔
253
        &self,
138✔
254
        target_partitions: usize,
138✔
255
        repartition_file_min_size: usize,
138✔
256
        output_ordering: Option<datafusion::physical_expr::LexOrdering>,
138✔
257
        config: &FileScanConfig,
138✔
258
    ) -> DFResult<Option<FileScanConfig>> {
138✔
259
        if config.file_compression_type.is_compressed() || config.new_lines_in_values {
138✔
NEW
260
            return Ok(None);
×
261
        }
138✔
262

263
        let repartitioned_file_groups_option =
138✔
264
            datafusion::datasource::physical_plan::FileGroupPartitioner::new()
138✔
265
                .with_target_partitions(target_partitions)
138✔
266
                .with_repartition_file_min_size(repartition_file_min_size)
138✔
267
                .with_preserve_order_within_groups(output_ordering.is_some())
138✔
268
                .repartition_file_groups(&config.file_groups);
138✔
269

270
        if let Some(repartitioned_file_groups) = repartitioned_file_groups_option {
138✔
271
            let mut source = config.clone();
44✔
272
            source.file_groups = repartitioned_file_groups;
44✔
273
            return Ok(Some(source));
44✔
274
        }
94✔
275
        Ok(None)
94✔
276
    }
138✔
277

NEW
278
    fn with_schema_adapter_factory(
×
NEW
279
        &self,
×
NEW
280
        factory: Arc<dyn SchemaAdapterFactory>,
×
NEW
281
    ) -> DFResult<Arc<dyn FileSource>> {
×
NEW
282
        let mut source = self.clone();
×
NEW
283
        source.schema_adapter_factory = Some(factory);
×
NEW
284
        Ok(Arc::new(source))
×
NEW
285
    }
×
286

NEW
287
    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
×
NEW
288
        None
×
NEW
289
    }
×
290
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc