• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16366682730

18 Jul 2025 09:04AM UTC coverage: 81.031% (+0.05%) from 80.979%
16366682730

push

github

web-flow
feat: duckdb global conversion cache (#3901)

Signed-off-by: Alexander Droste <alexander.droste@protonmail.com>

44 of 47 new or added lines in 6 files covered. (93.62%)

2 existing lines in 2 files now uncovered.

42013 of 51848 relevant lines covered (81.03%)

168816.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.16
/vortex-scan/src/multi_scan.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5

6
use crossbeam_queue::SegQueue;
7
use futures::executor::LocalPool;
8
use futures::future::BoxFuture;
9
use futures::stream::{FuturesUnordered, StreamExt};
10
use vortex_error::VortexResult;
11

12
use crate::ScanBuilder;
13

14
type ArrayFuture<T> = BoxFuture<'static, VortexResult<Option<T>>>;
15
type ScanBuilderFactory<T> = Arc<SegQueue<Box<(dyn FnOnce() -> ScanBuilder<T> + Send + Sync)>>>;
16

17
/// Coordinator to orchestrate multiple scan operations.
18
///
19
/// `MultiScan` allows to queue multiple scan operations in order to execute
20
/// them in parallel. In particular, this enables scanning multiple files.
21
#[derive(Default)]
22
pub struct MultiScan<T> {
23
    scan_builder_factory: ScanBuilderFactory<T>,
24
}
25

26
impl<T> MultiScan<T> {
27
    pub fn new() -> Self {
183✔
28
        Self {
183✔
29
            scan_builder_factory: Arc::new(SegQueue::new()),
183✔
30
        }
183✔
31
    }
183✔
32

33
    /// Add lazily constructed scan builders paired with their corresponding states.
34
    pub fn with_scan_builders<I, F>(self, closures: I) -> Self
183✔
35
    where
183✔
36
        F: FnOnce() -> ScanBuilder<T> + 'static + Send + Sync,
183✔
37
        I: IntoIterator<Item = F>,
183✔
38
    {
39
        for closure in closures.into_iter() {
184✔
40
            self.scan_builder_factory.push(Box::new(closure));
184✔
41
        }
184✔
42

43
        self
183✔
44
    }
183✔
45

46
    /// Creates a new iterator to participate in the scan.
47
    ///
48
    /// The scan progresses when calling `next` on the iterator.
49
    pub fn new_scan_iterator(&self) -> MultiScanIterator<T> {
1,450✔
50
        MultiScanIterator {
1,450✔
51
            scan_builder_factory: self.scan_builder_factory.clone(),
1,450✔
52
            local_pool: LocalPool::new(),
1,450✔
53
            polled_tasks: FuturesUnordered::new(),
1,450✔
54
            task_queue: SegQueue::new(),
1,450✔
55
        }
1,450✔
56
    }
1,450✔
57
}
58

59
/// Scan iterator to participate in a `MultiScan`.
60
pub struct MultiScanIterator<T> {
61
    local_pool: LocalPool,
62
    polled_tasks: FuturesUnordered<ArrayFuture<T>>,
63

64
    /// Thread-safe queue of closures that lazily produce [`ScanBuilder`] instances.
65
    /// This queue is shared across all iterators being created with `new_scan_iterator`.
66
    scan_builder_factory: ScanBuilderFactory<T>,
67
    task_queue: SegQueue<ArrayFuture<T>>,
68
}
69

70
impl<T> MultiScanIterator<T> {
71
    fn pop_scan_task(&self) -> Option<VortexResult<ArrayFuture<T>>> {
2,034✔
72
        if let Some(task_with_state) = self.task_queue.pop() {
2,034✔
73
            return Some(Ok(task_with_state));
584✔
74
        }
1,450✔
75
        None
1,450✔
76
    }
2,034✔
77
}
78

79
impl<T: Send + Sync + 'static> Iterator for MultiScanIterator<T> {
80
    type Item = VortexResult<T>;
81

82
    fn next(&mut self) -> Option<VortexResult<T>> {
2,034✔
83
        loop {
84
            // Queue up tasks if the thread local queue is almost empty.
85
            if self.task_queue.len() <= 4 {
2,034✔
86
                if let Some(scan_builder_fn) = self.scan_builder_factory.pop() {
1,846✔
87
                    let split_tasks = scan_builder_fn().build().ok()?.1;
184✔
88
                    for task in split_tasks {
768✔
89
                        self.task_queue.push(Box::pin(task));
584✔
90
                    }
584✔
91
                }
1,662✔
92
                // TODO(Alex): worksteal tasks from other threads
93
            }
188✔
94

95
            if let Some(work_result) = self.pop_scan_task() {
2,034✔
96
                match work_result {
584✔
97
                    Ok(task) => {
584✔
98
                        self.polled_tasks.push(task);
584✔
99
                    }
584✔
UNCOV
100
                    Err(e) => return Some(Err(e)),
×
101
                }
102
            }
1,450✔
103

104
            if self.task_queue.is_empty() && self.polled_tasks.is_empty() {
2,034✔
105
                // All tasks have been fully processed.
106
                return None;
1,450✔
107
            }
584✔
108

109
            let result = self.local_pool.run_until(async {
584✔
110
                while let Some(result) = self.polled_tasks.next().await {
584✔
111
                    match result {
584✔
112
                        Ok(Some(array)) => return Some(Ok(array)),
584✔
113
                        Ok(None) => continue,
×
114
                        Err(e) => return Some(Err(e)),
×
115
                    }
116
                }
117
                None
×
118
            });
584✔
119

120
            match result {
584✔
121
                Some(Ok(array)) => {
584✔
122
                    return Some(Ok(array));
584✔
123
                }
124
                Some(Err(e)) => return Some(Err(e)),
×
125
                None => continue, // Try next batch of futures
×
126
            }
127
        }
128
    }
2,034✔
129
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc