• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16880005972

11 Aug 2025 12:27PM UTC coverage: 86.29% (+0.08%) from 86.215%
16880005972

Pull #4180

github

web-flow
Merge 80f3edda7 into 8b28a1548
Pull Request #4180: feat: Support more datafusion features

236 of 256 new or added lines in 7 files covered. (92.19%)

1 existing line in 1 file now uncovered.

53523 of 62027 relevant lines covered (86.29%)

546301.19 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.19
/vortex-datafusion/src/persistent/source.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::Formatter;
6
use std::sync::{Arc, Weak};
7

8
use dashmap::DashMap;
9
use datafusion::arrow::datatypes::SchemaRef;
10
use datafusion::common::{Result as DFResult, Statistics};
11
use datafusion::config::ConfigOptions;
12
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
13
use datafusion::physical_expr::schema_rewriter::DefaultPhysicalExprAdapterFactory;
14
use datafusion::physical_expr::{PhysicalExprRef, conjunction};
15
use datafusion::physical_plan::filter_pushdown::{
16
    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
17
};
18
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
19
use datafusion::physical_plan::{DisplayFormatType, PhysicalExpr};
20
use object_store::ObjectStore;
21
use object_store::path::Path;
22
use vortex::error::VortexExpect as _;
23
use vortex::file::VORTEX_FILE_EXTENSION;
24
use vortex::layout::LayoutReader;
25
use vortex::metrics::VortexMetrics;
26

27
use super::cache::VortexFileCache;
28
use super::metrics::PARTITION_LABEL;
29
use super::opener::VortexFileOpener;
30
use crate::convert::exprs::can_be_pushed_down;
31

32
/// A config for [`VortexFileOpener`]. Used to create [`DataSourceExec`] based physical plans.
33
///
34
/// [`DataSourceExec`]: datafusion_physical_plan::source::DataSourceExec
35
#[derive(Clone)]
36
pub struct VortexSource {
37
    pub(crate) file_cache: VortexFileCache,
38
    pub(crate) predicate: Option<PhysicalExprRef>,
39
    pub(crate) batch_size: Option<usize>,
40
    pub(crate) projected_statistics: Option<Statistics>,
41
    pub(crate) arrow_file_schema: Option<SchemaRef>,
42
    pub(crate) metrics: VortexMetrics,
43

44
    _unused_df_metrics: ExecutionPlanMetricsSet,
45
    /// Shared layout readers, the source only lives as long as one scan.
46
    ///
47
    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
48
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
49
}
50

51
impl VortexSource {
52
    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
360✔
53
        Self {
360✔
54
            file_cache,
360✔
55
            metrics,
360✔
56
            predicate: None,
360✔
57
            batch_size: None,
360✔
58
            projected_statistics: None,
360✔
59
            arrow_file_schema: None,
360✔
60
            _unused_df_metrics: Default::default(),
360✔
61
            layout_readers: Arc::new(DashMap::default()),
360✔
62
        }
360✔
63
    }
360✔
64
}
65

66
impl FileSource for VortexSource {
67
    fn create_file_opener(
484✔
68
        &self,
484✔
69
        object_store: Arc<dyn ObjectStore>,
484✔
70
        base_config: &FileScanConfig,
484✔
71
        partition: usize,
484✔
72
    ) -> Arc<dyn FileOpener> {
484✔
73
        let partition_metrics = self
484✔
74
            .metrics
484✔
75
            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
484✔
76

77
        let batch_size = self
484✔
78
            .batch_size
484✔
79
            .vortex_expect("batch_size must be supplied to VortexSource");
484✔
80

81
        let expr_adapter_factory = base_config
484✔
82
            .expr_adapter_factory
484✔
83
            .clone()
484✔
84
            .unwrap_or_else(|| Arc::new(DefaultPhysicalExprAdapterFactory));
484✔
85

86
        let projection = base_config.file_column_projection_indices().map(Arc::from);
484✔
87

88
        let opener = VortexFileOpener {
484✔
89
            object_store,
484✔
90
            projection,
484✔
91
            filter: self.predicate.clone(),
484✔
92
            expr_adapter_factory,
484✔
93
            partition_fields: base_config.table_partition_cols.clone(),
484✔
94
            logical_schema: base_config.file_schema.clone(),
484✔
95
            file_cache: self.file_cache.clone(),
484✔
96
            batch_size,
484✔
97
            limit: base_config.limit,
484✔
98
            metrics: partition_metrics,
484✔
99
            layout_readers: self.layout_readers.clone(),
484✔
100
        };
484✔
101

102
        Arc::new(opener)
484✔
103
    }
484✔
104

105
    fn as_any(&self) -> &dyn Any {
174✔
106
        self
174✔
107
    }
174✔
108

109
    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
484✔
110
        let mut source = self.clone();
484✔
111
        source.batch_size = Some(batch_size);
484✔
112
        Arc::new(source)
484✔
113
    }
484✔
114

115
    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
506✔
116
        let mut source = self.clone();
506✔
117
        source.arrow_file_schema = Some(schema);
506✔
118
        Arc::new(source)
506✔
119
    }
506✔
120

121
    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
484✔
122
        Arc::new(self.clone())
484✔
123
    }
484✔
124

125
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
506✔
126
        let mut source = self.clone();
506✔
127
        source.projected_statistics = Some(statistics);
506✔
128
        Arc::new(source)
506✔
129
    }
506✔
130

131
    fn metrics(&self) -> &ExecutionPlanMetricsSet {
1,142✔
132
        &self._unused_df_metrics
1,142✔
133
    }
1,142✔
134

135
    fn statistics(&self) -> DFResult<Statistics> {
3,624✔
136
        let statistics = self
3,624✔
137
            .projected_statistics
3,624✔
138
            .clone()
3,624✔
139
            .vortex_expect("projected_statistics must be set");
3,624✔
140

141
        if self.predicate.is_some() {
3,624✔
142
            Ok(statistics.to_inexact())
1,318✔
143
        } else {
144
            Ok(statistics)
2,306✔
145
        }
146
    }
3,624✔
147

148
    fn file_type(&self) -> &str {
×
149
        VORTEX_FILE_EXTENSION
×
150
    }
×
151

152
    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
×
153
        match t {
×
154
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
155
                if let Some(ref predicate) = self.predicate {
×
156
                    write!(f, ", predicate: {predicate}")?;
×
157
                }
×
158
            }
159
            // Use TreeRender style key=value formatting to display the predicate
160
            DisplayFormatType::TreeRender => {
161
                if let Some(ref predicate) = self.predicate {
×
NEW
162
                    writeln!(f, "predicate={predicate}")?;
×
163
                };
×
164
            }
165
        }
166
        Ok(())
×
167
    }
×
168

169
    fn try_pushdown_filters(
350✔
170
        &self,
350✔
171
        filters: Vec<Arc<dyn PhysicalExpr>>,
350✔
172
        _config: &ConfigOptions,
350✔
173
    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
350✔
174
        let Some(schema) = self.arrow_file_schema.as_ref() else {
350✔
175
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
×
176
                vec![PushedDown::No; filters.len()],
×
177
            ));
×
178
        };
179

180
        let mut source = self.clone();
350✔
181

182
        let filters = filters
350✔
183
            .into_iter()
350✔
184
            .map(|expr| {
350✔
185
                if can_be_pushed_down(&expr, schema) {
134✔
186
                    PushedDownPredicate::supported(expr)
126✔
187
                } else {
188
                    PushedDownPredicate::unsupported(expr)
8✔
189
                }
190
            })
134✔
191
            .collect::<Vec<_>>();
350✔
192

193
        if filters
350✔
194
            .iter()
350✔
195
            .all(|p| matches!(p.discriminant, PushedDown::No))
350✔
196
        {
197
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
268✔
198
                vec![PushedDown::No; filters.len()],
268✔
199
            ));
268✔
200
        }
82✔
201

202
        let supported = filters
82✔
203
            .iter()
82✔
204
            .filter_map(|p| match p.discriminant {
132✔
205
                PushedDown::Yes => Some(&p.predicate),
126✔
206
                PushedDown::No => None,
6✔
207
            })
132✔
208
            .cloned();
82✔
209

210
        let predicate = match source.predicate {
82✔
NEW
211
            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
×
212
            None => conjunction(supported),
82✔
213
        };
214
        source.predicate = Some(predicate);
82✔
215

216
        let pushdown_propagation = if source.predicate.clone().is_some() {
82✔
217
            FilterPushdownPropagation::with_parent_pushdown_result(
82✔
218
                filters.iter().map(|f| f.discriminant).collect(),
82✔
219
            )
220
            .with_updated_node(Arc::new(source) as _)
82✔
221
        } else {
NEW
222
            FilterPushdownPropagation::with_parent_pushdown_result(vec![
×
NEW
223
                PushedDown::No;
×
NEW
224
                filters.len()
×
225
            ])
226
        };
227

228
        Ok(pushdown_propagation)
82✔
229
    }
350✔
230
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc