• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16466067776

23 Jul 2025 08:52AM UTC coverage: 81.142% (+0.007%) from 81.135%
16466067776

push

github

web-flow
Extract work queue logic (#3978)

Pull out the work queue logic from multi-scan so we can reuse it for a
single-scan, and possible for array exporting too.

Signed-off-by: Nicholas Gates <nick@nickgates.com>

71 of 73 new or added lines in 3 files covered. (97.26%)

42081 of 51861 relevant lines covered (81.14%)

173455.43 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.41
/vortex-scan/src/work_queue.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::iter;
5
use std::sync::Arc;
6
use std::sync::atomic::AtomicUsize;
7
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
8

9
use crossbeam_deque::{Steal, Stealer, Worker};
10
use crossbeam_queue::SegQueue;
11
use parking_lot::RwLock;
12
use vortex_error::VortexResult;
13

14
/// A factory that produces a vector of tasks.
15
pub type TaskFactory<T> = Box<dyn FnOnce() -> VortexResult<Vec<T>> + Send + Sync>;
16

17
/// A work-stealing queue that supports dynamically adding tasks from task factories.
18
///
19
/// Each task factory has affinity to a particular worker. After all factories have been
20
/// constructed, workers will attempt to steal tasks from each other until all tasks are processed.
21
pub struct WorkQueue<T> {
22
    state: Arc<State<T>>,
23
}
24

25
struct State<T> {
26
    /// A queue of factories that lazily produce tasks of type `T`.
27
    task_factories: SegQueue<TaskFactory<T>>,
28

29
    /// The total number of task factories that need to be constructed.
30
    num_factories: usize,
31

32
    /// How many factories have been constructed and had their tasks completely pushed into
33
    /// a worker queue.
34
    num_factories_constructed: AtomicUsize,
35

36
    /// The vector of stealers, one for each worker.
37
    stealers: RwLock<Vec<Stealer<T>>>,
38
}
39

40
impl<T> State<T> {
41
    /// Loads a factory and pushes its tasks into the given worker queue.
42
    ///
43
    /// Returns `true` if any tasks were pushed into the worker. Note that these tasks may have
44
    /// been stolen by the time the worker queue is checked.
45
    fn load_next_factory(&self, worker: &Worker<T>) -> VortexResult<bool> {
2,020✔
46
        if let Some(factory_fn) = self.task_factories.pop() {
2,020✔
47
            let tasks = factory_fn()?;
184✔
48
            for task in tasks {
768✔
49
                worker.push(task);
584✔
50
            }
584✔
51
            self.num_factories_constructed.fetch_add(1, SeqCst);
184✔
52
            Ok(true)
184✔
53
        } else {
54
            Ok(false)
1,836✔
55
        }
56
    }
2,020✔
57

58
    /// Attempts to steal work from other workers, returns `true` if work was stolen.
59
    fn steal_work(&self, worker: &Worker<T>) -> Steal<()> {
148,516✔
60
        // Repeatedly attempt to steal work from other workers until there are no retries.
61
        iter::repeat_with(|| {
148,744✔
62
            // This collect tries all stealers, exits early on the first successful steal,
63
            // or else tracks whether any steal requires a retry.
64
            self.stealers
148,744✔
65
                .read()
148,744✔
66
                .iter()
148,744✔
67
                .map(|stealer| stealer.steal_batch(worker))
1,088,235✔
68
                .collect::<Steal<()>>()
148,744✔
69
        })
148,744✔
70
        .find(|steal| !steal.is_retry())
148,744✔
71
        .unwrap_or(Steal::Empty)
148,516✔
72
    }
148,516✔
73
}
74

75
impl<T> WorkQueue<T> {
76
    /// Created with lazily constructed task factories.
77
    pub fn new<I>(factories: I) -> Self
183✔
78
    where
183✔
79
        I: IntoIterator<Item = TaskFactory<T>>,
183✔
80
    {
81
        let queue: SegQueue<TaskFactory<T>> = SegQueue::new();
183✔
82
        for factory in factories.into_iter() {
184✔
83
            queue.push(factory);
184✔
84
        }
184✔
85
        let num_factories = queue.len();
183✔
86

87
        Self {
183✔
88
            state: Arc::new(State {
183✔
89
                task_factories: queue,
183✔
90
                num_factories_constructed: AtomicUsize::new(0),
183✔
91
                num_factories,
183✔
92
                stealers: RwLock::new(Vec::new()),
183✔
93
            }),
183✔
94
        }
183✔
95
    }
183✔
96

97
    /// Creates a new worker to participate.
98
    ///
99
    /// The scan progresses when calling `next` on the iterator.
100
    pub fn new_iterator(&self) -> WorkQueueIterator<T> {
1,450✔
101
        let worker = Worker::new_fifo();
1,450✔
102

103
        // Register the worker with the shared state.
104
        self.state.stealers.write().push(worker.stealer());
1,450✔
105

106
        WorkQueueIterator {
1,450✔
107
            state: self.state.clone(),
1,450✔
108
            worker,
1,450✔
109
        }
1,450✔
110
    }
1,450✔
111
}
112

113
/// Iterator yield tasks from the work-stealing queue.
114
pub struct WorkQueueIterator<T> {
115
    state: Arc<State<T>>,
116
    worker: Worker<T>,
117
}
118

119
impl<T> Iterator for WorkQueueIterator<T> {
120
    type Item = VortexResult<T>;
121

122
    fn next(&mut self) -> Option<VortexResult<T>> {
2,034✔
123
        if self.worker.is_empty() {
2,034✔
124
            let next_factory_loaded = match self.state.load_next_factory(&self.worker) {
2,020✔
125
                Ok(next_factory_loaded) => next_factory_loaded,
2,020✔
NEW
126
                Err(e) => return Some(Err(e)),
×
127
            };
128

129
            if !next_factory_loaded {
2,020✔
130
                // If there are no more factories to load, then there is at least one worker
131
                // constructing a factory and about to push some tasks.
132
                //
133
                // We sit in a loop trying to steal some of those tasks, or else bail out when
134
                // all scans have been constructed, and we didn't manage to steal anything. To avoid
135
                // spinning too hot, we yield the thread each time we fail to steal work.
136
                while self.state.num_factories_constructed.load(Relaxed) < self.state.num_factories
148,206✔
137
                    || !self.state.steal_work(&self.worker).is_empty()
1,752✔
138
                {
139
                    if self.state.steal_work(&self.worker).is_success() {
146,764✔
140
                        break;
394✔
141
                    } else {
146,370✔
142
                        std::thread::yield_now();
146,370✔
143
                    }
146,370✔
144
                }
145
            }
184✔
146
        }
14✔
147

148
        Some(Ok(self.worker.pop()?))
2,034✔
149
    }
2,034✔
150
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc