• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16486479294

24 Jul 2025 02:42AM UTC coverage: 81.018% (-0.05%) from 81.067%
16486479294

push

github

web-flow
fix: Pruning expressions check NanCount where appropriate (#3973)

Fixes #3958 

NanCount stat is checked in the pruning evaluation if it is present
(i.e. for float columns and literals)

- [x] Verify fixes fuzz failure
- [x] Add unit tests

Signed-off-by: Andrew Duffy <andrew@a10y.dev>

---------

Signed-off-by: Andrew Duffy <andrew@a10y.dev>
Signed-off-by: Robert Kruszewski <github@robertk.io>
Co-authored-by: Robert Kruszewski <github@robertk.io>

106 of 132 new or added lines in 12 files covered. (80.3%)

4 existing lines in 2 files now uncovered.

42012 of 51855 relevant lines covered (81.02%)

173899.63 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.63
/vortex-datafusion/src/persistent/source.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::Formatter;
6
use std::sync::{Arc, Weak};
7

8
use dashmap::DashMap;
9
use datafusion::arrow::datatypes::SchemaRef;
10
use datafusion::common::{Result as DFResult, Statistics};
11
use datafusion::config::ConfigOptions;
12
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
13
use datafusion::physical_plan::filter_pushdown::{
14
    FilterPushdownPropagation, PredicateSupport, PredicateSupports,
15
};
16
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
17
use datafusion::physical_plan::{DisplayFormatType, PhysicalExpr};
18
use object_store::ObjectStore;
19
use object_store::path::Path;
20
use vortex::error::VortexExpect as _;
21
use vortex::expr::{ExprRef, VortexExpr, and, root};
22
use vortex::file::VORTEX_FILE_EXTENSION;
23
use vortex::layout::LayoutReader;
24
use vortex::metrics::VortexMetrics;
25

26
use super::cache::VortexFileCache;
27
use super::config::{ConfigProjection, FileScanConfigExt};
28
use super::metrics::PARTITION_LABEL;
29
use super::opener::VortexFileOpener;
30
use crate::can_be_pushed_down;
31
use crate::convert::TryFromDataFusion as _;
32

33
/// A config for [`VortexFileOpener`]. Used to create [`DataSourceExec`] based physical plans.
34
///
35
/// [`DataSourceExec`]: datafusion_physical_plan::source::DataSourceExec
36
#[derive(Clone)]
37
pub struct VortexSource {
38
    pub(crate) file_cache: VortexFileCache,
39
    pub(crate) predicate: Option<Arc<dyn VortexExpr>>,
40
    pub(crate) projection: Option<Arc<dyn VortexExpr>>,
41
    pub(crate) batch_size: Option<usize>,
42
    pub(crate) projected_statistics: Option<Statistics>,
43
    pub(crate) arrow_schema: Option<SchemaRef>,
44
    pub(crate) metrics: VortexMetrics,
45
    _unused_df_metrics: ExecutionPlanMetricsSet,
46
    /// Shared layout readers, the source only lives as long as one scan.
47
    ///
48
    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
49
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
50
}
51

52
impl VortexSource {
53
    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
360✔
54
        Self {
360✔
55
            file_cache,
360✔
56
            metrics,
360✔
57
            projection: None,
360✔
58
            batch_size: None,
360✔
59
            projected_statistics: None,
360✔
60
            arrow_schema: None,
360✔
61
            predicate: None,
360✔
62
            _unused_df_metrics: Default::default(),
360✔
63
            layout_readers: Arc::new(DashMap::default()),
360✔
64
        }
360✔
65
    }
360✔
66

67
    /// Sets a [`VortexExpr`] as a predicate
68
    pub fn with_predicate(&self, predicate: Arc<dyn VortexExpr>) -> Self {
80✔
69
        let mut source = self.clone();
80✔
70
        source.predicate = Some(predicate);
80✔
71
        source
80✔
72
    }
80✔
73
}
74

75
impl FileSource for VortexSource {
76
    fn create_file_opener(
484✔
77
        &self,
484✔
78
        object_store: Arc<dyn ObjectStore>,
484✔
79
        base_config: &FileScanConfig,
484✔
80
        partition: usize,
484✔
81
    ) -> Arc<dyn FileOpener> {
484✔
82
        let partition_metrics = self
484✔
83
            .metrics
484✔
84
            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
484✔
85

86
        let batch_size = self
484✔
87
            .batch_size
484✔
88
            .vortex_expect("batch_size must be supplied to VortexSource");
484✔
89

90
        let opener = VortexFileOpener::new(
484✔
91
            object_store,
484✔
92
            self.projection.clone().unwrap_or_else(root),
484✔
93
            self.predicate.clone(),
484✔
94
            self.file_cache.clone(),
484✔
95
            self.arrow_schema
484✔
96
                .clone()
484✔
97
                .vortex_expect("We should have a schema here"),
484✔
98
            batch_size,
484✔
99
            base_config.limit,
484✔
100
            partition_metrics,
484✔
101
            self.layout_readers.clone(),
484✔
102
        );
103

104
        Arc::new(opener)
484✔
105
    }
484✔
106

107
    fn as_any(&self) -> &dyn Any {
174✔
108
        self
174✔
109
    }
174✔
110

111
    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
484✔
112
        let mut source = self.clone();
484✔
113
        source.batch_size = Some(batch_size);
484✔
114
        Arc::new(source)
484✔
115
    }
484✔
116

117
    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
502✔
118
        // todo(adam): does this need to the same as `with_projection`?
119
        let mut source = self.clone();
502✔
120
        source.arrow_schema = Some(schema);
502✔
121
        Arc::new(source)
502✔
122
    }
502✔
123

124
    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
484✔
125
        let ConfigProjection {
126
            arrow_schema,
484✔
127
            constraints: _constraints,
484✔
128
            statistics,
484✔
129
            projection_expr,
484✔
130
        } = config.project_for_vortex();
484✔
131

132
        let statistics = if self.predicate.is_some() {
484✔
133
            statistics.to_inexact()
276✔
134
        } else {
135
            statistics
208✔
136
        };
137

138
        let mut source = self.clone();
484✔
139
        source.projection = Some(projection_expr);
484✔
140
        source.arrow_schema = Some(arrow_schema);
484✔
141
        source.projected_statistics = Some(statistics);
484✔
142

143
        Arc::new(source)
484✔
144
    }
484✔
145

146
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
502✔
147
        let mut source = self.clone();
502✔
148
        source.projected_statistics = Some(statistics);
502✔
149
        Arc::new(source)
502✔
150
    }
502✔
151

152
    fn metrics(&self) -> &ExecutionPlanMetricsSet {
658✔
153
        &self._unused_df_metrics
658✔
154
    }
658✔
155

156
    fn statistics(&self) -> DFResult<Statistics> {
4,126✔
157
        let statistics = self
4,126✔
158
            .projected_statistics
4,126✔
159
            .clone()
4,126✔
160
            .vortex_expect("projected_statistics must be set");
4,126✔
161

162
        if self.predicate.is_some() {
4,126✔
163
            Ok(statistics.to_inexact())
1,580✔
164
        } else {
165
            Ok(statistics)
2,546✔
166
        }
167
    }
4,126✔
168

169
    fn file_type(&self) -> &str {
×
170
        VORTEX_FILE_EXTENSION
×
171
    }
×
172

173
    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
×
174
        match t {
×
175
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
NEW
176
                if let Some(ref predicate) = self.predicate {
×
NEW
177
                    write!(f, ", predicate: {predicate}")?;
×
NEW
178
                }
×
179
            }
180
            // Use TreeRender style key=value formatting to display the predicate
181
            DisplayFormatType::TreeRender => {
NEW
182
                if let Some(ref predicate) = self.predicate {
×
NEW
183
                    write!(f, "predicate={predicate}")?;
×
NEW
184
                };
×
185
            }
186
        }
NEW
187
        Ok(())
×
UNCOV
188
    }
×
189

190
    fn try_pushdown_filters(
175✔
191
        &self,
175✔
192
        filters: Vec<Arc<dyn PhysicalExpr>>,
175✔
193
        _config: &ConfigOptions,
175✔
194
    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
175✔
195
        let Some(schema) = self.arrow_schema.as_ref() else {
175✔
196
            return Ok(FilterPushdownPropagation::unsupported(filters));
×
197
        };
198
        let (supported, unsupported): (Vec<_>, Vec<_>) = filters
175✔
199
            .iter()
175✔
200
            .partition(|expr| can_be_pushed_down(expr, schema));
175✔
201

202
        match make_vortex_predicate(&supported) {
175✔
203
            Some(predicate) => {
80✔
204
                let supports = PredicateSupports::new(
80✔
205
                    supported
80✔
206
                        .into_iter()
80✔
207
                        .map(|expr| PredicateSupport::Supported(expr.clone()))
116✔
208
                        .chain(
80✔
209
                            unsupported
80✔
210
                                .into_iter()
80✔
211
                                .map(|expr| PredicateSupport::Unsupported(expr.clone())),
80✔
212
                        )
213
                        .collect(),
80✔
214
                );
215
                Ok(FilterPushdownPropagation::with_filters(supports)
80✔
216
                    .with_updated_node(Arc::new(self.with_predicate(predicate))))
80✔
217
            }
218
            None => Ok(FilterPushdownPropagation::unsupported(filters)),
95✔
219
        }
220
    }
175✔
221
}
222

223
// If we cannot convert an expr to a vortex expr, we run no filter, since datafusion
224
// will rerun the filter expression anyway.
225
pub(crate) fn make_vortex_predicate(
175✔
226
    predicate: &[&Arc<dyn PhysicalExpr>],
175✔
227
) -> Option<Arc<dyn VortexExpr>> {
175✔
228
    // This splits expressions into conjunctions and converts them to vortex expressions.
229
    // Any inconvertible expressions are dropped since true /\ a == a.
230
    predicate
175✔
231
        .iter()
175✔
232
        .filter_map(|e| ExprRef::try_from_df(e.as_ref()).ok())
175✔
233
        .reduce(and)
175✔
234
}
175✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc