• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16992684502

15 Aug 2025 02:56PM UTC coverage: 87.875% (+0.2%) from 87.72%
16992684502

Pull #2456

github

web-flow
Merge 2d540e578 into 4a23f65b3
Pull Request #2456: feat: basic BoolBuffer / BoolBufferMut

1275 of 1428 new or added lines in 110 files covered. (89.29%)

334 existing lines in 31 files now uncovered.

57169 of 65057 relevant lines covered (87.88%)

658056.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.51
/vortex-datafusion/src/persistent/source.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::Formatter;
6
use std::sync::{Arc, Weak};
7

8
use arrow_schema::SchemaRef;
9
use dashmap::DashMap;
10
use datafusion_common::config::ConfigOptions;
11
use datafusion_common::{Result as DFResult, Statistics};
12
use datafusion_datasource::file::FileSource;
13
use datafusion_datasource::file_scan_config::FileScanConfig;
14
use datafusion_datasource::file_stream::FileOpener;
15
use datafusion_datasource::schema_adapter::{DefaultSchemaAdapterFactory, SchemaAdapterFactory};
16
use datafusion_physical_expr::schema_rewriter::{
17
    DefaultPhysicalExprAdapterFactory, PhysicalExprAdapterFactory,
18
};
19
use datafusion_physical_expr::{PhysicalExprRef, conjunction};
20
use datafusion_physical_plan::filter_pushdown::{
21
    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
22
};
23
use datafusion_physical_plan::metrics::ExecutionPlanMetricsSet;
24
use datafusion_physical_plan::{DisplayFormatType, PhysicalExpr};
25
use object_store::ObjectStore;
26
use object_store::path::Path;
27
use vortex::error::VortexExpect as _;
28
use vortex::file::VORTEX_FILE_EXTENSION;
29
use vortex::layout::LayoutReader;
30
use vortex::metrics::VortexMetrics;
31

32
use super::cache::VortexFileCache;
33
use super::metrics::PARTITION_LABEL;
34
use super::opener::VortexOpener;
35
use crate::convert::exprs::can_be_pushed_down;
36

37
/// A config for [`VortexFileOpener`]. Used to create [`DataSourceExec`] based physical plans.
38
///
39
/// [`DataSourceExec`]: datafusion::datasource::source::DataSourceExec
40
#[derive(Clone)]
41
pub struct VortexSource {
42
    pub(crate) file_cache: VortexFileCache,
43
    pub(crate) predicate: Option<PhysicalExprRef>,
44
    pub(crate) batch_size: Option<usize>,
45
    pub(crate) projected_statistics: Option<Statistics>,
46
    /// This is the file schema the table expects, which is the table's schema without partition columns, and **not** the file's physical schema.
47
    pub(crate) arrow_file_schema: Option<SchemaRef>,
48
    pub(crate) schema_adapter_factory: Option<Arc<dyn SchemaAdapterFactory>>,
49
    pub(crate) expr_adapter_factory: Option<Arc<dyn PhysicalExprAdapterFactory>>,
50
    pub(crate) metrics: VortexMetrics,
51
    _unused_df_metrics: ExecutionPlanMetricsSet,
52
    /// Shared layout readers, the source only lives as long as one scan.
53
    ///
54
    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
55
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
56
}
57

58
impl VortexSource {
59
    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
360✔
60
        Self {
360✔
61
            file_cache,
360✔
62
            metrics,
360✔
63
            predicate: None,
360✔
64
            batch_size: None,
360✔
65
            projected_statistics: None,
360✔
66
            arrow_file_schema: None,
360✔
67
            schema_adapter_factory: None,
360✔
68
            expr_adapter_factory: None,
360✔
69
            _unused_df_metrics: Default::default(),
360✔
70
            layout_readers: Arc::new(DashMap::default()),
360✔
71
        }
360✔
72
    }
360✔
73

74
    /// Sets a [`PhysicalExprAdapterFactory`] for the [`VortexSource`].
75
    /// Currently, this must be provided in order to filter columns in files that have a different data type from the unified table schema.
76
    ///
77
    /// This factory will take precedence when opening files over instances provided by the [`FileScanConfig`].
UNCOV
78
    pub fn with_expr_adapter_factory(
×
UNCOV
79
        &self,
×
UNCOV
80
        expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
×
UNCOV
81
    ) -> Arc<dyn FileSource> {
×
UNCOV
82
        let mut source = self.clone();
×
UNCOV
83
        source.expr_adapter_factory = Some(expr_adapter_factory);
×
UNCOV
84
        Arc::new(source)
×
UNCOV
85
    }
×
86
}
87

88
impl FileSource for VortexSource {
89
    fn create_file_opener(
484✔
90
        &self,
484✔
91
        object_store: Arc<dyn ObjectStore>,
484✔
92
        base_config: &FileScanConfig,
484✔
93
        partition: usize,
484✔
94
    ) -> Arc<dyn FileOpener> {
484✔
95
        let partition_metrics = self
484✔
96
            .metrics
484✔
97
            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
484✔
98

99
        let batch_size = self
484✔
100
            .batch_size
484✔
101
            .vortex_expect("batch_size must be supplied to VortexSource");
484✔
102

103
        let expr_adapter = self
484✔
104
            .expr_adapter_factory
484✔
105
            .as_ref()
484✔
106
            .or(base_config.expr_adapter_factory.as_ref());
484✔
107
        let schema_adapter = self.schema_adapter_factory.as_ref();
484✔
108

109
        // This match is here to support the behavior defined by [`ListingTable`], see https://github.com/apache/datafusion/issues/16800 for more details.
110
        let (expr_adapter_factory, schema_adapter_factory) = match (expr_adapter, schema_adapter) {
484✔
UNCOV
111
            (Some(expr_adapter), Some(schema_adapter)) => {
×
UNCOV
112
                (Some(expr_adapter.clone()), schema_adapter.clone())
×
113
            }
UNCOV
114
            (Some(expr_adapter), None) => (
×
UNCOV
115
                Some(expr_adapter.clone()),
×
UNCOV
116
                Arc::new(DefaultSchemaAdapterFactory) as _,
×
UNCOV
117
            ),
×
UNCOV
118
            (None, Some(schema_adapter)) => {
×
119
                // If no `PhysicalExprAdapterFactory` is specified, we only use the provided `SchemaAdapterFactory`
UNCOV
120
                (None, schema_adapter.clone())
×
121
            }
122
            (None, None) => (
484✔
123
                Some(Arc::new(DefaultPhysicalExprAdapterFactory) as _),
484✔
124
                Arc::new(DefaultSchemaAdapterFactory) as _,
484✔
125
            ),
484✔
126
        };
127

128
        let projection = base_config.file_column_projection_indices().map(Arc::from);
484✔
129

130
        let opener = VortexOpener {
484✔
131
            object_store,
484✔
132
            projection,
484✔
133
            filter: self.predicate.clone(),
484✔
134
            expr_adapter_factory,
484✔
135
            schema_adapter_factory,
484✔
136
            partition_fields: base_config.table_partition_cols.clone(),
484✔
137
            logical_schema: base_config.file_schema.clone(),
484✔
138
            file_cache: self.file_cache.clone(),
484✔
139
            batch_size,
484✔
140
            limit: base_config.limit,
484✔
141
            metrics: partition_metrics,
484✔
142
            layout_readers: self.layout_readers.clone(),
484✔
143
        };
484✔
144

145
        Arc::new(opener)
484✔
146
    }
484✔
147

148
    fn as_any(&self) -> &dyn Any {
174✔
149
        self
174✔
150
    }
174✔
151

152
    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
484✔
153
        let mut source = self.clone();
484✔
154
        source.batch_size = Some(batch_size);
484✔
155
        Arc::new(source)
484✔
156
    }
484✔
157

158
    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
506✔
159
        let mut source = self.clone();
506✔
160
        source.arrow_file_schema = Some(schema);
506✔
161
        Arc::new(source)
506✔
162
    }
506✔
163

164
    fn with_projection(&self, _config: &FileScanConfig) -> Arc<dyn FileSource> {
484✔
165
        Arc::new(self.clone())
484✔
166
    }
484✔
167

168
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
506✔
169
        let mut source = self.clone();
506✔
170
        source.projected_statistics = Some(statistics);
506✔
171
        Arc::new(source)
506✔
172
    }
506✔
173

174
    fn metrics(&self) -> &ExecutionPlanMetricsSet {
1,142✔
175
        &self._unused_df_metrics
1,142✔
176
    }
1,142✔
177

178
    fn statistics(&self) -> DFResult<Statistics> {
3,624✔
179
        let statistics = self
3,624✔
180
            .projected_statistics
3,624✔
181
            .clone()
3,624✔
182
            .vortex_expect("projected_statistics must be set");
3,624✔
183

184
        if self.predicate.is_some() {
3,624✔
185
            Ok(statistics.to_inexact())
1,318✔
186
        } else {
187
            Ok(statistics)
2,306✔
188
        }
189
    }
3,624✔
190

UNCOV
191
    fn file_type(&self) -> &str {
×
UNCOV
192
        VORTEX_FILE_EXTENSION
×
UNCOV
193
    }
×
194

UNCOV
195
    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
×
196
        match t {
×
197
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
198
                if let Some(ref predicate) = self.predicate {
×
UNCOV
199
                    write!(f, ", predicate: {predicate}")?;
×
UNCOV
200
                }
×
201
            }
202
            // Use TreeRender style key=value formatting to display the predicate
203
            DisplayFormatType::TreeRender => {
UNCOV
204
                if let Some(ref predicate) = self.predicate {
×
UNCOV
205
                    writeln!(f, "predicate={predicate}")?;
×
UNCOV
206
                };
×
207
            }
208
        }
UNCOV
209
        Ok(())
×
UNCOV
210
    }
×
211

212
    fn try_pushdown_filters(
350✔
213
        &self,
350✔
214
        filters: Vec<Arc<dyn PhysicalExpr>>,
350✔
215
        _config: &ConfigOptions,
350✔
216
    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
350✔
217
        let Some(schema) = self.arrow_file_schema.as_ref() else {
350✔
UNCOV
218
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
×
UNCOV
219
                vec![PushedDown::No; filters.len()],
×
UNCOV
220
            ));
×
221
        };
222

223
        let mut source = self.clone();
350✔
224

225
        let filters = filters
350✔
226
            .into_iter()
350✔
227
            .map(|expr| {
350✔
228
                if can_be_pushed_down(&expr, schema) {
134✔
229
                    PushedDownPredicate::supported(expr)
126✔
230
                } else {
231
                    PushedDownPredicate::unsupported(expr)
8✔
232
                }
233
            })
134✔
234
            .collect::<Vec<_>>();
350✔
235

236
        if filters
350✔
237
            .iter()
350✔
238
            .all(|p| matches!(p.discriminant, PushedDown::No))
350✔
239
        {
240
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
268✔
241
                vec![PushedDown::No; filters.len()],
268✔
242
            ));
268✔
243
        }
82✔
244

245
        let supported = filters
82✔
246
            .iter()
82✔
247
            .filter_map(|p| match p.discriminant {
132✔
248
                PushedDown::Yes => Some(&p.predicate),
126✔
249
                PushedDown::No => None,
6✔
250
            })
132✔
251
            .cloned();
82✔
252

253
        let predicate = match source.predicate {
82✔
UNCOV
254
            Some(predicate) => conjunction(std::iter::once(predicate).chain(supported)),
×
255
            None => conjunction(supported),
82✔
256
        };
257
        source.predicate = Some(predicate);
82✔
258

259
        let pushdown_propagation = if source.predicate.clone().is_some() {
82✔
260
            FilterPushdownPropagation::with_parent_pushdown_result(
82✔
261
                filters.iter().map(|f| f.discriminant).collect(),
82✔
262
            )
263
            .with_updated_node(Arc::new(source) as _)
82✔
264
        } else {
UNCOV
265
            FilterPushdownPropagation::with_parent_pushdown_result(vec![
×
UNCOV
266
                PushedDown::No;
×
UNCOV
267
                filters.len()
×
268
            ])
269
        };
270

271
        Ok(pushdown_propagation)
82✔
272
    }
350✔
273

UNCOV
274
    fn with_schema_adapter_factory(
×
UNCOV
275
        &self,
×
UNCOV
276
        factory: Arc<dyn SchemaAdapterFactory>,
×
UNCOV
277
    ) -> DFResult<Arc<dyn FileSource>> {
×
UNCOV
278
        let mut source = self.clone();
×
UNCOV
279
        source.schema_adapter_factory = Some(factory);
×
UNCOV
280
        Ok(Arc::new(source))
×
UNCOV
281
    }
×
282

UNCOV
283
    fn schema_adapter_factory(&self) -> Option<Arc<dyn SchemaAdapterFactory>> {
×
UNCOV
284
        None
×
UNCOV
285
    }
×
286
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc