• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16444646940

22 Jul 2025 12:43PM UTC coverage: 81.172% (-0.4%) from 81.523%
16444646940

Pull #3966

github

web-flow
Merge e4c94f925 into 31ea6b17f
Pull Request #3966: [wip] DuckDB Dynamic Expressions

73 of 296 new or added lines in 4 files covered. (24.66%)

17 existing lines in 2 files now uncovered.

42009 of 51753 relevant lines covered (81.17%)

170933.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.29
/vortex-expr/src/exprs/dynamic.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::fmt::{Debug, Display};
5
use std::hash::{Hash, Hasher};
6
use std::sync::Arc;
7
use std::sync::atomic::{AtomicU64, Ordering};
8

9
use arc_swap::ArcSwapOption;
10
use parking_lot::Mutex;
11
use vortex_array::arrays::ConstantArray;
12
use vortex_array::compute::{Operator, compare};
13
use vortex_array::{Array, ArrayRef, DeserializeMetadata, IntoArray, ProstMetadata};
14
use vortex_dtype::DType;
15
use vortex_error::{VortexExpect, VortexResult, vortex_bail};
16
use vortex_proto::expr as pb;
17
use vortex_scalar::{Scalar, ScalarValue};
18

19
use crate::traversal::{Node, NodeVisitor, TraversalOrder};
20
use crate::{
21
    AnalysisExpr, ExprEncodingRef, ExprId, ExprRef, IntoExpr, Scope, StatsCatalog, VTable, vtable,
22
};
23

24
vtable!(DynamicComparison);
25

26
/// A dynamic comparison expression can be used to capture a comparison to a value that can change
27
/// during the execution of a query, such as when a compute engine pushes down an ORDER BY + LIMIT
28
/// operation and is able to progressively tighten the bounds of the filter.
29
#[allow(clippy::derived_hash_with_manual_eq)]
30
#[derive(Clone, Debug, Hash)]
31
pub struct DynamicComparisonExpr {
32
    lhs: ExprRef,
33
    operator: Operator,
34
    rhs: Arc<Rhs>,
35
    // Default value for the dynamic comparison.
36
    default: bool,
37
}
38

39
impl PartialEq for DynamicComparisonExpr {
NEW
40
    fn eq(&self, other: &Self) -> bool {
×
NEW
41
        self.lhs.eq(&other.lhs)
×
NEW
42
            && self.operator == other.operator
×
NEW
43
            && self.rhs == other.rhs
×
NEW
44
            && self.default == other.default
×
NEW
45
    }
×
46
}
47
impl Eq for DynamicComparisonExpr {}
48

49
/// Hash and PartialEq are implemented based on the ptr of the value function, such that the
50
/// internal value doesn't impact the hash of an expression tree.
51
struct Rhs {
52
    // The right-hand side value is a function that returns an `Option<ScalarValue>`.
53
    value: Arc<dyn Fn() -> Option<ScalarValue> + Send + Sync>,
54
    // The data type of the right-hand side value.
55
    dtype: DType,
56
    // Tracks how many times the value has changed. Note that this may over-estimate the changes
57
    // in order to avoid lock contention.
58
    version: AtomicU64,
59
    // The previous value of the right-hand side, used to detect changes.
60
    previous_value: ArcSwapOption<Scalar>,
61
}
62

63
impl Hash for Rhs {
NEW
64
    fn hash<H: Hasher>(&self, state: &mut H) {
×
NEW
65
        Arc::as_ptr(&self.value).hash(state);
×
NEW
66
    }
×
67
}
68

69
impl PartialEq for Rhs {
NEW
70
    fn eq(&self, other: &Self) -> bool {
×
NEW
71
        Arc::ptr_eq(&self.value, &other.value)
×
NEW
72
    }
×
73
}
74
impl Eq for Rhs {}
75

76
impl Debug for Rhs {
NEW
77
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
78
        f.debug_struct("Rhs")
×
NEW
79
            .field("value", &"<dyn Fn() -> Option<ScalarValue> + Send + Sync>")
×
NEW
80
            .field("dtype", &self.dtype)
×
NEW
81
            .finish()
×
NEW
82
    }
×
83
}
84

85
pub struct DynamicComparisonExprEncoding;
86

87
impl VTable for DynamicComparisonVTable {
88
    type Expr = DynamicComparisonExpr;
89
    type Encoding = DynamicComparisonExprEncoding;
90
    type Metadata = ProstMetadata<pb::LiteralOpts>;
91

NEW
92
    fn id(_encoding: &Self::Encoding) -> ExprId {
×
NEW
93
        ExprId::new_ref("dynamic")
×
NEW
94
    }
×
95

NEW
96
    fn encoding(_expr: &Self::Expr) -> ExprEncodingRef {
×
NEW
97
        ExprEncodingRef::new_ref(DynamicComparisonExprEncoding.as_ref())
×
NEW
98
    }
×
99

NEW
100
    fn metadata(_expr: &Self::Expr) -> Option<Self::Metadata> {
×
NEW
101
        None
×
NEW
102
    }
×
103

NEW
104
    fn children(expr: &Self::Expr) -> Vec<&ExprRef> {
×
NEW
105
        vec![&expr.lhs]
×
NEW
106
    }
×
107

NEW
108
    fn with_children(expr: &Self::Expr, children: Vec<ExprRef>) -> VortexResult<Self::Expr> {
×
NEW
109
        Ok(DynamicComparisonExpr {
×
NEW
110
            lhs: children[0].clone(),
×
NEW
111
            operator: expr.operator,
×
NEW
112
            rhs: expr.rhs.clone(),
×
NEW
113
            default: expr.default,
×
NEW
114
        })
×
NEW
115
    }
×
116

NEW
117
    fn build(
×
NEW
118
        _encoding: &Self::Encoding,
×
NEW
119
        _metadata: &<Self::Metadata as DeserializeMetadata>::Output,
×
NEW
120
        _children: Vec<ExprRef>,
×
NEW
121
    ) -> VortexResult<Self::Expr> {
×
NEW
122
        vortex_bail!("DynamicComparison expression does not support building from metadata");
×
NEW
123
    }
×
124

NEW
125
    fn evaluate(expr: &Self::Expr, scope: &Scope) -> VortexResult<ArrayRef> {
×
NEW
126
        if let Some(value) = expr.scalar() {
×
NEW
127
            let lhs = expr.lhs.evaluate(scope)?;
×
NEW
128
            let rhs = ConstantArray::new(value, scope.len());
×
NEW
129
            return compare(lhs.as_ref(), rhs.as_ref(), expr.operator);
×
NEW
130
        }
×
131

132
        // Otherwise, we return the default value.
NEW
133
        let lhs = expr.return_dtype(scope.dtype())?;
×
NEW
134
        Ok(ConstantArray::new(
×
NEW
135
            Scalar::new(
×
NEW
136
                DType::Bool(lhs.nullability() | expr.rhs.dtype.nullability()),
×
NEW
137
                expr.default.into(),
×
NEW
138
            ),
×
NEW
139
            scope.len(),
×
NEW
140
        )
×
NEW
141
        .into_array())
×
NEW
142
    }
×
143

NEW
144
    fn return_dtype(expr: &Self::Expr, scope: &DType) -> VortexResult<DType> {
×
NEW
145
        let lhs = expr.lhs.return_dtype(scope)?;
×
NEW
146
        if !expr.rhs.dtype.eq_ignore_nullability(&lhs) {
×
NEW
147
            vortex_bail!(
×
NEW
148
                "Incompatible dtypes for dynamic comparison: expected {} (ignore nullability) but got {}",
×
NEW
149
                &expr.rhs.dtype,
×
150
                lhs
151
            );
NEW
152
        }
×
NEW
153
        Ok(DType::Bool(
×
NEW
154
            lhs.nullability() | expr.rhs.dtype.nullability(),
×
NEW
155
        ))
×
NEW
156
    }
×
157
}
158

159
impl DynamicComparisonExpr {
NEW
160
    pub fn new(
×
NEW
161
        rhs: ExprRef,
×
NEW
162
        operator: Operator,
×
NEW
163
        rhs_value: impl Fn() -> Option<ScalarValue> + Send + Sync + 'static,
×
NEW
164
        rhs_dtype: DType,
×
NEW
165
        default: bool,
×
NEW
166
    ) -> Self {
×
NEW
167
        DynamicComparisonExpr {
×
NEW
168
            lhs: rhs,
×
NEW
169
            operator,
×
NEW
170
            rhs: Arc::new(Rhs {
×
NEW
171
                value: Arc::new(rhs_value),
×
NEW
172
                dtype: rhs_dtype,
×
NEW
173
                version: Default::default(),
×
NEW
174
                previous_value: Default::default(),
×
NEW
175
            }),
×
NEW
176
            default,
×
NEW
177
        }
×
NEW
178
    }
×
179

NEW
180
    pub fn scalar(&self) -> Option<Scalar> {
×
NEW
181
        if let Some(next) = (self.rhs.value)().map(|v| Scalar::new(self.rhs.dtype.clone(), v)) {
×
NEW
182
            if self
×
NEW
183
                .rhs
×
NEW
184
                .previous_value
×
NEW
185
                .load()
×
NEW
186
                .as_deref()
×
NEW
187
                .is_none_or(|prev| prev != &next)
×
188
            {
NEW
189
                log::debug!("Updating dynamic expression to {next}");
×
NEW
190
                self.rhs.version.fetch_add(1, Ordering::Relaxed);
×
NEW
191
                self.rhs.previous_value.store(Some(Arc::new(next.clone())));
×
NEW
192
            }
×
NEW
193
            Some(next)
×
194
        } else {
NEW
195
            None
×
196
        }
NEW
197
    }
×
198
}
199

200
impl Display for DynamicComparisonExpr {
NEW
201
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
202
        write!(
×
NEW
203
            f,
×
NEW
204
            "{} {} dynamic({})",
×
NEW
205
            &self.lhs, self.operator, &self.rhs.dtype,
×
206
        )
NEW
207
    }
×
208
}
209

210
impl AnalysisExpr for DynamicComparisonExpr {
NEW
211
    fn stat_falsification(&self, catalog: &mut dyn StatsCatalog) -> Option<ExprRef> {
×
NEW
212
        match self.operator {
×
213
            Operator::Gt => Some(
214
                DynamicComparisonExpr {
NEW
215
                    lhs: self.lhs.max(catalog)?,
×
NEW
216
                    operator: Operator::Lte,
×
NEW
217
                    rhs: self.rhs.clone(),
×
NEW
218
                    default: !self.default,
×
219
                }
NEW
220
                .into_expr(),
×
221
            ),
222
            Operator::Gte => Some(
223
                DynamicComparisonExpr {
NEW
224
                    lhs: self.lhs.max(catalog)?,
×
NEW
225
                    operator: Operator::Lt,
×
NEW
226
                    rhs: self.rhs.clone(),
×
NEW
227
                    default: !self.default,
×
228
                }
NEW
229
                .into_expr(),
×
230
            ),
231
            Operator::Lt => Some(
232
                DynamicComparisonExpr {
NEW
233
                    lhs: self.lhs.min(catalog)?,
×
NEW
234
                    operator: Operator::Gte,
×
NEW
235
                    rhs: self.rhs.clone(),
×
NEW
236
                    default: !self.default,
×
237
                }
NEW
238
                .into_expr(),
×
239
            ),
240
            Operator::Lte => Some(
241
                DynamicComparisonExpr {
NEW
242
                    lhs: self.lhs.min(catalog)?,
×
NEW
243
                    operator: Operator::Gt,
×
NEW
244
                    rhs: self.rhs.clone(),
×
NEW
245
                    default: !self.default,
×
246
                }
NEW
247
                .into_expr(),
×
248
            ),
NEW
249
            _ => None,
×
250
        }
NEW
251
    }
×
252
}
253

254
/// A utility for checking whether any dynamic expressions have been updated.
255
pub struct DynamicExprUpdates {
256
    exprs: Box<[DynamicComparisonExpr]>,
257
    // Track the latest observed versions of each dynamic expression.
258
    prev_versions: Mutex<Vec<u64>>,
259
    // Create our own aggregated version of the dynamic expressions.
260
    version: AtomicU64,
261
}
262

263
impl DynamicExprUpdates {
264
    pub fn new(expr: &ExprRef) -> Option<Self> {
644✔
265
        #[derive(Default)]
266
        struct Visitor(Vec<DynamicComparisonExpr>);
267

268
        impl NodeVisitor<'_> for Visitor {
269
            type NodeTy = ExprRef;
270

271
            fn visit_down(&mut self, node: &'_ Self::NodeTy) -> VortexResult<TraversalOrder> {
1,980✔
272
                if let Some(dynamic) = node.as_opt::<DynamicComparisonVTable>() {
1,980✔
NEW
273
                    self.0.push(dynamic.clone());
×
274
                }
1,980✔
275
                Ok(TraversalOrder::Continue)
1,980✔
276
            }
1,980✔
277
        }
278

279
        let mut visitor = Visitor::default();
644✔
280
        expr.accept(&mut visitor).vortex_expect("Infallible");
644✔
281

282
        if visitor.0.is_empty() {
644✔
283
            return None;
644✔
NEW
284
        }
×
285

NEW
286
        let exprs = visitor.0.into_boxed_slice();
×
NEW
287
        let prev_versions = exprs
×
NEW
288
            .iter()
×
NEW
289
            .map(|expr| expr.rhs.version.load(Ordering::Relaxed))
×
NEW
290
            .collect();
×
291

NEW
292
        Some(Self {
×
NEW
293
            exprs,
×
NEW
294
            prev_versions: Mutex::new(prev_versions),
×
NEW
295
            version: Default::default(),
×
NEW
296
        })
×
297
    }
644✔
298

NEW
299
    pub fn version(&self) -> u64 {
×
NEW
300
        let mut prev_versions = self.prev_versions.lock();
×
NEW
301
        let mut updated = false;
×
NEW
302
        for (i, expr) in self.exprs.iter().enumerate() {
×
NEW
303
            let current_version = expr.rhs.version.load(Ordering::Relaxed);
×
NEW
304
            if current_version > prev_versions[i] {
×
NEW
305
                prev_versions[i] = current_version;
×
NEW
306
                // At least one expression has been updated.
×
NEW
307
                // We don't bail out early in order to avoid false positives for future calls
×
NEW
308
                // to `is_updated`.
×
NEW
309
                updated = true;
×
NEW
310
            }
×
311
        }
312

NEW
313
        if updated {
×
NEW
314
            self.version.fetch_add(1, Ordering::Relaxed) + 1
×
315
        } else {
NEW
316
            self.version.load(Ordering::Relaxed)
×
317
        }
NEW
318
    }
×
319
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc