• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16935267080

13 Aug 2025 11:00AM UTC coverage: 24.312% (-63.3%) from 87.658%
16935267080

Pull #4226

github

web-flow
Merge 81b48c7fb into baa6ea202
Pull Request #4226: Support converting TimestampTZ to and from duckdb

0 of 2 new or added lines in 1 file covered. (0.0%)

20666 existing lines in 469 files now uncovered.

8726 of 35892 relevant lines covered (24.31%)

147.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/vortex-expr/src/exprs/merge.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::fmt::Display;
5
use std::hash::Hash;
6

7
use itertools::Itertools as _;
8
use vortex_array::arrays::StructArray;
9
use vortex_array::validity::Validity;
10
use vortex_array::{Array, ArrayRef, DeserializeMetadata, EmptyMetadata, IntoArray, ToCanonical};
11
use vortex_dtype::{DType, FieldNames, Nullability, StructFields};
12
use vortex_error::{VortexExpect as _, VortexResult, vortex_bail};
13

14
use crate::{AnalysisExpr, ExprEncodingRef, ExprId, ExprRef, IntoExpr, Scope, VTable, vtable};
15

16
vtable!(Merge);
17

18
/// Merge zero or more expressions that ALL return structs.
19
///
20
/// If any field names are duplicated, the field from later expressions wins.
21
///
22
/// NOTE: Fields are not recursively merged, i.e. the later field REPLACES the earlier field.
23
/// This makes struct fields behaviour consistent with other dtypes.
24
#[allow(clippy::derived_hash_with_manual_eq)]
25
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26
pub struct MergeExpr {
27
    values: Vec<ExprRef>,
28
    nullability: Nullability,
29
}
30

31
pub struct MergeExprEncoding;
32

33
impl VTable for MergeVTable {
34
    type Expr = MergeExpr;
35
    type Encoding = MergeExprEncoding;
36
    type Metadata = EmptyMetadata;
37

UNCOV
38
    fn id(_encoding: &Self::Encoding) -> ExprId {
×
UNCOV
39
        ExprId::new_ref("merge")
×
UNCOV
40
    }
×
41

UNCOV
42
    fn encoding(_expr: &Self::Expr) -> ExprEncodingRef {
×
UNCOV
43
        ExprEncodingRef::new_ref(MergeExprEncoding.as_ref())
×
UNCOV
44
    }
×
45

UNCOV
46
    fn metadata(_expr: &Self::Expr) -> Option<Self::Metadata> {
×
UNCOV
47
        Some(EmptyMetadata)
×
UNCOV
48
    }
×
49

UNCOV
50
    fn children(expr: &Self::Expr) -> Vec<&ExprRef> {
×
UNCOV
51
        expr.values.iter().collect()
×
UNCOV
52
    }
×
53

UNCOV
54
    fn with_children(expr: &Self::Expr, children: Vec<ExprRef>) -> VortexResult<Self::Expr> {
×
UNCOV
55
        Ok(MergeExpr {
×
UNCOV
56
            values: children,
×
UNCOV
57
            nullability: expr.nullability,
×
UNCOV
58
        })
×
UNCOV
59
    }
×
60

UNCOV
61
    fn build(
×
UNCOV
62
        _encoding: &Self::Encoding,
×
UNCOV
63
        _metadata: &<Self::Metadata as DeserializeMetadata>::Output,
×
UNCOV
64
        children: Vec<ExprRef>,
×
UNCOV
65
    ) -> VortexResult<Self::Expr> {
×
UNCOV
66
        if children.is_empty() {
×
67
            vortex_bail!(
×
68
                "Merge expression must have at least one child, got: {:?}",
×
69
                children
70
            );
UNCOV
71
        }
×
UNCOV
72
        Ok(MergeExpr {
×
UNCOV
73
            values: children,
×
UNCOV
74
            nullability: Nullability::NonNullable, // Default to non-nullable
×
UNCOV
75
        })
×
UNCOV
76
    }
×
77

UNCOV
78
    fn evaluate(expr: &Self::Expr, scope: &Scope) -> VortexResult<ArrayRef> {
×
UNCOV
79
        let len = scope.len();
×
UNCOV
80
        let value_arrays = expr
×
UNCOV
81
            .values
×
UNCOV
82
            .iter()
×
UNCOV
83
            .map(|value_expr| value_expr.unchecked_evaluate(scope))
×
UNCOV
84
            .process_results(|it| it.collect::<Vec<_>>())?;
×
85

86
        // Collect fields in order of appearance. Later fields overwrite earlier fields.
UNCOV
87
        let mut field_names = Vec::new();
×
UNCOV
88
        let mut arrays = Vec::new();
×
89

UNCOV
90
        for value_array in value_arrays.iter() {
×
91
            // TODO(marko): When nullable, we need to merge struct validity into field validity.
UNCOV
92
            if value_array.dtype().is_nullable() {
×
93
                todo!("merge nullable structs");
×
UNCOV
94
            }
×
UNCOV
95
            if !value_array.dtype().is_struct() {
×
96
                vortex_bail!("merge expects non-nullable struct input");
×
UNCOV
97
            }
×
98

UNCOV
99
            let struct_array = value_array.to_struct()?;
×
100

UNCOV
101
            for (i, field_name) in struct_array.names().iter().enumerate() {
×
UNCOV
102
                let array = struct_array.fields()[i].clone();
×
103

104
                // Update or insert field.
UNCOV
105
                if let Some(idx) = field_names.iter().position(|name| name == field_name) {
×
UNCOV
106
                    arrays[idx] = array;
×
UNCOV
107
                } else {
×
UNCOV
108
                    field_names.push(field_name.clone());
×
UNCOV
109
                    arrays.push(array);
×
UNCOV
110
                }
×
111
            }
112
        }
113

UNCOV
114
        let validity = match expr.nullability {
×
UNCOV
115
            Nullability::NonNullable => Validity::NonNullable,
×
UNCOV
116
            Nullability::Nullable => Validity::AllValid,
×
117
        };
118
        Ok(
UNCOV
119
            StructArray::try_new(FieldNames::from(field_names), arrays, len, validity)?
×
UNCOV
120
                .into_array(),
×
121
        )
UNCOV
122
    }
×
123

UNCOV
124
    fn return_dtype(expr: &Self::Expr, scope: &DType) -> VortexResult<DType> {
×
UNCOV
125
        let mut field_names = Vec::new();
×
UNCOV
126
        let mut arrays = Vec::new();
×
127

UNCOV
128
        for value in expr.values.iter() {
×
UNCOV
129
            let dtype = value.return_dtype(scope)?;
×
UNCOV
130
            if !dtype.is_struct() {
×
131
                vortex_bail!("merge expects non-nullable struct input");
×
UNCOV
132
            }
×
133

UNCOV
134
            let struct_dtype = dtype
×
UNCOV
135
                .as_struct()
×
UNCOV
136
                .vortex_expect("merge expects struct input");
×
137

UNCOV
138
            for i in 0..struct_dtype.nfields() {
×
UNCOV
139
                let field_name = struct_dtype.field_name(i).vortex_expect("never OOB");
×
UNCOV
140
                let field_dtype = struct_dtype.field_by_index(i).vortex_expect("never OOB");
×
UNCOV
141
                if let Some(idx) = field_names.iter().position(|name| name == field_name) {
×
UNCOV
142
                    arrays[idx] = field_dtype;
×
UNCOV
143
                } else {
×
UNCOV
144
                    field_names.push(field_name.clone());
×
UNCOV
145
                    arrays.push(field_dtype);
×
UNCOV
146
                }
×
147
            }
148
        }
149

UNCOV
150
        Ok(DType::Struct(
×
UNCOV
151
            StructFields::new(FieldNames::from(field_names), arrays),
×
UNCOV
152
            expr.nullability,
×
UNCOV
153
        ))
×
UNCOV
154
    }
×
155
}
156

157
impl MergeExpr {
UNCOV
158
    pub fn new(values: Vec<ExprRef>, nullability: Nullability) -> Self {
×
UNCOV
159
        MergeExpr {
×
UNCOV
160
            values,
×
UNCOV
161
            nullability,
×
UNCOV
162
        }
×
UNCOV
163
    }
×
164

165
    pub fn new_expr(values: Vec<ExprRef>, nullability: Nullability) -> ExprRef {
×
166
        Self::new(values, nullability).into_expr()
×
167
    }
×
168

UNCOV
169
    pub fn nullability(&self) -> Nullability {
×
UNCOV
170
        self.nullability
×
UNCOV
171
    }
×
172
}
173

UNCOV
174
pub fn merge(
×
UNCOV
175
    elements: impl IntoIterator<Item = impl Into<ExprRef>>,
×
UNCOV
176
    nullability: Nullability,
×
UNCOV
177
) -> ExprRef {
×
UNCOV
178
    let values = elements.into_iter().map(|value| value.into()).collect_vec();
×
UNCOV
179
    MergeExpr::new(values, nullability).into_expr()
×
UNCOV
180
}
×
181

182
impl Display for MergeExpr {
183
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
184
        write!(
×
185
            f,
×
186
            "merge({}){}",
×
187
            self.values.iter().format(", "),
×
188
            self.nullability
189
        )
190
    }
×
191
}
192

193
impl AnalysisExpr for MergeExpr {}
194

195
#[cfg(test)]
196
mod tests {
197
    use vortex_array::arrays::{PrimitiveArray, StructArray};
198
    use vortex_array::{Array, IntoArray, ToCanonical};
199
    use vortex_buffer::buffer;
200
    use vortex_dtype::Nullability;
201
    use vortex_error::{VortexResult, vortex_bail};
202

203
    use crate::{MergeExpr, Scope, get_item, root};
204

205
    fn primitive_field(array: &dyn Array, field_path: &[&str]) -> VortexResult<PrimitiveArray> {
206
        let mut field_path = field_path.iter();
207

208
        let Some(field) = field_path.next() else {
209
            vortex_bail!("empty field path");
210
        };
211

212
        let mut array = array.to_struct()?.field_by_name(field)?.clone();
213
        for field in field_path {
214
            array = array.to_struct()?.field_by_name(field)?.clone();
215
        }
216
        array.to_primitive()
217
    }
218

219
    #[test]
220
    pub fn test_merge() {
221
        let expr = MergeExpr::new(
222
            vec![
223
                get_item("0", root()),
224
                get_item("1", root()),
225
                get_item("2", root()),
226
            ],
227
            Nullability::NonNullable,
228
        );
229

230
        let test_array = StructArray::from_fields(&[
231
            (
232
                "0",
233
                StructArray::from_fields(&[
234
                    ("a", buffer![0, 0, 0].into_array()),
235
                    ("b", buffer![1, 1, 1].into_array()),
236
                ])
237
                .unwrap()
238
                .into_array(),
239
            ),
240
            (
241
                "1",
242
                StructArray::from_fields(&[
243
                    ("b", buffer![2, 2, 2].into_array()),
244
                    ("c", buffer![3, 3, 3].into_array()),
245
                ])
246
                .unwrap()
247
                .into_array(),
248
            ),
249
            (
250
                "2",
251
                StructArray::from_fields(&[
252
                    ("d", buffer![4, 4, 4].into_array()),
253
                    ("e", buffer![5, 5, 5].into_array()),
254
                ])
255
                .unwrap()
256
                .into_array(),
257
            ),
258
        ])
259
        .unwrap()
260
        .into_array();
261
        let actual_array = expr.evaluate(&Scope::new(test_array)).unwrap();
262

263
        assert_eq!(
264
            actual_array.as_struct_typed().names(),
265
            &["a", "b", "c", "d", "e"].into()
266
        );
267

268
        assert_eq!(
269
            primitive_field(&actual_array, &["a"])
270
                .unwrap()
271
                .as_slice::<i32>(),
272
            [0, 0, 0]
273
        );
274
        assert_eq!(
275
            primitive_field(&actual_array, &["b"])
276
                .unwrap()
277
                .as_slice::<i32>(),
278
            [2, 2, 2]
279
        );
280
        assert_eq!(
281
            primitive_field(&actual_array, &["c"])
282
                .unwrap()
283
                .as_slice::<i32>(),
284
            [3, 3, 3]
285
        );
286
        assert_eq!(
287
            primitive_field(&actual_array, &["d"])
288
                .unwrap()
289
                .as_slice::<i32>(),
290
            [4, 4, 4]
291
        );
292
        assert_eq!(
293
            primitive_field(&actual_array, &["e"])
294
                .unwrap()
295
                .as_slice::<i32>(),
296
            [5, 5, 5]
297
        );
298
    }
299

300
    #[test]
301
    pub fn test_empty_merge() {
302
        let expr = MergeExpr::new(Vec::new(), Nullability::NonNullable);
303

304
        let test_array = StructArray::from_fields(&[("a", buffer![0, 1, 2].into_array())])
305
            .unwrap()
306
            .into_array();
307
        let actual_array = expr.evaluate(&Scope::new(test_array.clone())).unwrap();
308
        assert_eq!(actual_array.len(), test_array.len());
309
        assert_eq!(actual_array.as_struct_typed().nfields(), 0);
310
    }
311

312
    #[test]
313
    pub fn test_nested_merge() {
314
        // Nested structs are not merged!
315

316
        let expr = MergeExpr::new(
317
            vec![get_item("0", root()), get_item("1", root())],
318
            Nullability::NonNullable,
319
        );
320

321
        let test_array = StructArray::from_fields(&[
322
            (
323
                "0",
324
                StructArray::from_fields(&[(
325
                    "a",
326
                    StructArray::from_fields(&[
327
                        ("x", buffer![0, 0, 0].into_array()),
328
                        ("y", buffer![1, 1, 1].into_array()),
329
                    ])
330
                    .unwrap()
331
                    .into_array(),
332
                )])
333
                .unwrap()
334
                .into_array(),
335
            ),
336
            (
337
                "1",
338
                StructArray::from_fields(&[(
339
                    "a",
340
                    StructArray::from_fields(&[("x", buffer![0, 0, 0].into_array())])
341
                        .unwrap()
342
                        .into_array(),
343
                )])
344
                .unwrap()
345
                .into_array(),
346
            ),
347
        ])
348
        .unwrap()
349
        .into_array();
350
        let actual_array = expr
351
            .evaluate(&Scope::new(test_array.clone()))
352
            .unwrap()
353
            .to_struct()
354
            .unwrap();
355

356
        assert_eq!(
357
            actual_array
358
                .field_by_name("a")
359
                .unwrap()
360
                .to_struct()
361
                .unwrap()
362
                .names()
363
                .iter()
364
                .map(|name| name.as_ref())
365
                .collect::<Vec<_>>(),
366
            vec!["x"]
367
        );
368
    }
369

370
    #[test]
371
    pub fn test_merge_order() {
372
        let expr = MergeExpr::new(
373
            vec![get_item("0", root()), get_item("1", root())],
374
            Nullability::NonNullable,
375
        );
376

377
        let test_array = StructArray::from_fields(&[
378
            (
379
                "0",
380
                StructArray::from_fields(&[
381
                    ("a", buffer![0, 0, 0].into_array()),
382
                    ("c", buffer![1, 1, 1].into_array()),
383
                ])
384
                .unwrap()
385
                .into_array(),
386
            ),
387
            (
388
                "1",
389
                StructArray::from_fields(&[
390
                    ("b", buffer![2, 2, 2].into_array()),
391
                    ("d", buffer![3, 3, 3].into_array()),
392
                ])
393
                .unwrap()
394
                .into_array(),
395
            ),
396
        ])
397
        .unwrap()
398
        .into_array();
399
        let actual_array = expr
400
            .evaluate(&Scope::new(test_array.clone()))
401
            .unwrap()
402
            .to_struct()
403
            .unwrap();
404

405
        assert_eq!(actual_array.names(), &["a", "c", "b", "d"].into());
406
    }
407

408
    #[test]
409
    pub fn test_merge_nullable() {
410
        let expr = MergeExpr::new(vec![get_item("0", root())], Nullability::Nullable);
411

412
        let test_array = StructArray::from_fields(&[(
413
            "0",
414
            StructArray::from_fields(&[
415
                ("a", buffer![0, 0, 0].into_array()),
416
                ("b", buffer![1, 1, 1].into_array()),
417
            ])
418
            .unwrap()
419
            .into_array(),
420
        )])
421
        .unwrap()
422
        .into_array();
423
        let actual_array = expr.evaluate(&Scope::new(test_array.clone())).unwrap();
424
        assert!(actual_array.dtype().is_nullable());
425
    }
426
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc