• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

vortex-data / vortex / 16321963551

16 Jul 2025 02:14PM UTC coverage: 81.557% (-0.005%) from 81.562%
16321963551

push

github

web-flow
chore[vortex-scan]: clippy default `MultiScan` (#3889)

Signed-off-by: Joe Isaacs <joe.isaacs@live.co.uk>

1 of 2 new or added lines in 2 files covered. (50.0%)

2 existing lines in 1 file now uncovered.

46758 of 57332 relevant lines covered (81.56%)

145308.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

84.48
/vortex-scan/src/multi_scan.rs
1
// SPDX-License-Identifier: Apache-2.0
2
// SPDX-FileCopyrightText: Copyright the Vortex contributors
3

4
use std::sync::Arc;
5

6
use crossbeam_queue::SegQueue;
7
use futures::executor::LocalPool;
8
use futures::future::BoxFuture;
9
use futures::stream::{FuturesUnordered, StreamExt};
10
use vortex_array::ArrayRef;
11
use vortex_error::VortexResult;
12

13
use crate::ScanBuilder;
14

15
type ArrayFuture = BoxFuture<'static, VortexResult<Option<ArrayRef>>>;
16
type ScanBuilderFactory = Arc<SegQueue<Box<dyn FnOnce() -> ScanBuilder<ArrayRef> + Send + Sync>>>;
17

18
/// Coordinator to orchestrate multiple scan operations.
19
///
20
/// `MultiScan` allows to queue multiple scan operations in order to execute
21
/// them in parallel. In particular, this enables scanning multiple files.
22
#[derive(Default)]
23
pub struct MultiScan {
24
    scan_builder_factory: ScanBuilderFactory,
25
}
26

27
impl MultiScan {
UNCOV
28
    pub fn new() -> Self {
×
NEW
29
        Self::default()
×
UNCOV
30
    }
×
31

32
    /// `ScanBuilder`s are passed through closures to decouple how the they are created.
33
    pub fn with_scan_builders<I, F>(self, closures: I) -> Self
183✔
34
    where
183✔
35
        F: FnOnce() -> ScanBuilder<ArrayRef> + 'static + Send + Sync,
183✔
36
        I: IntoIterator<Item = F>,
183✔
37
    {
183✔
38
        for closure in closures.into_iter() {
184✔
39
            self.scan_builder_factory.push(Box::new(closure));
184✔
40
        }
184✔
41

42
        self
183✔
43
    }
183✔
44

45
    /// Creates a new iterator to participate in the scan.
46
    ///
47
    /// The scan progresses when calling `next` on the iterator.
48
    pub fn new_scan_iterator(&self) -> MultiScanIterator {
2,634✔
49
        MultiScanIterator {
2,634✔
50
            scan_builder_factory: self.scan_builder_factory.clone(),
2,634✔
51
            local_pool: LocalPool::new(),
2,634✔
52
            polled_tasks: FuturesUnordered::new(),
2,634✔
53
            task_queue: SegQueue::new(),
2,634✔
54
        }
2,634✔
55
    }
2,634✔
56
}
57

58
/// Scan iterator to participate in a `MultiScan`.
59
pub struct MultiScanIterator {
60
    local_pool: LocalPool,
61
    polled_tasks: FuturesUnordered<ArrayFuture>,
62

63
    /// Thread-safe queue of closures that lazily produce [`ScanBuilder`] instances.
64
    /// This queue is shared across all iterators being created with `new_scan_iterator`.
65
    scan_builder_factory: ScanBuilderFactory,
66
    task_queue: SegQueue<ArrayFuture>,
67
}
68

69
impl MultiScanIterator {
70
    fn pop_scan_task(&self) -> Option<VortexResult<ArrayFuture>> {
3,410✔
71
        if let Some(array_future_tuple) = self.task_queue.pop() {
3,410✔
72
            return Some(Ok(array_future_tuple));
776✔
73
        }
2,634✔
74
        None
2,634✔
75
    }
3,410✔
76
}
77

78
impl Iterator for MultiScanIterator {
79
    type Item = VortexResult<ArrayRef>;
80

81
    fn next(&mut self) -> Option<VortexResult<ArrayRef>> {
3,410✔
82
        loop {
83
            // Queue up tasks if the thread local queue is almost empty.
84
            if self.task_queue.len() <= 4 {
3,410✔
85
                if let Some(scan_builder_fn) = self.scan_builder_factory.pop() {
3,222✔
86
                    let split_tasks = scan_builder_fn().build().ok()?.1;
376✔
87
                    for task in split_tasks {
1,152✔
88
                        self.task_queue.push(Box::pin(task));
776✔
89
                    }
776✔
90
                }
2,846✔
91
                // TODO(Alex): worksteal tasks from other threads
92
            }
188✔
93

94
            if let Some(work_result) = self.pop_scan_task() {
3,410✔
95
                match work_result {
776✔
96
                    Ok(future) => self.polled_tasks.push(future),
776✔
97
                    Err(e) => return Some(Err(e)),
×
98
                }
99
            }
2,634✔
100

101
            if self.task_queue.is_empty() && self.polled_tasks.is_empty() {
3,410✔
102
                // All tasks have been fully processed.
103
                return None;
2,634✔
104
            }
776✔
105

776✔
106
            let result = self.local_pool.run_until(async {
776✔
107
                while let Some(result) = self.polled_tasks.next().await {
776✔
108
                    match result {
776✔
109
                        Ok(Some(array)) => return Some(Ok(array)),
776✔
110
                        Ok(None) => continue,
×
111
                        Err(e) => return Some(Err(e)),
×
112
                    }
113
                }
114
                None
×
115
            });
776✔
116

117
            match result {
776✔
118
                Some(Ok(array)) => return Some(Ok(array)),
776✔
119
                Some(Err(e)) => return Some(Err(e)),
×
120
                None => continue, // Try next batch of futures
×
121
            }
122
        }
123
    }
3,410✔
124
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc