• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16472752025

23 Jul 2025 01:57PM UTC coverage: 81.221% (+0.1%) from 81.111%
16472752025

Pull #3986

github

web-flow
Merge e7036b994 into 7fb2171f0
Pull Request #3986: feat[duckdb]: add file pruning and fix scanner

24 of 27 new or added lines in 3 files covered. (88.89%)

2 existing lines in 2 files now uncovered.

42145 of 51889 relevant lines covered (81.22%)

173504.45 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.97
/vortex-scan/src/work_queue.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5
use std::sync::atomic::AtomicUsize;
6
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
7
use std::{iter, thread};
8

9
use crossbeam_deque::{Steal, Stealer, Worker};
10
use crossbeam_queue::SegQueue;
11
use parking_lot::RwLock;
12
use vortex_error::VortexResult;
13

14
/// A factory that produces a vector of tasks.
15
pub type TaskFactory<T> = Box<dyn FnOnce() -> VortexResult<Vec<T>> + Send + Sync>;
16

17
/// A work-stealing queue that supports dynamically adding tasks from task factories.
18
///
19
/// Each task factory has affinity to a particular worker. After all factories have been
20
/// constructed, workers will attempt to steal tasks from each other until all tasks are processed.
21
pub struct WorkQueue<T> {
22
    state: Arc<State<T>>,
23
}
24

25
struct State<T> {
26
    /// A queue of factories that lazily produce tasks of type `T`.
27
    task_factories: SegQueue<TaskFactory<T>>,
28

29
    /// The total number of task factories that need to be constructed.
30
    num_factories: usize,
31

32
    /// How many factories have been constructed and had their tasks completely pushed into
33
    /// a worker queue.
34
    num_factories_constructed: AtomicUsize,
35

36
    /// The vector of stealers, one for each worker.
37
    stealers: RwLock<Vec<Stealer<T>>>,
38
}
39

40
impl<T> State<T> {
41
    /// Loads the first non-empty factory and pushes its tasks into the given worker queue.
42
    ///
43
    /// Returns `true` if any tasks were pushed into the worker. Note that these tasks may have
44
    /// been stolen by the time the worker queue is checked.
45
    fn load_next_factory(&self, worker: &Worker<T>) -> VortexResult<bool> {
2,014✔
46
        loop {
47
            if let Some(factory_fn) = self.task_factories.pop() {
2,014✔
48
                let tasks = factory_fn()?;
184✔
49
                let is_empty = tasks.is_empty();
184✔
50
                // Tasks must be pushed before `num_factories_constructed` is incremented, these
51
                // requires a happens-before relation
52
                for task in tasks {
768✔
53
                    worker.push(task);
584✔
54
                }
584✔
55
                self.num_factories_constructed.fetch_add(1, SeqCst);
184✔
56

57
                // Keep looping until we find a factory that has pushed tasks.
58
                if !is_empty {
184✔
59
                    return Ok(true);
184✔
NEW
UNCOV
60
                }
×
61
            } else {
62
                return Ok(false);
1,830✔
63
            }
64
        }
65
    }
2,014✔
66

67
    /// Attempts to steal work from other workers, returns `true` if work was stolen.
68
    fn steal_work(&self, worker: &Worker<T>) -> Steal<()> {
145,273✔
69
        // Repeatedly attempt to steal work from other workers until there are no retries.
70
        iter::repeat_with(|| {
145,546✔
71
            // This collect tries all stealers, exits early on the first successful steal,
72
            // or else tracks whether any steal requires a retry.
73
            self.stealers
145,546✔
74
                .read()
145,546✔
75
                .iter()
145,546✔
76
                .map(|stealer| stealer.steal_batch(worker))
1,056,208✔
77
                .collect::<Steal<()>>()
145,546✔
78
        })
145,546✔
79
        .find(|steal| !steal.is_retry())
145,546✔
80
        .unwrap_or(Steal::Empty)
145,273✔
81
    }
145,273✔
82
}
83

84
impl<T> WorkQueue<T> {
85
    /// Created with lazily constructed task factories.
86
    pub fn new<I>(factories: I) -> Self
183✔
87
    where
183✔
88
        I: IntoIterator<Item = TaskFactory<T>>,
183✔
89
    {
90
        let queue: SegQueue<TaskFactory<T>> = SegQueue::new();
183✔
91
        for factory in factories.into_iter() {
184✔
92
            queue.push(factory);
184✔
93
        }
184✔
94
        let num_factories = queue.len();
183✔
95

96
        Self {
183✔
97
            state: Arc::new(State {
183✔
98
                task_factories: queue,
183✔
99
                num_factories_constructed: AtomicUsize::new(0),
183✔
100
                num_factories,
183✔
101
                stealers: RwLock::new(Vec::new()),
183✔
102
            }),
183✔
103
        }
183✔
104
    }
183✔
105

106
    /// Creates a new worker to participate.
107
    ///
108
    /// The scan progresses when calling `next` on the iterator.
109
    pub fn new_iterator(&self) -> WorkQueueIterator<T> {
1,450✔
110
        let worker = Worker::new_fifo();
1,450✔
111

112
        // Register the worker with the shared state.
113
        self.state.stealers.write().push(worker.stealer());
1,450✔
114

115
        WorkQueueIterator {
1,450✔
116
            state: self.state.clone(),
1,450✔
117
            worker,
1,450✔
118
        }
1,450✔
119
    }
1,450✔
120
}
121

122
/// Iterator yield tasks from the work-stealing queue.
123
pub struct WorkQueueIterator<T> {
124
    state: Arc<State<T>>,
125
    worker: Worker<T>,
126
}
127

128
impl<T> Iterator for WorkQueueIterator<T> {
129
    type Item = VortexResult<T>;
130

131
    fn next(&mut self) -> Option<VortexResult<T>> {
2,034✔
132
        if self.worker.is_empty() {
2,034✔
133
            let next_factory_loaded = match self.state.load_next_factory(&self.worker) {
2,014✔
134
                Ok(next_factory_loaded) => next_factory_loaded,
2,014✔
135
                Err(e) => return Some(Err(e)),
×
136
            };
137

138
            if !next_factory_loaded {
2,014✔
139
                // If there are no more factories to load, then there is at least one worker
140
                // constructing a factory and about to push some tasks.
141
                //
142
                // We sit in a loop trying to steal some of those tasks, or else bail out when
143
                // all scans have been constructed, and we didn't manage to steal anything. To avoid
144
                // spinning too hot, we yield the thread each time we fail to steal work.
145
                while self.state.num_factories_constructed.load(Relaxed) < self.state.num_factories
145,009✔
146
                    || !self.state.steal_work(&self.worker).is_empty()
1,706✔
147
                {
148
                    if self.state.steal_work(&self.worker).is_success() {
143,567✔
149
                        break;
388✔
150
                    } else {
143,179✔
151
                        thread::yield_now();
143,179✔
152
                    }
143,179✔
153
                }
154
            }
184✔
155
        }
20✔
156

157
        Some(Ok(self.worker.pop()?))
2,034✔
158
    }
2,034✔
159
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc