• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16505110840

24 Jul 2025 06:40PM UTC coverage: 81.498% (+0.04%) from 81.457%
16505110840

push

github

web-flow
Remove scan async API (#4005)

Signed-off-by: Nicholas Gates <nick@nickgates.com>

150 of 222 new or added lines in 10 files covered. (67.57%)

10 existing lines in 2 files now uncovered.

42586 of 52254 relevant lines covered (81.5%)

172411.37 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/vortex-scan/src/multi_thread.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::iter;
5
use std::sync::{Arc, LazyLock};
6

7
use futures::executor::block_on;
8
use futures::{StreamExt, stream};
9
use tokio::runtime::{Builder, Runtime};
10
use vortex_array::ArrayRef;
11
use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
12
use vortex_error::{VortexExpect, VortexResult, vortex_err};
13

14
use crate::ScanBuilder;
15

16
/// We create an internal Tokio runtime used exclusively for orchestrating work-stealing
17
/// of CPU-bound work for multithreaded scans.
18
///
19
/// It is intentionally not exposed to the user, not configurable, and does not enable I/O or
20
/// timers.
NEW
21
static CPU_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
×
NEW
22
    Builder::new_multi_thread()
×
NEW
23
        .thread_name("vortex-multithread-scan")
×
NEW
24
        .build()
×
NEW
25
        .vortex_expect("Failed to create a new Tokio runtime")
×
NEW
26
});
×
27

28
impl ScanBuilder<ArrayRef> {
29
    /// Execute the scan on multiple worker threads.
NEW
30
    pub fn into_array_iter_multithread(self) -> VortexResult<impl ArrayIterator + Send + 'static> {
×
NEW
31
        let dtype = self.dtype()?;
×
NEW
32
        Ok(ArrayIteratorAdapter::new(
×
NEW
33
            dtype,
×
NEW
34
            self.into_iter_multithread(|a| a)?,
×
35
        ))
NEW
36
    }
×
37

38
    /// Execute the scan on multiple worker threads.
39
    ///
40
    /// A `map_fn` can be passed to further transform the results of the scan while still running
41
    /// on the thread pool.
NEW
42
    pub fn into_iter_multithread<T, F>(
×
NEW
43
        self,
×
NEW
44
        map_fn: F,
×
NEW
45
    ) -> VortexResult<impl Iterator<Item = T> + Send + 'static>
×
NEW
46
    where
×
NEW
47
        T: 'static + Send,
×
NEW
48
        F: Fn(VortexResult<ArrayRef>) -> T + Send + Sync + 'static,
×
49
    {
NEW
50
        let concurrency = self.concurrency;
×
NEW
51
        let num_workers = CPU_RUNTIME.metrics().num_workers();
×
52

NEW
53
        let tasks = self.build()?;
×
54

55
        // We need to clone and send the map_fn into each task.
NEW
56
        let map_fn = Arc::new(map_fn);
×
57

NEW
58
        let handle = CPU_RUNTIME.handle().clone();
×
NEW
59
        let mut stream = stream::iter(tasks)
×
NEW
60
            .map(move |task| {
×
NEW
61
                let map_fn = map_fn.clone();
×
62
                // We don't _need_ to spawn the work here. But it allows Tokio to make progress on
63
                // the tasks in the background, even if the consumer thread is not calling
64
                // poll_next.
NEW
65
                handle.spawn(async move { task.await.transpose().map(|t| map_fn(t)) })
×
NEW
66
            })
×
67
            // TODO(ngates): this is very crude indeed. This buffered call essentially controls how
68
            //  many splits we have in-flight at any given time. We multiple workers by concurrency
69
            //  to configure per-thread concurrency, which essentially means each thread can make
70
            //  progress on one split while waiting for the I/O of another split to complete.
71
            //  In an ideal world, the number of in-flight tasks would be dynamically adjusted
72
            //  based on how much I/O the tasks _actually_ require. For example, all pruning tasks
73
            //  could be spawned immediately since they all use a single segment, this would allow
74
            //  head-room to run ahead and figure out the I/O demands of subsequent tasks.
NEW
75
            .buffered(num_workers * concurrency);
×
76

77
        Ok(
NEW
78
            iter::from_fn(move || block_on(stream.next())).filter_map(|result| {
×
NEW
79
                result
×
NEW
80
                    .map_err(|e| vortex_err!("Failed to join on a spawned scan task {e}"))
×
NEW
81
                    .vortex_expect("Failed to join on a spawned scan task")
×
NEW
82
            }),
×
83
        )
NEW
84
    }
×
85
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc