• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16445759128

22 Jul 2025 01:29PM UTC coverage: 81.191% (-0.3%) from 81.523%
16445759128

Pull #3966

github

web-flow
Merge e4e661d55 into 31ea6b17f
Pull Request #3966: DuckDB Dynamic Expressions

74 of 285 new or added lines in 4 files covered. (25.96%)

17 existing lines in 2 files now uncovered.

42009 of 51741 relevant lines covered (81.19%)

170922.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

87.05
/vortex-datafusion/src/persistent/source.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::any::Any;
5
use std::fmt::Formatter;
6
use std::sync::{Arc, Weak};
7

8
use dashmap::DashMap;
9
use datafusion::arrow::datatypes::SchemaRef;
10
use datafusion::common::{Result as DFResult, Statistics};
11
use datafusion::config::ConfigOptions;
12
use datafusion::datasource::physical_plan::{FileOpener, FileScanConfig, FileSource};
13
use datafusion::physical_plan::filter_pushdown::{
14
    FilterPushdownPropagation, PredicateSupport, PredicateSupports,
15
};
16
use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
17
use datafusion::physical_plan::{DisplayFormatType, PhysicalExpr};
18
use object_store::ObjectStore;
19
use object_store::path::Path;
20
use vortex::error::VortexExpect as _;
21
use vortex::expr::pruning::pruning_expr;
22
use vortex::expr::{ExprRef, VortexExpr, and, root};
23
use vortex::file::VORTEX_FILE_EXTENSION;
24
use vortex::layout::LayoutReader;
25
use vortex::metrics::VortexMetrics;
26

27
use super::cache::VortexFileCache;
28
use super::config::{ConfigProjection, FileScanConfigExt};
29
use super::metrics::PARTITION_LABEL;
30
use super::opener::VortexFileOpener;
31
use crate::can_be_pushed_down;
32
use crate::convert::TryFromDataFusion as _;
33

34
/// A config for [`VortexFileOpener`]. Used to create [`DataSourceExec`] based physical plans.
35
///
36
/// [`DataSourceExec`]: datafusion_physical_plan::source::DataSourceExec
37
#[derive(Clone)]
38
pub struct VortexSource {
39
    pub(crate) file_cache: VortexFileCache,
40
    pub(crate) predicate: Option<Arc<dyn VortexExpr>>,
41
    pub(crate) projection: Option<Arc<dyn VortexExpr>>,
42
    pub(crate) batch_size: Option<usize>,
43
    pub(crate) projected_statistics: Option<Statistics>,
44
    pub(crate) arrow_schema: Option<SchemaRef>,
45
    pub(crate) metrics: VortexMetrics,
46
    _unused_df_metrics: ExecutionPlanMetricsSet,
47
    /// Shared layout readers, the source only lives as long as one scan.
48
    ///
49
    /// Sharing the readers allows us to only read every layout once from the file, even across partitions.
50
    layout_readers: Arc<DashMap<Path, Weak<dyn LayoutReader>>>,
51
}
52

53
impl VortexSource {
54
    pub(crate) fn new(file_cache: VortexFileCache, metrics: VortexMetrics) -> Self {
360✔
55
        Self {
360✔
56
            file_cache,
360✔
57
            metrics,
360✔
58
            projection: None,
360✔
59
            batch_size: None,
360✔
60
            projected_statistics: None,
360✔
61
            arrow_schema: None,
360✔
62
            predicate: None,
360✔
63
            _unused_df_metrics: Default::default(),
360✔
64
            layout_readers: Arc::new(DashMap::default()),
360✔
65
        }
360✔
66
    }
360✔
67

68
    /// Sets a [`VortexExpr`] as a predicate
69
    pub fn with_predicate(&self, predicate: Arc<dyn VortexExpr>) -> Self {
80✔
70
        let mut source = self.clone();
80✔
71
        source.predicate = Some(predicate);
80✔
72
        source
80✔
73
    }
80✔
74
}
75

76
impl FileSource for VortexSource {
77
    fn create_file_opener(
484✔
78
        &self,
484✔
79
        object_store: Arc<dyn ObjectStore>,
484✔
80
        base_config: &FileScanConfig,
484✔
81
        partition: usize,
484✔
82
    ) -> Arc<dyn FileOpener> {
484✔
83
        let partition_metrics = self
484✔
84
            .metrics
484✔
85
            .child_with_tags([(PARTITION_LABEL, partition.to_string())].into_iter());
484✔
86

87
        let batch_size = self
484✔
88
            .batch_size
484✔
89
            .vortex_expect("batch_size must be supplied to VortexSource");
484✔
90

91
        let opener = VortexFileOpener::new(
484✔
92
            object_store,
484✔
93
            self.projection.clone().unwrap_or_else(root),
484✔
94
            self.predicate.clone(),
484✔
95
            self.file_cache.clone(),
484✔
96
            self.arrow_schema
484✔
97
                .clone()
484✔
98
                .vortex_expect("We should have a schema here"),
484✔
99
            batch_size,
484✔
100
            base_config.limit,
484✔
101
            partition_metrics,
484✔
102
            self.layout_readers.clone(),
484✔
103
        );
104

105
        Arc::new(opener)
484✔
106
    }
484✔
107

108
    fn as_any(&self) -> &dyn Any {
174✔
109
        self
174✔
110
    }
174✔
111

112
    fn with_batch_size(&self, batch_size: usize) -> Arc<dyn FileSource> {
484✔
113
        let mut source = self.clone();
484✔
114
        source.batch_size = Some(batch_size);
484✔
115
        Arc::new(source)
484✔
116
    }
484✔
117

118
    fn with_schema(&self, schema: SchemaRef) -> Arc<dyn FileSource> {
502✔
119
        // todo(adam): does this need to the same as `with_projection`?
120
        let mut source = self.clone();
502✔
121
        source.arrow_schema = Some(schema);
502✔
122
        Arc::new(source)
502✔
123
    }
502✔
124

125
    fn with_projection(&self, config: &FileScanConfig) -> Arc<dyn FileSource> {
484✔
126
        let ConfigProjection {
127
            arrow_schema,
484✔
128
            constraints: _constraints,
484✔
129
            statistics,
484✔
130
            projection_expr,
484✔
131
        } = config.project_for_vortex();
484✔
132

133
        let statistics = if self.predicate.is_some() {
484✔
134
            statistics.to_inexact()
276✔
135
        } else {
136
            statistics
208✔
137
        };
138

139
        let mut source = self.clone();
484✔
140
        source.projection = Some(projection_expr);
484✔
141
        source.arrow_schema = Some(arrow_schema);
484✔
142
        source.projected_statistics = Some(statistics);
484✔
143

144
        Arc::new(source)
484✔
145
    }
484✔
146

147
    fn with_statistics(&self, statistics: Statistics) -> Arc<dyn FileSource> {
502✔
148
        let mut source = self.clone();
502✔
149
        source.projected_statistics = Some(statistics);
502✔
150
        Arc::new(source)
502✔
151
    }
502✔
152

153
    fn metrics(&self) -> &ExecutionPlanMetricsSet {
658✔
154
        &self._unused_df_metrics
658✔
155
    }
658✔
156

157
    fn statistics(&self) -> DFResult<Statistics> {
4,126✔
158
        let statistics = self
4,126✔
159
            .projected_statistics
4,126✔
160
            .clone()
4,126✔
161
            .vortex_expect("projected_statistics must be set");
4,126✔
162

163
        if self.predicate.is_some() {
4,126✔
164
            Ok(statistics.to_inexact())
1,580✔
165
        } else {
166
            Ok(statistics)
2,546✔
167
        }
168
    }
4,126✔
169

170
    fn file_type(&self) -> &str {
×
UNCOV
171
        VORTEX_FILE_EXTENSION
×
UNCOV
172
    }
×
173

UNCOV
174
    fn fmt_extra(&self, t: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
×
UNCOV
175
        match t {
×
176
            DisplayFormatType::Default | DisplayFormatType::Verbose => {
UNCOV
177
                if let Some(predicate) = &self.predicate {
×
178
                    write!(f, ", predicate={predicate}")?;
×
UNCOV
179
                    if let Some((pruning_predicate, _)) = pruning_expr(predicate) {
×
UNCOV
180
                        writeln!(f, ", pruning_predicate={pruning_predicate}")?;
×
UNCOV
181
                    }
×
UNCOV
182
                };
×
UNCOV
183
                Ok(())
×
184
            }
185
            DisplayFormatType::TreeRender => {
UNCOV
186
                if let Some(predicate) = &self.predicate {
×
UNCOV
187
                    writeln!(f, "predicate={}", predicate.as_ref())?;
×
UNCOV
188
                }
×
UNCOV
189
                Ok(())
×
190
            }
191
        }
UNCOV
192
    }
×
193

194
    fn try_pushdown_filters(
175✔
195
        &self,
175✔
196
        filters: Vec<Arc<dyn PhysicalExpr>>,
175✔
197
        _config: &ConfigOptions,
175✔
198
    ) -> DFResult<FilterPushdownPropagation<Arc<dyn FileSource>>> {
175✔
199
        let Some(schema) = self.arrow_schema.as_ref() else {
175✔
UNCOV
200
            return Ok(FilterPushdownPropagation::unsupported(filters));
×
201
        };
202
        let (supported, unsupported): (Vec<_>, Vec<_>) = filters
175✔
203
            .iter()
175✔
204
            .partition(|expr| can_be_pushed_down(expr, schema));
175✔
205

206
        match make_vortex_predicate(&supported) {
175✔
207
            Some(predicate) => {
80✔
208
                let supports = PredicateSupports::new(
80✔
209
                    supported
80✔
210
                        .into_iter()
80✔
211
                        .map(|expr| PredicateSupport::Supported(expr.clone()))
116✔
212
                        .chain(
80✔
213
                            unsupported
80✔
214
                                .into_iter()
80✔
215
                                .map(|expr| PredicateSupport::Unsupported(expr.clone())),
80✔
216
                        )
217
                        .collect(),
80✔
218
                );
219
                Ok(FilterPushdownPropagation::with_filters(supports)
80✔
220
                    .with_updated_node(Arc::new(self.with_predicate(predicate))))
80✔
221
            }
222
            None => Ok(FilterPushdownPropagation::unsupported(filters)),
95✔
223
        }
224
    }
175✔
225
}
226

227
// If we cannot convert an expr to a vortex expr, we run no filter, since datafusion
228
// will rerun the filter expression anyway.
229
pub(crate) fn make_vortex_predicate(
175✔
230
    predicate: &[&Arc<dyn PhysicalExpr>],
175✔
231
) -> Option<Arc<dyn VortexExpr>> {
175✔
232
    // This splits expressions into conjunctions and converts them to vortex expressions.
233
    // Any inconvertible expressions are dropped since true /\ a == a.
234
    predicate
175✔
235
        .iter()
175✔
236
        .filter_map(|e| ExprRef::try_from_df(e.as_ref()).ok())
175✔
237
        .reduce(and)
175✔
238
}
175✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc