• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16473447330

23 Jul 2025 02:25PM UTC coverage: 81.055% (-0.06%) from 81.11%
16473447330

push

github

web-flow
Add a work-stealing ArrayIterator to drive scans (#3982)

Signed-off-by: Nicholas Gates <nick@nickgates.com>

45 of 93 new or added lines in 6 files covered. (48.39%)

6 existing lines in 1 file now uncovered.

42087 of 51924 relevant lines covered (81.06%)

173654.75 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.08
/vortex-scan/src/multi_scan.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use futures::executor::LocalPool;
5
use futures::future::BoxFuture;
6
use vortex_error::VortexResult;
7

8
use crate::ScanBuilder;
9
use crate::work_queue::{TaskFactory, WorkStealingIterator, WorkStealingQueue};
10

11
type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
12

13
/// A multi-scan for executing multiple scans concurrently across workers.
14
#[derive(Clone)]
15
pub struct MultiScan<T> {
16
    queue: WorkStealingQueue<ArrayFuture<T>>,
17
}
18

19
impl<T: 'static + Send> MultiScan<T> {
20
    /// Created with lazily constructed scan builders closures.
21
    pub fn new<I, F>(closures: I) -> Self
183✔
22
    where
183✔
23
        F: FnOnce() -> ScanBuilder<T> + 'static + Send,
183✔
24
        I: IntoIterator<Item = F>,
183✔
25
    {
26
        Self {
27
            queue: WorkStealingQueue::new(
183✔
28
                closures.into_iter().map(|closure| {
184✔
29
                    Box::new(move || closure().build()) as TaskFactory<ArrayFuture<T>>
184✔
30
                }),
184✔
31
            ),
32
        }
33
    }
183✔
34

35
    pub fn new_iterator(self) -> MultiScanIterator<T> {
1,450✔
36
        MultiScanIterator {
1,450✔
37
            inner: self.queue.new_iterator(),
1,450✔
38
            local_pool: LocalPool::new(),
1,450✔
39
        }
1,450✔
40
    }
1,450✔
41
}
42

43
/// Scan iterator to participate in a `MultiScan`.
44
pub struct MultiScanIterator<T> {
45
    inner: WorkStealingIterator<ArrayFuture<T>>,
46
    local_pool: LocalPool,
47
}
48

49
impl<T> Clone for MultiScanIterator<T> {
NEW
50
    fn clone(&self) -> Self {
×
NEW
51
        Self {
×
NEW
52
            inner: self.inner.clone(),
×
NEW
53
            local_pool: Default::default(),
×
NEW
54
        }
×
NEW
55
    }
×
56
}
57

58
impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
59
    type Item = VortexResult<T>;
60

61
    fn next(&mut self) -> Option<VortexResult<T>> {
2,034✔
62
        match self.inner.next()? {
2,034✔
63
            Ok(task) => self.local_pool.run_until(task).transpose(),
584✔
64
            Err(e) => Some(Err(e)),
×
65
        }
66
    }
2,034✔
67
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc