• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16386778560

19 Jul 2025 08:20AM UTC coverage: 48.981%. First build
16386778560

Pull #3927

github

web-flow
Merge e05f29752 into 371d49976
Pull Request #3927: feat: duckdb workstealing

16 of 18 new or added lines in 1 file covered. (88.89%)

18211 of 37180 relevant lines covered (48.98%)

155204.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.75
/vortex-scan/src/multi_scan.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5
use std::sync::atomic::AtomicUsize;
6
use std::sync::atomic::Ordering::SeqCst;
7

8
use crossbeam_deque::{Stealer, Worker};
9
use crossbeam_queue::SegQueue;
10
use futures::executor::LocalPool;
11
use futures::future::BoxFuture;
12
use parking_lot::RwLock;
13
use vortex_error::VortexResult;
14

15
use crate::ScanBuilder;
16

17
type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
18
type ScanBuilderFactory<T> = Arc<SegQueue<Box<dyn FnOnce() -> ScanBuilder<T> + Send + Sync>>>;
19

20
/// Coordinator to orchestrate multiple scan operations.
21
///
22
/// `MultiScan` allows to queue multiple scan operations in order to execute
23
/// them in parallel. In particular, this enables scanning multiple files.
24
#[derive(Default)]
25
pub struct MultiScan<T> {
26
    scan_builder_factory: ScanBuilderFactory<T>,
27
    stealers: Arc<RwLock<Vec<Stealer<ArrayFuture<T>>>>>,
28
    next_stealer_id: Arc<AtomicUsize>,
29
}
30

31
impl<T> MultiScan<T> {
32
    pub fn new() -> Self {
172✔
33
        Self {
172✔
34
            scan_builder_factory: Arc::new(SegQueue::new()),
172✔
35
            stealers: Arc::new(RwLock::new(Vec::new())),
172✔
36
            next_stealer_id: Arc::new(AtomicUsize::new(0)),
172✔
37
        }
172✔
38
    }
172✔
39

40
    /// Add lazily constructed scan builders paired with their corresponding states.
41
    pub fn with_scan_builders<I, F>(self, closures: I) -> Self
172✔
42
    where
172✔
43
        F: FnOnce() -> ScanBuilder<T> + 'static + Send + Sync,
172✔
44
        I: IntoIterator<Item = F>,
172✔
45
    {
46
        for closure in closures.into_iter() {
172✔
47
            self.scan_builder_factory.push(Box::new(closure));
172✔
48
        }
172✔
49

50
        self
172✔
51
    }
172✔
52

53
    /// Creates a new iterator to participate in the scan.
54
    ///
55
    /// The scan progresses when calling `next` on the iterator.
56
    pub fn new_scan_iterator(&self) -> MultiScanIterator<T> {
1,376✔
57
        let worker = Worker::new_fifo();
1,376✔
58
        self.stealers.write().push(worker.stealer());
1,376✔
59

60
        MultiScanIterator {
1,376✔
61
            scan_builder_factory: self.scan_builder_factory.clone(),
1,376✔
62
            local_pool: LocalPool::new(),
1,376✔
63
            stealers: self.stealers.clone(),
1,376✔
64
            next_stealer_id: self.next_stealer_id.clone(),
1,376✔
65
            worker,
1,376✔
66
        }
1,376✔
67
    }
1,376✔
68
}
69

70
/// Scan iterator to participate in a `MultiScan`.
71
pub struct MultiScanIterator<T> {
72
    local_pool: LocalPool,
73
    worker: Worker<ArrayFuture<T>>,
74
    stealers: Arc<RwLock<Vec<Stealer<ArrayFuture<T>>>>>,
75
    next_stealer_id: Arc<AtomicUsize>,
76

77
    /// Thread-safe queue of closures that lazily produce [`ScanBuilder`] instances.
78
    /// This queue is shared across all iterators being created with `new_scan_iterator`.
79
    scan_builder_factory: ScanBuilderFactory<T>,
80
}
81

82
impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
83
    type Item = VortexResult<T>;
84

85
    fn next(&mut self) -> Option<VortexResult<T>> {
1,948✔
86
        // Queue up tasks if the thread local queue is empty.
87
        if self.worker.is_empty() {
1,948✔
88
            if let Some(scan_builder_fn) = self.scan_builder_factory.pop() {
1,548✔
89
                match scan_builder_fn().build() {
172✔
90
                    Ok(tasks) => {
172✔
91
                        for task in tasks {
744✔
92
                            self.worker.push(Box::pin(task));
572✔
93
                        }
572✔
94
                    }
95
                    Err(err) => return Some(Err(err)),
×
96
                }
97
            } else {
98
                let stealer_count = self.stealers.read().len();
1,376✔
99

100
                for _ in 0..stealer_count {
1,376✔
101
                    // Round robin to ensure work is not always stolen from the same worker.
102
                    let stealer_id = self.next_stealer_id.fetch_add(1, SeqCst) % stealer_count;
8,584✔
103
                    let stealer = &self.stealers.read()[stealer_id];
8,584✔
104
                    if !stealer.is_empty() {
8,584✔
105
                        // Steal ~half of the work and push it into `worker`.
NEW
106
                        _ = stealer.steal_batch(&self.worker);
×
NEW
107
                        break;
×
108
                    }
8,584✔
109
                }
110
            }
111
        }
400✔
112

113
        let task = self.worker.pop()?;
1,948✔
114

115
        self.local_pool.run_until(task).transpose()
572✔
116
    }
1,948✔
117
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc