• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16471957488

23 Jul 2025 01:25PM UTC coverage: 81.108% (-0.003%) from 81.111%
16471957488

Pull #3984

github

web-flow
Merge fc5ad86fd into 7fb2171f0
Pull Request #3984: prevent double stealing

2 of 3 new or added lines in 1 file covered. (66.67%)

1 existing line in 1 file now uncovered.

42077 of 51878 relevant lines covered (81.11%)

173032.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.67
/vortex-scan/src/work_queue.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::iter;
5
use std::sync::Arc;
6
use std::sync::atomic::AtomicUsize;
7
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
8

9
use crossbeam_deque::{Steal, Stealer, Worker};
10
use crossbeam_queue::SegQueue;
11
use parking_lot::RwLock;
12
use vortex_error::VortexResult;
13

14
/// A factory that produces a vector of tasks.
15
pub type TaskFactory<T> = Box<dyn FnOnce() -> VortexResult<Vec<T>> + Send + Sync>;
16

17
/// A work-stealing queue that supports dynamically adding tasks from task factories.
18
///
19
/// Each task factory has affinity to a particular worker. After all factories have been
20
/// constructed, workers will attempt to steal tasks from each other until all tasks are processed.
21
pub struct WorkQueue<T> {
22
    state: Arc<State<T>>,
23
}
24

25
struct State<T> {
26
    /// A queue of factories that lazily produce tasks of type `T`.
27
    task_factories: SegQueue<TaskFactory<T>>,
28

29
    /// The total number of task factories that need to be constructed.
30
    num_factories: usize,
31

32
    /// How many factories have been constructed and had their tasks completely pushed into
33
    /// a worker queue.
34
    num_factories_constructed: AtomicUsize,
35

36
    /// The vector of stealers, one for each worker.
37
    stealers: RwLock<Vec<Stealer<T>>>,
38
}
39

40
impl<T> State<T> {
41
    /// Loads a factory and pushes its tasks into the given worker queue.
42
    ///
43
    /// Returns `true` if any tasks were pushed into the worker. Note that these tasks may have
44
    /// been stolen by the time the worker queue is checked.
45
    fn load_next_factory(&self, worker: &Worker<T>) -> VortexResult<bool> {
1,634✔
46
        if let Some(factory_fn) = self.task_factories.pop() {
1,634✔
47
            let tasks = factory_fn()?;
184✔
48
            for task in tasks {
768✔
49
                worker.push(task);
584✔
50
            }
584✔
51
            self.num_factories_constructed.fetch_add(1, SeqCst);
184✔
52
            Ok(true)
184✔
53
        } else {
54
            Ok(false)
1,450✔
55
        }
56
    }
1,634✔
57

58
    /// Attempts to steal work from other workers, returns `true` if work was stolen.
59
    fn steal_work(&self, worker: &Worker<T>) -> Steal<()> {
1,259✔
60
        // Repeatedly attempt to steal work from other workers until there are no retries.
61
        iter::repeat_with(|| {
1,259✔
62
            // This collect tries all stealers, exits early on the first successful steal,
63
            // or else tracks whether any steal requires a retry.
64
            self.stealers
1,259✔
65
                .read()
1,259✔
66
                .iter()
1,259✔
67
                .map(|stealer| stealer.steal_batch(worker))
7,285✔
68
                .collect::<Steal<()>>()
1,259✔
69
        })
1,259✔
70
        .find(|steal| !steal.is_retry())
1,259✔
71
        .unwrap_or(Steal::Empty)
1,259✔
72
    }
1,259✔
73
}
74

75
impl<T> WorkQueue<T> {
76
    /// Created with lazily constructed task factories.
77
    pub fn new<I>(factories: I) -> Self
183✔
78
    where
183✔
79
        I: IntoIterator<Item = TaskFactory<T>>,
183✔
80
    {
81
        let queue: SegQueue<TaskFactory<T>> = SegQueue::new();
183✔
82
        for factory in factories.into_iter() {
184✔
83
            queue.push(factory);
184✔
84
        }
184✔
85
        let num_factories = queue.len();
183✔
86

87
        Self {
183✔
88
            state: Arc::new(State {
183✔
89
                task_factories: queue,
183✔
90
                num_factories_constructed: AtomicUsize::new(0),
183✔
91
                num_factories,
183✔
92
                stealers: RwLock::new(Vec::new()),
183✔
93
            }),
183✔
94
        }
183✔
95
    }
183✔
96

97
    /// Creates a new worker to participate.
98
    ///
99
    /// The scan progresses when calling `next` on the iterator.
100
    pub fn new_iterator(&self) -> WorkQueueIterator<T> {
1,450✔
101
        let worker = Worker::new_fifo();
1,450✔
102

103
        // Register the worker with the shared state.
104
        self.state.stealers.write().push(worker.stealer());
1,450✔
105

106
        WorkQueueIterator {
1,450✔
107
            state: self.state.clone(),
1,450✔
108
            worker,
1,450✔
109
        }
1,450✔
110
    }
1,450✔
111
}
112

113
/// Iterator yield tasks from the work-stealing queue.
114
pub struct WorkQueueIterator<T> {
115
    state: Arc<State<T>>,
116
    worker: Worker<T>,
117
}
118

119
impl<T> Iterator for WorkQueueIterator<T> {
120
    type Item = VortexResult<T>;
121

122
    fn next(&mut self) -> Option<VortexResult<T>> {
2,034✔
123
        if self.worker.is_empty() {
2,034✔
124
            let next_factory_loaded = match self.state.load_next_factory(&self.worker) {
1,634✔
125
                Ok(next_factory_loaded) => next_factory_loaded,
1,634✔
126
                Err(e) => return Some(Err(e)),
×
127
            };
128

129
            if !next_factory_loaded {
1,634✔
130
                // If there are no more factories to load, then there is at least one worker
131
                // constructing a factory and about to push some tasks.
132
                //
133
                // We sit in a loop trying to steal some of those tasks, or else bail out when
134
                // all scans have been constructed, and we didn't manage to steal anything. To avoid
135
                // spinning too hot, we yield the thread each time we fail to steal work.
136
                while self.state.num_factories_constructed.load(Relaxed) < self.state.num_factories
1,450✔
137
                {
138
                    match self.state.steal_work(&self.worker) {
1,259✔
139
                        Steal::Success(_) | Steal::Empty => break,
1,259✔
NEW
UNCOV
140
                        Steal::Retry => std::thread::yield_now(),
×
141
                    }
142
                }
143
            }
184✔
144
        }
400✔
145

146
        Some(Ok(self.worker.pop()?))
2,034✔
147
    }
2,034✔
148
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc