• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

MitMaro / git-interactive-rebase-tool / 15519513798

08 Jun 2025 02:35PM UTC coverage: 97.589% (+0.3%) from 97.262%
15519513798

push

github

MitMaro
Add aarch64 Linux as tested platform

4858 of 4978 relevant lines covered (97.59%)

2.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

86.0
/src/runtime/runtime.rs
1
use std::{
2
        fmt::{Debug, Formatter},
3
        sync::Arc,
4
        thread,
5
};
6

7
use crossbeam_channel::{Receiver, Sender, unbounded};
8
use parking_lot::Mutex;
9

10
use crate::runtime::{Installer, RuntimeError, Status, ThreadStatuses, Threadable};
11

12
const RUNTIME_THREAD_NAME: &str = "runtime";
13

14
/// A system the manages the lifetime of threads. This includes ensuring errors are handled, threads are paused and
15
/// resumed on request and that once the main application is completed, all threads complete and end.
16
pub(crate) struct Runtime<'runtime> {
17
        receiver: Receiver<(String, Status)>,
18
        sender: Sender<(String, Status)>,
19
        thread_statuses: ThreadStatuses,
20
        threadables: Arc<Mutex<Vec<&'runtime mut dyn Threadable>>>,
21
}
22

23
impl<'runtime> Runtime<'runtime> {
24
        /// Create a new instances of the `Runtime`.
25
        #[must_use]
26
        pub(crate) fn new(thread_statuses: ThreadStatuses) -> Self {
1✔
27
                let (sender, receiver) = unbounded();
2✔
28

29
                thread_statuses.register_thread(RUNTIME_THREAD_NAME, Status::Waiting);
1✔
30

31
                Self {
32
                        receiver,
33
                        sender,
34
                        thread_statuses,
35
                        threadables: Arc::new(Mutex::new(vec![])),
2✔
36
                }
37
        }
38

39
        /// Register a new `Threadable`.
40
        pub(crate) fn register(&self, threadable: &'runtime mut (dyn Threadable)) {
1✔
41
                self.threadables.lock().push(threadable);
1✔
42
        }
43

44
        /// Join the runtime thread, waiting for all threads to finish.
45
        ///
46
        /// # Errors
47
        /// Returns and error if any of the threads registered to the runtime produce an error.
48
        #[expect(
49
                clippy::iter_over_hash_type,
50
                reason = "Iteration order does not matter in this situation."
51
        )]
52
        pub(crate) fn join(&self) -> Result<(), RuntimeError> {
1✔
53
                let installer = Installer::new(self.thread_statuses.clone(), self.sender.clone());
1✔
54
                {
55
                        let threadables = self.threadables.lock();
2✔
56
                        for threadable in threadables.iter() {
2✔
57
                                threadable.install(&installer);
2✔
58
                        }
59
                }
60

61
                thread::scope(|scope| {
2✔
62
                        for (name, op) in installer.into_ops().drain() {
4✔
63
                                let _handle = thread::Builder::new()
3✔
64
                                        .name(String::from(name.as_str()))
2✔
65
                                        .spawn_scoped(scope, op)
1✔
66
                                        .map_err(|_err| RuntimeError::ThreadSpawnError(name))?;
1✔
67
                        }
68

69
                        let mut result = Ok(());
1✔
70

71
                        for (name, status) in &self.receiver {
2✔
72
                                match status {
1✔
73
                                        Status::Error(err) => {
1✔
74
                                                // since we entered an error state, we attempt to shutdown the other threads, but
75
                                                // they could fail due to the error state, but keeping the shutdown error is less
76
                                                // important than the original error.
77
                                                let _result = self.shutdown();
1✔
78
                                                result = Err(err);
2✔
79
                                                break;
×
80
                                        },
81
                                        Status::RequestPause => {
×
82
                                                for threadable in self.threadables.lock().iter() {
3✔
83
                                                        threadable.pause();
2✔
84
                                                }
85
                                        },
86
                                        Status::RequestResume => {
×
87
                                                for threadable in self.threadables.lock().iter() {
3✔
88
                                                        threadable.resume();
2✔
89
                                                }
90
                                        },
91
                                        Status::RequestEnd => {
×
92
                                                self.thread_statuses.update_thread(RUNTIME_THREAD_NAME, Status::Ended);
1✔
93
                                                for threadable in self.threadables.lock().iter() {
2✔
94
                                                        threadable.end();
2✔
95
                                                }
96
                                        },
97
                                        Status::New | Status::Busy | Status::Waiting | Status::Ended => {},
×
98
                                }
99

100
                                self.thread_statuses.update_thread(name.as_str(), status);
2✔
101

102
                                if self.thread_statuses.all_ended() {
1✔
103
                                        result = self.shutdown();
2✔
104
                                        break;
×
105
                                }
106
                        }
107
                        result
1✔
108
                })
109
        }
110

111
        fn shutdown(&self) -> Result<(), RuntimeError> {
1✔
112
                if self.thread_statuses.all_ended() {
1✔
113
                        return Ok(());
1✔
114
                }
115

116
                for threadable in self.threadables.lock().iter() {
2✔
117
                        threadable.end();
2✔
118
                }
119
                self.sender
×
120
                        .send((String::from(RUNTIME_THREAD_NAME), Status::Ended))
1✔
121
                        .map_err(|_err| RuntimeError::SendError)
1✔
122
        }
123
}
124

125
impl Debug for Runtime<'_> {
126
        fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
1✔
127
                f.write_str("Runtime {{ ... }}")
1✔
128
        }
129
}
130

131
#[cfg(test)]
132
mod tests {
133
        use std::{
134
                sync::atomic::{AtomicBool, Ordering},
135
                thread::sleep,
136
                time::Duration,
137
        };
138

139
        use claims::assert_err;
140

141
        use super::*;
142

143
        impl Runtime<'_> {
144
                /// Get a cloned copy of the `ThreadStatuses`.
145
                #[must_use]
146
                pub(crate) fn statuses(&self) -> ThreadStatuses {
147
                        self.thread_statuses.clone()
148
                }
149
        }
150

151
        #[test]
152
        fn run_thread_finish() {
153
                struct Thread;
154

155
                impl Thread {
156
                        const fn new() -> Self {
157
                                Self {}
158
                        }
159
                }
160

161
                impl Threadable for Thread {
162
                        fn install(&self, installer: &Installer) {
163
                                installer.spawn("name", |notifier| {
164
                                        move || {
165
                                                notifier.end();
166
                                                notifier.request_end();
167
                                        }
168
                                });
169
                        }
170
                }
171

172
                let runtime = Runtime::new(ThreadStatuses::new());
173
                let mut thread = Thread::new();
174
                runtime.register(&mut thread);
175
                runtime.join().unwrap();
176
                assert!(runtime.statuses().all_ended());
177
        }
178

179
        #[test]
180
        fn run_thread_error() {
181
                struct Thread1;
182

183
                impl Thread1 {
184
                        const fn new() -> Self {
185
                                Self {}
186
                        }
187
                }
188

189
                impl Threadable for Thread1 {
190
                        fn install(&self, installer: &Installer) {
191
                                installer.spawn("name0", |notifier| {
192
                                        move || {
193
                                                notifier.error(RuntimeError::ThreadError(String::from("error")));
194
                                        }
195
                                });
196
                        }
197
                }
198

199
                struct Thread2 {
200
                        ended: Arc<AtomicBool>,
201
                }
202

203
                impl Thread2 {
204
                        fn new() -> Self {
205
                                Self {
206
                                        ended: Arc::new(AtomicBool::new(false)),
207
                                }
208
                        }
209
                }
210

211
                impl Threadable for Thread2 {
212
                        fn install(&self, installer: &Installer) {
213
                                let ended = Arc::clone(&self.ended);
214
                                installer.spawn("name1", |notifier| {
215
                                        move || {
216
                                                while !ended.load(Ordering::Acquire) {
217
                                                        sleep(Duration::from_millis(10));
218
                                                }
219
                                                notifier.end();
220
                                        }
221
                                });
222
                        }
223

224
                        fn end(&self) {
225
                                self.ended.store(true, Ordering::Release);
226
                        }
227
                }
228

229
                let runtime = Runtime::new(ThreadStatuses::new());
230
                let mut thread1 = Thread1::new();
231
                let mut thread2 = Thread2::new();
232
                runtime.register(&mut thread1);
233
                runtime.register(&mut thread2);
234
                assert_err!(runtime.join());
235
        }
236

237
        #[test]
238
        fn run_thread_request_pause() {
239
                struct Thread1;
240

241
                impl Thread1 {
242
                        const fn new() -> Self {
243
                                Self {}
244
                        }
245
                }
246

247
                impl Threadable for Thread1 {
248
                        fn install(&self, installer: &Installer) {
249
                                installer.spawn("name0", |notifier| {
250
                                        move || {
251
                                                notifier.request_pause();
252
                                                notifier.end();
253
                                        }
254
                                });
255
                        }
256
                }
257

258
                struct Thread2 {
259
                        paused: Arc<AtomicBool>,
260
                }
261

262
                impl Thread2 {
263
                        fn new() -> Self {
264
                                Self {
265
                                        paused: Arc::new(AtomicBool::new(false)),
266
                                }
267
                        }
268
                }
269

270
                impl Threadable for Thread2 {
271
                        fn install(&self, installer: &Installer) {
272
                                let paused = Arc::clone(&self.paused);
273
                                installer.spawn("name1", |notifier| {
274
                                        move || {
275
                                                while !paused.load(Ordering::Acquire) {
276
                                                        sleep(Duration::from_millis(10));
277
                                                }
278
                                                notifier.end();
279
                                                notifier.request_end();
280
                                        }
281
                                });
282
                        }
283

284
                        fn pause(&self) {
285
                                self.paused.store(true, Ordering::Release);
286
                        }
287
                }
288

289
                let runtime = Runtime::new(ThreadStatuses::new());
290
                let mut thread1 = Thread1::new();
291
                let mut thread2 = Thread2::new();
292
                runtime.register(&mut thread1);
293
                runtime.register(&mut thread2);
294
                runtime.join().unwrap();
295
                assert!(thread2.paused.load(Ordering::Acquire));
296
        }
297

298
        #[test]
299
        fn run_thread_request_resume() {
300
                struct Thread1;
301

302
                impl Thread1 {
303
                        const fn new() -> Self {
304
                                Self {}
305
                        }
306
                }
307

308
                impl Threadable for Thread1 {
309
                        fn install(&self, installer: &Installer) {
310
                                installer.spawn("name0", |notifier| {
311
                                        move || {
312
                                                notifier.request_resume();
313
                                                notifier.end();
314
                                        }
315
                                });
316
                        }
317
                }
318

319
                struct Thread2 {
320
                        resumed: Arc<AtomicBool>,
321
                }
322

323
                impl Thread2 {
324
                        fn new() -> Self {
325
                                Self {
326
                                        resumed: Arc::new(AtomicBool::new(false)),
327
                                }
328
                        }
329
                }
330

331
                impl Threadable for Thread2 {
332
                        fn install(&self, installer: &Installer) {
333
                                let resumed = Arc::clone(&self.resumed);
334
                                installer.spawn("name1", |notifier| {
335
                                        move || {
336
                                                while !resumed.load(Ordering::Acquire) {
337
                                                        sleep(Duration::from_millis(10));
338
                                                }
339
                                                notifier.end();
340
                                                notifier.request_end();
341
                                        }
342
                                });
343
                        }
344

345
                        fn resume(&self) {
346
                                self.resumed.store(true, Ordering::Release);
347
                        }
348
                }
349

350
                let runtime = Runtime::new(ThreadStatuses::new());
351
                let mut thread1 = Thread1::new();
352
                let mut thread2 = Thread2::new();
353
                runtime.register(&mut thread1);
354
                runtime.register(&mut thread2);
355
                runtime.join().unwrap();
356
                assert!(thread2.resumed.load(Ordering::Acquire));
357
        }
358

359
        #[test]
360
        fn run_thread_request_end() {
361
                struct Thread1;
362

363
                impl Thread1 {
364
                        const fn new() -> Self {
365
                                Self {}
366
                        }
367
                }
368

369
                impl Threadable for Thread1 {
370
                        fn install(&self, installer: &Installer) {
371
                                installer.spawn("name0", |notifier| {
372
                                        move || {
373
                                                notifier.request_end();
374
                                                notifier.end();
375
                                        }
376
                                });
377
                        }
378
                }
379

380
                struct Thread2 {
381
                        ended: Arc<AtomicBool>,
382
                }
383

384
                impl Thread2 {
385
                        fn new() -> Self {
386
                                Self {
387
                                        ended: Arc::new(AtomicBool::new(false)),
388
                                }
389
                        }
390
                }
391

392
                impl Threadable for Thread2 {
393
                        fn install(&self, installer: &Installer) {
394
                                let ended = Arc::clone(&self.ended);
395
                                installer.spawn("name1", |notifier| {
396
                                        move || {
397
                                                while !ended.load(Ordering::Acquire) {
398
                                                        sleep(Duration::from_millis(10));
399
                                                }
400
                                                notifier.end();
401
                                        }
402
                                });
403
                        }
404

405
                        fn end(&self) {
406
                                self.ended.store(true, Ordering::Release);
407
                        }
408
                }
409

410
                let runtime = Runtime::new(ThreadStatuses::new());
411
                let mut thread1 = Thread1::new();
412
                let mut thread2 = Thread2::new();
413
                runtime.register(&mut thread1);
414
                runtime.register(&mut thread2);
415
                runtime.join().unwrap();
416
                assert!(thread2.ended.load(Ordering::Acquire));
417
        }
418

419
        #[test]
420
        fn runnable_debug() {
421
                struct Thread;
422

423
                impl Thread {
424
                        const fn new() -> Self {
425
                                Self {}
426
                        }
427
                }
428

429
                impl Threadable for Thread {
430
                        fn install(&self, installer: &Installer) {
431
                                installer.spawn("name", |notifier| {
432
                                        move || {
433
                                                notifier.end();
434
                                                notifier.request_end();
435
                                        }
436
                                });
437
                        }
438
                }
439

440
                let runtime = Runtime::new(ThreadStatuses::new());
441
                let mut thread = Thread::new();
442
                runtime.register(&mut thread);
443
                runtime.join().unwrap();
444
                assert_eq!(format!("{runtime:?}"), "Runtime {{ ... }}");
445
        }
446
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc