• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16312033955

16 Jul 2025 06:23AM UTC coverage: 81.567% (-0.07%) from 81.636%
16312033955

Pull #3863

github

web-flow
Merge 43dae2416 into 29d232388
Pull Request #3863: feature: multi-file-scan thread-safe iterator

192 of 203 new or added lines in 3 files covered. (94.58%)

25 existing lines in 1 file now uncovered.

46778 of 57349 relevant lines covered (81.57%)

145096.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.92
/vortex-scan/src/iterator.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use core::panic;
5

6
use crossbeam_queue::SegQueue;
7
use dashmap::DashMap;
8
use futures::executor::LocalPool;
9
use futures::future::BoxFuture;
10
use futures::stream::{FuturesUnordered, StreamExt};
11
use vortex_array::ArrayRef;
12
use vortex_error::VortexResult;
13

14
use crate::ScanBuilder;
15

16
type ArrayFuture = BoxFuture<'static, VortexResult<Option<ArrayRef>>>;
17

18
// struct MultiScan<S, A> {
19
//     readers: SegQueue<Box<dyn FnOnce() -> (S, ScanBuilder<A>)>>,
20
//     scan_state: Vec<Option<S>>,
21
// }
22

23
// impl MultiScan {
24
//   fn into_stream(self) -> impl Stream<Item = (S, A)> {}
25
// }
26

27
pub struct MultiFileIterator {
28
    local_pools: DashMap<usize, LocalPool>,
29
    scan_builder_fns: SegQueue<Box<dyn FnOnce() -> ScanBuilder<ArrayRef>>>,
30
    polled_tasks: DashMap<usize, FuturesUnordered<ArrayFuture>>,
31
    task_queues: DashMap<usize, SegQueue<ArrayFuture>>,
32
}
33

34
impl MultiFileIterator {
35
    pub fn new(num_threads: usize) -> Self {
359✔
36
        let task_queues = DashMap::new();
359✔
37
        let polled_tasks = DashMap::new();
359✔
38
        let local_pools = DashMap::new();
359✔
39
        for thread_id in 0..num_threads {
2,872✔
40
            task_queues.insert(thread_id, SegQueue::new());
2,872✔
41
            polled_tasks.insert(thread_id, FuturesUnordered::new());
2,872✔
42
            local_pools.insert(thread_id, LocalPool::new());
2,872✔
43
        }
2,872✔
44

45
        Self {
359✔
46
            task_queues,
359✔
47
            scan_builder_fns: SegQueue::new(),
359✔
48
            local_pools,
359✔
49
            polled_tasks,
359✔
50
        }
359✔
51
    }
359✔
52

53
    pub fn with_scan_builders<I, F>(self, closures: I) -> Self
183✔
54
    where
183✔
55
        F: FnOnce() -> ScanBuilder<ArrayRef> + 'static,
183✔
56
        I: IntoIterator<Item = F>,
183✔
57
    {
183✔
58
        for closure in closures.into_iter() {
184✔
59
            self.scan_builder_fns.push(Box::new(closure));
184✔
60
        }
184✔
61

62
        self
183✔
63
    }
183✔
64

65
    fn pop_scan_task(&self, preferred_thread: usize) -> Option<VortexResult<ArrayFuture>> {
3,410✔
66
        if let Some(queue) = self.task_queues.get(&preferred_thread) {
3,410✔
67
            if let Some(array_future_tuple) = queue.pop() {
3,410✔
68
                return Some(Ok(array_future_tuple));
776✔
69
            }
2,634✔
NEW
70
        }
×
71
        None
2,634✔
72
    }
3,410✔
73
}
74

75
impl MultiFileIterator {
76
    /// `next` is not implemented in terms of `impl Iterator` as `self`
77
    /// needs to be immutable in order to be shared across threads.
78
    pub fn next(&self, thread_id: usize) -> Option<VortexResult<ArrayRef>> {
3,410✔
79
        let Some(task_queue) = self.task_queues.get(&thread_id) else {
3,410✔
NEW
80
            panic!("Thread local queue not found");
×
81
        };
82

83
        let Some(mut polled_tasks) = self.polled_tasks.get_mut(&thread_id) else {
3,410✔
NEW
84
            panic!("Thread local processed tasks not found");
×
85
        };
86

87
        let Some(mut local_pool) = self.local_pools.get_mut(&thread_id) else {
3,410✔
NEW
88
            panic!("Thread local pool not found");
×
89
        };
90

91
        loop {
92
            // Queue up tasks if the thread local queue is almost empty.
93
            if task_queue.len() <= 4 {
3,410✔
94
                if let Some(scan_builder_fn) = self.scan_builder_fns.pop() {
3,222✔
95
                    let split_tasks = scan_builder_fn().build().ok()?.1;
376✔
96
                    for task in split_tasks {
1,152✔
97
                        task_queue.push(Box::pin(task));
776✔
98
                    }
776✔
99
                }
2,846✔
100
                // TODO(Alex): worksteal tasks from other threads
101
            }
188✔
102

103
            // Poll one future at a time. Polling multiple futures at
104
            // the same time leads to contention within a layout reader.
105
            if let Some(work_result) = self.pop_scan_task(thread_id) {
3,410✔
106
                match work_result {
776✔
107
                    Ok(future) => polled_tasks.push(future),
776✔
NEW
108
                    Err(e) => return Some(Err(e)),
×
109
                }
110
            }
2,634✔
111

112
            if task_queue.is_empty() && polled_tasks.is_empty() {
3,410✔
113
                // All tasks have been fully processed.
114
                return None;
2,634✔
115
            }
776✔
116

776✔
117
            let result = local_pool.run_until(async {
776✔
118
                while let Some(result) = polled_tasks.next().await {
776✔
119
                    match result {
776✔
120
                        Ok(Some(array)) => return Some(Ok(array)),
776✔
NEW
121
                        Ok(None) => continue,
×
NEW
122
                        Err(e) => return Some(Err(e)),
×
123
                    }
124
                }
NEW
125
                None
×
126
            });
776✔
127

128
            match result {
776✔
129
                Some(Ok(array)) => return Some(Ok(array)),
776✔
NEW
130
                Some(Err(e)) => return Some(Err(e)),
×
NEW
131
                None => continue, // Try next batch of futures
×
132
            }
133
        }
134
    }
3,410✔
135
}
136

137
unsafe impl Send for MultiFileIterator {}
138
unsafe impl Sync for MultiFileIterator {}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc