• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qubit-ltd / rs-batch / 462cbb8b-595b-48a9-8dc9-23e7b38a6fb6

06 May 2026 10:33AM UTC coverage: 99.851% (-0.04%) from 99.888%
462cbb8b-595b-48a9-8dc9-23e7b38a6fb6

push

circleci

Haixing-Hu
refactor(api)!: consolidate batch outcome model

Remove the legacy BatchExecutionResult types and keep BatchOutcome as the single execution result model.

Make BatchOutcome construction go through validated builders, keep BatchExecutionState internal, and reject duplicate failure indexes.

14 of 14 new or added lines in 3 files covered. (100.0%)

1 existing line in 1 file now uncovered.

671 of 672 relevant lines covered (99.85%)

154.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.96
/src/executor/batch_executor.rs
1
/*******************************************************************************
2
 *
3
 *    Copyright (c) 2025 - 2026 Haixing Hu.
4
 *
5
 *    SPDX-License-Identifier: Apache-2.0
6
 *
7
 *    Licensed under the Apache License, Version 2.0.
8
 *
9
 ******************************************************************************/
10
use std::fmt::Debug;
11
use std::sync::Arc;
12

13
use crossbeam_queue::SegQueue;
14
use qubit_function::{
15
    Callable,
16
    Runnable,
17
};
18

19
use crate::{
20
    BatchExecutionError,
21
    BatchOutcome,
22
};
23

24
use super::{
25
    BatchCallResult,
26
    callable_task::CallableTask,
27
    for_each_task::ForEachTask,
28
};
29

30
/// Executes batches of fallible runnable tasks.
31
///
32
pub trait BatchExecutor: Send + Sync {
33
    /// Executes a batch of runnable tasks.
34
    ///
35
    /// # Parameters
36
    ///
37
    /// * `tasks` - Task source for the batch. It may be eager or lazy.
38
    /// * `count` - Declared number of tasks expected from `tasks`.
39
    ///
40
    /// # Returns
41
    ///
42
    /// `Ok(BatchOutcome)` when the declared task count matches the
43
    /// source, or `Err(BatchExecutionError)` when the source yields fewer or
44
    /// more tasks than declared.
45
    ///
46
    /// # Errors
47
    ///
48
    /// Returns [`BatchExecutionError`] when the source task count does not
49
    /// match `count`.
50
    ///
51
    /// # Panics
52
    ///
53
    /// Panics from individual tasks are captured in [`BatchOutcome`].
54
    /// Panics from the configured [`crate::ProgressReporter`] are propagated to
55
    /// the caller.
56
    fn execute<T, E, I>(
57
        &self,
58
        tasks: I,
59
        count: usize,
60
    ) -> Result<BatchOutcome<E>, BatchExecutionError<E>>
61
    where
62
        I: IntoIterator<Item = T>,
63
        T: Runnable<E> + Send,
64
        E: Send + Debug;
65

66
    /// Executes a batch of callable tasks and collects success values by index.
67
    ///
68
    /// # Parameters
69
    ///
70
    /// * `tasks` - Callable task source for the batch.
71
    /// * `count` - Declared number of callables expected from `tasks`.
72
    ///
73
    /// # Returns
74
    ///
75
    /// A [`BatchCallResult`] containing the normal execution summary plus
76
    /// optional success values indexed by callable position.
77
    ///
78
    /// # Errors
79
    ///
80
    /// Returns [`BatchExecutionError`] when the source callable count does not
81
    /// match `count`.
82
    ///
83
    /// # Panics
84
    ///
85
    /// Panics from individual callables are captured in the execution result.
86
    /// Panics from the configured [`crate::ProgressReporter`] are propagated to
87
    /// the caller.
88
    fn call<C, R, E, I>(
5✔
89
        &self,
5✔
90
        tasks: I,
5✔
91
        count: usize,
5✔
92
    ) -> Result<BatchCallResult<R, E>, BatchExecutionError<E>>
5✔
93
    where
5✔
94
        I: IntoIterator<Item = C>,
5✔
95
        C: Callable<R, E> + Send,
5✔
96
        R: Send,
5✔
97
        E: Send + Debug,
5✔
98
    {
99
        let outputs = Arc::new(SegQueue::new());
5✔
100
        // This adapter is lazy: callables are wrapped as runnable tasks only
101
        // when the executor consumes the iterator. The callables themselves are
102
        // still executed later by `CallableTask::run`.
103
        let runnable_tasks = tasks.into_iter().enumerate().map({
5✔
104
            let outputs = Arc::clone(&outputs);
5✔
105
            move |(index, callable)| CallableTask::new(callable, index, Arc::clone(&outputs))
2,057✔
106
        });
107
        let outcome = self.execute(runnable_tasks, count)?;
5✔
108
        let values = collect_call_outputs(outputs, count);
5✔
109
        Ok(BatchCallResult::new(outcome, values))
5✔
110
    }
5✔
111

112
    /// Applies `action` to every `item` by executing a derived task batch.
113
    ///
114
    /// # Parameters
115
    ///
116
    /// * `items` - Item source to transform into runnable tasks.
117
    /// * `count` - Declared number of items expected from `items`.
118
    /// * `action` - Fallible action applied to each item.
119
    ///
120
    /// # Returns
121
    ///
122
    /// The result returned by [`Self::execute`] for the derived task batch.
123
    ///
124
    /// # Errors
125
    ///
126
    /// Returns [`BatchExecutionError`] when the source item count does not
127
    /// match `count`.
128
    fn for_each<Item, E, I, F>(
1✔
129
        &self,
1✔
130
        items: I,
1✔
131
        count: usize,
1✔
132
        action: F,
1✔
133
    ) -> Result<BatchOutcome<E>, BatchExecutionError<E>>
1✔
134
    where
1✔
135
        I: IntoIterator<Item = Item>,
1✔
136
        Item: Send,
1✔
137
        F: Fn(Item) -> Result<(), E> + Send + Sync,
1✔
138
        E: Send + Debug,
1✔
139
    {
140
        let action = Arc::new(action);
1✔
141
        let tasks = items
1✔
142
            .into_iter()
1✔
143
            .map(move |item| ForEachTask::new(item, Arc::clone(&action)));
4✔
144
        self.execute(tasks, count)
1✔
145
    }
1✔
146
}
147

148
/// Consumes shared callable outputs into an indexed value vector.
149
///
150
/// # Parameters
151
///
152
/// * `outputs` - Shared output queue filled by callable wrappers.
153
/// * `count` - Declared callable count used to size the result vector.
154
///
155
/// # Returns
156
///
157
/// Optional success values indexed by callable position.
158
///
159
/// # Panics
160
///
161
/// Panics if callable wrappers still hold references to `outputs`, or if a
162
/// queued output index is outside the declared batch size.
163
fn collect_call_outputs<R>(outputs: Arc<SegQueue<(usize, R)>>, count: usize) -> Vec<Option<R>> {
5✔
164
    let outputs = match Arc::try_unwrap(outputs) {
5✔
165
        Ok(outputs) => outputs,
5✔
UNCOV
166
        Err(_) => panic!("callable output queue should have a single owner after execution"),
×
167
    };
168
    let mut values = Vec::with_capacity(count);
5✔
169
    values.resize_with(count, || None);
5✔
170
    while let Some((index, value)) = outputs.pop() {
2,060✔
171
        let slot = values
2,055✔
172
            .get_mut(index)
2,055✔
173
            .expect("callable index must be within the declared count");
2,055✔
174
        *slot = Some(value);
2,055✔
175
    }
2,055✔
176
    values
5✔
177
}
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc