• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16935267080

13 Aug 2025 11:00AM UTC coverage: 24.312% (-63.3%) from 87.658%
16935267080

Pull #4226

github

web-flow
Merge 81b48c7fb into baa6ea202
Pull Request #4226: Support converting TimestampTZ to and from duckdb

0 of 2 new or added lines in 1 file covered. (0.0%)

20666 existing lines in 469 files now uncovered.

8726 of 35892 relevant lines covered (24.31%)

147.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.32
/vortex-layout/src/sequence.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::cmp::Ordering;
5
use std::collections::BTreeSet;
6
use std::fmt;
7
use std::hash::{Hash, Hasher};
8
use std::pin::Pin;
9
use std::sync::Arc;
10
use std::task::{Context, Poll, Waker};
11

12
use parking_lot::Mutex;
13
use vortex_error::VortexExpect;
14
use vortex_utils::aliases::hash_map::HashMap;
15

16
use crate::segments::SegmentId;
17

18
/// A hierarchical sequence identifier that exists within a shared universe.
19
///
20
/// SequenceIds form a collision-free universe where each ID is represented as a vector
21
/// of indices (e.g., `[0, 1, 2]`). The API design prevents collisions by only allowing
22
/// new IDs to be created through controlled advancement or descent operations.
23
///
24
/// # Hierarchy and Ordering
25
///
26
/// IDs are hierarchical and lexicographically ordered:
27
/// - `[0]` < `[0, 0]` < `[0, 1]` < `[1]` < `[1, 0]`
28
/// - A parent ID like `[0, 1]` can spawn children `[0, 1, 0]`, `[0, 1, 1]`, etc.
29
/// - Sibling IDs are created by advancing: `[0, 0]` → `[0, 1]` → `[0, 2]`
30
///
31
/// # Drop Ordering
32
///
33
/// When a SequenceId is dropped, it may wake futures waiting for ordering guarantees.
34
/// The `collapse()` method leverages this to provide deterministic ordering of
35
/// recursively created sequence IDs.
36
pub struct SequenceId {
37
    id: Vec<usize>,
38
    universe: Arc<Mutex<SequenceUniverse>>,
39
}
40

41
impl PartialEq for SequenceId {
42
    fn eq(&self, other: &Self) -> bool {
×
43
        self.id == other.id
×
44
    }
×
45
}
46

47
impl Eq for SequenceId {}
48

49
impl PartialOrd for SequenceId {
50
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
×
51
        Some(self.cmp(other))
×
52
    }
×
53
}
54

55
impl Ord for SequenceId {
56
    fn cmp(&self, other: &Self) -> Ordering {
×
57
        self.id.cmp(&other.id)
×
58
    }
×
59
}
60

61
impl Hash for SequenceId {
62
    fn hash<H: Hasher>(&self, state: &mut H) {
×
63
        self.id.hash(state);
×
64
    }
×
65
}
66

67
impl fmt::Debug for SequenceId {
68
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
×
69
        f.debug_struct("SequenceId").field("id", &self.id).finish()
×
70
    }
×
71
}
72

73
impl SequenceId {
74
    /// Creates a new root sequence universe starting with ID `[0]`.
75
    ///
76
    /// Each call to `root()` creates an independent universe with no ordering
77
    /// guarantees between separate root instances. Within a single universe,
78
    /// all IDs are strictly ordered.
79
    pub fn root() -> SequencePointer {
2✔
80
        SequencePointer(SequenceId::new(vec![0], Default::default()))
2✔
81
    }
2✔
82

83
    /// Creates a child sequence by descending one level in the hierarchy.
84
    ///
85
    /// If this SequenceId has ID `[1, 2]`, this method creates the first child
86
    /// `[1, 2, 0]` and returns a `SequencePointer` that can generate siblings
87
    /// `[1, 2, 1]`, `[1, 2, 2]`, etc.
88
    ///
89
    /// # Ownership
90
    ///
91
    /// This method consumes `self`, as the parent ID is no longer needed once
92
    /// we've descended to work with its children.
93
    pub fn descend(self) -> SequencePointer {
32✔
94
        let mut id = self.id.clone();
32✔
95
        id.push(0);
32✔
96
        SequencePointer(SequenceId::new(id, self.universe.clone()))
32✔
97
    }
32✔
98

99
    /// Waits until all SequenceIds with IDs lexicographically smaller than this one are dropped.
100
    ///
101
    /// This async method provides ordering guarantees by ensuring all "prior" sequences
102
    /// in the universe have been dropped before returning. Combined with the collision-free
103
    /// API, this guarantees that for this universe no sequences lexicographically smaller than
104
    /// this one will ever be created again.
105
    ///
106
    /// # Ordering Guarantee
107
    ///
108
    /// Once `collapse()` returns, you can be certain that:
109
    /// - All sequences with smaller IDs have been dropped
110
    /// - No new sequences with smaller IDs can ever be created (due to collision prevention)
111
    /// - The returned `SegmentId` is monotonically increasing within this universe
112
    ///
113
    /// # Use Cases
114
    ///
115
    /// This is particularly useful for ordering recursively created work:
116
    /// - Recursive algorithms that spawn child tasks
117
    /// - Ensuring deterministic processing order across concurrent operations  
118
    /// - Converting hierarchical sequence identifiers to linear segment identifiers
119
    ///
120
    /// # Returns
121
    ///
122
    /// A monotonically increasing `SegmentId` that can be used for ordered storage
123
    /// or processing. Each successful collapse within a universe produces a larger
124
    /// `SegmentId` than the previous one.
125
    pub async fn collapse(self) -> SegmentId {
8✔
126
        WaitSequenceFuture(self).await
8✔
127
    }
8✔
128

129
    /// This is intentionally not pub. [SequencePointer::advance] is the only allowed way to create
130
    /// [SequenceId] instances
131
    fn new(id: Vec<usize>, universe: Arc<Mutex<SequenceUniverse>>) -> Self {
74✔
132
        // NOTE: This is the only place we construct a SequenceId, and
133
        // we immediately add it to the universe.
134
        let res = Self { id, universe };
74✔
135
        res.universe.lock().add(&res);
74✔
136
        res
74✔
137
    }
74✔
138
}
139

140
impl Drop for SequenceId {
141
    fn drop(&mut self) {
74✔
142
        let waker = self.universe.lock().remove(self);
74✔
143
        if let Some(w) = waker {
74✔
144
            w.wake();
8✔
145
        }
66✔
146
    }
74✔
147
}
148

149
/// A pointer that can advance through sibling sequence IDs.
150
///
151
/// SequencePointer is the only mechanism for creating new SequenceIds within
152
/// a universe.
153
pub struct SequencePointer(SequenceId);
154

155
impl SequencePointer {
156
    /// Advances to the next sibling sequence and returns the current one.
157
    ///
158
    /// # Ownership
159
    ///
160
    /// This method requires `&mut self` because it advances the internal state
161
    /// to point to the next sibling position.
162
    pub fn advance(&mut self) -> SequenceId {
40✔
163
        let mut next_id = self.0.id.clone();
40✔
164

165
        // increment x.y.z -> x.y.(z + 1)
166
        let last = next_id.last_mut();
40✔
167
        let last = last.vortex_expect("must have at least one element");
40✔
168
        *last += 1;
40✔
169
        let next_sibling = SequenceId::new(next_id, self.0.universe.clone());
40✔
170
        std::mem::replace(&mut self.0, next_sibling)
40✔
171
    }
40✔
172

173
    /// Converts this pointer into its current SequenceId, consuming the pointer.
174
    ///
175
    /// This method is useful when you want to access the current SequenceId
176
    /// without advancing to the next sibling. Once downgraded, you cannot
177
    /// create additional siblings from this pointer.
UNCOV
178
    pub fn downgrade(self) -> SequenceId {
×
UNCOV
179
        self.0
×
UNCOV
180
    }
×
181
}
182

183
#[derive(Default)]
184
struct SequenceUniverse {
185
    active: BTreeSet<Vec<usize>>,
186
    wakers: HashMap<Vec<usize>, Waker>,
187
    next_segment_id: SegmentId,
188
}
189

190
impl SequenceUniverse {
191
    fn add(&mut self, sequence_id: &SequenceId) {
74✔
192
        self.active.insert(sequence_id.id.clone());
74✔
193
    }
74✔
194

195
    fn remove(&mut self, sequence_id: &SequenceId) -> Option<Waker> {
74✔
196
        self.active.remove(&sequence_id.id);
74✔
197
        let Some(first) = self.active.first() else {
74✔
198
            // last sequence finished, we must have no pending futures
199
            assert!(self.wakers.is_empty(), "all wakers must have been removed");
2✔
200
            return None;
2✔
201
        };
202
        self.wakers.remove(first)
72✔
203
    }
74✔
204

205
    pub fn next_segment_id(&mut self) -> SegmentId {
8✔
206
        let res = self.next_segment_id;
8✔
207
        self.next_segment_id = SegmentId::from(*res + 1);
8✔
208
        res
8✔
209
    }
8✔
210
}
211

212
struct WaitSequenceFuture(SequenceId);
213

214
impl Future for WaitSequenceFuture {
215
    type Output = SegmentId;
216

217
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
16✔
218
        let mut guard = self.0.universe.lock();
16✔
219
        let current_first = guard
16✔
220
            .active
16✔
221
            .first()
16✔
222
            .cloned()
16✔
223
            .vortex_expect("if we have a future, we must have at least one active sequence");
16✔
224
        if self.0.id == current_first {
16✔
225
            return Poll::Ready(guard.next_segment_id());
8✔
226
        }
8✔
227
        guard.wakers.insert(self.0.id.clone(), cx.waker().clone());
8✔
228
        Poll::Pending
8✔
229
    }
16✔
230
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc