• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16296457299

15 Jul 2025 02:42PM UTC coverage: 81.416% (-0.07%) from 81.486%
16296457299

Pull #3863

github

web-flow
Merge 285298a6f into 6564035a4
Pull Request #3863: chore: multi-file-scan thread-safe iterator

189 of 200 new or added lines in 3 files covered. (94.5%)

25 existing lines in 1 file now uncovered.

46319 of 56892 relevant lines covered (81.42%)

146414.86 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.92
/vortex-scan/src/iterator.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use core::panic;
5

6
use crossbeam_queue::SegQueue;
7
use dashmap::DashMap;
8
use futures::executor::LocalPool;
9
use futures::future::BoxFuture;
10
use futures::stream::{FuturesUnordered, StreamExt};
11
use vortex_array::ArrayRef;
12
use vortex_error::VortexResult;
13

14
use crate::ScanBuilder;
15

16
type ArrayFuture = BoxFuture<'static, VortexResult<Option<ArrayRef>>>;
17

18
pub struct MultiFileIterator {
19
    local_pools: DashMap<usize, LocalPool>,
20
    scan_builder_fns: SegQueue<Box<dyn FnOnce() -> ScanBuilder<ArrayRef>>>,
21
    polled_tasks: DashMap<usize, FuturesUnordered<ArrayFuture>>,
22
    task_queues: DashMap<usize, SegQueue<ArrayFuture>>,
23
}
24

25
impl MultiFileIterator {
26
    pub fn new(num_threads: usize) -> Self {
359✔
27
        let thread_queues = DashMap::new();
359✔
28
        let processed_tasks = DashMap::new();
359✔
29
        let local_pools = DashMap::new();
359✔
30
        for thread_id in 0..num_threads {
2,872✔
31
            thread_queues.insert(thread_id, SegQueue::new());
2,872✔
32
            processed_tasks.insert(thread_id, FuturesUnordered::new());
2,872✔
33
            local_pools.insert(thread_id, LocalPool::new());
2,872✔
34
        }
2,872✔
35

36
        Self {
359✔
37
            task_queues: thread_queues,
359✔
38
            scan_builder_fns: SegQueue::new(),
359✔
39
            local_pools,
359✔
40
            polled_tasks: processed_tasks,
359✔
41
        }
359✔
42
    }
359✔
43

44
    pub fn with_scan_builders<I, F>(self, closures: I) -> Self
183✔
45
    where
183✔
46
        F: FnOnce() -> ScanBuilder<ArrayRef> + 'static,
183✔
47
        I: IntoIterator<Item = F>,
183✔
48
    {
183✔
49
        for closure in closures.into_iter() {
184✔
50
            self.scan_builder_fns.push(Box::new(closure));
184✔
51
        }
184✔
52

53
        self
183✔
54
    }
183✔
55

56
    fn pop_scan_task(&self, preferred_thread: usize) -> Option<VortexResult<ArrayFuture>> {
3,410✔
57
        if let Some(queue) = self.task_queues.get(&preferred_thread) {
3,410✔
58
            if let Some(array_future_tuple) = queue.pop() {
3,410✔
59
                return Some(Ok(array_future_tuple));
776✔
60
            }
2,634✔
NEW
61
        }
×
62
        None
2,634✔
63
    }
3,410✔
64
}
65

66
impl MultiFileIterator {
67
    /// `next` is not implemented in terms of `impl Iterator` as `self`
68
    /// needs to be immutable in order to be shared across threads.
69
    pub fn next(&self, thread_id: usize) -> Option<VortexResult<ArrayRef>> {
3,410✔
70
        let Some(task_queue) = self.task_queues.get(&thread_id) else {
3,410✔
NEW
71
            panic!("Thread local queue not found");
×
72
        };
73

74
        let Some(mut polled_tasks) = self.polled_tasks.get_mut(&thread_id) else {
3,410✔
NEW
75
            panic!("Thread local processed tasks not found");
×
76
        };
77

78
        let Some(mut local_pool) = self.local_pools.get_mut(&thread_id) else {
3,410✔
NEW
79
            panic!("Thread local pool not found");
×
80
        };
81

82
        loop {
83
            // Queue up tasks if the thread local queue is almost empty.
84
            if task_queue.len() <= 4 {
3,410✔
85
                if let Some(scan_builder_fn) = self.scan_builder_fns.pop() {
3,222✔
86
                    let split_tasks = scan_builder_fn().build().ok()?.1;
376✔
87
                    for task in split_tasks {
1,152✔
88
                        task_queue.push(Box::pin(task));
776✔
89
                    }
776✔
90
                }
2,846✔
91
                // TODO(Alex): worksteal tasks from other threads
92
            }
188✔
93

94
            // Poll one future at a time. Polling multiple futures at
95
            // the same time leads to contention within a layout reader.
96
            if let Some(work_result) = self.pop_scan_task(thread_id) {
3,410✔
97
                match work_result {
776✔
98
                    Ok(future) => polled_tasks.push(future),
776✔
NEW
99
                    Err(e) => return Some(Err(e)),
×
100
                }
101
            }
2,634✔
102

103
            if task_queue.is_empty() && polled_tasks.is_empty() {
3,410✔
104
                // All tasks have been fully processed.
105
                return None;
2,634✔
106
            }
776✔
107

776✔
108
            let result = local_pool.run_until(async {
776✔
109
                while let Some(result) = polled_tasks.next().await {
776✔
110
                    match result {
776✔
111
                        Ok(Some(array)) => return Some(Ok(array)),
776✔
NEW
112
                        Ok(None) => continue,
×
NEW
113
                        Err(e) => return Some(Err(e)),
×
114
                    }
115
                }
NEW
116
                None
×
117
            });
776✔
118

119
            match result {
776✔
120
                Some(Ok(array)) => return Some(Ok(array)),
776✔
NEW
121
                Some(Err(e)) => return Some(Err(e)),
×
NEW
122
                None => continue, // Try next batch of futures
×
123
            }
124
        }
125
    }
3,410✔
126
}
127

128
unsafe impl Send for MultiFileIterator {}
129
unsafe impl Sync for MultiFileIterator {}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc