• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qubit-ltd / rs-progress / 872fdbf4-2cc4-4d47-92f9-6a54529f95aa

08 May 2026 05:07PM UTC coverage: 99.824%. Remained the same
872fdbf4-2cc4-4d47-92f9-6a54529f95aa

push

circleci

Haixing-Hu
chore: bump version to 0.4.1

568 of 569 relevant lines covered (99.82%)

9.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.96
/src/running/running_progress_loop.rs
1
/*******************************************************************************
2
 *
3
 *    Copyright (c) 2025 - 2026 Haixing Hu.
4
 *
5
 *    SPDX-License-Identifier: Apache-2.0
6
 *
7
 *    Licensed under the Apache License, Version 2.0.
8
 *
9
 ******************************************************************************/
10
use std::{
11
    sync::mpsc::{
12
        self,
13
        Receiver,
14
        RecvTimeoutError,
15
    },
16
    thread,
17
    time::Duration,
18
};
19

20
use crate::{
21
    Progress,
22
    model::ProgressCounters,
23
};
24

25
use super::{
26
    running_progress_guard::RunningProgressGuard,
27
    running_progress_notifier::RunningProgressNotifier,
28
    running_progress_signal::RunningProgressSignal,
29
};
30

31
/// Runs periodic `running` progress reports for work tracked elsewhere.
32
///
33
/// `RunningProgressLoop` is useful when worker threads update shared state and
34
/// a separate reporter thread should periodically emit `running` events. The
35
/// loop owns only the signal receiver. Callers provide a [`Progress`] instance
36
/// and a snapshot closure that converts their domain state into
37
/// [`ProgressCounters`].
38
///
39
/// `RunningProgressLoop` is an internal helper. Public callers should use
40
/// [`Progress::spawn_running_reporter`](crate::Progress::spawn_running_reporter).
41
///
42
/// # Examples
43
///
44
/// ```
45
/// use std::{
46
///     sync::{
47
///         Arc,
48
///         atomic::{
49
///             AtomicUsize,
50
///             Ordering,
51
///         },
52
///     },
53
///     thread,
54
///     time::Duration,
55
/// };
56
///
57
/// use qubit_progress::{
58
///     NoOpProgressReporter,
59
///     Progress,
60
///     ProgressCounters,
61
/// };
62
///
63
/// let reporter = NoOpProgressReporter;
64
/// let completed = Arc::new(AtomicUsize::new(0));
65
///
66
/// thread::scope(|scope| {
67
///     let loop_completed = Arc::clone(&completed);
68
///     let progress = Progress::new(&reporter, Duration::ZERO);
69
///     let running_progress =
70
///         progress.spawn_running_reporter(scope, move || {
71
///             // The background reporter thread does not own the operation
72
///             // state. It only reads a fresh counter snapshot when the
73
///             // interval is due or a worker sends a running point.
74
///             ProgressCounters::new(Some(3))
75
///                 .with_completed_count(loop_completed.load(Ordering::Acquire))
76
///         });
77
///     let progress_point_handle = running_progress.point_handle();
78
///
79
///     // Workers update domain state, then wake the loop. With a zero
80
///     // interval, each running point may emit a `running` event.
81
///     let mut handles = Vec::new();
82
///     for _ in 0..3 {
83
///         let c = Arc::clone(&completed);
84
///         let p = progress_point_handle.clone();
85
///         handles.push(scope.spawn(move || {
86
///             c.fetch_add(1, Ordering::AcqRel);
87
///             assert!(p.report());
88
///         }));
89
///     }
90
///     for h in handles {
91
///         h.join().unwrap();
92
///     }
93
///
94
///     // Stop the loop before leaving the scope. Reporter panics are
95
///     // propagated by `stop_and_join`.
96
///     running_progress.stop_and_join();
97
/// });
98
/// ```
99
///
100
/// # Author
101
///
102
/// Haixing Hu
103
pub(crate) struct RunningProgressLoop {
104
    /// Signal receiver owned by the reporter loop.
105
    signal_receiver: Receiver<RunningProgressSignal>,
106
}
107

108
/// Result of waiting for a running progress loop signal.
109
enum RunningProgressWait {
110
    /// A worker or stop signal was received.
111
    Signal(RunningProgressSignal),
112
    /// No signal arrived before the positive report interval elapsed.
113
    Timeout,
114
    /// All senders were dropped.
115
    Disconnected,
116
}
117

118
impl RunningProgressWait {
119
    /// Returns `true` when the running progress loop should call
120
    /// [`Progress::report_running_if_due`] after this wait result.
121
    #[inline]
122
    fn should_report(self) -> bool {
15✔
123
        match self {
15✔
124
            Self::Timeout => true,
6✔
125
            Self::Disconnected => false,
1✔
126
            Self::Signal(signal) => match signal {
8✔
127
                RunningProgressSignal::RunningPoint => true,
3✔
128
                RunningProgressSignal::Stop => false,
5✔
129
            },
130
        }
131
    }
15✔
132
}
133

134
impl RunningProgressLoop {
135
    /// Spawns a scoped reporter thread for running progress events.
136
    ///
137
    /// # Parameters
138
    ///
139
    /// * `scope` - Thread scope that owns the reporter thread.
140
    /// * `progress` - Progress run used by the reporter thread.
141
    /// * `snapshot` - Closure that returns current counters whenever a
142
    ///   `running` event may be due.
143
    ///
144
    /// # Returns
145
    ///
146
    /// A guard that can create worker point handles and stop the scoped
147
    /// reporter thread.
148
    pub(crate) fn spawn_scoped<'scope, 'env, 'progress, F>(
7✔
149
        scope: &'scope thread::Scope<'scope, 'env>,
7✔
150
        progress: Progress<'progress>,
7✔
151
        snapshot: F,
7✔
152
    ) -> RunningProgressGuard<'scope>
7✔
153
    where
7✔
154
        'progress: 'scope,
7✔
155
        F: FnMut() -> ProgressCounters + Send + 'scope,
7✔
156
    {
157
        let report_points = progress.report_interval().is_zero();
7✔
158
        let (progress_loop, notifier) = Self::channel();
7✔
159
        let progress_thread = scope.spawn(move || {
7✔
160
            progress_loop.run(progress, snapshot);
7✔
161
        });
7✔
162
        RunningProgressGuard::new(notifier, progress_thread, report_points)
7✔
163
    }
7✔
164

165
    /// Creates a paired running progress loop and notifier.
166
    ///
167
    /// # Returns
168
    ///
169
    /// A loop that owns the signal receiver and a notifier that sends wakeup or
170
    /// stop signals to that loop.
171
    pub(crate) fn channel() -> (Self, RunningProgressNotifier) {
7✔
172
        let (signal_sender, signal_receiver) = mpsc::channel();
7✔
173
        (
7✔
174
            Self { signal_receiver },
7✔
175
            RunningProgressNotifier { signal_sender },
7✔
176
        )
7✔
177
    }
7✔
178

179
    /// Runs until a stop signal is received or every notifier is dropped.
180
    ///
181
    /// # Parameters
182
    ///
183
    /// * `progress` - Progress run used to emit `running` events.
184
    /// * `snapshot` - Closure that returns the current counters whenever a
185
    ///   `running` event may be due.
186
    ///
187
    /// # Panics
188
    ///
189
    /// Propagates panics from the configured reporter when a `running` event is
190
    /// due.
191
    pub(crate) fn run<F>(self, mut progress: Progress<'_>, mut snapshot: F)
7✔
192
    where
7✔
193
        F: FnMut() -> ProgressCounters,
7✔
194
    {
195
        let report_interval = progress.report_interval();
7✔
196
        while self.receive_wait(report_interval).should_report() {
15✔
197
            progress.report_running_if_due(snapshot());
8✔
198
        }
8✔
199
    }
7✔
200

201
    /// Waits once on the signal channel and maps the outcome to [`RunningProgressWait`].
202
    ///
203
    /// The calling thread blocks until this wait completes.
204
    ///
205
    /// When `report_interval` is [`Duration::ZERO`], uses [`Receiver::recv`]: the call returns when a
206
    /// [`RunningProgressSignal`] is received or when every notifier sender has been dropped. In this
207
    /// mode no [`RunningProgressWait::Timeout`] is produced.
208
    ///
209
    /// When `report_interval` is positive, uses [`Receiver::recv_timeout`]: if no message arrives
210
    /// before the deadline, returns [`RunningProgressWait::Timeout`] so [`Self::run`] can drive periodic
211
    /// `running` progress; if a message arrives first, returns that signal wrapped in
212
    /// [`RunningProgressWait::Signal`], or [`RunningProgressWait::Disconnected`] if the channel closes.
213
    ///
214
    /// # Parameters
215
    ///
216
    /// * `report_interval` - Configured report interval from the [`Progress`] run passed to [`Self::run`];
217
    ///   [`Duration::ZERO`] selects unbounded waits (event-driven only), otherwise each wait is capped
218
    ///   by this duration and may time out.
219
    ///
220
    /// # Returns
221
    ///
222
    /// * [`RunningProgressWait::Signal`] - The next [`RunningProgressSignal`] from a notifier.
223
    /// * [`RunningProgressWait::Timeout`] - Only when `report_interval` is positive and the wait reached
224
    ///   the deadline without a message.
225
    /// * [`RunningProgressWait::Disconnected`] - The MPSC channel has no senders left.
226
    fn receive_wait(&self, report_interval: Duration) -> RunningProgressWait {
15✔
227
        if report_interval.is_zero() {
15✔
228
            return match self.signal_receiver.recv() {
7✔
229
                Ok(signal) => RunningProgressWait::Signal(signal),
6✔
230
                Err(_) => RunningProgressWait::Disconnected,
1✔
231
            };
232
        }
8✔
233
        match self.signal_receiver.recv_timeout(report_interval) {
8✔
234
            Ok(signal) => RunningProgressWait::Signal(signal),
2✔
235
            Err(RecvTimeoutError::Timeout) => RunningProgressWait::Timeout,
6✔
236
            Err(RecvTimeoutError::Disconnected) => RunningProgressWait::Disconnected,
×
237
        }
238
    }
15✔
239
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc