• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16277982668

14 Jul 2025 09:12PM UTC coverage: 81.564% (+0.4%) from 81.147%
16277982668

Pull #3852

github

web-flow
Merge e78f6e62e into b0be264bf
Pull Request #3852: feat: call optimize in compressor

3 of 3 new or added lines in 1 file covered. (100.0%)

381 existing lines in 36 files now uncovered.

46289 of 56752 relevant lines covered (81.56%)

157514.17 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.82
/vortex-expr/src/exprs/merge.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::fmt::Display;
5
use std::hash::Hash;
6

7
use itertools::Itertools as _;
8
use vortex_array::arrays::StructArray;
9
use vortex_array::validity::Validity;
10
use vortex_array::{Array, ArrayRef, DeserializeMetadata, EmptyMetadata, IntoArray, ToCanonical};
11
use vortex_dtype::{DType, FieldNames, Nullability, StructFields};
12
use vortex_error::{VortexExpect as _, VortexResult, vortex_bail};
13

14
use crate::{AnalysisExpr, ExprEncodingRef, ExprId, ExprRef, IntoExpr, Scope, VTable, vtable};
15

16
vtable!(Merge);
17

18
/// Merge zero or more expressions that ALL return structs.
19
///
20
/// If any field names are duplicated, the field from later expressions wins.
21
///
22
/// NOTE: Fields are not recursively merged, i.e. the later field REPLACES the earlier field.
23
/// This makes struct fields behaviour consistent with other dtypes.
24
#[allow(clippy::derived_hash_with_manual_eq)]
25
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
26
pub struct MergeExpr {
27
    values: Vec<ExprRef>,
28
    nullability: Nullability,
29
}
30

31
pub struct MergeExprEncoding;
32

33
impl VTable for MergeVTable {
34
    type Expr = MergeExpr;
35
    type Encoding = MergeExprEncoding;
36
    type Metadata = EmptyMetadata;
37

38
    fn id(_encoding: &Self::Encoding) -> ExprId {
123✔
39
        ExprId::new_ref("merge")
123✔
40
    }
123✔
41

42
    fn encoding(_expr: &Self::Expr) -> ExprEncodingRef {
×
43
        ExprEncodingRef::new_ref(MergeExprEncoding.as_ref())
×
44
    }
×
45

46
    fn metadata(_expr: &Self::Expr) -> Option<Self::Metadata> {
×
47
        Some(EmptyMetadata)
×
48
    }
×
49

50
    fn children(expr: &Self::Expr) -> Vec<&ExprRef> {
26✔
51
        expr.values.iter().collect()
26✔
52
    }
26✔
53

54
    fn with_children(expr: &Self::Expr, children: Vec<ExprRef>) -> VortexResult<Self::Expr> {
1✔
55
        Ok(MergeExpr {
1✔
56
            values: children,
1✔
57
            nullability: expr.nullability,
1✔
58
        })
1✔
59
    }
1✔
60

61
    fn build(
×
62
        _encoding: &Self::Encoding,
×
63
        _metadata: &<Self::Metadata as DeserializeMetadata>::Output,
×
64
        children: Vec<ExprRef>,
×
65
    ) -> VortexResult<Self::Expr> {
×
66
        if children.is_empty() {
×
67
            vortex_bail!(
×
68
                "Merge expression must have at least one child, got: {:?}",
×
69
                children
×
70
            );
×
71
        }
×
72
        Ok(MergeExpr {
×
73
            values: children,
×
74
            nullability: Nullability::NonNullable, // Default to non-nullable
×
75
        })
×
76
    }
×
77

78
    fn evaluate(expr: &Self::Expr, scope: &Scope) -> VortexResult<ArrayRef> {
5✔
79
        let len = scope.len();
5✔
80
        let value_arrays = expr
5✔
81
            .values
5✔
82
            .iter()
5✔
83
            .map(|value_expr| value_expr.unchecked_evaluate(scope))
8✔
84
            .process_results(|it| it.collect::<Vec<_>>())?;
5✔
85

86
        // Collect fields in order of appearance. Later fields overwrite earlier fields.
87
        let mut field_names = Vec::new();
5✔
88
        let mut arrays = Vec::new();
5✔
89

90
        for value_array in value_arrays.iter() {
8✔
91
            // TODO(marko): When nullable, we need to merge struct validity into field validity.
92
            if value_array.dtype().is_nullable() {
8✔
93
                todo!("merge nullable structs");
×
94
            }
8✔
95
            if !value_array.dtype().is_struct() {
8✔
96
                vortex_bail!("merge expects non-nullable struct input");
×
97
            }
8✔
98

99
            let struct_array = value_array.to_struct()?;
8✔
100

101
            for (i, field_name) in struct_array.names().iter().enumerate() {
14✔
102
                let array = struct_array.fields()[i].clone();
14✔
103

104
                // Update or insert field.
105
                if let Some(idx) = field_names.iter().position(|name| name == field_name) {
20✔
106
                    arrays[idx] = array;
2✔
107
                } else {
12✔
108
                    field_names.push(field_name.clone());
12✔
109
                    arrays.push(array);
12✔
110
                }
12✔
111
            }
112
        }
113

114
        let validity = match expr.nullability {
5✔
115
            Nullability::NonNullable => Validity::NonNullable,
4✔
116
            Nullability::Nullable => Validity::AllValid,
1✔
117
        };
118
        Ok(
119
            StructArray::try_new(FieldNames::from(field_names), arrays, len, validity)?
5✔
120
                .into_array(),
5✔
121
        )
122
    }
5✔
123

124
    fn return_dtype(expr: &Self::Expr, scope: &DType) -> VortexResult<DType> {
5✔
125
        let mut field_names = Vec::new();
5✔
126
        let mut arrays = Vec::new();
5✔
127

128
        for value in expr.values.iter() {
8✔
129
            let dtype = value.return_dtype(scope)?;
8✔
130
            if !dtype.is_struct() {
8✔
131
                vortex_bail!("merge expects non-nullable struct input");
×
132
            }
8✔
133

8✔
134
            let struct_dtype = dtype
8✔
135
                .as_struct()
8✔
136
                .vortex_expect("merge expects struct input");
8✔
137

138
            for i in 0..struct_dtype.nfields() {
14✔
139
                let field_name = struct_dtype.field_name(i).vortex_expect("never OOB");
14✔
140
                let field_dtype = struct_dtype.field_by_index(i).vortex_expect("never OOB");
14✔
141
                if let Some(idx) = field_names.iter().position(|name| name == field_name) {
20✔
142
                    arrays[idx] = field_dtype;
2✔
143
                } else {
12✔
144
                    field_names.push(field_name.clone());
12✔
145
                    arrays.push(field_dtype);
12✔
146
                }
12✔
147
            }
148
        }
149

150
        Ok(DType::Struct(
5✔
151
            StructFields::new(FieldNames::from(field_names), arrays),
5✔
152
            expr.nullability,
5✔
153
        ))
5✔
154
    }
5✔
155
}
156

157
impl MergeExpr {
158
    pub fn new(values: Vec<ExprRef>, nullability: Nullability) -> Self {
8✔
159
        MergeExpr {
8✔
160
            values,
8✔
161
            nullability,
8✔
162
        }
8✔
163
    }
8✔
164

UNCOV
165
    pub fn new_expr(values: Vec<ExprRef>, nullability: Nullability) -> ExprRef {
×
UNCOV
166
        Self::new(values, nullability).into_expr()
×
UNCOV
167
    }
×
168

169
    pub fn nullability(&self) -> Nullability {
3✔
170
        self.nullability
3✔
171
    }
3✔
172
}
173

174
pub fn merge(
3✔
175
    elements: impl IntoIterator<Item = impl Into<ExprRef>>,
3✔
176
    nullability: Nullability,
3✔
177
) -> ExprRef {
3✔
178
    let values = elements.into_iter().map(|value| value.into()).collect_vec();
5✔
179
    MergeExpr::new(values, nullability).into_expr()
3✔
180
}
3✔
181

182
impl Display for MergeExpr {
183
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
×
184
        write!(
×
185
            f,
×
186
            "merge({}){}",
×
UNCOV
187
            self.values.iter().format(", "),
×
UNCOV
188
            self.nullability
×
UNCOV
189
        )
×
UNCOV
190
    }
×
191
}
192

193
impl AnalysisExpr for MergeExpr {}
194

195
#[cfg(test)]
196
mod tests {
197
    use vortex_array::arrays::{PrimitiveArray, StructArray};
198
    use vortex_array::{Array, IntoArray, ToCanonical};
199
    use vortex_buffer::buffer;
200
    use vortex_dtype::Nullability;
201
    use vortex_error::{VortexResult, vortex_bail};
202

203
    use crate::{MergeExpr, Scope, get_item, root};
204

205
    fn primitive_field(array: &dyn Array, field_path: &[&str]) -> VortexResult<PrimitiveArray> {
5✔
206
        let mut field_path = field_path.iter();
5✔
207

208
        let Some(field) = field_path.next() else {
5✔
209
            vortex_bail!("empty field path");
210
        };
211

212
        let mut array = array.to_struct()?.field_by_name(field)?.clone();
5✔
213
        for field in field_path {
5✔
214
            array = array.to_struct()?.field_by_name(field)?.clone();
215
        }
216
        array.to_primitive()
5✔
217
    }
5✔
218

219
    #[test]
220
    pub fn test_merge() {
1✔
221
        let expr = MergeExpr::new(
1✔
222
            vec![
1✔
223
                get_item("0", root()),
1✔
224
                get_item("1", root()),
1✔
225
                get_item("2", root()),
1✔
226
            ],
1✔
227
            Nullability::NonNullable,
1✔
228
        );
1✔
229

1✔
230
        let test_array = StructArray::from_fields(&[
1✔
231
            (
1✔
232
                "0",
1✔
233
                StructArray::from_fields(&[
1✔
234
                    ("a", buffer![0, 0, 0].into_array()),
1✔
235
                    ("b", buffer![1, 1, 1].into_array()),
1✔
236
                ])
1✔
237
                .unwrap()
1✔
238
                .into_array(),
1✔
239
            ),
1✔
240
            (
1✔
241
                "1",
1✔
242
                StructArray::from_fields(&[
1✔
243
                    ("b", buffer![2, 2, 2].into_array()),
1✔
244
                    ("c", buffer![3, 3, 3].into_array()),
1✔
245
                ])
1✔
246
                .unwrap()
1✔
247
                .into_array(),
1✔
248
            ),
1✔
249
            (
1✔
250
                "2",
1✔
251
                StructArray::from_fields(&[
1✔
252
                    ("d", buffer![4, 4, 4].into_array()),
1✔
253
                    ("e", buffer![5, 5, 5].into_array()),
1✔
254
                ])
1✔
255
                .unwrap()
1✔
256
                .into_array(),
1✔
257
            ),
1✔
258
        ])
1✔
259
        .unwrap()
1✔
260
        .into_array();
1✔
261
        let actual_array = expr.evaluate(&Scope::new(test_array)).unwrap();
1✔
262

1✔
263
        assert_eq!(
1✔
264
            actual_array.as_struct_typed().names(),
1✔
265
            &["a", "b", "c", "d", "e"].into()
1✔
266
        );
1✔
267

268
        assert_eq!(
1✔
269
            primitive_field(&actual_array, &["a"])
1✔
270
                .unwrap()
1✔
271
                .as_slice::<i32>(),
1✔
272
            [0, 0, 0]
1✔
273
        );
1✔
274
        assert_eq!(
1✔
275
            primitive_field(&actual_array, &["b"])
1✔
276
                .unwrap()
1✔
277
                .as_slice::<i32>(),
1✔
278
            [2, 2, 2]
1✔
279
        );
1✔
280
        assert_eq!(
1✔
281
            primitive_field(&actual_array, &["c"])
1✔
282
                .unwrap()
1✔
283
                .as_slice::<i32>(),
1✔
284
            [3, 3, 3]
1✔
285
        );
1✔
286
        assert_eq!(
1✔
287
            primitive_field(&actual_array, &["d"])
1✔
288
                .unwrap()
1✔
289
                .as_slice::<i32>(),
1✔
290
            [4, 4, 4]
1✔
291
        );
1✔
292
        assert_eq!(
1✔
293
            primitive_field(&actual_array, &["e"])
1✔
294
                .unwrap()
1✔
295
                .as_slice::<i32>(),
1✔
296
            [5, 5, 5]
1✔
297
        );
1✔
298
    }
1✔
299

300
    #[test]
301
    pub fn test_empty_merge() {
1✔
302
        let expr = MergeExpr::new(Vec::new(), Nullability::NonNullable);
1✔
303

1✔
304
        let test_array = StructArray::from_fields(&[("a", buffer![0, 1, 2].into_array())])
1✔
305
            .unwrap()
1✔
306
            .into_array();
1✔
307
        let actual_array = expr.evaluate(&Scope::new(test_array.clone())).unwrap();
1✔
308
        assert_eq!(actual_array.len(), test_array.len());
1✔
309
        assert_eq!(actual_array.as_struct_typed().nfields(), 0);
1✔
310
    }
1✔
311

312
    #[test]
313
    pub fn test_nested_merge() {
1✔
314
        // Nested structs are not merged!
1✔
315

1✔
316
        let expr = MergeExpr::new(
1✔
317
            vec![get_item("0", root()), get_item("1", root())],
1✔
318
            Nullability::NonNullable,
1✔
319
        );
1✔
320

1✔
321
        let test_array = StructArray::from_fields(&[
1✔
322
            (
1✔
323
                "0",
1✔
324
                StructArray::from_fields(&[(
1✔
325
                    "a",
1✔
326
                    StructArray::from_fields(&[
1✔
327
                        ("x", buffer![0, 0, 0].into_array()),
1✔
328
                        ("y", buffer![1, 1, 1].into_array()),
1✔
329
                    ])
1✔
330
                    .unwrap()
1✔
331
                    .into_array(),
1✔
332
                )])
1✔
333
                .unwrap()
1✔
334
                .into_array(),
1✔
335
            ),
1✔
336
            (
1✔
337
                "1",
1✔
338
                StructArray::from_fields(&[(
1✔
339
                    "a",
1✔
340
                    StructArray::from_fields(&[("x", buffer![0, 0, 0].into_array())])
1✔
341
                        .unwrap()
1✔
342
                        .into_array(),
1✔
343
                )])
1✔
344
                .unwrap()
1✔
345
                .into_array(),
1✔
346
            ),
1✔
347
        ])
1✔
348
        .unwrap()
1✔
349
        .into_array();
1✔
350
        let actual_array = expr
1✔
351
            .evaluate(&Scope::new(test_array.clone()))
1✔
352
            .unwrap()
1✔
353
            .to_struct()
1✔
354
            .unwrap();
1✔
355

1✔
356
        assert_eq!(
1✔
357
            actual_array
1✔
358
                .field_by_name("a")
1✔
359
                .unwrap()
1✔
360
                .to_struct()
1✔
361
                .unwrap()
1✔
362
                .names()
1✔
363
                .iter()
1✔
364
                .map(|name| name.as_ref())
1✔
365
                .collect::<Vec<_>>(),
1✔
366
            vec!["x"]
1✔
367
        );
1✔
368
    }
1✔
369

370
    #[test]
371
    pub fn test_merge_order() {
1✔
372
        let expr = MergeExpr::new(
1✔
373
            vec![get_item("0", root()), get_item("1", root())],
1✔
374
            Nullability::NonNullable,
1✔
375
        );
1✔
376

1✔
377
        let test_array = StructArray::from_fields(&[
1✔
378
            (
1✔
379
                "0",
1✔
380
                StructArray::from_fields(&[
1✔
381
                    ("a", buffer![0, 0, 0].into_array()),
1✔
382
                    ("c", buffer![1, 1, 1].into_array()),
1✔
383
                ])
1✔
384
                .unwrap()
1✔
385
                .into_array(),
1✔
386
            ),
1✔
387
            (
1✔
388
                "1",
1✔
389
                StructArray::from_fields(&[
1✔
390
                    ("b", buffer![2, 2, 2].into_array()),
1✔
391
                    ("d", buffer![3, 3, 3].into_array()),
1✔
392
                ])
1✔
393
                .unwrap()
1✔
394
                .into_array(),
1✔
395
            ),
1✔
396
        ])
1✔
397
        .unwrap()
1✔
398
        .into_array();
1✔
399
        let actual_array = expr
1✔
400
            .evaluate(&Scope::new(test_array.clone()))
1✔
401
            .unwrap()
1✔
402
            .to_struct()
1✔
403
            .unwrap();
1✔
404

1✔
405
        assert_eq!(actual_array.names(), &["a", "c", "b", "d"].into());
1✔
406
    }
1✔
407

408
    #[test]
409
    pub fn test_merge_nullable() {
1✔
410
        let expr = MergeExpr::new(vec![get_item("0", root())], Nullability::Nullable);
1✔
411

1✔
412
        let test_array = StructArray::from_fields(&[(
1✔
413
            "0",
1✔
414
            StructArray::from_fields(&[
1✔
415
                ("a", buffer![0, 0, 0].into_array()),
1✔
416
                ("b", buffer![1, 1, 1].into_array()),
1✔
417
            ])
1✔
418
            .unwrap()
1✔
419
            .into_array(),
1✔
420
        )])
1✔
421
        .unwrap()
1✔
422
        .into_array();
1✔
423
        let actual_array = expr.evaluate(&Scope::new(test_array.clone())).unwrap();
1✔
424
        assert!(actual_array.dtype().is_nullable());
1✔
425
    }
1✔
426
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc