• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16346697784

17 Jul 2025 01:39PM UTC coverage: 80.707% (+0.003%) from 80.704%
16346697784

Pull #3901

github

web-flow
Merge d01e093a5 into d53d06603
Pull Request #3901: feat: duckdb global conversion cache

40 of 56 new or added lines in 6 files covered. (71.43%)

5 existing lines in 3 files now uncovered.

41878 of 51889 relevant lines covered (80.71%)

157474.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

85.94
/vortex-scan/src/multi_scan.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5

6
use crossbeam_queue::SegQueue;
7
use futures::executor::LocalPool;
8
use futures::future::BoxFuture;
9
use futures::stream::{FuturesUnordered, StreamExt};
10
use vortex_array::ArrayRef;
11
use vortex_error::VortexResult;
12

13
use crate::ScanBuilder;
14

15
type ArrayFuture<S> = BoxFuture<'static, VortexResult<Option<(ArrayRef, S)>>>;
16
type ScanBuilderFactory<S> =
17
    Arc<SegQueue<Box<(dyn FnOnce() -> ScanBuilder<(ArrayRef, S)> + Send + Sync)>>>;
18

19
/// Coordinator to orchestrate multiple scan operations.
20
///
21
/// `MultiScan` allows to queue multiple scan operations in order to execute
22
/// them in parallel. In particular, this enables scanning multiple files.
23
pub struct MultiScan<S> {
24
    scan_builder_factory: ScanBuilderFactory<S>,
25
}
26

27
impl<S> Default for MultiScan<S> {
NEW
28
    fn default() -> Self {
×
NEW
29
        Self::new()
×
NEW
30
    }
×
31
}
32

33
impl<S> MultiScan<S> {
34
    pub fn new() -> Self {
183✔
35
        Self {
183✔
36
            scan_builder_factory: Arc::new(SegQueue::new()),
183✔
37
        }
183✔
38
    }
183✔
39

40
    /// Add lazily constructed scan builders paired with their corresponding states.
41
    pub fn with_scan_builders<I, F>(self, closures: I) -> Self
183✔
42
    where
183✔
43
        F: FnOnce() -> ScanBuilder<(ArrayRef, S)> + 'static + Send + Sync,
183✔
44
        I: IntoIterator<Item = F>,
183✔
45
    {
46
        for closure in closures.into_iter() {
184✔
47
            self.scan_builder_factory.push(Box::new(closure));
184✔
48
        }
184✔
49

50
        self
183✔
51
    }
183✔
52

53
    /// Creates a new iterator to participate in the scan.
54
    ///
55
    /// The scan progresses when calling `next` on the iterator.
56
    pub fn new_scan_iterator(&self) -> MultiScanIterator<S> {
1,450✔
57
        MultiScanIterator {
1,450✔
58
            scan_builder_factory: self.scan_builder_factory.clone(),
1,450✔
59
            local_pool: LocalPool::new(),
1,450✔
60
            polled_tasks: FuturesUnordered::new(),
1,450✔
61
            task_queue: SegQueue::new(),
1,450✔
62
        }
1,450✔
63
    }
1,450✔
64
}
65

66
/// Scan iterator to participate in a `MultiScan`.
67
pub struct MultiScanIterator<S> {
68
    local_pool: LocalPool,
69
    polled_tasks: FuturesUnordered<ArrayFuture<S>>,
70

71
    /// Thread-safe queue of closures that lazily produce [`ScanBuilder`] instances.
72
    /// This queue is shared across all iterators being created with `new_scan_iterator`.
73
    scan_builder_factory: ScanBuilderFactory<S>,
74
    task_queue: SegQueue<ArrayFuture<S>>,
75
}
76

77
impl<S> MultiScanIterator<S> {
78
    fn pop_scan_task(&self) -> Option<VortexResult<ArrayFuture<S>>> {
2,034✔
79
        if let Some(task_with_state) = self.task_queue.pop() {
2,034✔
80
            return Some(Ok(task_with_state));
584✔
81
        }
1,450✔
82
        None
1,450✔
83
    }
2,034✔
84
}
85

86
impl<S: Send + Sync + 'static> Iterator for MultiScanIterator<S> {
87
    type Item = VortexResult<(ArrayRef, S)>;
88

89
    fn next(&mut self) -> Option<VortexResult<(ArrayRef, S)>> {
2,034✔
90
        loop {
91
            // Queue up tasks if the thread local queue is almost empty.
92
            if self.task_queue.len() <= 4 {
2,034✔
93
                if let Some(scan_builder_fn) = self.scan_builder_factory.pop() {
1,846✔
94
                    let split_tasks = scan_builder_fn().build().ok()?.1;
184✔
95
                    for task in split_tasks {
768✔
96
                        self.task_queue.push(Box::pin(task));
584✔
97
                    }
584✔
98
                }
1,662✔
99
                // TODO(Alex): worksteal tasks from other threads
100
            }
188✔
101

102
            if let Some(work_result) = self.pop_scan_task() {
2,034✔
103
                match work_result {
584✔
104
                    Ok(task) => {
584✔
105
                        self.polled_tasks.push(task);
584✔
106
                    }
584✔
UNCOV
107
                    Err(e) => return Some(Err(e)),
×
108
                }
109
            }
1,450✔
110

111
            if self.task_queue.is_empty() && self.polled_tasks.is_empty() {
2,034✔
112
                // All tasks have been fully processed.
113
                return None;
1,450✔
114
            }
584✔
115

116
            let result = self.local_pool.run_until(async {
584✔
117
                while let Some(result) = self.polled_tasks.next().await {
584✔
118
                    match result {
584✔
119
                        Ok(Some(array)) => return Some(Ok(array)),
584✔
120
                        Ok(None) => continue,
×
121
                        Err(e) => return Some(Err(e)),
×
122
                    }
123
                }
124
                None
×
125
            });
584✔
126

127
            match result {
584✔
128
                Some(Ok(array)) => {
584✔
129
                    return Some(Ok(array));
584✔
130
                }
131
                Some(Err(e)) => return Some(Err(e)),
×
132
                None => continue, // Try next batch of futures
×
133
            }
134
        }
135
    }
2,034✔
136
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc