• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16935267080

13 Aug 2025 11:00AM UTC coverage: 24.312% (-63.3%) from 87.658%
16935267080

Pull #4226

github

web-flow
Merge 81b48c7fb into baa6ea202
Pull Request #4226: Support converting TimestampTZ to and from duckdb

0 of 2 new or added lines in 1 file covered. (0.0%)

20666 existing lines in 469 files now uncovered.

8726 of 35892 relevant lines covered (24.31%)

147.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.82
/vortex-scan/src/work_queue.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
//! A work-stealing iterator that supports dynamically adding tasks from task factories.
5

6
use std::sync::Arc;
7
use std::sync::atomic::{AtomicUsize, Ordering};
8
use std::{iter, thread};
9

10
use crossbeam_deque::{Steal, Stealer, Worker};
11
use crossbeam_queue::SegQueue;
12
use parking_lot::RwLock;
13
use vortex_error::VortexResult;
14

15
/// A factory that produces a vector of tasks.
16
pub type TaskFactory<T> = Box<dyn FnOnce() -> VortexResult<Vec<T>> + Send>;
17

18
/// A work-stealing queue that allows for dynamic task addition.
19
pub struct WorkStealingQueue<T> {
20
    state: Arc<State<T>>,
21
}
22

23
impl<T> Clone for WorkStealingQueue<T> {
24
    fn clone(&self) -> Self {
2✔
25
        Self {
2✔
26
            state: self.state.clone(),
2✔
27
        }
2✔
28
    }
2✔
29
}
30

31
impl<T> WorkStealingQueue<T> {
32
    /// Created with lazily constructed task factories.
33
    pub fn new<I>(factories: I) -> Self
2✔
34
    where
2✔
35
        I: IntoIterator<Item = TaskFactory<T>>,
2✔
36
    {
37
        let queue: SegQueue<TaskFactory<T>> = SegQueue::new();
2✔
38
        for factory in factories.into_iter() {
2✔
39
            queue.push(factory);
2✔
40
        }
2✔
41
        let num_factories = queue.len();
2✔
42

43
        Self {
2✔
44
            state: Arc::new(State {
2✔
45
                task_factories: queue,
2✔
46
                num_factories_constructed: AtomicUsize::new(0),
2✔
47
                num_factories,
2✔
48
                stealers: RwLock::new(Vec::new()),
2✔
49
                stealer_offset: Default::default(),
2✔
50
            }),
2✔
51
        }
2✔
52
    }
2✔
53

54
    pub fn new_iterator(self) -> WorkStealingIterator<T> {
2✔
55
        self.state.new_iterator()
2✔
56
    }
2✔
57
}
58

59
/// Shared state for the work queue.
60
struct State<T> {
61
    /// A queue of factories that lazily produce tasks of type `T`.
62
    task_factories: SegQueue<TaskFactory<T>>,
63

64
    /// The total number of task factories that need to be constructed.
65
    num_factories: usize,
66

67
    /// How many factories have been constructed and had their tasks completely pushed into
68
    /// a worker queue.
69
    num_factories_constructed: AtomicUsize,
70

71
    /// The vector of stealers, one for each worker.
72
    stealers: RwLock<Vec<Stealer<T>>>,
73

74
    /// An offset into the stealers vector, used to avoid skewed worker queues when stealing.
75
    stealer_offset: AtomicUsize,
76
}
77

78
impl<T> State<T> {
79
    /// Create a new iterator.
80
    fn new_iterator(self: Arc<Self>) -> WorkStealingIterator<T> {
2✔
81
        let worker = Worker::new_fifo();
2✔
82

83
        // Register the new worker with the shared state.
84
        self.stealers.write().push(worker.stealer());
2✔
85

86
        WorkStealingIterator {
2✔
87
            state: self,
2✔
88
            worker,
2✔
89
        }
2✔
90
    }
2✔
91

92
    /// Loads a factory and pushes its tasks into the given worker queue.
93
    ///
94
    /// Returns `true` if any tasks were pushed into the worker. Note that these tasks may have
95
    /// been stolen by the time the worker queue is checked.
96
    fn load_next_factory(&self, worker: &Worker<T>) -> VortexResult<bool> {
4✔
97
        loop {
98
            if let Some(factory_fn) = self.task_factories.pop() {
4✔
99
                let tasks = factory_fn().inspect_err(|_| {
2✔
100
                    // In case of an error, increment the counter such that all other workers are able to terminate.
101
                    // `num_factories_constructed` is part of the loop condition when workers attempt to steal work.
UNCOV
102
                    self.num_factories_constructed
×
UNCOV
103
                        .fetch_add(1, Ordering::SeqCst);
×
UNCOV
104
                })?;
×
105
                let is_empty = tasks.is_empty();
2✔
106

107
                // Tasks *must* be pushed before `num_factories_constructed` is incremented.
108
                for task in tasks {
4✔
109
                    worker.push(task);
2✔
110
                }
2✔
111

112
                self.num_factories_constructed
2✔
113
                    .fetch_add(1, Ordering::SeqCst);
2✔
114

115
                // Keep looping until we find a factory that has pushed tasks.
116
                if !is_empty {
2✔
117
                    return Ok(true);
2✔
UNCOV
118
                }
×
119
            } else {
120
                return Ok(false);
2✔
121
            }
122
        }
123
    }
4✔
124

125
    /// Reports whether there is any work left to steal.
126
    fn stealers_have_work(&self) -> bool {
2✔
127
        self.stealers
2✔
128
            .read()
2✔
129
            .iter()
2✔
130
            .any(|stealer| !stealer.is_empty())
2✔
131
    }
2✔
132

133
    /// Attempts to steal work from other workers, returns `true` if work was stolen.
UNCOV
134
    fn steal_work(&self, worker: &Worker<T>) -> Steal<()> {
×
135
        // Repeatedly attempt to steal work from other workers until there are no retries.
UNCOV
136
        iter::repeat_with(|| {
×
137
            // This collect tries all stealers, exits early on the first successful steal,
138
            // or else tracks whether any steal requires a retry.
UNCOV
139
            let guard = self.stealers.read();
×
UNCOV
140
            let num_stealers = guard.len();
×
UNCOV
141
            guard
×
UNCOV
142
                .iter()
×
UNCOV
143
                .cycle()
×
UNCOV
144
                .skip(self.stealer_offset.fetch_add(1, Ordering::SeqCst) % num_stealers)
×
UNCOV
145
                .take(num_stealers)
×
UNCOV
146
                .map(|stealer| stealer.steal_batch(worker))
×
UNCOV
147
                .collect::<Steal<()>>()
×
UNCOV
148
        })
×
UNCOV
149
        .find(|steal| !steal.is_retry())
×
UNCOV
150
        .unwrap_or(Steal::Empty)
×
UNCOV
151
    }
×
152
}
153

154
/// A work-stealing iterator that supports dynamically adding tasks from task factories.
155
///
156
/// Each task factory has affinity to a particular worker. After all factories have been
157
/// constructed, workers will attempt to steal tasks from each other until all tasks are processed.
158
///
159
/// Workers are constructed by cloning the iterator.
160
pub struct WorkStealingIterator<T> {
161
    state: Arc<State<T>>,
162
    worker: Worker<T>,
163
}
164

165
impl<T> Clone for WorkStealingIterator<T> {
UNCOV
166
    fn clone(&self) -> Self {
×
UNCOV
167
        self.state.clone().new_iterator()
×
UNCOV
168
    }
×
169
}
170

171
impl<T> Iterator for WorkStealingIterator<T> {
172
    type Item = VortexResult<T>;
173

174
    fn next(&mut self) -> Option<VortexResult<T>> {
4✔
175
        if self.worker.is_empty() {
4✔
176
            let next_factory_loaded = match self.state.load_next_factory(&self.worker) {
4✔
177
                Ok(next_factory_loaded) => next_factory_loaded,
4✔
UNCOV
178
                Err(e) => return Some(Err(e)),
×
179
            };
180

181
            if !next_factory_loaded {
4✔
182
                // If there are no more factories to load, then there is at least one worker
183
                // constructing a factory and about to push some tasks.
184
                //
185
                // We sit in a loop trying to steal some of those tasks, or else bail out when
186
                // all scans have been constructed, and we didn't manage to steal anything. To avoid
187
                // spinning too hot, we yield the thread each time we fail to steal work.
188
                //
189
                // `steal_work` does have the side effect of stealing work, and we only want to loop
190
                // again if the result of an attempt of stealing results with `Retry`, for other cases
191
                // `Empty` and `Success` there is no point in trying again
192
                // Use Acquire ordering to ensure we see all writes from other threads
193
                while self.state.num_factories_constructed.load(Ordering::SeqCst)
2✔
194
                    < self.state.num_factories
2✔
195
                    || self.state.stealers_have_work()
2✔
196
                {
UNCOV
197
                    if self.state.steal_work(&self.worker).is_success() {
×
UNCOV
198
                        break;
×
UNCOV
199
                    } else {
×
UNCOV
200
                        thread::yield_now();
×
UNCOV
201
                    }
×
202
                }
203
            }
2✔
UNCOV
204
        }
×
205

206
        // Attempt to pop a task from the worker queue.
207
        // Another worker may have stolen our tasks by this point. If that's the case, then we've
208
        // already finished loading the factories, and we're down to the last few tasks. Therefore,
209
        // it's ok for us to return `None` and terminate the iterator.
210
        Some(Ok(self.worker.pop()?))
4✔
211
    }
4✔
212
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc