• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16318538666

16 Jul 2025 11:44AM UTC coverage: 81.562% (-0.07%) from 81.631%
16318538666

push

github

web-flow
feature: multi-file-scan thread-safe iterator (#3863)

This PR implements a multi-scan operator which can be driven from
multiple threads through `MultiScanIterator`s. Work-stealing will be
implemented in a follow up. Further, the implementation will to a global
conversion cache shared across all threads.

---------

Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>

180 of 187 new or added lines in 3 files covered. (96.26%)

26 existing lines in 2 files now uncovered.

46763 of 57334 relevant lines covered (81.56%)

145250.55 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.0
/vortex-scan/src/multi_scan.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5

6
use crossbeam_queue::SegQueue;
7
use futures::executor::LocalPool;
8
use futures::future::BoxFuture;
9
use futures::stream::{FuturesUnordered, StreamExt};
10
use vortex_array::ArrayRef;
11
use vortex_error::VortexResult;
12

13
use crate::ScanBuilder;
14

15
type ArrayFuture = BoxFuture<'static, VortexResult<Option<ArrayRef>>>;
16
type ScanBuilderFactory = Arc<SegQueue<Box<dyn FnOnce() -> ScanBuilder<ArrayRef> + Send + Sync>>>;
17

18
/// Coordinator to orchestrate multiple scan operations.
19
///
20
/// `MultiScan` allows to queue multiple scan operations in order to execute
21
/// them in parallel. In particular, this enables scanning multiple files.
22
pub struct MultiScan {
23
    scan_builder_factory: ScanBuilderFactory,
24
}
25

26
impl MultiScan {
27
    pub fn new() -> Self {
359✔
28
        Self {
359✔
29
            scan_builder_factory: Arc::new(SegQueue::new()),
359✔
30
        }
359✔
31
    }
359✔
32

33
    /// `ScanBuilder`s are passed through closures to decouple how the they are created.
34
    pub fn with_scan_builders<I, F>(self, closures: I) -> Self
183✔
35
    where
183✔
36
        F: FnOnce() -> ScanBuilder<ArrayRef> + 'static + Send + Sync,
183✔
37
        I: IntoIterator<Item = F>,
183✔
38
    {
183✔
39
        for closure in closures.into_iter() {
184✔
40
            self.scan_builder_factory.push(Box::new(closure));
184✔
41
        }
184✔
42

43
        self
183✔
44
    }
183✔
45

46
    /// Creates a new iterator to participate in the scan.
47
    ///
48
    /// The scan progresses when calling `next` on the iterator.
49
    pub fn new_scan_iterator(&self) -> MultiScanIterator {
2,634✔
50
        MultiScanIterator {
2,634✔
51
            scan_builder_factory: self.scan_builder_factory.clone(),
2,634✔
52
            local_pool: LocalPool::new(),
2,634✔
53
            polled_tasks: FuturesUnordered::new(),
2,634✔
54
            task_queue: SegQueue::new(),
2,634✔
55
        }
2,634✔
56
    }
2,634✔
57
}
58

59
/// Scan iterator to participate in a `MultiScan`.
60
pub struct MultiScanIterator {
61
    local_pool: LocalPool,
62
    polled_tasks: FuturesUnordered<ArrayFuture>,
63

64
    /// Thread-safe queue of closures that lazily produce [`ScanBuilder`] instances.
65
    /// This queue is shared across all iterators being created with `new_scan_iterator`.
66
    scan_builder_factory: ScanBuilderFactory,
67
    task_queue: SegQueue<ArrayFuture>,
68
}
69

70
impl MultiScanIterator {
71
    fn pop_scan_task(&self) -> Option<VortexResult<ArrayFuture>> {
3,410✔
72
        if let Some(array_future_tuple) = self.task_queue.pop() {
3,410✔
73
            return Some(Ok(array_future_tuple));
776✔
74
        }
2,634✔
75
        None
2,634✔
76
    }
3,410✔
77
}
78

79
impl Iterator for MultiScanIterator {
80
    type Item = VortexResult<ArrayRef>;
81

82
    fn next(&mut self) -> Option<VortexResult<ArrayRef>> {
3,410✔
83
        loop {
84
            // Queue up tasks if the thread local queue is almost empty.
85
            if self.task_queue.len() <= 4 {
3,410✔
86
                if let Some(scan_builder_fn) = self.scan_builder_factory.pop() {
3,222✔
87
                    let split_tasks = scan_builder_fn().build().ok()?.1;
376✔
88
                    for task in split_tasks {
1,152✔
89
                        self.task_queue.push(Box::pin(task));
776✔
90
                    }
776✔
91
                }
2,846✔
92
                // TODO(Alex): worksteal tasks from other threads
93
            }
188✔
94

95
            if let Some(work_result) = self.pop_scan_task() {
3,410✔
96
                match work_result {
776✔
97
                    Ok(future) => self.polled_tasks.push(future),
776✔
NEW
98
                    Err(e) => return Some(Err(e)),
×
99
                }
100
            }
2,634✔
101

102
            if self.task_queue.is_empty() && self.polled_tasks.is_empty() {
3,410✔
103
                // All tasks have been fully processed.
104
                return None;
2,634✔
105
            }
776✔
106

776✔
107
            let result = self.local_pool.run_until(async {
776✔
108
                while let Some(result) = self.polled_tasks.next().await {
776✔
109
                    match result {
776✔
110
                        Ok(Some(array)) => return Some(Ok(array)),
776✔
NEW
111
                        Ok(None) => continue,
×
NEW
112
                        Err(e) => return Some(Err(e)),
×
113
                    }
114
                }
NEW
115
                None
×
116
            });
776✔
117

118
            match result {
776✔
119
                Some(Ok(array)) => return Some(Ok(array)),
776✔
NEW
120
                Some(Err(e)) => return Some(Err(e)),
×
NEW
121
                None => continue, // Try next batch of futures
×
122
            }
123
        }
124
    }
3,410✔
125
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc