• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MitMaro / git-interactive-rebase-tool / 13798245227

11 Mar 2025 09:09PM UTC coverage: 97.311% (-0.01%) from 97.325%
13798245227

push

github

MitMaro
Fix ordering in build.rs

4596 of 4723 relevant lines covered (97.31%)

2.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.79
/src/runtime/runtime.rs
1
use std::{
2
        fmt::{Debug, Formatter},
3
        sync::Arc,
4
        thread,
5
};
6

7
use crossbeam_channel::{Receiver, Sender, unbounded};
8
use parking_lot::Mutex;
9

10
use crate::runtime::{Installer, RuntimeError, Status, ThreadStatuses, Threadable};
11

12
const RUNTIME_THREAD_NAME: &str = "runtime";
13

14
/// A system the manages the lifetime of threads. This includes ensuring errors are handled, threads are paused and
15
/// resumed on request and that once the main application is completed, all threads complete and end.
16
pub(crate) struct Runtime<'runtime> {
17
        receiver: Receiver<(String, Status)>,
18
        sender: Sender<(String, Status)>,
19
        thread_statuses: ThreadStatuses,
20
        threadables: Arc<Mutex<Vec<&'runtime mut dyn Threadable>>>,
21
}
22

23
impl<'runtime> Runtime<'runtime> {
24
        /// Create a new instances of the `Runtime`.
25
        #[must_use]
26
        pub(crate) fn new(thread_statuses: ThreadStatuses) -> Self {
1✔
27
                let (sender, receiver) = unbounded();
2✔
28

29
                thread_statuses.register_thread(RUNTIME_THREAD_NAME, Status::Waiting);
1✔
30

31
                Self {
32
                        receiver,
33
                        sender,
34
                        thread_statuses,
35
                        threadables: Arc::new(Mutex::new(vec![])),
2✔
36
                }
37
        }
38

39
        /// Register a new `Threadable`.
40
        pub(crate) fn register(&self, threadable: &'runtime mut (dyn Threadable)) {
1✔
41
                self.threadables.lock().push(threadable);
1✔
42
        }
43

44
        /// Join the runtime thread, waiting for all threads to finish.
45
        ///
46
        /// # Errors
47
        /// Returns and error if any of the threads registered to the runtime produce an error.
48
        #[expect(
49
                clippy::iter_over_hash_type,
50
                reason = "Iteration order does not matter in this situation."
51
        )]
52
        pub(crate) fn join(&self) -> Result<(), RuntimeError> {
1✔
53
                let installer = Installer::new(self.thread_statuses.clone(), self.sender.clone());
1✔
54
                {
55
                        let threadables = self.threadables.lock();
2✔
56
                        for threadable in threadables.iter() {
2✔
57
                                threadable.install(&installer);
2✔
58
                        }
59
                }
60
                let mut handles = vec![];
1✔
61

62
                for (name, op) in installer.into_ops().drain() {
5✔
63
                        handles.push(
1✔
64
                                thread::Builder::new()
4✔
65
                                        .name(String::from(name.as_str()))
2✔
66
                                        .spawn(op)
1✔
67
                                        .map_err(|_err| RuntimeError::ThreadSpawnError(name))?,
1✔
68
                        );
69
                }
70

71
                let mut result = Ok(());
1✔
72

73
                for (name, status) in &self.receiver {
2✔
74
                        match status {
1✔
75
                                Status::Error(err) => {
1✔
76
                                        // since we entered an error state, we attempt to shutdown the other threads, but
77
                                        // they could fail due to the error state, but keeping the shutdown error is less
78
                                        // important than the original error.
79
                                        let _result = self.shutdown();
1✔
80
                                        result = Err(err);
2✔
81
                                        break;
×
82
                                },
83
                                Status::RequestPause => {
×
84
                                        for threadable in self.threadables.lock().iter() {
3✔
85
                                                threadable.pause();
2✔
86
                                        }
87
                                },
88
                                Status::RequestResume => {
×
89
                                        for threadable in self.threadables.lock().iter() {
3✔
90
                                                threadable.resume();
2✔
91
                                        }
92
                                },
93
                                Status::RequestEnd => {
×
94
                                        self.thread_statuses.update_thread(RUNTIME_THREAD_NAME, Status::Ended);
1✔
95
                                        for threadable in self.threadables.lock().iter() {
2✔
96
                                                threadable.end();
2✔
97
                                        }
98
                                },
99
                                Status::New | Status::Busy | Status::Waiting | Status::Ended => {},
×
100
                        }
101

102
                        self.thread_statuses.update_thread(name.as_str(), status);
2✔
103

104
                        if self.thread_statuses.all_ended() {
1✔
105
                                result = self.shutdown();
2✔
106
                                break;
×
107
                        }
108
                }
109

110
                while let Some(handle) = handles.pop() {
2✔
111
                        let _result = handle.join();
2✔
112
                }
113

114
                result
1✔
115
        }
116

117
        fn shutdown(&self) -> Result<(), RuntimeError> {
1✔
118
                if self.thread_statuses.all_ended() {
1✔
119
                        return Ok(());
1✔
120
                }
121

122
                for threadable in self.threadables.lock().iter() {
2✔
123
                        threadable.end();
2✔
124
                }
125
                self.sender
1✔
126
                        .send((String::from(RUNTIME_THREAD_NAME), Status::Ended))
1✔
127
                        .map_err(|_err| RuntimeError::SendError)
×
128
        }
129
}
130

131
impl Debug for Runtime<'_> {
132
        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1✔
133
                f.write_str("Runtime {{ ... }}")
1✔
134
        }
135
}
136

137
#[cfg(test)]
138
mod tests {
139
        use std::{
140
                sync::atomic::{AtomicBool, Ordering},
141
                thread::sleep,
142
                time::Duration,
143
        };
144

145
        use claims::assert_err;
146

147
        use super::*;
148

149
        impl Runtime<'_> {
150
                /// Get a cloned copy of the `ThreadStatuses`.
151
                #[must_use]
152
                pub(crate) fn statuses(&self) -> ThreadStatuses {
153
                        self.thread_statuses.clone()
154
                }
155
        }
156

157
        #[test]
158
        fn run_thread_finish() {
159
                struct Thread;
160

161
                impl Thread {
162
                        const fn new() -> Self {
163
                                Self {}
164
                        }
165
                }
166

167
                impl Threadable for Thread {
168
                        fn install(&self, installer: &Installer) {
169
                                installer.spawn("name", |notifier| {
170
                                        move || {
171
                                                notifier.end();
172
                                                notifier.request_end();
173
                                        }
174
                                });
175
                        }
176
                }
177

178
                let runtime = Runtime::new(ThreadStatuses::new());
179
                let mut thread = Thread::new();
180
                runtime.register(&mut thread);
181
                runtime.join().unwrap();
182
                assert!(runtime.statuses().all_ended());
183
        }
184

185
        #[test]
186
        fn run_thread_error() {
187
                struct Thread1;
188

189
                impl Thread1 {
190
                        const fn new() -> Self {
191
                                Self {}
192
                        }
193
                }
194

195
                impl Threadable for Thread1 {
196
                        fn install(&self, installer: &Installer) {
197
                                installer.spawn("name0", |notifier| {
198
                                        move || {
199
                                                notifier.error(RuntimeError::ThreadError(String::from("error")));
200
                                        }
201
                                });
202
                        }
203
                }
204

205
                struct Thread2 {
206
                        ended: Arc<AtomicBool>,
207
                }
208

209
                impl Thread2 {
210
                        fn new() -> Self {
211
                                Self {
212
                                        ended: Arc::new(AtomicBool::new(false)),
213
                                }
214
                        }
215
                }
216

217
                impl Threadable for Thread2 {
218
                        fn install(&self, installer: &Installer) {
219
                                let ended = Arc::clone(&self.ended);
220
                                installer.spawn("name1", |notifier| {
221
                                        move || {
222
                                                while !ended.load(Ordering::Acquire) {
223
                                                        sleep(Duration::from_millis(10));
224
                                                }
225
                                                notifier.end();
226
                                        }
227
                                });
228
                        }
229

230
                        fn end(&self) {
231
                                self.ended.store(true, Ordering::Release);
232
                        }
233
                }
234

235
                let runtime = Runtime::new(ThreadStatuses::new());
236
                let mut thread1 = Thread1::new();
237
                let mut thread2 = Thread2::new();
238
                runtime.register(&mut thread1);
239
                runtime.register(&mut thread2);
240
                assert_err!(runtime.join());
241
        }
242

243
        #[test]
244
        fn run_thread_request_pause() {
245
                struct Thread1;
246

247
                impl Thread1 {
248
                        const fn new() -> Self {
249
                                Self {}
250
                        }
251
                }
252

253
                impl Threadable for Thread1 {
254
                        fn install(&self, installer: &Installer) {
255
                                installer.spawn("name0", |notifier| {
256
                                        move || {
257
                                                notifier.request_pause();
258
                                                notifier.end();
259
                                        }
260
                                });
261
                        }
262
                }
263

264
                struct Thread2 {
265
                        paused: Arc<AtomicBool>,
266
                }
267

268
                impl Thread2 {
269
                        fn new() -> Self {
270
                                Self {
271
                                        paused: Arc::new(AtomicBool::new(false)),
272
                                }
273
                        }
274
                }
275

276
                impl Threadable for Thread2 {
277
                        fn install(&self, installer: &Installer) {
278
                                let paused = Arc::clone(&self.paused);
279
                                installer.spawn("name1", |notifier| {
280
                                        move || {
281
                                                while !paused.load(Ordering::Acquire) {
282
                                                        sleep(Duration::from_millis(10));
283
                                                }
284
                                                notifier.end();
285
                                                notifier.request_end();
286
                                        }
287
                                });
288
                        }
289

290
                        fn pause(&self) {
291
                                self.paused.store(true, Ordering::Release);
292
                        }
293
                }
294

295
                let runtime = Runtime::new(ThreadStatuses::new());
296
                let mut thread1 = Thread1::new();
297
                let mut thread2 = Thread2::new();
298
                runtime.register(&mut thread1);
299
                runtime.register(&mut thread2);
300
                runtime.join().unwrap();
301
                assert!(thread2.paused.load(Ordering::Acquire));
302
        }
303

304
        #[test]
305
        fn run_thread_request_resume() {
306
                struct Thread1;
307

308
                impl Thread1 {
309
                        const fn new() -> Self {
310
                                Self {}
311
                        }
312
                }
313

314
                impl Threadable for Thread1 {
315
                        fn install(&self, installer: &Installer) {
316
                                installer.spawn("name0", |notifier| {
317
                                        move || {
318
                                                notifier.request_resume();
319
                                                notifier.end();
320
                                        }
321
                                });
322
                        }
323
                }
324

325
                struct Thread2 {
326
                        resumed: Arc<AtomicBool>,
327
                }
328

329
                impl Thread2 {
330
                        fn new() -> Self {
331
                                Self {
332
                                        resumed: Arc::new(AtomicBool::new(false)),
333
                                }
334
                        }
335
                }
336

337
                impl Threadable for Thread2 {
338
                        fn install(&self, installer: &Installer) {
339
                                let resumed = Arc::clone(&self.resumed);
340
                                installer.spawn("name1", |notifier| {
341
                                        move || {
342
                                                while !resumed.load(Ordering::Acquire) {
343
                                                        sleep(Duration::from_millis(10));
344
                                                }
345
                                                notifier.end();
346
                                                notifier.request_end();
347
                                        }
348
                                });
349
                        }
350

351
                        fn resume(&self) {
352
                                self.resumed.store(true, Ordering::Release);
353
                        }
354
                }
355

356
                let runtime = Runtime::new(ThreadStatuses::new());
357
                let mut thread1 = Thread1::new();
358
                let mut thread2 = Thread2::new();
359
                runtime.register(&mut thread1);
360
                runtime.register(&mut thread2);
361
                runtime.join().unwrap();
362
                assert!(thread2.resumed.load(Ordering::Acquire));
363
        }
364

365
        #[test]
366
        fn run_thread_request_end() {
367
                struct Thread1;
368

369
                impl Thread1 {
370
                        const fn new() -> Self {
371
                                Self {}
372
                        }
373
                }
374

375
                impl Threadable for Thread1 {
376
                        fn install(&self, installer: &Installer) {
377
                                installer.spawn("name0", |notifier| {
378
                                        move || {
379
                                                notifier.request_end();
380
                                                notifier.end();
381
                                        }
382
                                });
383
                        }
384
                }
385

386
                struct Thread2 {
387
                        ended: Arc<AtomicBool>,
388
                }
389

390
                impl Thread2 {
391
                        fn new() -> Self {
392
                                Self {
393
                                        ended: Arc::new(AtomicBool::new(false)),
394
                                }
395
                        }
396
                }
397

398
                impl Threadable for Thread2 {
399
                        fn install(&self, installer: &Installer) {
400
                                let ended = Arc::clone(&self.ended);
401
                                installer.spawn("name1", |notifier| {
402
                                        move || {
403
                                                while !ended.load(Ordering::Acquire) {
404
                                                        sleep(Duration::from_millis(10));
405
                                                }
406
                                                notifier.end();
407
                                        }
408
                                });
409
                        }
410

411
                        fn end(&self) {
412
                                self.ended.store(true, Ordering::Release);
413
                        }
414
                }
415

416
                let runtime = Runtime::new(ThreadStatuses::new());
417
                let mut thread1 = Thread1::new();
418
                let mut thread2 = Thread2::new();
419
                runtime.register(&mut thread1);
420
                runtime.register(&mut thread2);
421
                runtime.join().unwrap();
422
                assert!(thread2.ended.load(Ordering::Acquire));
423
        }
424

425
        #[test]
426
        fn runnable_debug() {
427
                struct Thread;
428

429
                impl Thread {
430
                        const fn new() -> Self {
431
                                Self {}
432
                        }
433
                }
434

435
                impl Threadable for Thread {
436
                        fn install(&self, installer: &Installer) {
437
                                installer.spawn("name", |notifier| {
438
                                        move || {
439
                                                notifier.end();
440
                                                notifier.request_end();
441
                                        }
442
                                });
443
                        }
444
                }
445

446
                let runtime = Runtime::new(ThreadStatuses::new());
447
                let mut thread = Thread::new();
448
                runtime.register(&mut thread);
449
                runtime.join().unwrap();
450
                assert_eq!(format!("{runtime:?}"), "Runtime {{ ... }}");
451
        }
452
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc