• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16389458957

19 Jul 2025 02:06PM UTC coverage: 81.516% (+0.004%) from 81.512%
16389458957

Pull #3938

github

web-flow
Merge 65b2c864a into 45f202aaf
Pull Request #3938: perf: fine tune workstealing

37 of 38 new or added lines in 2 files covered. (97.37%)

1 existing line in 1 file now uncovered.

42011 of 51537 relevant lines covered (81.52%)

171776.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

98.28
/vortex-scan/src/multi_scan.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5
use std::sync::atomic::AtomicUsize;
6
use std::sync::atomic::Ordering::{Relaxed, SeqCst};
7

8
use crossbeam_deque::{Steal, Stealer, Worker};
9
use crossbeam_queue::SegQueue;
10
use futures::executor::LocalPool;
11
use futures::future::BoxFuture;
12
use parking_lot::RwLock;
13
use vortex_error::VortexResult;
14

15
use crate::ScanBuilder;
16

17
type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
18
type ScanBuilderFactory<T> = Arc<SegQueue<Box<dyn FnOnce() -> ScanBuilder<T> + Send + Sync>>>;
19

20
/// Coordinator to orchestrate multiple scan operations.
21
///
22
/// `MultiScan` allows to queue multiple scan operations in order to execute
23
/// them in parallel. In particular, this enables scanning multiple files.
24
#[derive(Default)]
25
pub struct MultiScan<T> {
26
    scan_builder_count: usize,
27
    scan_builders_constructed: Arc<AtomicUsize>,
28

29
    scan_builder_factory: ScanBuilderFactory<T>,
30
    stealers: Arc<RwLock<Vec<Stealer<ArrayFuture<T>>>>>,
31
    next_stealer_id: Arc<AtomicUsize>,
32
}
33

34
impl<T> MultiScan<T> {
35
    /// Created with lazily constructed scan builders closures.
36
    pub fn new<I, F>(closures: I) -> Self
183✔
37
    where
183✔
38
        F: FnOnce() -> ScanBuilder<T> + 'static + Send + Sync,
183✔
39
        I: IntoIterator<Item = F>,
183✔
40
    {
41
        let scan_builder_factory: ScanBuilderFactory<T> = Arc::new(SegQueue::new());
183✔
42
        for closure in closures.into_iter() {
184✔
43
            scan_builder_factory.push(Box::new(closure));
184✔
44
        }
184✔
45

46
        Self {
183✔
47
            scan_builder_count: scan_builder_factory.len(),
183✔
48
            scan_builders_constructed: Arc::new(AtomicUsize::new(0)),
183✔
49
            scan_builder_factory,
183✔
50
            stealers: Arc::new(RwLock::new(Vec::new())),
183✔
51
            next_stealer_id: Arc::new(AtomicUsize::new(0)),
183✔
52
        }
183✔
53
    }
183✔
54

55
    /// Creates a new iterator to participate in the scan.
56
    ///
57
    /// The scan progresses when calling `next` on the iterator.
58
    pub fn new_scan_iterator(&self) -> MultiScanIterator<T> {
1,450✔
59
        let worker = Worker::new_fifo();
1,450✔
60
        self.stealers.write().push(worker.stealer());
1,450✔
61

62
        MultiScanIterator {
1,450✔
63
            scan_builder_count: self.scan_builder_count,
1,450✔
64
            scan_builders_constructed: self.scan_builders_constructed.clone(),
1,450✔
65
            scan_builder_factory: self.scan_builder_factory.clone(),
1,450✔
66
            local_pool: LocalPool::new(),
1,450✔
67
            stealers: self.stealers.clone(),
1,450✔
68
            next_stealer_id: self.next_stealer_id.clone(),
1,450✔
69
            worker,
1,450✔
70
        }
1,450✔
71
    }
1,450✔
72
}
73

74
/// Scan iterator to participate in a `MultiScan`.
75
pub struct MultiScanIterator<T> {
76
    scan_builder_count: usize,
77
    scan_builders_constructed: Arc<AtomicUsize>,
78

79
    local_pool: LocalPool,
80
    worker: Worker<ArrayFuture<T>>,
81
    stealers: Arc<RwLock<Vec<Stealer<ArrayFuture<T>>>>>,
82
    next_stealer_id: Arc<AtomicUsize>,
83

84
    /// Thread-safe queue of closures that lazily produce [`ScanBuilder`] instances.
85
    /// This queue is shared across all iterators being created with `new_scan_iterator`.
86
    scan_builder_factory: ScanBuilderFactory<T>,
87
}
88

89
impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
90
    type Item = VortexResult<T>;
91

92
    fn next(&mut self) -> Option<VortexResult<T>> {
2,034✔
93
        // Queue up tasks if the thread local queue is empty.
94
        if self.worker.is_empty() {
2,034✔
95
            if let Some(scan_builder_fn) = self.scan_builder_factory.pop() {
2,018✔
96
                match scan_builder_fn().build() {
184✔
97
                    Ok(tasks) => {
184✔
98
                        for task in tasks {
768✔
99
                            self.worker.push(Box::pin(task));
584✔
100
                        }
584✔
101
                        self.scan_builders_constructed.fetch_add(1, Relaxed);
184✔
102
                    }
NEW
103
                    Err(err) => return Some(Err(err)),
×
104
                }
105
            } else {
106
                'outer_loop: while self.scan_builders_constructed.load(Relaxed)
177,190✔
107
                    < self.scan_builder_count
177,190✔
108
                    || self
1,838✔
109
                        .stealers
1,838✔
110
                        .read()
1,838✔
111
                        .iter()
1,838✔
112
                        .any(|stealer| !stealer.is_empty())
11,328✔
113
                {
114
                    // Round robin to ensure work is not always stolen from the same worker.
115
                    let stealers = self.stealers.read();
175,740✔
116
                    let steal_id = self.next_stealer_id.fetch_add(1, SeqCst);
175,740✔
117
                    for idx in 0..stealers.len() {
1,201,450✔
118
                        let idx = (steal_id + idx) % stealers.len();
1,201,450✔
119
                        let stealer = &stealers[idx];
1,201,450✔
120
                        if let Steal::Success(_) = stealer.steal_batch(&self.worker) {
1,201,450✔
121
                            break 'outer_loop;
384✔
122
                        }
1,201,066✔
123
                    }
124
                }
125
            }
126
        }
16✔
127

128
        let task = self.worker.pop()?;
2,034✔
129
        self.local_pool.run_until(task).transpose()
584✔
130
    }
2,034✔
131
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc