• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16777255093

06 Aug 2025 12:43PM UTC coverage: 84.036% (+0.007%) from 84.029%
16777255093

Pull #4138

github

web-flow
Merge c628d6ed8 into 773d4ec96
Pull Request #4138: fix: `into_iter_multithread` buffered futures usage

0 of 4 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

48340 of 57523 relevant lines covered (84.04%)

520681.07 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/vortex-scan/src/multi_thread.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::iter;
5
use std::sync::{Arc, LazyLock};
6

7
use futures::{StreamExt, stream};
8
use tokio::runtime::{Builder, Runtime};
9
use vortex_array::ArrayRef;
10
use vortex_array::iter::{ArrayIterator, ArrayIteratorAdapter};
11
use vortex_error::{VortexExpect, VortexResult, vortex_err};
12

13
use crate::ScanBuilder;
14

15
/// We create an internal Tokio runtime used exclusively for orchestrating work-stealing
16
/// of CPU-bound work for multithreaded scans.
17
///
18
/// It is intentionally not exposed to the user, not configurable, and does not enable I/O or
19
/// timers.
20
static CPU_RUNTIME: LazyLock<Runtime> = LazyLock::new(|| {
×
21
    Builder::new_multi_thread()
×
22
        .thread_name("vortex-multithread-scan")
×
23
        .build()
×
24
        .vortex_expect("Failed to create a new Tokio runtime")
×
25
});
×
26

27
impl ScanBuilder<ArrayRef> {
28
    /// Execute the scan on multiple worker threads.
29
    pub fn into_array_iter_multithread(self) -> VortexResult<impl ArrayIterator + Send + 'static> {
×
30
        let dtype = self.dtype()?;
×
31
        Ok(ArrayIteratorAdapter::new(
×
32
            dtype,
×
33
            self.into_iter_multithread(|a| a)?,
×
34
        ))
35
    }
×
36

37
    /// Execute the scan on multiple worker threads.
38
    ///
39
    /// A `map_fn` can be passed to further transform the results of the scan while still running
40
    /// on the thread pool.
41
    pub fn into_iter_multithread<T, F>(
×
42
        self,
×
43
        map_fn: F,
×
44
    ) -> VortexResult<impl Iterator<Item = T> + Send + 'static>
×
45
    where
×
46
        T: 'static + Send,
×
47
        F: Fn(VortexResult<ArrayRef>) -> T + Send + Sync + 'static,
×
48
    {
49
        let concurrency = self.concurrency;
×
50
        let num_workers = CPU_RUNTIME.metrics().num_workers();
×
51

52
        let tasks = self.build()?;
×
53
        // We need to clone and send the map_fn into each task.
54
        let map_fn = Arc::new(map_fn);
×
55

UNCOV
56
        let mut stream = stream::iter(tasks)
×
57
            // TODO(ngates): this is very crude indeed. This buffered call essentially controls how
58
            //  many splits we have in-flight at any given time. We multiple workers by concurrency
59
            //  to configure per-thread concurrency, which essentially means each thread can make
60
            //  progress on one split while waiting for the I/O of another split to complete.
61
            //  In an ideal world, the number of in-flight tasks would be dynamically adjusted
62
            //  based on how much I/O the tasks _actually_ require. For example, all pruning tasks
63
            //  could be spawned immediately since they all use a single segment, this would allow
64
            //  head-room to run ahead and figure out the I/O demands of subsequent tasks.
65
            .buffered(num_workers * concurrency);
×
66

67
        Ok(iter::from_fn(move || {
×
68
            tokio::task::block_in_place(|| CPU_RUNTIME.handle().block_on(stream.next()))
×
69
        })
×
NEW
70
        .filter_map(move |result| match result {
×
NEW
71
            Ok(Some(array)) => Some(map_fn(Ok(array))),
×
NEW
72
            Ok(None) => None,
×
NEW
73
            Err(e) => Some(map_fn(Err(e))),
×
74
        }))
×
75
    }
×
76
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc