• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16473447330

23 Jul 2025 02:25PM UTC coverage: 81.055% (-0.06%) from 81.11%
16473447330

push

github

web-flow
Add a work-stealing ArrayIterator to drive scans (#3982)

Signed-off-by: Nicholas Gates <nick@nickgates.com>

45 of 93 new or added lines in 6 files covered. (48.39%)

6 existing lines in 1 file now uncovered.

42087 of 51924 relevant lines covered (81.06%)

173654.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

88.31
/vortex-scan/src/work_queue.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
//! A work-stealing iterator that supports dynamically adding tasks from task factories.
5

6
use std::iter;
7
use std::sync::Arc;
8
use std::sync::atomic::AtomicUsize;
9
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
10

11
use crossbeam_deque::{Steal, Stealer, Worker};
12
use crossbeam_queue::SegQueue;
13
use parking_lot::RwLock;
14
use vortex_error::VortexResult;
15

16
/// A factory that produces a vector of tasks.
17
pub type TaskFactory<T> = Box<dyn FnOnce() -> VortexResult<Vec<T>> + Send>;
18

19
/// A work-stealing queue that allows for dynamic task addition.
20
pub struct WorkStealingQueue<T> {
21
    state: Arc<State<T>>,
22
}
23

24
impl<T> Clone for WorkStealingQueue<T> {
25
    fn clone(&self) -> Self {
1,450✔
26
        Self {
1,450✔
27
            state: self.state.clone(),
1,450✔
28
        }
1,450✔
29
    }
1,450✔
30
}
31

32
impl<T> WorkStealingQueue<T> {
33
    /// Created with lazily constructed task factories.
34
    pub fn new<I>(factories: I) -> Self
183✔
35
    where
183✔
36
        I: IntoIterator<Item = TaskFactory<T>>,
183✔
37
    {
38
        let queue: SegQueue<TaskFactory<T>> = SegQueue::new();
183✔
39
        for factory in factories.into_iter() {
184✔
40
            queue.push(factory);
184✔
41
        }
184✔
42
        let num_factories = queue.len();
183✔
43

44
        Self {
183✔
45
            state: Arc::new(State {
183✔
46
                task_factories: queue,
183✔
47
                num_factories_constructed: AtomicUsize::new(0),
183✔
48
                num_factories,
183✔
49
                stealers: RwLock::new(Vec::new()),
183✔
50
            }),
183✔
51
        }
183✔
52
    }
183✔
53

54
    pub fn new_iterator(self) -> WorkStealingIterator<T> {
1,450✔
55
        self.state.new_iterator()
1,450✔
56
    }
1,450✔
57
}
58

59
/// A work-stealing iterator that supports dynamically adding tasks from task factories.
60
///
61
/// Each task factory has affinity to a particular worker. After all factories have been
62
/// constructed, workers will attempt to steal tasks from each other until all tasks are processed.
63
///
64
/// Workers are constructed by cloning the iterator.
65
pub struct WorkStealingIterator<T> {
66
    state: Arc<State<T>>,
67
    worker: Worker<T>,
68
}
69

70
/// Shared state for the work queue.
71
struct State<T> {
72
    /// A queue of factories that lazily produce tasks of type `T`.
73
    task_factories: SegQueue<TaskFactory<T>>,
74

75
    /// The total number of task factories that need to be constructed.
76
    num_factories: usize,
77

78
    /// How many factories have been constructed and had their tasks completely pushed into
79
    /// a worker queue.
80
    num_factories_constructed: AtomicUsize,
81

82
    /// The vector of stealers, one for each worker.
83
    stealers: RwLock<Vec<Stealer<T>>>,
84
}
85

86
impl<T> State<T> {
87
    /// Create a new iterator.
88
    fn new_iterator(self: Arc<Self>) -> WorkStealingIterator<T> {
1,450✔
89
        let worker = Worker::new_fifo();
1,450✔
90

91
        // Register the new worker with the shared state.
92
        self.stealers.write().push(worker.stealer());
1,450✔
93

94
        WorkStealingIterator {
1,450✔
95
            state: self,
1,450✔
96
            worker,
1,450✔
97
        }
1,450✔
98
    }
1,450✔
99

100
    /// Loads a factory and pushes its tasks into the given worker queue.
101
    ///
102
    /// Returns `true` if any tasks were pushed into the worker. Note that these tasks may have
103
    /// been stolen by the time the worker queue is checked.
104
    fn load_next_factory(&self, worker: &Worker<T>) -> VortexResult<bool> {
2,016✔
105
        if let Some(factory_fn) = self.task_factories.pop() {
2,016✔
106
            let tasks = factory_fn()?;
184✔
107
            for task in tasks {
768✔
108
                worker.push(task);
584✔
109
            }
584✔
110
            self.num_factories_constructed.fetch_add(1, SeqCst);
184✔
111
            Ok(true)
184✔
112
        } else {
113
            Ok(false)
1,832✔
114
        }
115
    }
2,016✔
116

117
    /// Attempts to steal work from other workers, returns `true` if work was stolen.
118
    fn steal_work(&self, worker: &Worker<T>) -> Steal<()> {
1,832✔
119
        // Repeatedly attempt to steal work from other workers until there are no retries.
120
        iter::repeat_with(|| {
2,182✔
121
            // This collect tries all stealers, exits early on the first successful steal,
122
            // or else tracks whether any steal requires a retry.
123
            self.stealers
2,182✔
124
                .read()
2,182✔
125
                .iter()
2,182✔
126
                .map(|stealer| stealer.steal_batch(worker))
14,884✔
127
                .collect::<Steal<()>>()
2,182✔
128
        })
2,182✔
129
        .find(|steal| !steal.is_retry())
2,182✔
130
        .unwrap_or(Steal::Empty)
1,832✔
131
    }
1,832✔
132
}
133

134
impl<T> Clone for WorkStealingIterator<T> {
NEW
135
    fn clone(&self) -> Self {
×
UNCOV
136
        let worker = Worker::new_fifo();
×
137

138
        // Register the new worker with the shared state.
UNCOV
139
        self.state.stealers.write().push(worker.stealer());
×
140

NEW
141
        Self {
×
UNCOV
142
            state: self.state.clone(),
×
UNCOV
143
            worker,
×
UNCOV
144
        }
×
UNCOV
145
    }
×
146
}
147

148
impl<T> Iterator for WorkStealingIterator<T> {
149
    type Item = VortexResult<T>;
150

151
    fn next(&mut self) -> Option<VortexResult<T>> {
2,034✔
152
        if self.worker.is_empty() {
2,034✔
153
            let next_factory_loaded = match self.state.load_next_factory(&self.worker) {
2,016✔
154
                Ok(next_factory_loaded) => next_factory_loaded,
2,016✔
155
                Err(e) => return Some(Err(e)),
×
156
            };
157

158
            if !next_factory_loaded {
2,016✔
159
                // If there are no more factories to load, then there is at least one worker
160
                // constructing a factory and about to push some tasks.
161
                //
162
                // We sit in a loop trying to steal some of those tasks, or else bail out when
163
                // all scans have been constructed, and we didn't manage to steal anything. To avoid
164
                // spinning too hot, we yield the thread each time we fail to steal work.
165
                //
166
                // `steal_work` does have the side effect of stealing work, and we only want to loop
167
                // again if the result of an attempt of stealing results with `Retry`, for other cases
168
                // `Empty` and `Success` there is no point in trying again
169
                while self.state.num_factories_constructed.load(Relaxed) < self.state.num_factories
5,899,053✔
170
                    || self.state.steal_work(&self.worker).is_retry()
1,832✔
171
                {
5,897,221✔
172
                    std::thread::yield_now();
5,897,221✔
173
                }
5,897,221✔
174
            }
184✔
175
        }
18✔
176

177
        Some(Ok(self.worker.pop()?))
2,034✔
178
    }
2,034✔
179
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc