• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16444881771

22 Jul 2025 12:53PM UTC coverage: 81.194% (-0.3%) from 81.523%
16444881771

Pull #3966

github

web-flow
Merge 6b60768a8 into 31ea6b17f
Pull Request #3966: DuckDB Dynamic Expressions

74 of 283 new or added lines in 4 files covered. (26.15%)

17 existing lines in 2 files now uncovered.

42009 of 51739 relevant lines covered (81.19%)

170919.76 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

6.83
/vortex-expr/src/exprs/dynamic.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::fmt::{Debug, Display};
5
use std::hash::{Hash, Hasher};
6
use std::sync::Arc;
7

8
use parking_lot::Mutex;
9
use vortex_array::arrays::ConstantArray;
10
use vortex_array::compute::{Operator, compare};
11
use vortex_array::{Array, ArrayRef, DeserializeMetadata, IntoArray, ProstMetadata};
12
use vortex_dtype::DType;
13
use vortex_error::{VortexExpect, VortexResult, vortex_bail};
14
use vortex_proto::expr as pb;
15
use vortex_scalar::{Scalar, ScalarValue};
16

17
use crate::traversal::{Node, NodeVisitor, TraversalOrder};
18
use crate::{
19
    AnalysisExpr, ExprEncodingRef, ExprId, ExprRef, IntoExpr, Scope, StatsCatalog, VTable, vtable,
20
};
21

22
vtable!(DynamicComparison);
23

24
/// A dynamic comparison expression can be used to capture a comparison to a value that can change
25
/// during the execution of a query, such as when a compute engine pushes down an ORDER BY + LIMIT
26
/// operation and is able to progressively tighten the bounds of the filter.
27
#[allow(clippy::derived_hash_with_manual_eq)]
28
#[derive(Clone, Debug, Hash)]
29
pub struct DynamicComparisonExpr {
30
    lhs: ExprRef,
31
    operator: Operator,
32
    rhs: Arc<Rhs>,
33
    // Default value for the dynamic comparison.
34
    default: bool,
35
}
36

37
impl PartialEq for DynamicComparisonExpr {
NEW
38
    fn eq(&self, other: &Self) -> bool {
×
NEW
39
        self.lhs.eq(&other.lhs)
×
NEW
40
            && self.operator == other.operator
×
NEW
41
            && self.rhs == other.rhs
×
NEW
42
            && self.default == other.default
×
NEW
43
    }
×
44
}
45
impl Eq for DynamicComparisonExpr {}
46

47
/// Hash and PartialEq are implemented based on the ptr of the value function, such that the
48
/// internal value doesn't impact the hash of an expression tree.
49
struct Rhs {
50
    // The right-hand side value is a function that returns an `Option<ScalarValue>`.
51
    value: Arc<dyn Fn() -> Option<ScalarValue> + Send + Sync>,
52
    // The data type of the right-hand side value.
53
    dtype: DType,
54
}
55

56
impl Hash for Rhs {
NEW
57
    fn hash<H: Hasher>(&self, state: &mut H) {
×
NEW
58
        Arc::as_ptr(&self.value).hash(state);
×
NEW
59
    }
×
60
}
61

62
impl PartialEq for Rhs {
NEW
63
    fn eq(&self, other: &Self) -> bool {
×
NEW
64
        Arc::ptr_eq(&self.value, &other.value)
×
NEW
65
    }
×
66
}
67
impl Eq for Rhs {}
68

69
impl Debug for Rhs {
NEW
70
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
71
        f.debug_struct("Rhs")
×
NEW
72
            .field("value", &"<dyn Fn() -> Option<ScalarValue> + Send + Sync>")
×
NEW
73
            .field("dtype", &self.dtype)
×
NEW
74
            .finish()
×
NEW
75
    }
×
76
}
77

78
pub struct DynamicComparisonExprEncoding;
79

80
impl VTable for DynamicComparisonVTable {
81
    type Expr = DynamicComparisonExpr;
82
    type Encoding = DynamicComparisonExprEncoding;
83
    type Metadata = ProstMetadata<pb::LiteralOpts>;
84

NEW
85
    fn id(_encoding: &Self::Encoding) -> ExprId {
×
NEW
86
        ExprId::new_ref("dynamic")
×
NEW
87
    }
×
88

NEW
89
    fn encoding(_expr: &Self::Expr) -> ExprEncodingRef {
×
NEW
90
        ExprEncodingRef::new_ref(DynamicComparisonExprEncoding.as_ref())
×
NEW
91
    }
×
92

NEW
93
    fn metadata(_expr: &Self::Expr) -> Option<Self::Metadata> {
×
NEW
94
        None
×
NEW
95
    }
×
96

NEW
97
    fn children(expr: &Self::Expr) -> Vec<&ExprRef> {
×
NEW
98
        vec![&expr.lhs]
×
NEW
99
    }
×
100

NEW
101
    fn with_children(expr: &Self::Expr, children: Vec<ExprRef>) -> VortexResult<Self::Expr> {
×
NEW
102
        Ok(DynamicComparisonExpr {
×
NEW
103
            lhs: children[0].clone(),
×
NEW
104
            operator: expr.operator,
×
NEW
105
            rhs: expr.rhs.clone(),
×
NEW
106
            default: expr.default,
×
NEW
107
        })
×
NEW
108
    }
×
109

NEW
110
    fn build(
×
NEW
111
        _encoding: &Self::Encoding,
×
NEW
112
        _metadata: &<Self::Metadata as DeserializeMetadata>::Output,
×
NEW
113
        _children: Vec<ExprRef>,
×
NEW
114
    ) -> VortexResult<Self::Expr> {
×
NEW
115
        vortex_bail!("DynamicComparison expression does not support building from metadata");
×
NEW
116
    }
×
117

NEW
118
    fn evaluate(expr: &Self::Expr, scope: &Scope) -> VortexResult<ArrayRef> {
×
NEW
119
        if let Some(value) = expr.scalar() {
×
NEW
120
            let lhs = expr.lhs.evaluate(scope)?;
×
NEW
121
            let rhs = ConstantArray::new(value, scope.len());
×
NEW
122
            return compare(lhs.as_ref(), rhs.as_ref(), expr.operator);
×
NEW
123
        }
×
124

125
        // Otherwise, we return the default value.
NEW
126
        let lhs = expr.return_dtype(scope.dtype())?;
×
NEW
127
        Ok(ConstantArray::new(
×
NEW
128
            Scalar::new(
×
NEW
129
                DType::Bool(lhs.nullability() | expr.rhs.dtype.nullability()),
×
NEW
130
                expr.default.into(),
×
NEW
131
            ),
×
NEW
132
            scope.len(),
×
NEW
133
        )
×
NEW
134
        .into_array())
×
NEW
135
    }
×
136

NEW
137
    fn return_dtype(expr: &Self::Expr, scope: &DType) -> VortexResult<DType> {
×
NEW
138
        let lhs = expr.lhs.return_dtype(scope)?;
×
NEW
139
        if !expr.rhs.dtype.eq_ignore_nullability(&lhs) {
×
NEW
140
            vortex_bail!(
×
NEW
141
                "Incompatible dtypes for dynamic comparison: expected {} (ignore nullability) but got {}",
×
NEW
142
                &expr.rhs.dtype,
×
143
                lhs
144
            );
NEW
145
        }
×
NEW
146
        Ok(DType::Bool(
×
NEW
147
            lhs.nullability() | expr.rhs.dtype.nullability(),
×
NEW
148
        ))
×
NEW
149
    }
×
150
}
151

152
impl DynamicComparisonExpr {
NEW
153
    pub fn new(
×
NEW
154
        rhs: ExprRef,
×
NEW
155
        operator: Operator,
×
NEW
156
        rhs_value: impl Fn() -> Option<ScalarValue> + Send + Sync + 'static,
×
NEW
157
        rhs_dtype: DType,
×
NEW
158
        default: bool,
×
NEW
159
    ) -> Self {
×
NEW
160
        DynamicComparisonExpr {
×
NEW
161
            lhs: rhs,
×
NEW
162
            operator,
×
NEW
163
            rhs: Arc::new(Rhs {
×
NEW
164
                value: Arc::new(rhs_value),
×
NEW
165
                dtype: rhs_dtype,
×
NEW
166
            }),
×
NEW
167
            default,
×
NEW
168
        }
×
NEW
169
    }
×
170

NEW
171
    pub fn scalar(&self) -> Option<Scalar> {
×
NEW
172
        (self.rhs.value)().map(|v| Scalar::new(self.rhs.dtype.clone(), v))
×
NEW
173
    }
×
174
}
175

176
impl Display for DynamicComparisonExpr {
NEW
177
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
NEW
178
        write!(
×
NEW
179
            f,
×
NEW
180
            "{} {} dynamic({})",
×
NEW
181
            &self.lhs, self.operator, &self.rhs.dtype,
×
182
        )
NEW
183
    }
×
184
}
185

186
impl AnalysisExpr for DynamicComparisonExpr {
NEW
187
    fn stat_falsification(&self, catalog: &mut dyn StatsCatalog) -> Option<ExprRef> {
×
NEW
188
        match self.operator {
×
189
            Operator::Gt => Some(
190
                DynamicComparisonExpr {
NEW
191
                    lhs: self.lhs.max(catalog)?,
×
NEW
192
                    operator: Operator::Lte,
×
NEW
193
                    rhs: self.rhs.clone(),
×
NEW
194
                    default: !self.default,
×
195
                }
NEW
196
                .into_expr(),
×
197
            ),
198
            Operator::Gte => Some(
199
                DynamicComparisonExpr {
NEW
200
                    lhs: self.lhs.max(catalog)?,
×
NEW
201
                    operator: Operator::Lt,
×
NEW
202
                    rhs: self.rhs.clone(),
×
NEW
203
                    default: !self.default,
×
204
                }
NEW
205
                .into_expr(),
×
206
            ),
207
            Operator::Lt => Some(
208
                DynamicComparisonExpr {
NEW
209
                    lhs: self.lhs.min(catalog)?,
×
NEW
210
                    operator: Operator::Gte,
×
NEW
211
                    rhs: self.rhs.clone(),
×
NEW
212
                    default: !self.default,
×
213
                }
NEW
214
                .into_expr(),
×
215
            ),
216
            Operator::Lte => Some(
217
                DynamicComparisonExpr {
NEW
218
                    lhs: self.lhs.min(catalog)?,
×
NEW
219
                    operator: Operator::Gt,
×
NEW
220
                    rhs: self.rhs.clone(),
×
NEW
221
                    default: !self.default,
×
222
                }
NEW
223
                .into_expr(),
×
224
            ),
NEW
225
            _ => None,
×
226
        }
NEW
227
    }
×
228
}
229

230
/// A utility for checking whether any dynamic expressions have been updated.
231
pub struct DynamicExprUpdates {
232
    exprs: Box<[DynamicComparisonExpr]>,
233
    // Track the latest observed versions of each dynamic expression, along with a version counter.
234
    prev_versions: Mutex<(u64, Vec<Option<Scalar>>)>,
235
}
236

237
impl DynamicExprUpdates {
238
    pub fn new(expr: &ExprRef) -> Option<Self> {
664✔
239
        #[derive(Default)]
240
        struct Visitor(Vec<DynamicComparisonExpr>);
241

242
        impl NodeVisitor<'_> for Visitor {
243
            type NodeTy = ExprRef;
244

245
            fn visit_down(&mut self, node: &'_ Self::NodeTy) -> VortexResult<TraversalOrder> {
2,040✔
246
                if let Some(dynamic) = node.as_opt::<DynamicComparisonVTable>() {
2,040✔
NEW
247
                    self.0.push(dynamic.clone());
×
248
                }
2,040✔
249
                Ok(TraversalOrder::Continue)
2,040✔
250
            }
2,040✔
251
        }
252

253
        let mut visitor = Visitor::default();
664✔
254
        expr.accept(&mut visitor).vortex_expect("Infallible");
664✔
255

256
        if visitor.0.is_empty() {
664✔
257
            return None;
664✔
NEW
258
        }
×
259

NEW
260
        let exprs = visitor.0.into_boxed_slice();
×
NEW
261
        let prev_versions = exprs
×
NEW
262
            .iter()
×
NEW
263
            .map(|expr| (expr.rhs.value)().map(|v| Scalar::new(expr.rhs.dtype.clone(), v)))
×
NEW
264
            .collect();
×
265

NEW
266
        Some(Self {
×
NEW
267
            exprs,
×
NEW
268
            prev_versions: Mutex::new((0, prev_versions)),
×
NEW
269
        })
×
270
    }
664✔
271

NEW
272
    pub fn version(&self) -> u64 {
×
NEW
273
        let mut guard = self.prev_versions.lock();
×
274

NEW
275
        let mut updated = false;
×
NEW
276
        for (i, expr) in self.exprs.iter().enumerate() {
×
NEW
277
            let current = (expr.rhs.value)().map(|v| Scalar::new(expr.rhs.dtype.clone(), v));
×
NEW
278
            if current != guard.1[i] {
×
NEW
279
                // At least one expression has been updated.
×
NEW
280
                // We don't bail out early in order to avoid false positives for future calls
×
NEW
281
                // to `is_updated`.
×
NEW
282
                updated = true;
×
NEW
283
                guard.1[i] = current;
×
NEW
284
            }
×
285
        }
286

NEW
287
        if updated {
×
NEW
288
            guard.0 += 1;
×
NEW
289
        }
×
290

NEW
291
        guard.0
×
NEW
292
    }
×
293
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc