• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16785034283

06 Aug 2025 06:15PM UTC coverage: 83.936% (-0.1%) from 84.031%
16785034283

Pull #4144

github

web-flow
Merge 4f39e8189 into e5eccefcb
Pull Request #4144: [WIP] Datafusion-related crimes

30 of 93 new or added lines in 5 files covered. (32.26%)

12 existing lines in 2 files now uncovered.

48336 of 57587 relevant lines covered (83.94%)

520076.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.71
/vortex-datafusion/src/persistent/source.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::Formatter;
6
use std::sync::{Arc, Weak};
7

8
use dashmap::DashMap;
9
use datafusion::arrow::datatypes::SchemaRef;
10
use datafusion::common::{Result as DFResult, Statistics};
11
use datafusion::config::ConfigOptions;
12
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
13
use datafusion::physical_plan::filter_pushdown::{
14
    FilterPushdownPropagation, PushedDown, PushedDownPredicate,
15
};
16
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
17
use datafusion::physical_plan::{DisplayFormatType, PhysicalExpr};
18
use object_store::ObjectStore;
19
use object_store::path::Path;
20
use vortex::error::VortexExpect as _;
21
use vortex::expr::{VortexExpr, root};
22
use vortex::file::VORTEX_FILE_EXTENSION;
23
use vortex::layout::LayoutReader;
24
use vortex::metrics::VortexMetrics;
25

26
use super::cache::VortexFileCache;
27
use super::config::{ConfigProjection, FileScanConfigExt};
28
use super::metrics::PARTITION_LABEL;
29
use super::opener::VortexOpener;
30

31
use crate::{can_be_pushed_down, make_vortex_predicate};
32

33
/// A config for [`VortexOpener`]. Used to create [`FileSource`] based physical plans.
34
#[derive(Clone)]
35
pub struct VortexSource {
36
    pub(crate) file_cache: VortexFileCache,
37
    pub(crate) predicate: Option<Arc<dyn VortexExpr>>,
38
    pub(crate) projection: Option<Arc<dyn VortexExpr>>,
39
    pub(crate) batch_size: Option<usize>,
40
    pub(crate) projected_statistics: Option<Statistics>,
41
    pub(crate) arrow_schema: Option<SchemaRef>,
42
    pub(crate) metrics: VortexMetrics,
43
    _unused_df_metrics: ExecutionPlanMetricsSet,
44
    /// Shared layout readers, the source only lives as long as one scan.
45
    ///
46
    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
47
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
48
}
49

50
impl VortexSource {
51
    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
360✔
52
        Self {
360✔
53
            file_cache,
360✔
54
            metrics,
360✔
55
            projection: None,
360✔
56
            batch_size: None,
360✔
57
            projected_statistics: None,
360✔
58
            arrow_schema: None,
360✔
59
            predicate: None,
360✔
60
            _unused_df_metrics: Default::default(),
360✔
61
            layout_readers: Arc::new(DashMap::default()),
360✔
62
        }
360✔
63
    }
360✔
64

65
    /// Sets a [`VortexExpr`] as a predicate
66
    pub fn with_predicate(&self, predicate: Arc<dyn VortexExpr>) -> Self {
80✔
67
        let mut source = self.clone();
80✔
68
        source.predicate = Some(predicate);
80✔
69
        source
80✔
70
    }
80✔
71
}
72

73
impl FileSource for VortexSource {
74
    fn create_file_opener(
484✔
75
        &self,
484✔
76
        object_store: Arc<dyn ObjectStore>,
484✔
77
        base_config: &FileScanConfig,
484✔
78
        partition: usize,
484✔
79
    ) -> Arc<dyn FileOpener> {
484✔
80
        let partition_metrics = self
484✔
81
            .metrics
484✔
82
            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
484✔
83

84
        let batch_size = self
484✔
85
            .batch_size
484✔
86
            .vortex_expect("batch_size must be supplied to VortexSource");
484✔
87

88
        let opener = VortexOpener {
484✔
89
            object_store,
484✔
90
            projection: self.projection.clone().unwrap_or_else(root),
484✔
91
            filter: self.predicate.clone(),
484✔
92
            file_cache: self.file_cache.clone(),
484✔
93
            projected_arrow_schema: self
484✔
94
                .arrow_schema
484✔
95
                .clone()
484✔
96
                .vortex_expect("We should have a schema here"),
484✔
97
            batch_size,
484✔
98
            limit: base_config.limit,
484✔
99
            metrics: partition_metrics,
484✔
100
            layout_readers: self.layout_readers.clone(),
484✔
101
        };
484✔
102

103
        Arc::new(opener)
484✔
104
    }
484✔
105

106
    fn as_any(&self) -> &dyn Any {
174✔
107
        self
174✔
108
    }
174✔
109

110
    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
484✔
111
        let mut source = self.clone();
484✔
112
        source.batch_size = Some(batch_size);
484✔
113
        Arc::new(source)
484✔
114
    }
484✔
115

116
    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
502✔
117
        // todo(adam): does this need to the same as `with_projection`?
118
        let mut source = self.clone();
502✔
119
        source.arrow_schema = Some(schema);
502✔
120
        Arc::new(source)
502✔
121
    }
502✔
122

123
    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
484✔
124
        let ConfigProjection {
125
            arrow_schema,
484✔
126
            constraints: _constraints,
484✔
127
            statistics,
484✔
128
            projection_expr,
484✔
129
        } = config.project_for_vortex();
484✔
130

131
        let statistics = if self.predicate.is_some() {
484✔
132
            statistics.to_inexact()
276✔
133
        } else {
134
            statistics
208✔
135
        };
136

137
        let mut source = self.clone();
484✔
138
        source.projection = Some(projection_expr);
484✔
139
        source.arrow_schema = Some(arrow_schema);
484✔
140
        source.projected_statistics = Some(statistics);
484✔
141

142
        Arc::new(source)
484✔
143
    }
484✔
144

145
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
502✔
146
        let mut source = self.clone();
502✔
147
        source.projected_statistics = Some(statistics);
502✔
148
        Arc::new(source)
502✔
149
    }
502✔
150

151
    fn metrics(&self) -> &ExecutionPlanMetricsSet {
1,142✔
152
        &self._unused_df_metrics
1,142✔
153
    }
1,142✔
154

155
    fn statistics(&self) -> DFResult<Statistics> {
4,126✔
156
        let statistics = self
4,126✔
157
            .projected_statistics
4,126✔
158
            .clone()
4,126✔
159
            .vortex_expect("projected_statistics must be set");
4,126✔
160

161
        if self.predicate.is_some() {
4,126✔
162
            Ok(statistics.to_inexact())
1,580✔
163
        } else {
164
            Ok(statistics)
2,546✔
165
        }
166
    }
4,126✔
167

168
    fn file_type(&self) -> &str {
×
169
        VORTEX_FILE_EXTENSION
×
UNCOV
170
    }
×
171

172
    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
×
UNCOV
173
        match t {
×
174
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
175
                if let Some(ref predicate) = self.predicate {
×
176
                    write!(f, ", predicate: {predicate}")?;
×
UNCOV
177
                }
×
178
            }
179
            // Use TreeRender style key=value formatting to display the predicate
180
            DisplayFormatType::TreeRender => {
181
                if let Some(ref predicate) = self.predicate {
×
182
                    write!(f, "predicate={predicate}")?;
×
UNCOV
183
                };
×
184
            }
185
        }
186
        Ok(())
×
UNCOV
187
    }
×
188

189
    fn try_pushdown_filters(
350✔
190
        &self,
350✔
191
        filters: Vec<Arc<dyn PhysicalExpr>>,
350✔
192
        _config: &ConfigOptions,
350✔
193
    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
350✔
194
        let Some(schema) = self.arrow_schema.as_ref() else {
350✔
195
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
×
196
                vec![PushedDown::No; filters.len()],
×
UNCOV
197
            ));
×
198
        };
199

200
        let filters = filters
350✔
201
            .into_iter()
350✔
202
            .map(|expr| {
350✔
203
                if can_be_pushed_down(&expr, schema) {
134✔
204
                    PushedDownPredicate::supported(expr)
116✔
205
                } else {
206
                    PushedDownPredicate::unsupported(expr)
18✔
207
                }
208
            })
134✔
209
            .collect::<Vec<_>>();
350✔
210

211
        if filters
350✔
212
            .iter()
350✔
213
            .all(|p| matches!(p.discriminant, PushedDown::No))
350✔
214
        {
215
            return Ok(FilterPushdownPropagation::with_parent_pushdown_result(
270✔
216
                vec![PushedDown::No; filters.len()],
270✔
217
            ));
270✔
218
        }
80✔
219

220
        let supported = filters
80✔
221
            .iter()
80✔
222
            .filter_map(|p| match p.discriminant {
128✔
223
                PushedDown::Yes => Some(&p.predicate),
116✔
224
                PushedDown::No => None,
12✔
225
            })
128✔
226
            .collect::<Vec<_>>();
80✔
227

228
        match make_vortex_predicate(&supported) {
80✔
229
            Some(predicate) => Ok(FilterPushdownPropagation::with_parent_pushdown_result(
80✔
230
                filters.iter().map(|f| f.discriminant).collect(),
80✔
231
            )
232
            .with_updated_node(Arc::new(self.with_predicate(predicate)))),
80✔
233
            _ => Ok(FilterPushdownPropagation::with_parent_pushdown_result(
×
234
                vec![PushedDown::No; filters.len()],
×
UNCOV
235
            )),
×
236
        }
237
    }
350✔
238
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc