• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16472348934

23 Jul 2025 01:41PM UTC coverage: 81.221% (+0.1%) from 81.111%
16472348934

Pull #3986

github

web-flow
Merge ebb75e07d into 7fb2171f0
Pull Request #3986: feat[duckdb]: add file pruning and fix scanner

21 of 24 new or added lines in 3 files covered. (87.5%)

19 existing lines in 3 files now uncovered.

42144 of 51888 relevant lines covered (81.22%)

173515.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.14
/vortex-scan/src/work_queue.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5
use std::sync::atomic::AtomicUsize;
6
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
7
use std::{iter, thread};
8

9
use crossbeam_deque::{Steal, Stealer, Worker};
10
use crossbeam_queue::SegQueue;
11
use parking_lot::RwLock;
12
use vortex_error::VortexResult;
13

14
/// A factory that produces a vector of tasks.
15
pub type TaskFactory<T> = Box<dyn FnOnce() -> VortexResult<Vec<T>> + Send + Sync>;
16

17
/// A work-stealing queue that supports dynamically adding tasks from task factories.
18
///
19
/// Each task factory has affinity to a particular worker. After all factories have been
20
/// constructed, workers will attempt to steal tasks from each other until all tasks are processed.
21
pub struct WorkQueue<T> {
22
    state: Arc<State<T>>,
23
}
24

25
struct State<T> {
26
    /// A queue of factories that lazily produce tasks of type `T`.
27
    task_factories: SegQueue<TaskFactory<T>>,
28

29
    /// The total number of task factories that need to be constructed.
30
    num_factories: usize,
31

32
    /// How many factories have been constructed and had their tasks completely pushed into
33
    /// a worker queue.
34
    num_factories_constructed: AtomicUsize,
35

36
    /// The vector of stealers, one for each worker.
37
    stealers: RwLock<Vec<Stealer<T>>>,
38
}
39

40
impl<T> State<T> {
41
    /// Loads the first non-empty factory and pushes its tasks into the given worker queue.
42
    ///
43
    /// Returns `true` if any tasks were pushed into the worker. Note that these tasks may have
44
    /// been stolen by the time the worker queue is checked.
45
    fn load_next_factory(&self, worker: &Worker<T>) -> VortexResult<bool> {
2,016✔
46
        let mut next_tasks = None;
2,016✔
47
        while let Some(factory_fn) = self.task_factories.pop() {
2,016✔
48
            let tasks = factory_fn()?;
184✔
49
            self.num_factories_constructed.fetch_add(1, SeqCst);
184✔
50
            if !tasks.is_empty() {
184✔
51
                next_tasks = Some(tasks);
184✔
52
                break;
184✔
NEW
53
            }
×
54
        }
55

56
        if let Some(tasks) = next_tasks.take() {
2,016✔
57
            for task in tasks {
768✔
58
                worker.push(task);
584✔
59
            }
584✔
60
            return Ok(true);
184✔
61
        }
1,832✔
62
        Ok(false)
1,832✔
63
    }
2,016✔
64

65
    /// Attempts to steal work from other workers, returns `true` if work was stolen.
66
    fn steal_work(&self, worker: &Worker<T>) -> Steal<()> {
155,892✔
67
        // Repeatedly attempt to steal work from other workers until there are no retries.
68
        iter::repeat_with(|| {
156,177✔
69
            // This collect tries all stealers, exits early on the first successful steal,
70
            // or else tracks whether any steal requires a retry.
71
            self.stealers
156,177✔
72
                .read()
156,177✔
73
                .iter()
156,177✔
74
                .map(|stealer| stealer.steal_batch(worker))
1,122,600✔
75
                .collect::<Steal<()>>()
156,177✔
76
        })
156,177✔
77
        .find(|steal| !steal.is_retry())
156,177✔
78
        .unwrap_or(Steal::Empty)
155,892✔
79
    }
155,892✔
80
}
81

82
impl<T> WorkQueue<T> {
83
    /// Created with lazily constructed task factories.
84
    pub fn new<I>(factories: I) -> Self
183✔
85
    where
183✔
86
        I: IntoIterator<Item = TaskFactory<T>>,
183✔
87
    {
88
        let queue: SegQueue<TaskFactory<T>> = SegQueue::new();
183✔
89
        for factory in factories.into_iter() {
184✔
90
            queue.push(factory);
184✔
91
        }
184✔
92
        let num_factories = queue.len();
183✔
93

94
        Self {
183✔
95
            state: Arc::new(State {
183✔
96
                task_factories: queue,
183✔
97
                num_factories_constructed: AtomicUsize::new(0),
183✔
98
                num_factories,
183✔
99
                stealers: RwLock::new(Vec::new()),
183✔
100
            }),
183✔
101
        }
183✔
102
    }
183✔
103

104
    /// Creates a new worker to participate.
105
    ///
106
    /// The scan progresses when calling `next` on the iterator.
107
    pub fn new_iterator(&self) -> WorkQueueIterator<T> {
1,450✔
108
        let worker = Worker::new_fifo();
1,450✔
109

110
        // Register the worker with the shared state.
111
        self.state.stealers.write().push(worker.stealer());
1,450✔
112

113
        WorkQueueIterator {
1,450✔
114
            state: self.state.clone(),
1,450✔
115
            worker,
1,450✔
116
        }
1,450✔
117
    }
1,450✔
118
}
119

120
/// Iterator yield tasks from the work-stealing queue.
121
pub struct WorkQueueIterator<T> {
122
    state: Arc<State<T>>,
123
    worker: Worker<T>,
124
}
125

126
impl<T> Iterator for WorkQueueIterator<T> {
127
    type Item = VortexResult<T>;
128

129
    fn next(&mut self) -> Option<VortexResult<T>> {
2,034✔
130
        if self.worker.is_empty() {
2,034✔
131
            let next_factory_loaded = match self.state.load_next_factory(&self.worker) {
2,016✔
132
                Ok(next_factory_loaded) => next_factory_loaded,
2,016✔
UNCOV
133
                Err(e) => return Some(Err(e)),
×
134
            };
135

136
            if !next_factory_loaded {
2,016✔
137
                // If there are no more factories to load, then there is at least one worker
138
                // constructing a factory and about to push some tasks.
139
                //
140
                // We sit in a loop trying to steal some of those tasks, or else bail out when
141
                // all scans have been constructed, and we didn't manage to steal anything. To avoid
142
                // spinning too hot, we yield the thread each time we fail to steal work.
143
                while self.state.num_factories_constructed.load(Relaxed) < self.state.num_factories
155,492✔
144
                    || !self.state.steal_work(&self.worker).is_empty()
1,844✔
145
                {
146
                    if self.state.steal_work(&self.worker).is_success() {
154,048✔
147
                        break;
388✔
148
                    } else {
153,660✔
149
                        thread::yield_now();
153,660✔
150
                    }
153,660✔
151
                }
152
            }
184✔
153
        }
18✔
154

155
        Some(Ok(self.worker.pop()?))
2,034✔
156
    }
2,034✔
157
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc