• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16474031294

23 Jul 2025 02:48PM UTC coverage: 81.11% (+0.1%) from 80.988%
16474031294

push

github

web-flow
feat[duckdb]: add file pruning and fix scanner (#3986)

Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>

39 of 43 new or added lines in 3 files covered. (90.7%)

1 existing line in 1 file now uncovered.

42126 of 51937 relevant lines covered (81.11%)

173376.96 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

94.32
/vortex-scan/src/work_queue.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
//! A work-stealing iterator that supports dynamically adding tasks from task factories.
5

6
use std::sync::Arc;
7
use std::sync::atomic::AtomicUsize;
8
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
9
use std::{iter, thread};
10

11
use crossbeam_deque::{Steal, Stealer, Worker};
12
use crossbeam_queue::SegQueue;
13
use parking_lot::RwLock;
14
use vortex_error::VortexResult;
15

16
/// A factory that produces a vector of tasks.
17
pub type TaskFactory<T> = Box<dyn FnOnce() -> VortexResult<Vec<T>> + Send>;
18

19
/// A work-stealing queue that allows for dynamic task addition.
20
pub struct WorkStealingQueue<T> {
21
    state: Arc<State<T>>,
22
}
23

24
impl<T> Clone for WorkStealingQueue<T> {
25
    fn clone(&self) -> Self {
1,450✔
26
        Self {
1,450✔
27
            state: self.state.clone(),
1,450✔
28
        }
1,450✔
29
    }
1,450✔
30
}
31

32
impl<T> WorkStealingQueue<T> {
33
    /// Created with lazily constructed task factories.
34
    pub fn new<I>(factories: I) -> Self
183✔
35
    where
183✔
36
        I: IntoIterator<Item = TaskFactory<T>>,
183✔
37
    {
38
        let queue: SegQueue<TaskFactory<T>> = SegQueue::new();
183✔
39
        for factory in factories.into_iter() {
184✔
40
            queue.push(factory);
184✔
41
        }
184✔
42
        let num_factories = queue.len();
183✔
43

44
        Self {
183✔
45
            state: Arc::new(State {
183✔
46
                task_factories: queue,
183✔
47
                num_factories_constructed: AtomicUsize::new(0),
183✔
48
                num_factories,
183✔
49
                stealers: RwLock::new(Vec::new()),
183✔
50
                stealer_offset: Default::default(),
183✔
51
            }),
183✔
52
        }
183✔
53
    }
183✔
54

55
    pub fn new_iterator(self) -> WorkStealingIterator<T> {
1,450✔
56
        self.state.new_iterator()
1,450✔
57
    }
1,450✔
58
}
59

60
/// Shared state for the work queue.
61
struct State<T> {
62
    /// A queue of factories that lazily produce tasks of type `T`.
63
    task_factories: SegQueue<TaskFactory<T>>,
64

65
    /// The total number of task factories that need to be constructed.
66
    num_factories: usize,
67

68
    /// How many factories have been constructed and had their tasks completely pushed into
69
    /// a worker queue.
70
    num_factories_constructed: AtomicUsize,
71

72
    /// The vector of stealers, one for each worker.
73
    stealers: RwLock<Vec<Stealer<T>>>,
74

75
    /// An offset into the stealers vector, used to avoid skewed worker queues when stealing.
76
    stealer_offset: AtomicUsize,
77
}
78

79
impl<T> State<T> {
80
    /// Create a new iterator.
81
    fn new_iterator(self: Arc<Self>) -> WorkStealingIterator<T> {
1,450✔
82
        let worker = Worker::new_fifo();
1,450✔
83

84
        // Register the new worker with the shared state.
85
        self.stealers.write().push(worker.stealer());
1,450✔
86

87
        WorkStealingIterator {
1,450✔
88
            state: self,
1,450✔
89
            worker,
1,450✔
90
        }
1,450✔
91
    }
1,450✔
92

93
    /// Loads a factory and pushes its tasks into the given worker queue.
94
    ///
95
    /// Returns `true` if any tasks were pushed into the worker. Note that these tasks may have
96
    /// been stolen by the time the worker queue is checked.
97
    fn load_next_factory(&self, worker: &Worker<T>) -> VortexResult<bool> {
2,014✔
98
        loop {
99
            if let Some(factory_fn) = self.task_factories.pop() {
2,014✔
100
                let tasks = factory_fn()?;
184✔
101
                let is_empty = tasks.is_empty();
184✔
102
                // Tasks **must** be pushed before `num_factories_constructed` is incremented.
103
                for task in tasks {
768✔
104
                    worker.push(task);
584✔
105
                }
584✔
106
                self.num_factories_constructed.fetch_add(1, SeqCst);
184✔
107

108
                // Keep looping until we find a factory that has pushed tasks.
109
                if !is_empty {
184✔
110
                    return Ok(true);
184✔
NEW
111
                }
×
112
            } else {
113
                return Ok(false);
1,830✔
114
            }
115
        }
116
    }
2,014✔
117

118
    /// Reports whether there is any work left to steal.
119
    fn stealers_have_work(&self) -> bool {
1,724✔
120
        self.stealers
1,724✔
121
            .read()
1,724✔
122
            .iter()
1,724✔
123
            .any(|stealer| !stealer.is_empty())
11,625✔
124
    }
1,724✔
125

126
    /// Attempts to steal work from other workers, returns `true` if work was stolen.
127
    fn steal_work(&self, worker: &Worker<T>) -> Steal<()> {
170,699✔
128
        // Repeatedly attempt to steal work from other workers until there are no retries.
129
        iter::repeat_with(|| {
171,146✔
130
            // This collect tries all stealers, exits early on the first successful steal,
131
            // or else tracks whether any steal requires a retry.
132
            let guard = self.stealers.read();
171,146✔
133
            let num_stealers = guard.len();
171,146✔
134
            guard
171,146✔
135
                .iter()
171,146✔
136
                .cycle()
171,146✔
137
                .skip(self.stealer_offset.fetch_add(1, Relaxed) % num_stealers)
171,146✔
138
                .take(num_stealers)
171,146✔
139
                .map(|stealer| stealer.steal_batch(worker))
1,281,250✔
140
                .collect::<Steal<()>>()
171,146✔
141
        })
171,146✔
142
        .find(|steal| !steal.is_retry())
171,146✔
143
        .unwrap_or(Steal::Empty)
170,699✔
144
    }
170,699✔
145
}
146

147
/// A work-stealing iterator that supports dynamically adding tasks from task factories.
148
///
149
/// Each task factory has affinity to a particular worker. After all factories have been
150
/// constructed, workers will attempt to steal tasks from each other until all tasks are processed.
151
///
152
/// Workers are constructed by cloning the iterator.
153
pub struct WorkStealingIterator<T> {
154
    state: Arc<State<T>>,
155
    worker: Worker<T>,
156
}
157

158
impl<T> Clone for WorkStealingIterator<T> {
159
    fn clone(&self) -> Self {
×
NEW
160
        self.state.clone().new_iterator()
×
UNCOV
161
    }
×
162
}
163

164
impl<T> Iterator for WorkStealingIterator<T> {
165
    type Item = VortexResult<T>;
166

167
    fn next(&mut self) -> Option<VortexResult<T>> {
2,034✔
168
        if self.worker.is_empty() {
2,034✔
169
            let next_factory_loaded = match self.state.load_next_factory(&self.worker) {
2,014✔
170
                Ok(next_factory_loaded) => next_factory_loaded,
2,014✔
171
                Err(e) => return Some(Err(e)),
×
172
            };
173

174
            if !next_factory_loaded {
2,014✔
175
                // If there are no more factories to load, then there is at least one worker
176
                // constructing a factory and about to push some tasks.
177
                //
178
                // We sit in a loop trying to steal some of those tasks, or else bail out when
179
                // all scans have been constructed, and we didn't manage to steal anything. To avoid
180
                // spinning too hot, we yield the thread each time we fail to steal work.
181
                //
182
                // `steal_work` does have the side effect of stealing work, and we only want to loop
183
                // again if the result of an attempt of stealing results with `Retry`, for other cases
184
                // `Empty` and `Success` there is no point in trying again
185
                while self.state.num_factories_constructed.load(Relaxed) < self.state.num_factories
172,147✔
186
                    || self.state.stealers_have_work()
1,724✔
187
                {
188
                    if self.state.steal_work(&self.worker).is_success() {
170,699✔
189
                        break;
382✔
190
                    } else {
170,317✔
191
                        thread::yield_now();
170,317✔
192
                    }
170,317✔
193
                }
194
            }
184✔
195
        }
20✔
196

197
        // Attempt to pop a task from the worker queue.
198
        // Another worker may have stolen our tasks by this point. If that's the case, then we've
199
        // already finished loading the factories, and we're down to the last few tasks. Therefore,
200
        // it's ok for us to return `None` and terminate the iterator.
201
        Some(Ok(self.worker.pop()?))
2,034✔
202
    }
2,034✔
203
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc