• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16481624455

23 Jul 2025 08:55PM UTC coverage: 81.413% (+0.3%) from 81.07%
16481624455

Pull #3973

github

web-flow
Merge 8cba7d598 into 2ddcfbf30
Pull Request #3973: fix: Pruning expressions check NanCount where appropriate

223 of 234 new or added lines in 14 files covered. (95.3%)

33 existing lines in 3 files now uncovered.

42531 of 52241 relevant lines covered (81.41%)

172575.91 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.27
/vortex-datafusion/src/persistent/source.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::Formatter;
6
use std::sync::{Arc, Weak};
7

8
use dashmap::DashMap;
9
use datafusion::arrow::datatypes::SchemaRef;
10
use datafusion::common::{Result as DFResult, Statistics};
11
use datafusion::config::ConfigOptions;
12
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
13
use datafusion::physical_plan::filter_pushdown::{
14
    FilterPushdownPropagation, PredicateSupport, PredicateSupports,
15
};
16
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
17
use datafusion::physical_plan::{DisplayFormatType, PhysicalExpr};
18
use object_store::ObjectStore;
19
use object_store::path::Path;
20
use vortex::error::VortexExpect as _;
21
use vortex::expr::pruning::pruning_expr;
22
use vortex::expr::{ExprRef, VortexExpr, and, root};
23
use vortex::file::VORTEX_FILE_EXTENSION;
24
use vortex::layout::LayoutReader;
25
use vortex::metrics::VortexMetrics;
26

27
use super::cache::VortexFileCache;
28
use super::config::{ConfigProjection, FileScanConfigExt};
29
use super::metrics::PARTITION_LABEL;
30
use super::opener::VortexFileOpener;
31
use crate::can_be_pushed_down;
32
use crate::convert::TryFromDataFusion as _;
33

34
/// A config for [`VortexFileOpener`]. Used to create [`DataSourceExec`] based physical plans.
35
///
36
/// [`DataSourceExec`]: datafusion_physical_plan::source::DataSourceExec
37
#[derive(Clone)]
38
pub struct VortexSource {
39
    pub(crate) file_cache: VortexFileCache,
40
    pub(crate) predicate: Option<Arc<dyn VortexExpr>>,
41
    pub(crate) projection: Option<Arc<dyn VortexExpr>>,
42
    pub(crate) batch_size: Option<usize>,
43
    pub(crate) projected_statistics: Option<Statistics>,
44
    pub(crate) arrow_schema: Option<SchemaRef>,
45
    pub(crate) metrics: VortexMetrics,
46
    _unused_df_metrics: ExecutionPlanMetricsSet,
47
    /// Shared layout readers, the source only lives as long as one scan.
48
    ///
49
    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
50
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
51
}
52

53
impl VortexSource {
348✔
54
    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
360✔
55
        Self {
360✔
56
            file_cache,
360✔
57
            metrics,
360✔
58
            projection: None,
360✔
59
            batch_size: None,
360✔
60
            projected_statistics: None,
360✔
61
            arrow_schema: None,
360✔
62
            predicate: None,
360✔
63
            _unused_df_metrics: Default::default(),
360✔
64
            layout_readers: Arc::new(DashMap::default()),
360✔
65
        }
360✔
66
    }
12✔
67

68
    /// Sets a [`VortexExpr`] as a predicate
80✔
69
    pub fn with_predicate(&self, predicate: Arc<dyn VortexExpr>) -> Self {
80✔
70
        let mut source = self.clone();
80✔
71
        source.predicate = Some(predicate);
80✔
72
        source
80✔
73
    }
74
}
75

76
impl FileSource for VortexSource {
482✔
77
    fn create_file_opener(
484✔
78
        &self,
484✔
79
        object_store: Arc<dyn ObjectStore>,
484✔
80
        base_config: &FileScanConfig,
484✔
81
        partition: usize,
484✔
82
    ) -> Arc<dyn FileOpener> {
484✔
83
        let partition_metrics = self
484✔
84
            .metrics
484✔
85
            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
2✔
86

482✔
87
        let batch_size = self
484✔
88
            .batch_size
484✔
89
            .vortex_expect("batch_size must be supplied to VortexSource");
2✔
90

482✔
91
        let opener = VortexFileOpener::new(
484✔
92
            object_store,
484✔
93
            self.projection.clone().unwrap_or_else(root),
484✔
94
            self.predicate.clone(),
484✔
95
            self.file_cache.clone(),
484✔
96
            self.arrow_schema
484✔
97
                .clone()
484✔
98
                .vortex_expect("We should have a schema here"),
484✔
99
            batch_size,
484✔
100
            base_config.limit,
484✔
101
            partition_metrics,
484✔
102
            self.layout_readers.clone(),
2✔
103
        );
104

482✔
105
        Arc::new(opener)
484✔
106
    }
2✔
107

174✔
108
    fn as_any(&self) -> &dyn Any {
174✔
109
        self
174✔
110
    }
111

482✔
112
    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
484✔
113
        let mut source = self.clone();
484✔
114
        source.batch_size = Some(batch_size);
484✔
115
        Arc::new(source)
484✔
116
    }
2✔
117

490✔
118
    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
12✔
119
        // todo(adam): does this need to the same as `with_projection`?
490✔
120
        let mut source = self.clone();
502✔
121
        source.arrow_schema = Some(schema);
502✔
122
        Arc::new(source)
502✔
123
    }
12✔
124

482✔
125
    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
2✔
126
        let ConfigProjection {
482✔
127
            arrow_schema,
484✔
128
            constraints: _constraints,
484✔
129
            statistics,
484✔
130
            projection_expr,
484✔
131
        } = config.project_for_vortex();
2✔
132

482✔
133
        let statistics = if self.predicate.is_some() {
278✔
134
            statistics.to_inexact()
135
        } else {
206✔
136
            statistics
2✔
137
        };
138

482✔
139
        let mut source = self.clone();
484✔
140
        source.projection = Some(projection_expr);
484✔
141
        source.arrow_schema = Some(arrow_schema);
484✔
142
        source.projected_statistics = Some(statistics);
2✔
143

482✔
144
        Arc::new(source)
484✔
145
    }
2✔
146

490✔
147
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
502✔
148
        let mut source = self.clone();
502✔
149
        source.projected_statistics = Some(statistics);
502✔
150
        Arc::new(source)
502✔
151
    }
12✔
152

656✔
153
    fn metrics(&self) -> &ExecutionPlanMetricsSet {
658✔
154
        &self._unused_df_metrics
658✔
155
    }
2✔
156

4,106✔
157
    fn statistics(&self) -> DFResult<Statistics> {
4,126✔
158
        let statistics = self
4,126✔
159
            .projected_statistics
4,126✔
160
            .clone()
4,126✔
161
            .vortex_expect("projected_statistics must be set");
20✔
162

4,106✔
163
        if self.predicate.is_some() {
1,600✔
164
            Ok(statistics.to_inexact())
165
        } else {
2,526✔
166
            Ok(statistics)
20✔
167
        }
4,106✔
168
    }
20✔
169

170
    fn file_type(&self) -> &str {
×
171
        VORTEX_FILE_EXTENSION
×
172
    }
173

174
    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
×
175
        match t {
176
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
NEW
177
                if let Some(predicate) = &self.predicate {
×
NEW
178
                    write!(f, ", predicate={predicate}")?;
×
179
                    if let Some((pruning_predicate, _)) = pruning_expr(predicate) {
180
                        writeln!(f, ", pruning_predicate={pruning_predicate}")?;
181
                    }
NEW
182
                };
×
NEW
183
                Ok(())
×
184
            }
185
            DisplayFormatType::TreeRender => {
186
                if let Some(predicate) = &self.predicate {
NEW
187
                    writeln!(f, "predicate={}", predicate.as_ref())?;
×
UNCOV
188
                }
×
189
                Ok(())
190
            }
174✔
191
        }
174✔
192
    }
174✔
193

174✔
194
    fn try_pushdown_filters(
175✔
195
        &self,
175✔
196
        filters: Vec<Arc<dyn PhysicalExpr>>,
1✔
197
        _config: &ConfigOptions,
1✔
198
    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
175✔
199
        let Some(schema) = self.arrow_schema.as_ref() else {
175✔
200
            return Ok(FilterPushdownPropagation::unsupported(filters));
174✔
201
        };
202
        let (supported, unsupported): (Vec<_>, Vec<_>) = filters
175✔
203
            .iter()
81✔
204
            .partition(|expr| can_be_pushed_down(expr, schema));
81✔
205

80✔
206
        match make_vortex_predicate(&supported) {
81✔
207
            Some(predicate) => {
116✔
208
                let supports = PredicateSupports::new(
80✔
209
                    supported
80✔
210
                        .into_iter()
80✔
211
                        .map(|expr| PredicateSupport::Supported(expr.clone()))
80✔
212
                        .chain(
213
                            unsupported
80✔
214
                                .into_iter()
215
                                .map(|expr| PredicateSupport::Unsupported(expr.clone())),
80✔
216
                        )
80✔
217
                        .collect(),
218
                );
94✔
219
                Ok(FilterPushdownPropagation::with_filters(supports)
220
                    .with_updated_node(Arc::new(self.with_predicate(predicate))))
174✔
221
            }
222
            None => Ok(FilterPushdownPropagation::unsupported(filters)),
1✔
223
        }
224
    }
1✔
225
}
174✔
226

174✔
227
// If we cannot convert an expr to a vortex expr, we run no filter, since datafusion
174✔
228
// will rerun the filter expression anyway.
229
pub(crate) fn make_vortex_predicate(
1✔
230
    predicate: &[&Arc<dyn PhysicalExpr>],
175✔
231
) -> Option<Arc<dyn VortexExpr>> {
175✔
232
    // This splits expressions into conjunctions and converts them to vortex expressions.
174✔
233
    // Any inconvertible expressions are dropped since true /\ a == a.
174✔
234
    predicate
175✔
235
        .iter()
1✔
236
        .filter_map(|e| ExprRef::try_from_df(e.as_ref()).ok())
1✔
237
        .reduce(and)
1✔
238
}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc