• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qubit-ltd / rs-progress / 3fd04109-8b83-4a5d-9403-b050209ee9fa

08 May 2026 03:58PM UTC coverage: 99.817% (+0.002%) from 99.815%
3fd04109-8b83-4a5d-9403-b050209ee9fa

push

circleci

Haixing-Hu
chore(release): bump crate version to 0.3.4

- update package version in Cargo.toml from 0.3.3 to 0.3.4
- refresh Cargo.lock package metadata to keep lockstep with crate version

545 of 546 relevant lines covered (99.82%)

9.16 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.96
/src/running/running_progress_loop.rs
1
/*******************************************************************************
2
 *
3
 *    Copyright (c) 2025 - 2026 Haixing Hu.
4
 *
5
 *    SPDX-License-Identifier: Apache-2.0
6
 *
7
 *    Licensed under the Apache License, Version 2.0.
8
 *
9
 ******************************************************************************/
10
use std::{
11
    sync::mpsc::{
12
        self,
13
        Receiver,
14
        RecvTimeoutError,
15
    },
16
    thread,
17
    time::Duration,
18
};
19

20
use crate::{
21
    Progress,
22
    model::ProgressCounters,
23
};
24

25
use super::{
26
    running_progress_notifier::RunningProgressNotifier,
27
    running_progress_signal::RunningProgressSignal,
28
    scoped_running_progress::ScopedRunningProgress,
29
};
30

31
/// Runs periodic `running` progress reports for work tracked elsewhere.
32
///
33
/// `RunningProgressLoop` is useful when worker threads update shared state and
34
/// a separate reporter thread should periodically emit `running` events. The
35
/// loop owns only the signal receiver. Callers provide a [`Progress`] instance
36
/// and a snapshot closure that converts their domain state into
37
/// [`ProgressCounters`].
38
///
39
/// Use [`Self::spawn_scoped`] when the reporter thread can be scoped to the
40
/// operation call. It returns a [`ScopedRunningProgress`] guard and cloneable
41
/// [`crate::RunningProgressPointHandle`] handles for workers. Use [`Self::channel`]
42
/// only when callers need to own the lower-level loop and notifier directly.
43
///
44
/// # Examples
45
///
46
/// ```
47
/// use std::{
48
///     sync::{
49
///         Arc,
50
///         atomic::{
51
///             AtomicUsize,
52
///             Ordering,
53
///         },
54
///     },
55
///     thread,
56
///     time::Duration,
57
/// };
58
///
59
/// use qubit_progress::{
60
///     NoOpProgressReporter,
61
///     Progress,
62
///     ProgressCounters,
63
///     RunningProgressLoop,
64
/// };
65
///
66
/// let reporter = NoOpProgressReporter;
67
/// let completed = Arc::new(AtomicUsize::new(0));
68
///
69
/// thread::scope(|scope| {
70
///     let loop_completed = Arc::clone(&completed);
71
///     let progress = Progress::new(&reporter, Duration::ZERO);
72
///     let running_progress =
73
///         RunningProgressLoop::spawn_scoped(scope, progress, move || {
74
///             // The background reporter thread does not own the operation
75
///             // state. It only reads a fresh counter snapshot when the
76
///             // interval is due or a worker sends a running point.
77
///             ProgressCounters::new(Some(3))
78
///                 .with_completed_count(loop_completed.load(Ordering::Acquire))
79
///         });
80
///     let progress_point_handle = running_progress.point_handle();
81
///
82
///     // Worker code updates domain state first, then wakes the loop. With a
83
///     // zero interval, each running point may emit a `running` event.
84
///     for _ in 0..3 {
85
///         completed.fetch_add(1, Ordering::AcqRel);
86
///         assert!(progress_point_handle.report());
87
///     }
88
///
89
///     // Stop the loop before leaving the scope. Reporter panics are
90
///     // propagated by `stop_and_join`.
91
///     running_progress.stop_and_join();
92
/// });
93
/// ```
94
///
95
/// # Author
96
///
97
/// Haixing Hu
98
pub struct RunningProgressLoop {
99
    /// Signal receiver owned by the reporter loop.
100
    signal_receiver: Receiver<RunningProgressSignal>,
101
}
102

103
/// Result of waiting for a running progress loop signal.
104
enum RunningProgressWait {
105
    /// A worker or stop signal was received.
106
    Signal(RunningProgressSignal),
107
    /// No signal arrived before the positive report interval elapsed.
108
    Timeout,
109
    /// All senders were dropped.
110
    Disconnected,
111
}
112

113
impl RunningProgressWait {
114
    /// Returns `true` when the running progress loop should call
115
    /// [`Progress::report_running_if_due`] after this wait result.
116
    #[inline]
117
    fn should_report(self) -> bool {
14✔
118
        match self {
14✔
119
            Self::Timeout => true,
6✔
120
            Self::Disconnected => false,
1✔
121
            Self::Signal(signal) => match signal {
7✔
122
                RunningProgressSignal::RunningPoint => true,
3✔
123
                RunningProgressSignal::Stop => false,
4✔
124
            },
125
        }
126
    }
14✔
127
}
128

129
impl RunningProgressLoop {
130
    /// Spawns a scoped reporter thread for running progress events.
131
    ///
132
    /// # Parameters
133
    ///
134
    /// * `scope` - Thread scope that owns the reporter thread.
135
    /// * `progress` - Progress run used by the reporter thread.
136
    /// * `snapshot` - Closure that returns current counters whenever a
137
    ///   `running` event may be due.
138
    ///
139
    /// # Returns
140
    ///
141
    /// A guard that can create worker point handles and stop the scoped
142
    /// reporter thread.
143
    pub fn spawn_scoped<'scope, 'env, 'progress, F>(
3✔
144
        scope: &'scope thread::Scope<'scope, 'env>,
3✔
145
        progress: Progress<'progress>,
3✔
146
        snapshot: F,
3✔
147
    ) -> ScopedRunningProgress<'scope>
3✔
148
    where
3✔
149
        'progress: 'scope,
3✔
150
        F: FnMut() -> ProgressCounters + Send + 'scope,
3✔
151
    {
152
        let report_points = progress.report_interval().is_zero();
3✔
153
        let (progress_loop, notifier) = Self::channel();
3✔
154
        let progress_thread = scope.spawn(move || {
3✔
155
            progress_loop.run(progress, snapshot);
3✔
156
        });
3✔
157
        ScopedRunningProgress::new(notifier, progress_thread, report_points)
3✔
158
    }
3✔
159

160
    /// Creates a paired running progress loop and notifier.
161
    ///
162
    /// # Returns
163
    ///
164
    /// A loop that owns the signal receiver and a notifier that sends wakeup or
165
    /// stop signals to that loop.
166
    pub fn channel() -> (Self, RunningProgressNotifier) {
8✔
167
        let (signal_sender, signal_receiver) = mpsc::channel();
8✔
168
        (
8✔
169
            Self { signal_receiver },
8✔
170
            RunningProgressNotifier { signal_sender },
8✔
171
        )
8✔
172
    }
8✔
173

174
    /// Runs until a stop signal is received or every notifier is dropped.
175
    ///
176
    /// # Parameters
177
    ///
178
    /// * `progress` - Progress run used to emit `running` events.
179
    /// * `snapshot` - Closure that returns the current counters whenever a
180
    ///   `running` event may be due.
181
    ///
182
    /// # Panics
183
    ///
184
    /// Propagates panics from the configured reporter when a `running` event is
185
    /// due.
186
    pub fn run<F>(self, mut progress: Progress<'_>, mut snapshot: F)
6✔
187
    where
6✔
188
        F: FnMut() -> ProgressCounters,
6✔
189
    {
190
        let report_interval = progress.report_interval();
6✔
191
        while self.receive_wait(report_interval).should_report() {
14✔
192
            progress.report_running_if_due(snapshot());
8✔
193
        }
8✔
194
    }
6✔
195

196
    /// Waits once on the signal channel and maps the outcome to [`RunningProgressWait`].
197
    ///
198
    /// The calling thread blocks until this wait completes.
199
    ///
200
    /// When `report_interval` is [`Duration::ZERO`], uses [`Receiver::recv`]: the call returns when a
201
    /// [`RunningProgressSignal`] is received or when every notifier sender has been dropped. In this
202
    /// mode no [`RunningProgressWait::Timeout`] is produced.
203
    ///
204
    /// When `report_interval` is positive, uses [`Receiver::recv_timeout`]: if no message arrives
205
    /// before the deadline, returns [`RunningProgressWait::Timeout`] so [`Self::run`] can drive periodic
206
    /// `running` progress; if a message arrives first, returns that signal wrapped in
207
    /// [`RunningProgressWait::Signal`], or [`RunningProgressWait::Disconnected`] if the channel closes.
208
    ///
209
    /// # Parameters
210
    ///
211
    /// * `report_interval` - Configured report interval from the [`Progress`] run passed to [`Self::run`];
212
    ///   [`Duration::ZERO`] selects unbounded waits (event-driven only), otherwise each wait is capped
213
    ///   by this duration and may time out.
214
    ///
215
    /// # Returns
216
    ///
217
    /// * [`RunningProgressWait::Signal`] - The next [`RunningProgressSignal`] from a notifier.
218
    /// * [`RunningProgressWait::Timeout`] - Only when `report_interval` is positive and the wait reached
219
    ///   the deadline without a message.
220
    /// * [`RunningProgressWait::Disconnected`] - The MPSC channel has no senders left.
221
    fn receive_wait(&self, report_interval: Duration) -> RunningProgressWait {
14✔
222
        if report_interval.is_zero() {
14✔
223
            return match self.signal_receiver.recv() {
6✔
224
                Ok(signal) => RunningProgressWait::Signal(signal),
5✔
225
                Err(_) => RunningProgressWait::Disconnected,
1✔
226
            };
227
        }
8✔
228
        match self.signal_receiver.recv_timeout(report_interval) {
8✔
229
            Ok(signal) => RunningProgressWait::Signal(signal),
2✔
230
            Err(RecvTimeoutError::Timeout) => RunningProgressWait::Timeout,
6✔
231
            Err(RecvTimeoutError::Disconnected) => RunningProgressWait::Disconnected,
×
232
        }
233
    }
14✔
234
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc